diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 30 | ||||
| -rw-r--r-- | src/git/authorization.rs | 102 | ||||
| -rw-r--r-- | src/git/handlers.rs | 108 | ||||
| -rw-r--r-- | src/git/mod.rs | 2 | ||||
| -rw-r--r-- | src/git/sync.rs | 6 | ||||
| -rw-r--r-- | src/http/mod.rs | 2 | ||||
| -rw-r--r-- | src/main.rs | 75 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 165 | ||||
| -rw-r--r-- | src/nostr/events.rs | 2 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 9 | ||||
| -rw-r--r-- | src/purgatory/helpers.rs | 204 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 341 | ||||
| -rw-r--r-- | src/purgatory/sync/context.rs | 163 | ||||
| -rw-r--r-- | src/purgatory/sync/functions.rs | 22 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 30 | ||||
| -rw-r--r-- | src/sync/mod.rs | 2 | ||||
| -rw-r--r-- | src/sync/naughty_list.rs | 485 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 142 |
18 files changed, 1493 insertions, 397 deletions
diff --git a/src/config.rs b/src/config.rs index 7062187..dd7b1e3 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -115,16 +115,19 @@ pub struct ArchiveConfig { | |||
| 115 | /// | 115 | /// |
| 116 | /// WARNING: Setting this to true allows anyone to mirror any repository | 116 | /// WARNING: Setting this to true allows anyone to mirror any repository |
| 117 | /// to this relay, potentially causing storage/bandwidth exhaustion. | 117 | /// to this relay, potentially causing storage/bandwidth exhaustion. |
| 118 | #[serde(default)] | ||
| 118 | pub archive_all: bool, | 119 | pub archive_all: bool, |
| 119 | 120 | ||
| 120 | /// Whitelist entries for selective archiving | 121 | /// Whitelist entries for selective archiving |
| 121 | /// | 122 | /// |
| 122 | /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode). | 123 | /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode). |
| 124 | #[serde(default)] | ||
| 123 | pub whitelist: Vec<WhitelistEntry>, | 125 | pub whitelist: Vec<WhitelistEntry>, |
| 124 | 126 | ||
| 125 | /// GRASP server domains to archive (archive all repositories from these domains) | 127 | /// GRASP server domains to archive (archive all repositories from these domains) |
| 126 | /// | 128 | /// |
| 127 | /// If non-empty, archives all repositories from the specified GRASP server domains. | 129 | /// If non-empty, archives all repositories from the specified GRASP server domains. |
| 130 | #[serde(default)] | ||
| 128 | pub grasp_services: Vec<String>, | 131 | pub grasp_services: Vec<String>, |
| 129 | 132 | ||
| 130 | /// Read-only archive mode: relay is a read-only sync of archived repositories | 133 | /// Read-only archive mode: relay is a read-only sync of archived repositories |
| @@ -132,6 +135,7 @@ pub struct ArchiveConfig { | |||
| 132 | /// When true, the relay ONLY accepts announcements matching the archive whitelist/all. | 135 | /// When true, the relay ONLY accepts announcements matching the archive whitelist/all. |
| 133 | /// Announcements listing the relay but not in the whitelist are rejected. | 136 | /// Announcements listing the relay but not in the whitelist are rejected. |
| 134 | /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos. | 137 | /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos. |
| 138 | #[serde(default)] | ||
| 135 | pub read_only: bool, | 139 | pub read_only: bool, |
| 136 | } | 140 | } |
| 137 | 141 | ||
| @@ -178,6 +182,7 @@ pub struct RepositoryConfig { | |||
| 178 | /// Whitelist entries for selective repository acceptance | 182 | /// Whitelist entries for selective repository acceptance |
| 179 | /// | 183 | /// |
| 180 | /// If empty, all repositories listing the service are accepted (GRASP-01 mode). | 184 | /// If empty, all repositories listing the service are accepted (GRASP-01 mode). |
| 185 | #[serde(default)] | ||
| 181 | pub whitelist: Vec<WhitelistEntry>, | 186 | pub whitelist: Vec<WhitelistEntry>, |
| 182 | } | 187 | } |
| 183 | 188 | ||
| @@ -204,6 +209,7 @@ pub struct BlacklistConfig { | |||
| 204 | /// | 209 | /// |
| 205 | /// If empty, no repositories are blacklisted. | 210 | /// If empty, no repositories are blacklisted. |
| 206 | /// Blacklist takes precedence over both archive and repository whitelists. | 211 | /// Blacklist takes precedence over both archive and repository whitelists. |
| 212 | #[serde(default)] | ||
| 207 | pub blacklist: Vec<WhitelistEntry>, | 213 | pub blacklist: Vec<WhitelistEntry>, |
| 208 | } | 214 | } |
| 209 | 215 | ||
| @@ -245,6 +251,7 @@ pub struct EventBlacklistConfig { | |||
| 245 | /// | 251 | /// |
| 246 | /// If empty, no events are blacklisted by author. | 252 | /// If empty, no events are blacklisted by author. |
| 247 | /// Applies to ALL event types, preventing events from reaching both the relay and purgatory. | 253 | /// Applies to ALL event types, preventing events from reaching both the relay and purgatory. |
| 254 | #[serde(default)] | ||
| 248 | pub blacklisted_npubs: Vec<String>, | 255 | pub blacklisted_npubs: Vec<String>, |
| 249 | } | 256 | } |
| 250 | 257 | ||
| @@ -466,6 +473,10 @@ pub struct Config { | |||
| 466 | /// Prevents connection exhaustion DoS attacks | 473 | /// Prevents connection exhaustion DoS attacks |
| 467 | #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)] | 474 | #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)] |
| 468 | pub max_connections: usize, | 475 | pub max_connections: usize, |
| 476 | |||
| 477 | /// Log level for application logging | ||
| 478 | #[arg(long, env = "NGIT_LOG_LEVEL", default_value = "info")] | ||
| 479 | pub log_level: String, | ||
| 469 | } | 480 | } |
| 470 | 481 | ||
| 471 | impl Config { | 482 | impl Config { |
| @@ -748,6 +759,7 @@ impl Config { | |||
| 748 | repository_blacklist: String::new(), | 759 | repository_blacklist: String::new(), |
| 749 | event_blacklist: String::new(), | 760 | event_blacklist: String::new(), |
| 750 | max_connections: 500, | 761 | max_connections: 500, |
| 762 | log_level: "debug".to_string(), | ||
| 751 | } | 763 | } |
| 752 | } | 764 | } |
| 753 | } | 765 | } |
| @@ -1069,14 +1081,14 @@ mod tests { | |||
| 1069 | fn test_archive_read_only_defaults() { | 1081 | fn test_archive_read_only_defaults() { |
| 1070 | // Default: false when no archive mode | 1082 | // Default: false when no archive mode |
| 1071 | let config = Config::for_testing(); | 1083 | let config = Config::for_testing(); |
| 1072 | assert_eq!(config.archive_config().read_only, false); | 1084 | assert!(!config.archive_config().read_only); |
| 1073 | 1085 | ||
| 1074 | // Default: true when archive_all is set | 1086 | // Default: true when archive_all is set |
| 1075 | let config = Config { | 1087 | let config = Config { |
| 1076 | archive_all: true, | 1088 | archive_all: true, |
| 1077 | ..Config::for_testing() | 1089 | ..Config::for_testing() |
| 1078 | }; | 1090 | }; |
| 1079 | assert_eq!(config.archive_config().read_only, true); | 1091 | assert!(config.archive_config().read_only); |
| 1080 | 1092 | ||
| 1081 | // Default: true when archive_whitelist is set | 1093 | // Default: true when archive_whitelist is set |
| 1082 | let keys = Keys::generate(); | 1094 | let keys = Keys::generate(); |
| @@ -1085,7 +1097,7 @@ mod tests { | |||
| 1085 | archive_whitelist: test_npub, | 1097 | archive_whitelist: test_npub, |
| 1086 | ..Config::for_testing() | 1098 | ..Config::for_testing() |
| 1087 | }; | 1099 | }; |
| 1088 | assert_eq!(config.archive_config().read_only, true); | 1100 | assert!(config.archive_config().read_only); |
| 1089 | } | 1101 | } |
| 1090 | 1102 | ||
| 1091 | #[test] | 1103 | #[test] |
| @@ -1096,7 +1108,7 @@ mod tests { | |||
| 1096 | archive_read_only: Some(true), | 1108 | archive_read_only: Some(true), |
| 1097 | ..Config::for_testing() | 1109 | ..Config::for_testing() |
| 1098 | }; | 1110 | }; |
| 1099 | assert_eq!(config.archive_config().read_only, true); | 1111 | assert!(config.archive_config().read_only); |
| 1100 | 1112 | ||
| 1101 | // Explicit false with archive_all (unusual but allowed) | 1113 | // Explicit false with archive_all (unusual but allowed) |
| 1102 | let config = Config { | 1114 | let config = Config { |
| @@ -1104,14 +1116,14 @@ mod tests { | |||
| 1104 | archive_read_only: Some(false), | 1116 | archive_read_only: Some(false), |
| 1105 | ..Config::for_testing() | 1117 | ..Config::for_testing() |
| 1106 | }; | 1118 | }; |
| 1107 | assert_eq!(config.archive_config().read_only, false); | 1119 | assert!(!config.archive_config().read_only); |
| 1108 | 1120 | ||
| 1109 | // Explicit false without archive mode | 1121 | // Explicit false without archive mode |
| 1110 | let config = Config { | 1122 | let config = Config { |
| 1111 | archive_read_only: Some(false), | 1123 | archive_read_only: Some(false), |
| 1112 | ..Config::for_testing() | 1124 | ..Config::for_testing() |
| 1113 | }; | 1125 | }; |
| 1114 | assert_eq!(config.archive_config().read_only, false); | 1126 | assert!(!config.archive_config().read_only); |
| 1115 | } | 1127 | } |
| 1116 | 1128 | ||
| 1117 | #[test] | 1129 | #[test] |
| @@ -1514,7 +1526,7 @@ mod tests { | |||
| 1514 | }; | 1526 | }; |
| 1515 | let archive_config = config.archive_config(); | 1527 | let archive_config = config.archive_config(); |
| 1516 | assert!(archive_config.enabled()); | 1528 | assert!(archive_config.enabled()); |
| 1517 | assert_eq!(archive_config.read_only, true); // Default to true | 1529 | assert!(archive_config.read_only); // Default to true |
| 1518 | } | 1530 | } |
| 1519 | 1531 | ||
| 1520 | #[test] | 1532 | #[test] |
| @@ -1524,7 +1536,7 @@ mod tests { | |||
| 1524 | archive_grasp_services: "git.example.com".to_string(), | 1536 | archive_grasp_services: "git.example.com".to_string(), |
| 1525 | ..Config::for_testing() | 1537 | ..Config::for_testing() |
| 1526 | }; | 1538 | }; |
| 1527 | assert_eq!(config.archive_config().read_only, true); | 1539 | assert!(config.archive_config().read_only); |
| 1528 | } | 1540 | } |
| 1529 | 1541 | ||
| 1530 | #[test] | 1542 | #[test] |
| @@ -1535,7 +1547,7 @@ mod tests { | |||
| 1535 | archive_read_only: Some(false), | 1547 | archive_read_only: Some(false), |
| 1536 | ..Config::for_testing() | 1548 | ..Config::for_testing() |
| 1537 | }; | 1549 | }; |
| 1538 | assert_eq!(config.archive_config().read_only, false); | 1550 | assert!(!config.archive_config().read_only); |
| 1539 | } | 1551 | } |
| 1540 | 1552 | ||
| 1541 | #[test] | 1553 | #[test] |
diff --git a/src/git/authorization.rs b/src/git/authorization.rs index 69a0751..df780bb 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs | |||
| @@ -609,6 +609,28 @@ pub async fn get_state_authorization_for_specific_owner_repo( | |||
| 609 | owner_pubkey | 609 | owner_pubkey |
| 610 | ); | 610 | ); |
| 611 | 611 | ||
| 612 | // Accept pushes where all refs are already at the desired state (old_oid == new_oid) | ||
| 613 | // This handles race conditions where state events are applied between fetch and push | ||
| 614 | if !pushed_refs.is_empty() { | ||
| 615 | let all_refs_unchanged = pushed_refs | ||
| 616 | .iter() | ||
| 617 | .all(|(old_oid, new_oid, _)| old_oid == new_oid); | ||
| 618 | |||
| 619 | if all_refs_unchanged { | ||
| 620 | debug!( | ||
| 621 | "All pushed refs unchanged (old_oid == new_oid) for {} owned by {}, accepting without purgatory check", | ||
| 622 | identifier, owner_pubkey | ||
| 623 | ); | ||
| 624 | return Ok(AuthorizationResult { | ||
| 625 | authorized: true, | ||
| 626 | reason: "Push accepted: all refs already at desired state (no-op)".to_string(), | ||
| 627 | state: None, | ||
| 628 | maintainers: authorized.into_iter().collect(), | ||
| 629 | purgatory_events: vec![], | ||
| 630 | }); | ||
| 631 | } | ||
| 632 | } | ||
| 633 | |||
| 612 | // Check purgatory for matching state events | 634 | // Check purgatory for matching state events |
| 613 | // Convert pushed refs to RefUpdate (filter out refs/nostr/* refs) | 635 | // Convert pushed refs to RefUpdate (filter out refs/nostr/* refs) |
| 614 | let pushed_updates: Vec<RefUpdate> = pushed_refs | 636 | let pushed_updates: Vec<RefUpdate> = pushed_refs |
| @@ -699,12 +721,88 @@ pub async fn get_state_authorization_for_specific_owner_repo( | |||
| 699 | debug!("Purgatory events found but none from authorized authors"); | 721 | debug!("Purgatory events found but none from authorized authors"); |
| 700 | } | 722 | } |
| 701 | } else { | 723 | } else { |
| 702 | debug!("No matching state events found in purgatory"); | 724 | // Check if there are ANY state events in purgatory for this identifier |
| 725 | let all_purgatory_states = purgatory.find_state(identifier); | ||
| 726 | |||
| 727 | if !all_purgatory_states.is_empty() { | ||
| 728 | // There are state events but none match the push - diagnose why | ||
| 729 | debug!( | ||
| 730 | "Found {} state event(s) in purgatory for {} but none match the push", | ||
| 731 | all_purgatory_states.len(), | ||
| 732 | identifier | ||
| 733 | ); | ||
| 734 | |||
| 735 | // Count authorized state events and collect diagnostic info | ||
| 736 | let mut authorized_count = 0; | ||
| 737 | let mut diagnostic_reasons = Vec::new(); | ||
| 738 | |||
| 739 | // Diagnose why each authorized state event doesn't match | ||
| 740 | for entry in all_purgatory_states.iter() { | ||
| 741 | let author_hex = entry.event.pubkey.to_hex(); | ||
| 742 | if authorized.contains(&author_hex) { | ||
| 743 | authorized_count += 1; | ||
| 744 | if let Some(reason) = crate::purgatory::diagnose_state_mismatch( | ||
| 745 | &entry.event, | ||
| 746 | &pushed_updates, | ||
| 747 | &local_refs, | ||
| 748 | ) { | ||
| 749 | debug!( | ||
| 750 | "State event {} from authorized author {} doesn't match push: {}", | ||
| 751 | entry.event.id, | ||
| 752 | entry | ||
| 753 | .event | ||
| 754 | .pubkey | ||
| 755 | .to_bech32() | ||
| 756 | .unwrap_or_else(|_| author_hex.clone()), | ||
| 757 | reason | ||
| 758 | ); | ||
| 759 | diagnostic_reasons.push(reason); | ||
| 760 | } | ||
| 761 | } | ||
| 762 | } | ||
| 763 | |||
| 764 | // Create concise WARN message summarizing the rejection | ||
| 765 | let summary = if authorized_count > 0 { | ||
| 766 | let reason_summary = if !diagnostic_reasons.is_empty() { | ||
| 767 | // Take the first diagnostic reason as representative | ||
| 768 | format!(" ({})", diagnostic_reasons[0]) | ||
| 769 | } else { | ||
| 770 | String::new() | ||
| 771 | }; | ||
| 772 | format!( | ||
| 773 | "{} state event{} in purgatory from authorized publisher{} but doesn't match push{}", | ||
| 774 | authorized_count, | ||
| 775 | if authorized_count == 1 { "" } else { "s" }, | ||
| 776 | if authorized_count == 1 { "" } else { "s" }, | ||
| 777 | reason_summary | ||
| 778 | ) | ||
| 779 | } else { | ||
| 780 | format!( | ||
| 781 | "{} state event{} in purgatory but none from authorized publishers", | ||
| 782 | all_purgatory_states.len(), | ||
| 783 | if all_purgatory_states.len() == 1 { | ||
| 784 | "" | ||
| 785 | } else { | ||
| 786 | "s" | ||
| 787 | } | ||
| 788 | ) | ||
| 789 | }; | ||
| 790 | |||
| 791 | warn!("Push rejected for {}: {}", identifier, summary); | ||
| 792 | return Ok(AuthorizationResult::denied(summary)); | ||
| 793 | } else { | ||
| 794 | debug!("No state events found in purgatory for {}", identifier); | ||
| 795 | warn!( | ||
| 796 | "Push rejected for {}: No state events in purgatory", | ||
| 797 | identifier | ||
| 798 | ); | ||
| 799 | return Ok(AuthorizationResult::denied("No state events in purgatory")); | ||
| 800 | } | ||
| 703 | } | 801 | } |
| 704 | 802 | ||
| 705 | // No matching state found in purgatory | 803 | // No matching state found in purgatory |
| 706 | Ok(AuthorizationResult::denied( | 804 | Ok(AuthorizationResult::denied( |
| 707 | "No state event found in purgatory from authorized publishers", | 805 | "No matching state event found in purgatory from authorized publishers", |
| 708 | )) | 806 | )) |
| 709 | } | 807 | } |
| 710 | 808 | ||
diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 13d6ba0..f43cbb6 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs | |||
| @@ -100,6 +100,42 @@ pub async fn handle_info_refs( | |||
| 100 | .unwrap()) | 100 | .unwrap()) |
| 101 | } | 101 | } |
| 102 | 102 | ||
| 103 | /// Build an HTTP 200 OK response with an ERR pkt-line for git protocol errors. | ||
| 104 | /// | ||
| 105 | /// Per the git smart HTTP protocol spec, protocol-level errors (like "not our ref") | ||
| 106 | /// should be returned as HTTP 200 OK with the error message in pkt-line format: | ||
| 107 | /// `PKT-LINE("ERR" SP explanation-text)` | ||
| 108 | /// | ||
| 109 | /// This allows git clients to properly parse and display the error message. | ||
| 110 | fn build_git_protocol_error_response( | ||
| 111 | service: GitService, | ||
| 112 | error_message: &str, | ||
| 113 | ) -> Response<Full<Bytes>> { | ||
| 114 | // Format: "ERR <message>\n" | ||
| 115 | let err_content = format!("ERR {}\n", error_message.trim()); | ||
| 116 | let err_pktline = PktLine::data(err_content.as_bytes()).encode(); | ||
| 117 | |||
| 118 | Response::builder() | ||
| 119 | .status(StatusCode::OK) | ||
| 120 | .header("content-type", service.result_content_type()) | ||
| 121 | .header("cache-control", "no-cache") | ||
| 122 | .body(Full::new(Bytes::from(err_pktline))) | ||
| 123 | .unwrap() | ||
| 124 | } | ||
| 125 | |||
| 126 | /// Check if a git process failure is a protocol error (vs transport error). | ||
| 127 | /// | ||
| 128 | /// Protocol errors are communicated via stderr when git exits with code 128. | ||
| 129 | /// These should be returned to the client as HTTP 200 with ERR pkt-line. | ||
| 130 | /// | ||
| 131 | /// Transport errors (process spawn failures, I/O errors, signals) should | ||
| 132 | /// remain as HTTP 500 errors. | ||
| 133 | fn is_git_protocol_error(exit_code: Option<i32>, stderr: &[u8]) -> bool { | ||
| 134 | // Git uses exit code 128 for protocol/usage errors | ||
| 135 | // If there's stderr content, it's a protocol error message | ||
| 136 | exit_code == Some(128) && !stderr.is_empty() | ||
| 137 | } | ||
| 138 | |||
| 103 | /// Handle POST /git-upload-pack (clone/fetch) | 139 | /// Handle POST /git-upload-pack (clone/fetch) |
| 104 | pub async fn handle_upload_pack( | 140 | pub async fn handle_upload_pack( |
| 105 | repo_path: PathBuf, | 141 | repo_path: PathBuf, |
| @@ -121,7 +157,10 @@ pub async fn handle_upload_pack( | |||
| 121 | stdin | 157 | stdin |
| 122 | .write_all(&request_body) | 158 | .write_all(&request_body) |
| 123 | .await | 159 | .await |
| 124 | .map_err(GitError::IoError)?; | 160 | .map_err(|e| { |
| 161 | error!("Failed to write to git upload-pack stdin: {}", e); | ||
| 162 | GitError::IoError(e) | ||
| 163 | })?; | ||
| 125 | // Close stdin to signal end of input | 164 | // Close stdin to signal end of input |
| 126 | drop(stdin); | 165 | drop(stdin); |
| 127 | } | 166 | } |
| @@ -135,7 +174,10 @@ pub async fn handle_upload_pack( | |||
| 135 | stdout | 174 | stdout |
| 136 | .read_to_end(&mut output) | 175 | .read_to_end(&mut output) |
| 137 | .await | 176 | .await |
| 138 | .map_err(GitError::IoError)?; | 177 | .map_err(|e| { |
| 178 | error!("Failed to read git upload-pack stdout: {}", e); | ||
| 179 | GitError::IoError(e) | ||
| 180 | })?; | ||
| 139 | } | 181 | } |
| 140 | 182 | ||
| 141 | if let Some(stderr) = git.take_stderr() { | 183 | if let Some(stderr) = git.take_stderr() { |
| @@ -143,14 +185,35 @@ pub async fn handle_upload_pack( | |||
| 143 | stderr | 185 | stderr |
| 144 | .read_to_end(&mut stderr_output) | 186 | .read_to_end(&mut stderr_output) |
| 145 | .await | 187 | .await |
| 146 | .map_err(GitError::IoError)?; | 188 | .map_err(|e| { |
| 189 | error!("Failed to read git upload-pack stderr: {}", e); | ||
| 190 | GitError::IoError(e) | ||
| 191 | })?; | ||
| 147 | } | 192 | } |
| 148 | 193 | ||
| 149 | // Wait for process | 194 | // Wait for process |
| 150 | let status = git.wait().await.map_err(GitError::IoError)?; | 195 | let status = git.wait().await.map_err(|e| { |
| 196 | error!("Failed to wait for git upload-pack process: {}", e); | ||
| 197 | GitError::IoError(e) | ||
| 198 | })?; | ||
| 151 | 199 | ||
| 152 | if !status.success() { | 200 | if !status.success() { |
| 153 | let stderr_str = String::from_utf8_lossy(&stderr_output); | 201 | let stderr_str = String::from_utf8_lossy(&stderr_output); |
| 202 | |||
| 203 | // Check if this is a git protocol error (exit code 128 with stderr) | ||
| 204 | // Protocol errors should be returned as HTTP 200 with ERR pkt-line | ||
| 205 | if is_git_protocol_error(status.code(), &stderr_output) { | ||
| 206 | warn!( | ||
| 207 | "Git upload-pack protocol error (returning ERR pkt-line): {}", | ||
| 208 | stderr_str | ||
| 209 | ); | ||
| 210 | return Ok(build_git_protocol_error_response( | ||
| 211 | GitService::UploadPack, | ||
| 212 | &stderr_str, | ||
| 213 | )); | ||
| 214 | } | ||
| 215 | |||
| 216 | // Transport errors (spawn failures, signals, etc.) remain as HTTP 500 | ||
| 154 | error!("Git upload-pack failed: {}", stderr_str); | 217 | error!("Git upload-pack failed: {}", stderr_str); |
| 155 | return Err(GitError::GitFailed(status.code())); | 218 | return Err(GitError::GitFailed(status.code())); |
| 156 | } | 219 | } |
| @@ -206,7 +269,7 @@ pub async fn handle_receive_pack( | |||
| 206 | } | 269 | } |
| 207 | 270 | ||
| 208 | // GRASP Authorization Check | 271 | // GRASP Authorization Check |
| 209 | info!( | 272 | debug!( |
| 210 | "Authorizing push for {} owned by {} via database query", | 273 | "Authorizing push for {} owned by {} via database query", |
| 211 | identifier, owner_pubkey | 274 | identifier, owner_pubkey |
| 212 | ); | 275 | ); |
| @@ -251,7 +314,10 @@ pub async fn handle_receive_pack( | |||
| 251 | stdin | 314 | stdin |
| 252 | .write_all(&request_body) | 315 | .write_all(&request_body) |
| 253 | .await | 316 | .await |
| 254 | .map_err(GitError::IoError)?; | 317 | .map_err(|e| { |
| 318 | error!("Failed to write to git receive-pack stdin: {}", e); | ||
| 319 | GitError::IoError(e) | ||
| 320 | })?; | ||
| 255 | drop(stdin); | 321 | drop(stdin); |
| 256 | } | 322 | } |
| 257 | 323 | ||
| @@ -264,7 +330,10 @@ pub async fn handle_receive_pack( | |||
| 264 | stdout | 330 | stdout |
| 265 | .read_to_end(&mut output) | 331 | .read_to_end(&mut output) |
| 266 | .await | 332 | .await |
| 267 | .map_err(GitError::IoError)?; | 333 | .map_err(|e| { |
| 334 | error!("Failed to read git receive-pack stdout: {}", e); | ||
| 335 | GitError::IoError(e) | ||
| 336 | })?; | ||
| 268 | } | 337 | } |
| 269 | 338 | ||
| 270 | if let Some(stderr) = git.take_stderr() { | 339 | if let Some(stderr) = git.take_stderr() { |
| @@ -272,14 +341,35 @@ pub async fn handle_receive_pack( | |||
| 272 | stderr | 341 | stderr |
| 273 | .read_to_end(&mut stderr_output) | 342 | .read_to_end(&mut stderr_output) |
| 274 | .await | 343 | .await |
| 275 | .map_err(GitError::IoError)?; | 344 | .map_err(|e| { |
| 345 | error!("Failed to read git receive-pack stderr: {}", e); | ||
| 346 | GitError::IoError(e) | ||
| 347 | })?; | ||
| 276 | } | 348 | } |
| 277 | 349 | ||
| 278 | // Wait for process | 350 | // Wait for process |
| 279 | let status = git.wait().await.map_err(GitError::IoError)?; | 351 | let status = git.wait().await.map_err(|e| { |
| 352 | error!("Failed to wait for git receive-pack process: {}", e); | ||
| 353 | GitError::IoError(e) | ||
| 354 | })?; | ||
| 280 | 355 | ||
| 281 | if !status.success() { | 356 | if !status.success() { |
| 282 | let stderr_str = String::from_utf8_lossy(&stderr_output); | 357 | let stderr_str = String::from_utf8_lossy(&stderr_output); |
| 358 | |||
| 359 | // Check if this is a git protocol error (exit code 128 with stderr) | ||
| 360 | // Protocol errors should be returned as HTTP 200 with ERR pkt-line | ||
| 361 | if is_git_protocol_error(status.code(), &stderr_output) { | ||
| 362 | warn!( | ||
| 363 | "Git receive-pack protocol error (returning ERR pkt-line): {}", | ||
| 364 | stderr_str | ||
| 365 | ); | ||
| 366 | return Ok(build_git_protocol_error_response( | ||
| 367 | GitService::ReceivePack, | ||
| 368 | &stderr_str, | ||
| 369 | )); | ||
| 370 | } | ||
| 371 | |||
| 372 | // Transport errors (spawn failures, signals, etc.) remain as HTTP 500 | ||
| 283 | error!("Git receive-pack failed: {}", stderr_str); | 373 | error!("Git receive-pack failed: {}", stderr_str); |
| 284 | return Err(GitError::GitFailed(status.code())); | 374 | return Err(GitError::GitFailed(status.code())); |
| 285 | } | 375 | } |
diff --git a/src/git/mod.rs b/src/git/mod.rs index b3fee69..1255b6f 100644 --- a/src/git/mod.rs +++ b/src/git/mod.rs | |||
| @@ -253,7 +253,7 @@ pub fn update_ref(repo_path: &Path, ref_name: &str, commit_hash: &str) -> Result | |||
| 253 | return Err(format!("git update-ref failed: {}", stderr)); | 253 | return Err(format!("git update-ref failed: {}", stderr)); |
| 254 | } | 254 | } |
| 255 | 255 | ||
| 256 | info!( | 256 | debug!( |
| 257 | "Updated ref {} to {} in {}", | 257 | "Updated ref {} to {} in {}", |
| 258 | ref_name, | 258 | ref_name, |
| 259 | commit_hash, | 259 | commit_hash, |
diff --git a/src/git/sync.rs b/src/git/sync.rs index 8401736..c24d16b 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs | |||
| @@ -673,7 +673,7 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) -> | |||
| 673 | match git::update_ref(repo_path, ref_name, expected_commit) { | 673 | match git::update_ref(repo_path, ref_name, expected_commit) { |
| 674 | Ok(()) => { | 674 | Ok(()) => { |
| 675 | if current_commit.is_some() { | 675 | if current_commit.is_some() { |
| 676 | info!( | 676 | debug!( |
| 677 | "Updated {} to {} in {}", | 677 | "Updated {} to {} in {}", |
| 678 | ref_name, | 678 | ref_name, |
| 679 | expected_commit, | 679 | expected_commit, |
| @@ -681,7 +681,7 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) -> | |||
| 681 | ); | 681 | ); |
| 682 | result.refs_updated += 1; | 682 | result.refs_updated += 1; |
| 683 | } else { | 683 | } else { |
| 684 | info!( | 684 | debug!( |
| 685 | "Created {} at {} in {}", | 685 | "Created {} at {} in {}", |
| 686 | ref_name, | 686 | ref_name, |
| 687 | expected_commit, | 687 | expected_commit, |
| @@ -707,7 +707,7 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) -> | |||
| 707 | if let Some(head_commit) = state.get_branch_commit(branch_name) { | 707 | if let Some(head_commit) = state.get_branch_commit(branch_name) { |
| 708 | match git::try_set_head_if_available(repo_path, head_ref, head_commit) { | 708 | match git::try_set_head_if_available(repo_path, head_ref, head_commit) { |
| 709 | Ok(true) => { | 709 | Ok(true) => { |
| 710 | info!("Set HEAD to {} in {}", head_ref, repo_path.display()); | 710 | debug!("Set HEAD to {} in {}", head_ref, repo_path.display()); |
| 711 | result.head_set = true; | 711 | result.head_set = true; |
| 712 | } | 712 | } |
| 713 | Ok(false) => { | 713 | Ok(false) => { |
diff --git a/src/http/mod.rs b/src/http/mod.rs index cfd7c52..76ffef3 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs | |||
| @@ -345,7 +345,7 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 345 | .unwrap()) | 345 | .unwrap()) |
| 346 | } | 346 | } |
| 347 | Err(e) => { | 347 | Err(e) => { |
| 348 | tracing::error!("Git handler error: {}", e); | 348 | // Errors are already logged at their source with full context |
| 349 | let error_msg = format!("Git error: {}", e); | 349 | let error_msg = format!("Git error: {}", e); |
| 350 | Ok(add_cors_headers(Response::builder()) | 350 | Ok(add_cors_headers(Response::builder()) |
| 351 | .status(e.status_code()) | 351 | .status(e.status_code()) |
diff --git a/src/main.rs b/src/main.rs index 6769cf3..bf3aefb 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,8 +3,8 @@ use std::{path::PathBuf, sync::Arc}; | |||
| 3 | 3 | ||
| 4 | use anyhow::Result; | 4 | use anyhow::Result; |
| 5 | use tokio::signal; | 5 | use tokio::signal; |
| 6 | use tracing::{error, info, warn, Level}; | 6 | use tracing::{error, info, warn}; |
| 7 | use tracing_subscriber::FmtSubscriber; | 7 | use tracing_subscriber::{EnvFilter, FmtSubscriber}; |
| 8 | 8 | ||
| 9 | use ngit_grasp::{ | 9 | use ngit_grasp::{ |
| 10 | config::{Config, DatabaseBackend}, | 10 | config::{Config, DatabaseBackend}, |
| @@ -17,16 +17,16 @@ use ngit_grasp::{ | |||
| 17 | 17 | ||
| 18 | #[tokio::main] | 18 | #[tokio::main] |
| 19 | async fn main() -> Result<()> { | 19 | async fn main() -> Result<()> { |
| 20 | // Initialize tracing | 20 | // Load configuration first (priority: CLI flags > env vars > .env file > defaults) |
| 21 | let config = Config::load()?; | ||
| 22 | |||
| 23 | // Initialize tracing with configured log level | ||
| 21 | let subscriber = FmtSubscriber::builder() | 24 | let subscriber = FmtSubscriber::builder() |
| 22 | .with_max_level(Level::DEBUG) | 25 | .with_env_filter(EnvFilter::new(&config.log_level)) |
| 23 | .finish(); | 26 | .finish(); |
| 24 | tracing::subscriber::set_global_default(subscriber)?; | 27 | tracing::subscriber::set_global_default(subscriber)?; |
| 25 | 28 | ||
| 26 | info!("Starting ngit-grasp with nostr-relay-builder..."); | 29 | info!("Starting ngit-grasp with log level: {}", config.log_level); |
| 27 | |||
| 28 | // Load configuration (priority: CLI flags > env vars > .env file > defaults) | ||
| 29 | let config = Config::load()?; | ||
| 30 | 30 | ||
| 31 | // Validate configuration and fail fast on fatal errors | 31 | // Validate configuration and fail fast on fatal errors |
| 32 | // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings | 32 | // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings |
| @@ -189,8 +189,8 @@ async fn main() -> Result<()> { | |||
| 189 | )); | 189 | )); |
| 190 | 190 | ||
| 191 | // Create throttle manager for rate limiting remote git servers | 191 | // Create throttle manager for rate limiting remote git servers |
| 192 | // Default: 5 concurrent requests per domain, 30 requests per minute per domain | 192 | // Default: 5 concurrent requests per domain, 60 requests per minute per domain |
| 193 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | 193 | let throttle_manager = Arc::new(ThrottleManager::new(5, 60)); |
| 194 | throttle_manager.set_context(sync_ctx.clone()); | 194 | throttle_manager.set_context(sync_ctx.clone()); |
| 195 | throttle_manager.set_git_naughty_list(git_naughty_list.clone()); | 195 | throttle_manager.set_git_naughty_list(git_naughty_list.clone()); |
| 196 | 196 | ||
| @@ -212,20 +212,49 @@ async fn main() -> Result<()> { | |||
| 212 | let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); | 212 | let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); |
| 213 | 213 | ||
| 214 | // Run server until shutdown signal, then cleanup | 214 | // Run server until shutdown signal, then cleanup |
| 215 | tokio::select! { | 215 | #[cfg(unix)] |
| 216 | result = http::run_server( | 216 | { |
| 217 | config, | 217 | use tokio::signal::unix::{signal, SignalKind}; |
| 218 | relay_with_db.relay, | 218 | let mut sigterm = signal(SignalKind::terminate())?; |
| 219 | relay_with_db.database, | 219 | |
| 220 | metrics, | 220 | tokio::select! { |
| 221 | purgatory, | 221 | result = http::run_server( |
| 222 | http_write_policy, | 222 | config, |
| 223 | http_rejected_index, | 223 | relay_with_db.relay, |
| 224 | ) => { | 224 | relay_with_db.database, |
| 225 | result? | 225 | metrics, |
| 226 | purgatory, | ||
| 227 | http_write_policy, | ||
| 228 | http_rejected_index, | ||
| 229 | ) => { | ||
| 230 | result? | ||
| 231 | } | ||
| 232 | _ = signal::ctrl_c() => { | ||
| 233 | info!("Received SIGINT (Ctrl+C), cleaning up..."); | ||
| 234 | } | ||
| 235 | _ = sigterm.recv() => { | ||
| 236 | info!("Received SIGTERM, cleaning up..."); | ||
| 237 | } | ||
| 226 | } | 238 | } |
| 227 | _ = signal::ctrl_c() => { | 239 | } |
| 228 | info!("Received shutdown signal, cleaning up..."); | 240 | |
| 241 | #[cfg(not(unix))] | ||
| 242 | { | ||
| 243 | tokio::select! { | ||
| 244 | result = http::run_server( | ||
| 245 | config, | ||
| 246 | relay_with_db.relay, | ||
| 247 | relay_with_db.database, | ||
| 248 | metrics, | ||
| 249 | purgatory, | ||
| 250 | http_write_policy, | ||
| 251 | http_rejected_index, | ||
| 252 | ) => { | ||
| 253 | result? | ||
| 254 | } | ||
| 255 | _ = signal::ctrl_c() => { | ||
| 256 | info!("Received SIGINT (Ctrl+C), cleaning up..."); | ||
| 257 | } | ||
| 229 | } | 258 | } |
| 230 | } | 259 | } |
| 231 | 260 | ||
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index d056e46..7a05348 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -102,6 +102,62 @@ impl Nip34WritePolicy { | |||
| 102 | self.ctx.set_local_relay(relay); | 102 | self.ctx.set_local_relay(relay); |
| 103 | } | 103 | } |
| 104 | 104 | ||
| 105 | /// Extract repository identifier from event's 'd' tag. | ||
| 106 | /// | ||
| 107 | /// Used for structured logging when parsing fails - we try to extract | ||
| 108 | /// the identifier even if full parsing failed. | ||
| 109 | fn extract_identifier_from_event(event: &Event) -> String { | ||
| 110 | use nostr_relay_builder::prelude::TagKind; | ||
| 111 | event | ||
| 112 | .tags | ||
| 113 | .iter() | ||
| 114 | .find(|t| t.kind() == TagKind::d()) | ||
| 115 | .and_then(|t| t.content()) | ||
| 116 | .map(|s| s.to_string()) | ||
| 117 | .unwrap_or_else(|| "unknown".to_string()) | ||
| 118 | } | ||
| 119 | |||
| 120 | /// Extract ALL repository identifiers from PR event's 'a' tags. | ||
| 121 | /// | ||
| 122 | /// PR events can reference multiple repositories via multiple 'a' tags | ||
| 123 | /// (e.g., when there are multiple maintainers). Each tag has format | ||
| 124 | /// `30617:<owner_pubkey>:<identifier>`. | ||
| 125 | /// | ||
| 126 | /// Returns a vector of unique identifiers, or `["unknown"]` if none found. | ||
| 127 | fn extract_repos_from_pr_event(event: &Event) -> Vec<String> { | ||
| 128 | let repos: Vec<String> = event | ||
| 129 | .tags | ||
| 130 | .iter() | ||
| 131 | .filter_map(|tag| { | ||
| 132 | let tag_vec = tag.clone().to_vec(); | ||
| 133 | if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { | ||
| 134 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 135 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 136 | if parts.len() >= 3 { | ||
| 137 | Some(parts[2].to_string()) | ||
| 138 | } else { | ||
| 139 | None | ||
| 140 | } | ||
| 141 | } else { | ||
| 142 | None | ||
| 143 | } | ||
| 144 | }) | ||
| 145 | .collect(); | ||
| 146 | |||
| 147 | // Deduplicate while preserving order | ||
| 148 | let mut seen = std::collections::HashSet::new(); | ||
| 149 | let unique_repos: Vec<String> = repos | ||
| 150 | .into_iter() | ||
| 151 | .filter(|r| seen.insert(r.clone())) | ||
| 152 | .collect(); | ||
| 153 | |||
| 154 | if unique_repos.is_empty() { | ||
| 155 | vec!["unknown".to_string()] | ||
| 156 | } else { | ||
| 157 | unique_repos | ||
| 158 | } | ||
| 159 | } | ||
| 160 | |||
| 105 | /// Handle repository announcement event | 161 | /// Handle repository announcement event |
| 106 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { | 162 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { |
| 107 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | 163 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); |
| @@ -133,10 +189,21 @@ impl Nip34WritePolicy { | |||
| 133 | WritePolicyResult::Accept | 189 | WritePolicyResult::Accept |
| 134 | } | 190 | } |
| 135 | Err(e) => { | 191 | Err(e) => { |
| 192 | let npub = event | ||
| 193 | .pubkey | ||
| 194 | .to_bech32() | ||
| 195 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 196 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 197 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 198 | let repo = Self::extract_identifier_from_event(event); | ||
| 199 | // Structured log for migration scripts | ||
| 136 | tracing::warn!( | 200 | tracing::warn!( |
| 137 | "Failed to parse repository announcement {}: {}", | 201 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", |
| 138 | event_id_str, | 202 | event.kind.as_u16(), |
| 139 | e | 203 | event_id_short, |
| 204 | e, | ||
| 205 | repo, | ||
| 206 | npub | ||
| 140 | ); | 207 | ); |
| 141 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) | 208 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) |
| 142 | } | 209 | } |
| @@ -185,10 +252,21 @@ impl Nip34WritePolicy { | |||
| 185 | WritePolicyResult::Accept | 252 | WritePolicyResult::Accept |
| 186 | } | 253 | } |
| 187 | Err(e) => { | 254 | Err(e) => { |
| 255 | let npub = event | ||
| 256 | .pubkey | ||
| 257 | .to_bech32() | ||
| 258 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 259 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 260 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 261 | let repo = Self::extract_identifier_from_event(event); | ||
| 262 | // Structured log for migration scripts | ||
| 188 | tracing::warn!( | 263 | tracing::warn!( |
| 189 | "Failed to parse maintainer announcement {}: {}", | 264 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", |
| 190 | event_id_str, | 265 | event.kind.as_u16(), |
| 191 | e | 266 | event_id_short, |
| 267 | e, | ||
| 268 | repo, | ||
| 269 | npub | ||
| 192 | ); | 270 | ); |
| 193 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) | 271 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) |
| 194 | } | 272 | } |
| @@ -211,8 +289,6 @@ impl Nip34WritePolicy { | |||
| 211 | /// * `event` - The state event to validate | 289 | /// * `event` - The state event to validate |
| 212 | /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) | 290 | /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) |
| 213 | async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult { | 291 | async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult { |
| 214 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | ||
| 215 | |||
| 216 | match self.state_policy.validate(event) { | 292 | match self.state_policy.validate(event) { |
| 217 | StateResult::Accept => { | 293 | StateResult::Accept => { |
| 218 | // Process state alignment asynchronously | 294 | // Process state alignment asynchronously |
| @@ -223,7 +299,22 @@ impl Nip34WritePolicy { | |||
| 223 | { | 299 | { |
| 224 | Ok(poilicy_result) => poilicy_result, | 300 | Ok(poilicy_result) => poilicy_result, |
| 225 | Err(e) => { | 301 | Err(e) => { |
| 226 | tracing::warn!("Failed to process state event {}: {}", event_id_str, e); | 302 | let npub = event |
| 303 | .pubkey | ||
| 304 | .to_bech32() | ||
| 305 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 306 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 307 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 308 | let repo = Self::extract_identifier_from_event(event); | ||
| 309 | // Structured log for migration scripts | ||
| 310 | tracing::warn!( | ||
| 311 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", | ||
| 312 | event.kind.as_u16(), | ||
| 313 | event_id_short, | ||
| 314 | e, | ||
| 315 | repo, | ||
| 316 | npub | ||
| 317 | ); | ||
| 227 | // reject if processing failed | 318 | // reject if processing failed |
| 228 | WritePolicyResult::Reject { | 319 | WritePolicyResult::Reject { |
| 229 | status: false, | 320 | status: false, |
| @@ -233,7 +324,22 @@ impl Nip34WritePolicy { | |||
| 233 | } | 324 | } |
| 234 | } | 325 | } |
| 235 | StateResult::Reject(reason) => { | 326 | StateResult::Reject(reason) => { |
| 236 | tracing::warn!("Rejected repository state {}: {}", event_id_str, reason); | 327 | let npub = event |
| 328 | .pubkey | ||
| 329 | .to_bech32() | ||
| 330 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 331 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 332 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 333 | let repo = Self::extract_identifier_from_event(event); | ||
| 334 | // Structured log for migration scripts | ||
| 335 | tracing::warn!( | ||
| 336 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", | ||
| 337 | event.kind.as_u16(), | ||
| 338 | event_id_short, | ||
| 339 | reason, | ||
| 340 | repo, | ||
| 341 | npub | ||
| 342 | ); | ||
| 237 | WritePolicyResult::reject(reason) | 343 | WritePolicyResult::reject(reason) |
| 238 | } | 344 | } |
| 239 | } | 345 | } |
| @@ -331,9 +437,12 @@ impl Nip34WritePolicy { | |||
| 331 | ); | 437 | ); |
| 332 | 438 | ||
| 333 | // Add to purgatory | 439 | // Add to purgatory |
| 334 | self.ctx | 440 | self.ctx.purgatory.add_pr( |
| 335 | .purgatory | 441 | event.clone(), |
| 336 | .add_pr(event.clone(), event.id.to_hex(), commit.clone()); | 442 | event.id.to_hex(), |
| 443 | commit.clone(), | ||
| 444 | is_synced, | ||
| 445 | ); | ||
| 337 | 446 | ||
| 338 | WritePolicyResult::Reject { | 447 | WritePolicyResult::Reject { |
| 339 | status: true, // Client sees OK | 448 | status: true, // Client sees OK |
| @@ -351,11 +460,25 @@ impl Nip34WritePolicy { | |||
| 351 | } | 460 | } |
| 352 | Err(e) => { | 461 | Err(e) => { |
| 353 | // Error checking git data - reject event | 462 | // Error checking git data - reject event |
| 354 | tracing::warn!( | 463 | let npub = event |
| 355 | "Failed to check git data for PR event {}: {}", | 464 | .pubkey |
| 356 | event_id_str, | 465 | .to_bech32() |
| 357 | e | 466 | .unwrap_or_else(|_| event.pubkey.to_hex()); |
| 358 | ); | 467 | let event_id_short = &event.id.to_hex()[..12]; |
| 468 | // Extract ALL repo identifiers from 'a' tags for PR events | ||
| 469 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 470 | let repos = Self::extract_repos_from_pr_event(event); | ||
| 471 | // Structured log for migration scripts - log once per repo | ||
| 472 | for repo in &repos { | ||
| 473 | tracing::warn!( | ||
| 474 | "[PARSE_FAIL] kind={} event_id={}... reason=\"git data check failed: {}\" repo={} npub={}", | ||
| 475 | event.kind.as_u16(), | ||
| 476 | event_id_short, | ||
| 477 | e, | ||
| 478 | repo, | ||
| 479 | npub | ||
| 480 | ); | ||
| 481 | } | ||
| 359 | WritePolicyResult::reject(format!("Failed to check git data: {}", e)) | 482 | WritePolicyResult::reject(format!("Failed to check git data: {}", e)) |
| 360 | } | 483 | } |
| 361 | } | 484 | } |
| @@ -462,9 +585,11 @@ impl Nip34WritePolicy { | |||
| 462 | let (addressable_refs, event_refs) = | 585 | let (addressable_refs, event_refs) = |
| 463 | RelatedEventPolicy::extract_reference_tags(event); | 586 | RelatedEventPolicy::extract_reference_tags(event); |
| 464 | tracing::info!( | 587 | tracing::info!( |
| 465 | "Rejected orphan {} event {}: no references to accepted repos or events (checked {} addressable, {} event refs)", | 588 | "Rejected orphan {} event {} (kind={}, pubkey={}): no references to accepted repos or events (checked {} addressable, {} event refs)", |
| 466 | event_type, | 589 | event_type, |
| 467 | event_id_str, | 590 | event.id.to_hex(), |
| 591 | event.kind.as_u16(), | ||
| 592 | event.pubkey.to_hex(), | ||
| 468 | addressable_refs.len(), | 593 | addressable_refs.len(), |
| 469 | event_refs.len() | 594 | event_refs.len() |
| 470 | ); | 595 | ); |
diff --git a/src/nostr/events.rs b/src/nostr/events.rs index b9784f7..a441742 100644 --- a/src/nostr/events.rs +++ b/src/nostr/events.rs | |||
| @@ -424,7 +424,7 @@ pub fn validate_announcement( | |||
| 424 | { | 424 | { |
| 425 | return AnnouncementResult::Reject(format!( | 425 | return AnnouncementResult::Reject(format!( |
| 426 | "Announcement lists service but does not match repository whitelist. \ | 426 | "Announcement lists service but does not match repository whitelist. \ |
| 427 | Repository {}/{} not in whitelist", | 427 | Repository {}/{} not in whitelist", |
| 428 | npub, announcement.identifier | 428 | npub, announcement.identifier |
| 429 | )); | 429 | )); |
| 430 | } | 430 | } |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index e6de54e..df743ae 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -276,9 +276,12 @@ impl StatePolicy { | |||
| 276 | 276 | ||
| 277 | // If no git data - add to purgatory | 277 | // If no git data - add to purgatory |
| 278 | // (add_state automatically enqueues for background sync) | 278 | // (add_state automatically enqueues for background sync) |
| 279 | self.ctx | 279 | self.ctx.purgatory.add_state( |
| 280 | .purgatory | 280 | event.clone(), |
| 281 | .add_state(event.clone(), state.identifier.clone(), event.pubkey); | 281 | state.identifier.clone(), |
| 282 | event.pubkey, | ||
| 283 | is_synced, | ||
| 284 | ); | ||
| 282 | 285 | ||
| 283 | tracing::info!( | 286 | tracing::info!( |
| 284 | "state event added to purgatory: eventid: {}, identifier: {}", | 287 | "state event added to purgatory: eventid: {}, identifier: {}", |
diff --git a/src/purgatory/helpers.rs b/src/purgatory/helpers.rs index 193ef99..a9f6e66 100644 --- a/src/purgatory/helpers.rs +++ b/src/purgatory/helpers.rs | |||
| @@ -225,6 +225,117 @@ pub fn get_unpushed_refs(event: &Event, pushed_refs: &[RefPair]) -> Vec<RefPair> | |||
| 225 | .collect() | 225 | .collect() |
| 226 | } | 226 | } |
| 227 | 227 | ||
| 228 | /// Diagnose why a state event doesn't match the push. | ||
| 229 | /// | ||
| 230 | /// Returns a human-readable explanation of the mismatch between the state event | ||
| 231 | /// and what would result from applying the push to local refs. | ||
| 232 | /// | ||
| 233 | /// # Arguments | ||
| 234 | /// * `event` - The state event to check | ||
| 235 | /// * `pushed_updates` - Ref updates in the current push operation | ||
| 236 | /// * `local_refs` - Refs already existing locally (ref_name -> SHA) | ||
| 237 | /// | ||
| 238 | /// # Returns | ||
| 239 | /// String explaining why the state doesn't match, or None if it matches | ||
| 240 | pub fn diagnose_state_mismatch( | ||
| 241 | event: &Event, | ||
| 242 | pushed_updates: &[RefUpdate], | ||
| 243 | local_refs: &HashMap<String, String>, | ||
| 244 | ) -> Option<String> { | ||
| 245 | let state_refs = extract_refs_from_state(event); | ||
| 246 | |||
| 247 | // Filter local_refs to only branches and tags | ||
| 248 | let mut would_be_state: HashMap<String, String> = local_refs | ||
| 249 | .iter() | ||
| 250 | .filter(|(ref_name, _)| { | ||
| 251 | ref_name.starts_with("refs/heads/") || ref_name.starts_with("refs/tags/") | ||
| 252 | }) | ||
| 253 | .map(|(k, v)| (k.clone(), v.clone())) | ||
| 254 | .collect(); | ||
| 255 | |||
| 256 | // Apply all pushed updates to create the would-be state | ||
| 257 | for update in pushed_updates { | ||
| 258 | // Only process branches and tags | ||
| 259 | if !update.ref_name.starts_with("refs/heads/") && !update.ref_name.starts_with("refs/tags/") | ||
| 260 | { | ||
| 261 | continue; | ||
| 262 | } | ||
| 263 | |||
| 264 | if update.is_deletion() { | ||
| 265 | would_be_state.remove(&update.ref_name); | ||
| 266 | } else { | ||
| 267 | would_be_state.insert(update.ref_name.clone(), update.new_oid.clone()); | ||
| 268 | } | ||
| 269 | } | ||
| 270 | |||
| 271 | // Convert event's state refs to a HashMap for comparison | ||
| 272 | let declared_state: HashMap<String, String> = state_refs | ||
| 273 | .into_iter() | ||
| 274 | .map(|r| (r.ref_name, r.object_sha)) | ||
| 275 | .collect(); | ||
| 276 | |||
| 277 | // Check if they match | ||
| 278 | if would_be_state == declared_state { | ||
| 279 | return None; // No mismatch | ||
| 280 | } | ||
| 281 | |||
| 282 | // Build diagnostic message | ||
| 283 | let mut reasons = Vec::new(); | ||
| 284 | |||
| 285 | // Check for refs in declared state but not in would-be state | ||
| 286 | for (ref_name, declared_sha) in &declared_state { | ||
| 287 | if let Some(would_be_sha) = would_be_state.get(ref_name) { | ||
| 288 | if would_be_sha != declared_sha { | ||
| 289 | let would_be_short = if would_be_sha.len() >= 8 { | ||
| 290 | &would_be_sha[..8] | ||
| 291 | } else { | ||
| 292 | would_be_sha.as_str() | ||
| 293 | }; | ||
| 294 | let declared_short = if declared_sha.len() >= 8 { | ||
| 295 | &declared_sha[..8] | ||
| 296 | } else { | ||
| 297 | declared_sha.as_str() | ||
| 298 | }; | ||
| 299 | reasons.push(format!( | ||
| 300 | "{} would be at {} but state declares {}", | ||
| 301 | ref_name, would_be_short, declared_short | ||
| 302 | )); | ||
| 303 | } | ||
| 304 | } else { | ||
| 305 | let declared_short = if declared_sha.len() >= 8 { | ||
| 306 | &declared_sha[..8] | ||
| 307 | } else { | ||
| 308 | declared_sha.as_str() | ||
| 309 | }; | ||
| 310 | reasons.push(format!( | ||
| 311 | "{} missing (state declares {})", | ||
| 312 | ref_name, declared_short | ||
| 313 | )); | ||
| 314 | } | ||
| 315 | } | ||
| 316 | |||
| 317 | // Check for refs in would-be state but not in declared state | ||
| 318 | for (ref_name, would_be_sha) in &would_be_state { | ||
| 319 | if !declared_state.contains_key(ref_name) { | ||
| 320 | let would_be_short = if would_be_sha.len() >= 8 { | ||
| 321 | &would_be_sha[..8] | ||
| 322 | } else { | ||
| 323 | would_be_sha.as_str() | ||
| 324 | }; | ||
| 325 | reasons.push(format!( | ||
| 326 | "{} would exist at {} but state doesn't declare it", | ||
| 327 | ref_name, would_be_short | ||
| 328 | )); | ||
| 329 | } | ||
| 330 | } | ||
| 331 | |||
| 332 | if reasons.is_empty() { | ||
| 333 | Some("Unknown mismatch".to_string()) | ||
| 334 | } else { | ||
| 335 | Some(reasons.join("; ")) | ||
| 336 | } | ||
| 337 | } | ||
| 338 | |||
| 228 | #[cfg(test)] | 339 | #[cfg(test)] |
| 229 | mod tests { | 340 | mod tests { |
| 230 | use super::*; | 341 | use super::*; |
| @@ -695,4 +806,97 @@ mod tests { | |||
| 695 | // Should return true - real OID exists, symbolic ref skipped | 806 | // Should return true - real OID exists, symbolic ref skipped |
| 696 | assert!(can_apply_state(&event, repo_path)); | 807 | assert!(can_apply_state(&event, repo_path)); |
| 697 | } | 808 | } |
| 809 | |||
| 810 | #[test] | ||
| 811 | fn test_diagnose_state_mismatch_missing_ref() { | ||
| 812 | // State declares both main and test branches | ||
| 813 | let event = create_test_state_event( | ||
| 814 | "test-repo", | ||
| 815 | vec![("refs/heads/main", "abc123"), ("refs/heads/test", "def456")], | ||
| 816 | ); | ||
| 817 | |||
| 818 | // Push only creates test branch | ||
| 819 | let pushed_updates = vec![RefUpdate { | ||
| 820 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 821 | new_oid: "def456".to_string(), | ||
| 822 | ref_name: "refs/heads/test".to_string(), | ||
| 823 | }]; | ||
| 824 | |||
| 825 | // No local refs | ||
| 826 | let local_refs = HashMap::new(); | ||
| 827 | |||
| 828 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 829 | assert!(diagnosis.is_some()); | ||
| 830 | let msg = diagnosis.unwrap(); | ||
| 831 | assert!(msg.contains("refs/heads/main")); | ||
| 832 | assert!(msg.contains("missing")); | ||
| 833 | } | ||
| 834 | |||
| 835 | #[test] | ||
| 836 | fn test_diagnose_state_mismatch_wrong_sha() { | ||
| 837 | // State declares main at abc123 | ||
| 838 | let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]); | ||
| 839 | |||
| 840 | // Push updates main to different SHA | ||
| 841 | let pushed_updates = vec![RefUpdate { | ||
| 842 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 843 | new_oid: "wrong123".to_string(), | ||
| 844 | ref_name: "refs/heads/main".to_string(), | ||
| 845 | }]; | ||
| 846 | |||
| 847 | let local_refs = HashMap::new(); | ||
| 848 | |||
| 849 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 850 | assert!(diagnosis.is_some()); | ||
| 851 | let msg = diagnosis.unwrap(); | ||
| 852 | assert!(msg.contains("refs/heads/main")); | ||
| 853 | assert!(msg.contains("would be at")); | ||
| 854 | assert!(msg.contains("state declares")); | ||
| 855 | } | ||
| 856 | |||
| 857 | #[test] | ||
| 858 | fn test_diagnose_state_mismatch_extra_ref() { | ||
| 859 | // State declares only main | ||
| 860 | let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]); | ||
| 861 | |||
| 862 | // Push creates both main and test | ||
| 863 | let pushed_updates = vec![ | ||
| 864 | RefUpdate { | ||
| 865 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 866 | new_oid: "abc123".to_string(), | ||
| 867 | ref_name: "refs/heads/main".to_string(), | ||
| 868 | }, | ||
| 869 | RefUpdate { | ||
| 870 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 871 | new_oid: "def456".to_string(), | ||
| 872 | ref_name: "refs/heads/test".to_string(), | ||
| 873 | }, | ||
| 874 | ]; | ||
| 875 | |||
| 876 | let local_refs = HashMap::new(); | ||
| 877 | |||
| 878 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 879 | assert!(diagnosis.is_some()); | ||
| 880 | let msg = diagnosis.unwrap(); | ||
| 881 | assert!(msg.contains("refs/heads/test")); | ||
| 882 | assert!(msg.contains("doesn't declare")); | ||
| 883 | } | ||
| 884 | |||
| 885 | #[test] | ||
| 886 | fn test_diagnose_state_mismatch_no_mismatch() { | ||
| 887 | // State declares main | ||
| 888 | let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]); | ||
| 889 | |||
| 890 | // Push creates main at correct SHA | ||
| 891 | let pushed_updates = vec![RefUpdate { | ||
| 892 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 893 | new_oid: "abc123".to_string(), | ||
| 894 | ref_name: "refs/heads/main".to_string(), | ||
| 895 | }]; | ||
| 896 | |||
| 897 | let local_refs = HashMap::new(); | ||
| 898 | |||
| 899 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 900 | assert!(diagnosis.is_none()); // No mismatch | ||
| 901 | } | ||
| 698 | } | 902 | } |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 9a63bf6..bb6ff54 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -16,11 +16,12 @@ pub mod persistence; | |||
| 16 | pub mod sync; | 16 | pub mod sync; |
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use nostr_sdk::ToBech32; | ||
| 24 | use serde::{Deserialize, Serialize}; | 25 | use serde::{Deserialize, Serialize}; |
| 25 | use std::collections::HashMap; | 26 | use std::collections::HashMap; |
| 26 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| @@ -64,6 +65,9 @@ struct SerializableStatePurgatoryEntry { | |||
| 64 | created_at_offset_secs: u64, | 65 | created_at_offset_secs: u64, |
| 65 | /// Duration offset from saved_at for expires_at | 66 | /// Duration offset from saved_at for expires_at |
| 66 | expires_at_offset_secs: u64, | 67 | expires_at_offset_secs: u64, |
| 68 | /// Source of this event (direct submission vs sync) | ||
| 69 | #[serde(default)] | ||
| 70 | source: types::EventSource, | ||
| 67 | } | 71 | } |
| 68 | 72 | ||
| 69 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | 73 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. |
| @@ -81,6 +85,9 @@ struct SerializablePrPurgatoryEntry { | |||
| 81 | created_at_offset_secs: u64, | 85 | created_at_offset_secs: u64, |
| 82 | /// Duration offset from saved_at for expires_at | 86 | /// Duration offset from saved_at for expires_at |
| 83 | expires_at_offset_secs: u64, | 87 | expires_at_offset_secs: u64, |
| 88 | /// Source of this event (direct submission vs sync) | ||
| 89 | #[serde(default)] | ||
| 90 | source: types::EventSource, | ||
| 84 | } | 91 | } |
| 85 | 92 | ||
| 86 | /// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. | 93 | /// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. |
| @@ -313,11 +320,38 @@ impl Purgatory { | |||
| 313 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately | 320 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately |
| 314 | /// to override this delay. | 321 | /// to override this delay. |
| 315 | /// | 322 | /// |
| 323 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 324 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 325 | /// | ||
| 316 | /// # Arguments | 326 | /// # Arguments |
| 317 | /// * `event` - The state event (kind 30618) to hold | 327 | /// * `event` - The state event (kind 30618) to hold |
| 318 | /// * `identifier` - The repository identifier from the 'd' tag | 328 | /// * `identifier` - The repository identifier from the 'd' tag |
| 319 | /// * `author` - The event author's public key | 329 | /// * `author` - The event author's public key |
| 320 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 330 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 331 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) { | ||
| 332 | let source = if from_sync { | ||
| 333 | types::EventSource::Sync | ||
| 334 | } else { | ||
| 335 | types::EventSource::Direct | ||
| 336 | }; | ||
| 337 | |||
| 338 | // Check if event already exists - if so, potentially upgrade source | ||
| 339 | if let Some(mut entries) = self.state_events.get_mut(&identifier) { | ||
| 340 | if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) { | ||
| 341 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 342 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 343 | existing.source = types::EventSource::Direct; | ||
| 344 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 345 | tracing::debug!( | ||
| 346 | event_id = %event.id, | ||
| 347 | identifier = %identifier, | ||
| 348 | "Upgraded purgatory entry source from Sync to Direct, reset expiry" | ||
| 349 | ); | ||
| 350 | } | ||
| 351 | return; // Event already exists, don't add duplicate | ||
| 352 | } | ||
| 353 | } | ||
| 354 | |||
| 321 | let now = Instant::now(); | 355 | let now = Instant::now(); |
| 322 | let entry = StatePurgatoryEntry { | 356 | let entry = StatePurgatoryEntry { |
| 323 | event, | 357 | event, |
| @@ -325,6 +359,7 @@ impl Purgatory { | |||
| 325 | author, | 359 | author, |
| 326 | created_at: now, | 360 | created_at: now, |
| 327 | expires_at: now + DEFAULT_EXPIRY, | 361 | expires_at: now + DEFAULT_EXPIRY, |
| 362 | source, | ||
| 328 | }; | 363 | }; |
| 329 | 364 | ||
| 330 | self.state_events | 365 | self.state_events |
| @@ -344,11 +379,35 @@ impl Purgatory { | |||
| 344 | /// Automatically enqueues the referenced repository identifier for background sync | 379 | /// Automatically enqueues the referenced repository identifier for background sync |
| 345 | /// with the default delay (3 minutes), giving time for a git push to arrive. | 380 | /// with the default delay (3 minutes), giving time for a git push to arrive. |
| 346 | /// | 381 | /// |
| 382 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 383 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 384 | /// | ||
| 347 | /// # Arguments | 385 | /// # Arguments |
| 348 | /// * `event` - The PR event (kind 1617/1618) to hold | 386 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 349 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 387 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 350 | /// * `commit` - The commit SHA from the 'c' tag | 388 | /// * `commit` - The commit SHA from the 'c' tag |
| 351 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 389 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 390 | pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) { | ||
| 391 | let source = if from_sync { | ||
| 392 | types::EventSource::Sync | ||
| 393 | } else { | ||
| 394 | types::EventSource::Direct | ||
| 395 | }; | ||
| 396 | |||
| 397 | // Check if event already exists - if so, potentially upgrade source | ||
| 398 | if let Some(mut existing) = self.pr_events.get_mut(&event_id) { | ||
| 399 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 400 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 401 | existing.source = types::EventSource::Direct; | ||
| 402 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 403 | tracing::debug!( | ||
| 404 | event_id = %event_id, | ||
| 405 | "Upgraded PR purgatory entry source from Sync to Direct, reset expiry" | ||
| 406 | ); | ||
| 407 | } | ||
| 408 | return; // Event already exists, don't add duplicate | ||
| 409 | } | ||
| 410 | |||
| 352 | // Extract identifier from the event's `a` tag for sync enqueueing | 411 | // Extract identifier from the event's `a` tag for sync enqueueing |
| 353 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | 412 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); |
| 354 | 413 | ||
| @@ -358,6 +417,7 @@ impl Purgatory { | |||
| 358 | commit, | 417 | commit, |
| 359 | created_at: now, | 418 | created_at: now, |
| 360 | expires_at: now + DEFAULT_EXPIRY, | 419 | expires_at: now + DEFAULT_EXPIRY, |
| 420 | source, | ||
| 361 | }; | 421 | }; |
| 362 | 422 | ||
| 363 | self.pr_events.insert(event_id, entry); | 423 | self.pr_events.insert(event_id, entry); |
| @@ -371,6 +431,8 @@ impl Purgatory { | |||
| 371 | /// Add a PR placeholder (git data arrived before PR event). | 431 | /// Add a PR placeholder (git data arrived before PR event). |
| 372 | /// | 432 | /// |
| 373 | /// Creates a placeholder entry waiting for the corresponding PR event. | 433 | /// Creates a placeholder entry waiting for the corresponding PR event. |
| 434 | /// Placeholders are always marked as `Direct` source since they originate | ||
| 435 | /// from git pushes (direct user action). | ||
| 374 | /// | 436 | /// |
| 375 | /// # Arguments | 437 | /// # Arguments |
| 376 | /// * `event_id` - The expected event ID (from git ref name) | 438 | /// * `event_id` - The expected event ID (from git ref name) |
| @@ -382,6 +444,7 @@ impl Purgatory { | |||
| 382 | commit, | 444 | commit, |
| 383 | created_at: now, | 445 | created_at: now, |
| 384 | expires_at: now + DEFAULT_EXPIRY, | 446 | expires_at: now + DEFAULT_EXPIRY, |
| 447 | source: types::EventSource::Direct, // Git pushes are direct user actions | ||
| 385 | }; | 448 | }; |
| 386 | 449 | ||
| 387 | self.pr_events.insert(event_id, entry); | 450 | self.pr_events.insert(event_id, entry); |
| @@ -892,6 +955,9 @@ impl Purgatory { | |||
| 892 | /// prevent infinite re-sync loops. Events that expire without finding git data | 955 | /// prevent infinite re-sync loops. Events that expire without finding git data |
| 893 | /// will be filtered out during future negentropy/REQ sync operations. | 956 | /// will be filtered out during future negentropy/REQ sync operations. |
| 894 | /// | 957 | /// |
| 958 | /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event | ||
| 959 | /// to support migration scripts and operational monitoring. | ||
| 960 | /// | ||
| 895 | /// # Returns | 961 | /// # Returns |
| 896 | /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) | 962 | /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) |
| 897 | pub fn cleanup(&self) -> (usize, usize, usize) { | 963 | pub fn cleanup(&self) -> (usize, usize, usize) { |
| @@ -976,18 +1042,38 @@ impl Purgatory { | |||
| 976 | let mut state_removed = 0; | 1042 | let mut state_removed = 0; |
| 977 | 1043 | ||
| 978 | // Remove expired state events and mark them as expired | 1044 | // Remove expired state events and mark them as expired |
| 979 | self.state_events.retain(|_, entries| { | 1045 | self.state_events.retain(|identifier, entries| { |
| 980 | let original_len = entries.len(); | 1046 | let original_len = entries.len(); |
| 981 | // Collect event IDs before removing | ||
| 982 | let expired_ids: Vec<EventId> = entries | ||
| 983 | .iter() | ||
| 984 | .filter(|entry| entry.expires_at <= now) | ||
| 985 | .map(|entry| entry.event.id) | ||
| 986 | .collect(); | ||
| 987 | 1047 | ||
| 988 | // Mark as expired to prevent re-sync | 1048 | // Log and collect expired entries before removing |
| 989 | for event_id in expired_ids { | 1049 | for entry in entries.iter().filter(|e| e.expires_at <= now) { |
| 990 | self.mark_expired(event_id); | 1050 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); |
| 1051 | let event_id_short = &entry.event.id.to_hex()[..12]; | ||
| 1052 | let source_str = if entry.source.is_direct() { "direct" } else { "sync" }; | ||
| 1053 | |||
| 1054 | // Structured log for migration scripts | ||
| 1055 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 1056 | if entry.source.is_direct() { | ||
| 1057 | tracing::warn!( | ||
| 1058 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1059 | identifier, | ||
| 1060 | npub, | ||
| 1061 | event_id_short, | ||
| 1062 | entry.event.kind.as_u16(), | ||
| 1063 | source_str | ||
| 1064 | ); | ||
| 1065 | } else { | ||
| 1066 | tracing::debug!( | ||
| 1067 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1068 | identifier, | ||
| 1069 | npub, | ||
| 1070 | event_id_short, | ||
| 1071 | entry.event.kind.as_u16(), | ||
| 1072 | source_str | ||
| 1073 | ); | ||
| 1074 | } | ||
| 1075 | |||
| 1076 | self.mark_expired(entry.event.id); | ||
| 991 | } | 1077 | } |
| 992 | 1078 | ||
| 993 | // Remove expired entries | 1079 | // Remove expired entries |
| @@ -997,21 +1083,103 @@ impl Purgatory { | |||
| 997 | }); | 1083 | }); |
| 998 | 1084 | ||
| 999 | // Remove expired PR events and mark them as expired | 1085 | // Remove expired PR events and mark them as expired |
| 1000 | let expired_prs: Vec<(String, Option<EventId>)> = self | 1086 | let expired_prs: Vec<_> = self |
| 1001 | .pr_events | 1087 | .pr_events |
| 1002 | .iter() | 1088 | .iter() |
| 1003 | .filter(|entry| entry.value().expires_at <= now) | 1089 | .filter(|entry| entry.value().expires_at <= now) |
| 1004 | .map(|entry| { | 1090 | .map(|entry| { |
| 1005 | let event_id = entry.value().event.as_ref().map(|e| e.id); | 1091 | let pr_entry = entry.value(); |
| 1006 | (entry.key().clone(), event_id) | 1092 | let event_id_str = entry.key().clone(); |
| 1093 | let event_opt = pr_entry.event.clone(); | ||
| 1094 | let commit = pr_entry.commit.clone(); | ||
| 1095 | let source = pr_entry.source; | ||
| 1096 | (event_id_str, event_opt, commit, source) | ||
| 1007 | }) | 1097 | }) |
| 1008 | .collect(); | 1098 | .collect(); |
| 1009 | 1099 | ||
| 1010 | let pr_removed = expired_prs.len(); | 1100 | let pr_removed = expired_prs.len(); |
| 1011 | for (event_id_str, event_id_opt) in expired_prs { | 1101 | for (event_id_str, event_opt, commit, source) in expired_prs { |
| 1012 | // Mark actual PR events as expired (not placeholders) | 1102 | // Log structured entry for PR events (not placeholders) |
| 1013 | if let Some(event_id) = event_id_opt { | 1103 | if let Some(ref event) = event_opt { |
| 1014 | self.mark_expired(event_id); | 1104 | let npub = event |
| 1105 | .pubkey | ||
| 1106 | .to_bech32() | ||
| 1107 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 1108 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 1109 | let source_str = if source.is_direct() { "direct" } else { "sync" }; | ||
| 1110 | |||
| 1111 | // Extract ALL repo identifiers from 'a' tags | ||
| 1112 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 1113 | let repos: Vec<String> = event | ||
| 1114 | .tags | ||
| 1115 | .iter() | ||
| 1116 | .filter_map(|tag| { | ||
| 1117 | let tag_vec = tag.clone().to_vec(); | ||
| 1118 | if tag_vec.len() >= 2 | ||
| 1119 | && tag_vec[0] == "a" | ||
| 1120 | && tag_vec[1].starts_with("30617:") | ||
| 1121 | { | ||
| 1122 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 1123 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 1124 | if parts.len() >= 3 { | ||
| 1125 | Some(parts[2].to_string()) | ||
| 1126 | } else { | ||
| 1127 | None | ||
| 1128 | } | ||
| 1129 | } else { | ||
| 1130 | None | ||
| 1131 | } | ||
| 1132 | }) | ||
| 1133 | .collect(); | ||
| 1134 | |||
| 1135 | // Deduplicate while preserving order | ||
| 1136 | let mut seen = std::collections::HashSet::new(); | ||
| 1137 | let unique_repos: Vec<String> = repos | ||
| 1138 | .into_iter() | ||
| 1139 | .filter(|r| seen.insert(r.clone())) | ||
| 1140 | .collect(); | ||
| 1141 | |||
| 1142 | let repos_to_log = if unique_repos.is_empty() { | ||
| 1143 | vec!["unknown".to_string()] | ||
| 1144 | } else { | ||
| 1145 | unique_repos | ||
| 1146 | }; | ||
| 1147 | |||
| 1148 | // Structured log for migration scripts - log once per repo | ||
| 1149 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 1150 | for repo in &repos_to_log { | ||
| 1151 | if source.is_direct() { | ||
| 1152 | tracing::warn!( | ||
| 1153 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1154 | repo, | ||
| 1155 | npub, | ||
| 1156 | event_id_short, | ||
| 1157 | event.kind.as_u16(), | ||
| 1158 | &commit[..commit.len().min(12)], | ||
| 1159 | source_str | ||
| 1160 | ); | ||
| 1161 | } else { | ||
| 1162 | tracing::debug!( | ||
| 1163 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1164 | repo, | ||
| 1165 | npub, | ||
| 1166 | event_id_short, | ||
| 1167 | event.kind.as_u16(), | ||
| 1168 | &commit[..commit.len().min(12)], | ||
| 1169 | source_str | ||
| 1170 | ); | ||
| 1171 | } | ||
| 1172 | } | ||
| 1173 | |||
| 1174 | self.mark_expired(event.id); | ||
| 1175 | } else { | ||
| 1176 | // Placeholder (git data arrived first, but PR event never came) | ||
| 1177 | // Placeholders are always Direct source (from git push) | ||
| 1178 | tracing::debug!( | ||
| 1179 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"", | ||
| 1180 | &event_id_str[..event_id_str.len().min(12)], | ||
| 1181 | &commit[..commit.len().min(12)] | ||
| 1182 | ); | ||
| 1015 | } | 1183 | } |
| 1016 | self.pr_events.remove(&event_id_str); | 1184 | self.pr_events.remove(&event_id_str); |
| 1017 | } | 1185 | } |
| @@ -1191,6 +1359,7 @@ impl Purgatory { | |||
| 1191 | author: e.author, | 1359 | author: e.author, |
| 1192 | created_at_offset_secs: created_offset.as_secs(), | 1360 | created_at_offset_secs: created_offset.as_secs(), |
| 1193 | expires_at_offset_secs: expires_offset.as_secs(), | 1361 | expires_at_offset_secs: expires_offset.as_secs(), |
| 1362 | source: e.source, | ||
| 1194 | } | 1363 | } |
| 1195 | }) | 1364 | }) |
| 1196 | .collect(); | 1365 | .collect(); |
| @@ -1213,6 +1382,7 @@ impl Purgatory { | |||
| 1213 | commit: e.commit.clone(), | 1382 | commit: e.commit.clone(), |
| 1214 | created_at_offset_secs: created_offset.as_secs(), | 1383 | created_at_offset_secs: created_offset.as_secs(), |
| 1215 | expires_at_offset_secs: expires_offset.as_secs(), | 1384 | expires_at_offset_secs: expires_offset.as_secs(), |
| 1385 | source: e.source, | ||
| 1216 | }; | 1386 | }; |
| 1217 | pr_events.insert(event_id, serializable); | 1387 | pr_events.insert(event_id, serializable); |
| 1218 | } | 1388 | } |
| @@ -1355,6 +1525,7 @@ impl Purgatory { | |||
| 1355 | author: e.author, | 1525 | author: e.author, |
| 1356 | created_at, | 1526 | created_at, |
| 1357 | expires_at, | 1527 | expires_at, |
| 1528 | source: e.source, | ||
| 1358 | } | 1529 | } |
| 1359 | }) | 1530 | }) |
| 1360 | .collect(); | 1531 | .collect(); |
| @@ -1380,6 +1551,7 @@ impl Purgatory { | |||
| 1380 | commit: e.commit, | 1551 | commit: e.commit, |
| 1381 | created_at, | 1552 | created_at, |
| 1382 | expires_at, | 1553 | expires_at, |
| 1554 | source: e.source, | ||
| 1383 | }; | 1555 | }; |
| 1384 | 1556 | ||
| 1385 | self.pr_events.insert(event_id, entry); | 1557 | self.pr_events.insert(event_id, entry); |
| @@ -1439,8 +1611,18 @@ mod tests { | |||
| 1439 | .sign_with_keys(&keys) | 1611 | .sign_with_keys(&keys) |
| 1440 | .unwrap(); | 1612 | .unwrap(); |
| 1441 | 1613 | ||
| 1442 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); | 1614 | purgatory.add_state( |
| 1443 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); | 1615 | event.clone(), |
| 1616 | "test-repo".to_string(), | ||
| 1617 | keys.public_key(), | ||
| 1618 | false, | ||
| 1619 | ); | ||
| 1620 | purgatory.add_pr( | ||
| 1621 | event, | ||
| 1622 | "test-event-id".to_string(), | ||
| 1623 | "abc123".to_string(), | ||
| 1624 | false, | ||
| 1625 | ); | ||
| 1444 | 1626 | ||
| 1445 | let (announcement_count, state_count, pr_count) = purgatory.count(); | 1627 | let (announcement_count, state_count, pr_count) = purgatory.count(); |
| 1446 | assert_eq!(announcement_count, 0); | 1628 | assert_eq!(announcement_count, 0); |
| @@ -1492,7 +1674,7 @@ mod tests { | |||
| 1492 | let event = EventBuilder::text_note("state") | 1674 | let event = EventBuilder::text_note("state") |
| 1493 | .sign_with_keys(&keys) | 1675 | .sign_with_keys(&keys) |
| 1494 | .unwrap(); | 1676 | .unwrap(); |
| 1495 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | 1677 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false); |
| 1496 | 1678 | ||
| 1497 | // Now should have pending events | 1679 | // Now should have pending events |
| 1498 | assert!(purgatory.has_pending_events("test-repo")); | 1680 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1522,7 +1704,12 @@ mod tests { | |||
| 1522 | .sign_with_keys(&keys) | 1704 | .sign_with_keys(&keys) |
| 1523 | .unwrap(); | 1705 | .unwrap(); |
| 1524 | 1706 | ||
| 1525 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | 1707 | purgatory.add_pr( |
| 1708 | event, | ||
| 1709 | "pr-event-id".to_string(), | ||
| 1710 | "commit123".to_string(), | ||
| 1711 | false, | ||
| 1712 | ); | ||
| 1526 | 1713 | ||
| 1527 | // Now should have pending events for test-repo | 1714 | // Now should have pending events for test-repo |
| 1528 | assert!(purgatory.has_pending_events("test-repo")); | 1715 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1587,6 +1774,7 @@ fn test_pr_event_vs_placeholder() { | |||
| 1587 | event.clone(), | 1774 | event.clone(), |
| 1588 | "event-id-1".to_string(), | 1775 | "event-id-1".to_string(), |
| 1589 | "commit-abc".to_string(), | 1776 | "commit-abc".to_string(), |
| 1777 | false, | ||
| 1590 | ); | 1778 | ); |
| 1591 | 1779 | ||
| 1592 | // Add a placeholder (no event) | 1780 | // Add a placeholder (no event) |
| @@ -1643,8 +1831,14 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1643 | state_event.clone(), | 1831 | state_event.clone(), |
| 1644 | "test-repo".to_string(), | 1832 | "test-repo".to_string(), |
| 1645 | keys.public_key(), | 1833 | keys.public_key(), |
| 1834 | false, | ||
| 1835 | ); | ||
| 1836 | purgatory.add_pr( | ||
| 1837 | pr_event, | ||
| 1838 | "pr-123".to_string(), | ||
| 1839 | "commit-abc".to_string(), | ||
| 1840 | false, | ||
| 1646 | ); | 1841 | ); |
| 1647 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | ||
| 1648 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1842 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1649 | 1843 | ||
| 1650 | // Verify entries are there | 1844 | // Verify entries are there |
| @@ -1691,8 +1885,18 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1691 | .unwrap(); | 1885 | .unwrap(); |
| 1692 | 1886 | ||
| 1693 | // Add fresh entries | 1887 | // Add fresh entries |
| 1694 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); | 1888 | purgatory.add_state( |
| 1695 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1889 | state_event, |
| 1890 | "test-repo".to_string(), | ||
| 1891 | keys.public_key(), | ||
| 1892 | false, | ||
| 1893 | ); | ||
| 1894 | purgatory.add_pr( | ||
| 1895 | pr_event, | ||
| 1896 | "pr-123".to_string(), | ||
| 1897 | "commit-abc".to_string(), | ||
| 1898 | false, | ||
| 1899 | ); | ||
| 1696 | 1900 | ||
| 1697 | // Run cleanup | 1901 | // Run cleanup |
| 1698 | let (_, state_removed, pr_removed) = purgatory.cleanup(); | 1902 | let (_, state_removed, pr_removed) = purgatory.cleanup(); |
| @@ -1722,8 +1926,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1722 | .sign_with_keys(&keys) | 1926 | .sign_with_keys(&keys) |
| 1723 | .unwrap(); | 1927 | .unwrap(); |
| 1724 | 1928 | ||
| 1725 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); | 1929 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false); |
| 1726 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); | 1930 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false); |
| 1727 | 1931 | ||
| 1728 | // Expire only the first one | 1932 | // Expire only the first one |
| 1729 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { | 1933 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { |
| @@ -1740,8 +1944,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1740 | .sign_with_keys(&keys) | 1944 | .sign_with_keys(&keys) |
| 1741 | .unwrap(); | 1945 | .unwrap(); |
| 1742 | 1946 | ||
| 1743 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); | 1947 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false); |
| 1744 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); | 1948 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false); |
| 1745 | 1949 | ||
| 1746 | // Expire only first PR | 1950 | // Expire only first PR |
| 1747 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { | 1951 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { |
| @@ -1773,8 +1977,8 @@ fn test_remove_expired_legacy_method() { | |||
| 1773 | .unwrap(); | 1977 | .unwrap(); |
| 1774 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); | 1978 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); |
| 1775 | 1979 | ||
| 1776 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1980 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1777 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1981 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1778 | 1982 | ||
| 1779 | // Expire both | 1983 | // Expire both |
| 1780 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1984 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1808,8 +2012,8 @@ fn test_expired_event_tracking() { | |||
| 1808 | let pr_event_id = pr_event.id; | 2012 | let pr_event_id = pr_event.id; |
| 1809 | 2013 | ||
| 1810 | // Add events to purgatory | 2014 | // Add events to purgatory |
| 1811 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 2015 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1812 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 2016 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1813 | 2017 | ||
| 1814 | // Events should not be marked as expired yet | 2018 | // Events should not be marked as expired yet |
| 1815 | assert!(!purgatory.is_expired(&state_event_id)); | 2019 | assert!(!purgatory.is_expired(&state_event_id)); |
| @@ -1861,7 +2065,7 @@ fn test_cleanup_expired_events() { | |||
| 1861 | let event2_id = event2.id; | 2065 | let event2_id = event2.id; |
| 1862 | 2066 | ||
| 1863 | // Add and immediately expire event1 | 2067 | // Add and immediately expire event1 |
| 1864 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); | 2068 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false); |
| 1865 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { | 2069 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { |
| 1866 | for entry in entries.iter_mut() { | 2070 | for entry in entries.iter_mut() { |
| 1867 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2071 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1870,7 +2074,7 @@ fn test_cleanup_expired_events() { | |||
| 1870 | purgatory.cleanup(); | 2074 | purgatory.cleanup(); |
| 1871 | 2075 | ||
| 1872 | // Add and expire event2 (will be more recent) | 2076 | // Add and expire event2 (will be more recent) |
| 1873 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); | 2077 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false); |
| 1874 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { | 2078 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { |
| 1875 | for entry in entries.iter_mut() { | 2079 | for entry in entries.iter_mut() { |
| 1876 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2080 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1912,7 +2116,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1912 | let event_id = event.id; | 2116 | let event_id = event.id; |
| 1913 | 2117 | ||
| 1914 | // Add event to purgatory | 2118 | // Add event to purgatory |
| 1915 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2119 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1916 | 2120 | ||
| 1917 | // Expire it | 2121 | // Expire it |
| 1918 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 2122 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1932,7 +2136,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1932 | // This simulates what negentropy/REQ+EOSE should do: | 2136 | // This simulates what negentropy/REQ+EOSE should do: |
| 1933 | // Check if event is in event_ids() before adding | 2137 | // Check if event is in event_ids() before adding |
| 1934 | if !ids.contains(&event_id) { | 2138 | if !ids.contains(&event_id) { |
| 1935 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2139 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1936 | } | 2140 | } |
| 1937 | 2141 | ||
| 1938 | // Event should NOT be re-added | 2142 | // Event should NOT be re-added |
| @@ -1975,7 +2179,7 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1975 | let event_id = event.id; | 2179 | let event_id = event.id; |
| 1976 | 2180 | ||
| 1977 | // Add event to purgatory | 2181 | // Add event to purgatory |
| 1978 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2182 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1979 | 2183 | ||
| 1980 | // Expire it | 2184 | // Expire it |
| 1981 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 2185 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -2024,8 +2228,18 @@ async fn test_save_and_restore_state_events() { | |||
| 2024 | let event1_id = event1.id; | 2228 | let event1_id = event1.id; |
| 2025 | let event2_id = event2.id; | 2229 | let event2_id = event2.id; |
| 2026 | 2230 | ||
| 2027 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | 2231 | purgatory.add_state( |
| 2028 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | 2232 | event1.clone(), |
| 2233 | "test-repo".to_string(), | ||
| 2234 | keys.public_key(), | ||
| 2235 | false, | ||
| 2236 | ); | ||
| 2237 | purgatory.add_state( | ||
| 2238 | event2.clone(), | ||
| 2239 | "test-repo".to_string(), | ||
| 2240 | keys.public_key(), | ||
| 2241 | false, | ||
| 2242 | ); | ||
| 2029 | 2243 | ||
| 2030 | // Save to disk | 2244 | // Save to disk |
| 2031 | purgatory.save_to_disk(&state_file).unwrap(); | 2245 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2087,6 +2301,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 2087 | pr_event.clone(), | 2301 | pr_event.clone(), |
| 2088 | "pr-event-id".to_string(), | 2302 | "pr-event-id".to_string(), |
| 2089 | "commit-abc".to_string(), | 2303 | "commit-abc".to_string(), |
| 2304 | false, | ||
| 2090 | ); | 2305 | ); |
| 2091 | 2306 | ||
| 2092 | // Save to disk | 2307 | // Save to disk |
| @@ -2156,7 +2371,7 @@ async fn test_save_and_restore_expired_events() { | |||
| 2156 | let event_id = event.id; | 2371 | let event_id = event.id; |
| 2157 | 2372 | ||
| 2158 | // Add and expire event | 2373 | // Add and expire event |
| 2159 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2374 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2160 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 2375 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| 2161 | for entry in entries.iter_mut() { | 2376 | for entry in entries.iter_mut() { |
| 2162 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2377 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -2295,7 +2510,7 @@ async fn test_downtime_calculation() { | |||
| 2295 | .sign_with_keys(&keys) | 2510 | .sign_with_keys(&keys) |
| 2296 | .unwrap(); | 2511 | .unwrap(); |
| 2297 | 2512 | ||
| 2298 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2513 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 2299 | 2514 | ||
| 2300 | // Get original expiry time | 2515 | // Get original expiry time |
| 2301 | let original_entries = purgatory.find_state("repo"); | 2516 | let original_entries = purgatory.find_state("repo"); |
| @@ -2351,7 +2566,7 @@ async fn test_expiry_times_preserved() { | |||
| 2351 | .sign_with_keys(&keys) | 2566 | .sign_with_keys(&keys) |
| 2352 | .unwrap(); | 2567 | .unwrap(); |
| 2353 | 2568 | ||
| 2354 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2569 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 2355 | 2570 | ||
| 2356 | // Manually set expiry to a specific time in the future | 2571 | // Manually set expiry to a specific time in the future |
| 2357 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | 2572 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes |
| @@ -2410,16 +2625,19 @@ async fn test_multiple_state_events_same_identifier() { | |||
| 2410 | event1.clone(), | 2625 | event1.clone(), |
| 2411 | "shared-repo".to_string(), | 2626 | "shared-repo".to_string(), |
| 2412 | keys1.public_key(), | 2627 | keys1.public_key(), |
| 2628 | false, | ||
| 2413 | ); | 2629 | ); |
| 2414 | purgatory.add_state( | 2630 | purgatory.add_state( |
| 2415 | event2.clone(), | 2631 | event2.clone(), |
| 2416 | "shared-repo".to_string(), | 2632 | "shared-repo".to_string(), |
| 2417 | keys2.public_key(), | 2633 | keys2.public_key(), |
| 2634 | false, | ||
| 2418 | ); | 2635 | ); |
| 2419 | purgatory.add_state( | 2636 | purgatory.add_state( |
| 2420 | event3.clone(), | 2637 | event3.clone(), |
| 2421 | "shared-repo".to_string(), | 2638 | "shared-repo".to_string(), |
| 2422 | keys3.public_key(), | 2639 | keys3.public_key(), |
| 2640 | false, | ||
| 2423 | ); | 2641 | ); |
| 2424 | 2642 | ||
| 2425 | // Save to disk | 2643 | // Save to disk |
| @@ -2466,6 +2684,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2466 | pr_event.clone(), | 2684 | pr_event.clone(), |
| 2467 | "pr-with-event".to_string(), | 2685 | "pr-with-event".to_string(), |
| 2468 | "commit-abc".to_string(), | 2686 | "commit-abc".to_string(), |
| 2687 | false, | ||
| 2469 | ); | 2688 | ); |
| 2470 | 2689 | ||
| 2471 | // Add PR placeholder | 2690 | // Add PR placeholder |
| @@ -2511,7 +2730,7 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2511 | let event = EventBuilder::text_note("test") | 2730 | let event = EventBuilder::text_note("test") |
| 2512 | .sign_with_keys(&keys) | 2731 | .sign_with_keys(&keys) |
| 2513 | .unwrap(); | 2732 | .unwrap(); |
| 2514 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2733 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2515 | 2734 | ||
| 2516 | // Save to disk | 2735 | // Save to disk |
| 2517 | purgatory.save_to_disk(&state_file).unwrap(); | 2736 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2697,8 +2916,18 @@ async fn test_comprehensive_roundtrip() { | |||
| 2697 | .sign_with_keys(&keys2) | 2916 | .sign_with_keys(&keys2) |
| 2698 | .unwrap(); | 2917 | .unwrap(); |
| 2699 | 2918 | ||
| 2700 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | 2919 | purgatory.add_state( |
| 2701 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | 2920 | state1.clone(), |
| 2921 | "repo1".to_string(), | ||
| 2922 | keys1.public_key(), | ||
| 2923 | false, | ||
| 2924 | ); | ||
| 2925 | purgatory.add_state( | ||
| 2926 | state2.clone(), | ||
| 2927 | "repo2".to_string(), | ||
| 2928 | keys2.public_key(), | ||
| 2929 | false, | ||
| 2930 | ); | ||
| 2702 | 2931 | ||
| 2703 | // Add PR event | 2932 | // Add PR event |
| 2704 | let tags = vec![Tag::custom( | 2933 | let tags = vec![Tag::custom( |
| @@ -2709,7 +2938,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2709 | .tags(tags) | 2938 | .tags(tags) |
| 2710 | .sign_with_keys(&keys1) | 2939 | .sign_with_keys(&keys1) |
| 2711 | .unwrap(); | 2940 | .unwrap(); |
| 2712 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | 2941 | purgatory.add_pr( |
| 2942 | pr_event.clone(), | ||
| 2943 | "pr-1".to_string(), | ||
| 2944 | "commit-1".to_string(), | ||
| 2945 | false, | ||
| 2946 | ); | ||
| 2713 | 2947 | ||
| 2714 | // Add PR placeholder | 2948 | // Add PR placeholder |
| 2715 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | 2949 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); |
| @@ -2719,7 +2953,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2719 | .sign_with_keys(&keys1) | 2953 | .sign_with_keys(&keys1) |
| 2720 | .unwrap(); | 2954 | .unwrap(); |
| 2721 | let expired_id = expired_event.id; | 2955 | let expired_id = expired_event.id; |
| 2722 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | 2956 | purgatory.add_state( |
| 2957 | expired_event, | ||
| 2958 | "repo3".to_string(), | ||
| 2959 | keys1.public_key(), | ||
| 2960 | false, | ||
| 2961 | ); | ||
| 2723 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | 2962 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { |
| 2724 | for entry in entries.iter_mut() { | 2963 | for entry in entries.iter_mut() { |
| 2725 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2964 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index ece8cd6..8297515 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -375,94 +375,121 @@ impl SyncContext for RealSyncContext { | |||
| 375 | let naughty_list = self.git_naughty_list.clone(); | 375 | let naughty_list = self.git_naughty_list.clone(); |
| 376 | 376 | ||
| 377 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { | 377 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { |
| 378 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history | 378 | let mut remaining_oids = missing_oids.clone(); |
| 379 | let mut args = vec!["fetch", &url]; | 379 | let mut missing_from_remote: Vec<String> = Vec::new(); |
| 380 | args.extend(missing_oids.iter().map(|s| s.as_str())); | 380 | |
| 381 | 381 | // Retry loop: keep fetching until success or no OIDs left | |
| 382 | let output = Command::new("git") | 382 | loop { |
| 383 | .args(&args) | 383 | if remaining_oids.is_empty() { |
| 384 | .current_dir(&repo_path) | 384 | // All OIDs were missing from remote |
| 385 | .output(); | 385 | debug!( |
| 386 | 386 | url = %url, | |
| 387 | match output { | 387 | missing_count = missing_from_remote.len(), |
| 388 | Ok(result) if result.status.success() => { | 388 | "All requested OIDs missing from remote" |
| 389 | // Count how many OIDs we now have | 389 | ); |
| 390 | let fetched: Vec<String> = missing_oids | 390 | return Ok(vec![]); |
| 391 | .iter() | ||
| 392 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) | ||
| 393 | .cloned() | ||
| 394 | .collect(); | ||
| 395 | |||
| 396 | debug!(fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 397 | |||
| 398 | Ok(fetched) | ||
| 399 | } | 391 | } |
| 400 | Ok(result) => { | 392 | |
| 401 | let stderr = String::from_utf8_lossy(&result.stderr); | 393 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history |
| 402 | 394 | let mut args = vec!["fetch".to_string(), url.clone()]; | |
| 403 | // Extract domain and classify error for naughty list | 395 | args.extend(remaining_oids.iter().cloned()); |
| 404 | if let Some(domain) = extract_domain(&url) { | 396 | |
| 405 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | 397 | let output = Command::new("git") |
| 406 | let is_new = naughty_list.record(&domain, category, stderr.to_string()); | 398 | .args(&args) |
| 407 | 399 | .current_dir(&repo_path) | |
| 408 | if is_new { | 400 | .output(); |
| 409 | tracing::warn!( | 401 | |
| 410 | domain = %domain, | 402 | match output { |
| 411 | category = %category, | 403 | Ok(result) if result.status.success() => { |
| 412 | error = %stderr, | 404 | // Fetch succeeded - count how many OIDs we now have |
| 413 | "Git remote domain added to naughty list" | 405 | let fetched: Vec<String> = missing_oids |
| 414 | ); | 406 | .iter() |
| 415 | } else { | 407 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) |
| 416 | debug!( | 408 | .cloned() |
| 417 | domain = %domain, | 409 | .collect(); |
| 418 | category = %category, | 410 | |
| 419 | "Git remote domain still on naughty list" | 411 | if !missing_from_remote.is_empty() { |
| 420 | ); | 412 | debug!( |
| 421 | } | 413 | url = %url, |
| 414 | fetched_count = fetched.len(), | ||
| 415 | missing_count = missing_from_remote.len(), | ||
| 416 | missing_oids = ?missing_from_remote, | ||
| 417 | "Fetch completed after retries - some OIDs were missing from remote" | ||
| 418 | ); | ||
| 419 | } else { | ||
| 420 | debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 422 | } | 421 | } |
| 422 | |||
| 423 | return Ok(fetched); | ||
| 423 | } | 424 | } |
| 425 | Ok(result) => { | ||
| 426 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 424 | 427 | ||
| 425 | // Check for "not our ref" errors and provide a clearer error message | 428 | // Check for "not our ref" error - this is retryable |
| 426 | let error_msg = if stderr.contains("upload-pack: not our ref") { | 429 | if stderr.contains("upload-pack: not our ref") { |
| 427 | // Parse out the missing OID from stderr (git only reports one at a time) | 430 | // Parse out the missing OID from stderr |
| 428 | let missing_oid = stderr | 431 | let missing_oid = stderr.lines().find_map(|line| { |
| 429 | .lines() | ||
| 430 | .find_map(|line| { | ||
| 431 | if line.contains("not our ref") { | 432 | if line.contains("not our ref") { |
| 432 | // Extract the OID from lines like: | 433 | // Extract the OID from lines like: |
| 433 | // "fatal: remote error: upload-pack: not our ref <oid>" | 434 | // "fatal: remote error: upload-pack: not our ref <oid>" |
| 434 | line.split("not our ref").nth(1).map(|s| s.trim().to_string()) | 435 | line.split("not our ref") |
| 436 | .nth(1) | ||
| 437 | .map(|s| s.trim().to_string()) | ||
| 435 | } else { | 438 | } else { |
| 436 | None | 439 | None |
| 437 | } | 440 | } |
| 438 | }); | 441 | }); |
| 439 | 442 | ||
| 440 | let total_requested = missing_oids.len(); | 443 | if let Some(ref oid) = missing_oid { |
| 444 | // Remove the missing OID and retry with remaining | ||
| 445 | remaining_oids.retain(|o| o != oid); | ||
| 446 | missing_from_remote.push(oid.clone()); | ||
| 441 | 447 | ||
| 442 | if let Some(oid) = missing_oid { | 448 | debug!( |
| 443 | if total_requested > 1 { | ||
| 444 | // BUG: Git stops at first missing OID, so we don't know if the others exist | ||
| 445 | // We need retry logic to fetch remaining OIDs individually | ||
| 446 | tracing::warn!( | ||
| 447 | url = %url, | 449 | url = %url, |
| 448 | missing_oid = %oid, | 450 | missing_oid = %oid, |
| 449 | total_requested = total_requested, | 451 | remaining_count = remaining_oids.len(), |
| 450 | "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." | 452 | "OID not found on remote, retrying with remaining OIDs" |
| 451 | ); | 453 | ); |
| 452 | format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) | 454 | |
| 453 | } else { | 455 | continue; // Retry with remaining OIDs |
| 454 | format!("remote missing only oid requested: {}", oid) | 456 | } |
| 457 | } | ||
| 458 | |||
| 459 | // Non-retryable error - record to naughty list and return error | ||
| 460 | if let Some(domain) = extract_domain(&url) { | ||
| 461 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | ||
| 462 | let is_new = | ||
| 463 | naughty_list.record(&domain, category, stderr.to_string()); | ||
| 464 | |||
| 465 | if is_new { | ||
| 466 | tracing::warn!( | ||
| 467 | domain = %domain, | ||
| 468 | category = %category, | ||
| 469 | error = %stderr, | ||
| 470 | "Git remote domain added to naughty list" | ||
| 471 | ); | ||
| 472 | } else { | ||
| 473 | debug!( | ||
| 474 | domain = %domain, | ||
| 475 | category = %category, | ||
| 476 | error = %stderr, | ||
| 477 | "Git fetch failed (domain on naughty list)" | ||
| 478 | ); | ||
| 479 | } | ||
| 455 | } | 480 | } |
| 456 | } else { | ||
| 457 | format!("git fetch failed: {}", stderr) | ||
| 458 | } | 481 | } |
| 459 | } else { | ||
| 460 | format!("git fetch failed: {}", stderr) | ||
| 461 | }; | ||
| 462 | 482 | ||
| 463 | Err(anyhow::anyhow!("{}", error_msg)) | 483 | return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr)); |
| 484 | } | ||
| 485 | Err(e) => { | ||
| 486 | return Err(anyhow::anyhow!( | ||
| 487 | "git fetch command error for {}: {}", | ||
| 488 | url, | ||
| 489 | e | ||
| 490 | )) | ||
| 491 | } | ||
| 464 | } | 492 | } |
| 465 | Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)), | ||
| 466 | } | 493 | } |
| 467 | }) | 494 | }) |
| 468 | .await | 495 | .await |
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index 65d29af..9207d58 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; | 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; |
| 369 | throttle_manager.complete_request(&domain); | 369 | throttle_manager.complete_request(&domain); |
| 370 | 370 | ||
| 371 | let oids_fetched = match fetch_result { | 371 | let fetched_oids = match fetch_result { |
| 372 | Ok(fetched) => { | 372 | Ok(fetched) if !fetched.is_empty() => { |
| 373 | debug!( | 373 | debug!( |
| 374 | identifier = %identifier, | 374 | identifier = %identifier, |
| 375 | url = %url, | 375 | url = %url, |
| 376 | oids_fetched = fetched.len(), | 376 | oids_fetched = fetched.len(), |
| 377 | "Fetch succeeded" | 377 | "Fetch succeeded" |
| 378 | ); | 378 | ); |
| 379 | fetched.len() | 379 | fetched |
| 380 | } | ||
| 381 | Ok(_) => { | ||
| 382 | debug!( | ||
| 383 | identifier = %identifier, | ||
| 384 | url = %url, | ||
| 385 | "Fetch returned no OIDs (not available on remote)" | ||
| 386 | ); | ||
| 387 | vec![] | ||
| 380 | } | 388 | } |
| 381 | Err(e) => { | 389 | Err(e) => { |
| 382 | debug!( | 390 | debug!( |
| @@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 385 | error = %e, | 393 | error = %e, |
| 386 | "Fetch failed" | 394 | "Fetch failed" |
| 387 | ); | 395 | ); |
| 388 | 0 | 396 | vec![] |
| 389 | } | 397 | } |
| 390 | }; | 398 | }; |
| 391 | 399 | ||
| 392 | // Try to process any events that can now be satisfied | 400 | // Try to process any events that can now be satisfied |
| 393 | if oids_fetched > 0 { | 401 | if !fetched_oids.is_empty() { |
| 394 | let new_oids: HashSet<String> = needed_oids.into_iter().collect(); | 402 | let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect(); |
| 395 | if let Err(e) = ctx | 403 | if let Err(e) = ctx |
| 396 | .process_newly_available_git_data(&target_repo, &new_oids) | 404 | .process_newly_available_git_data(&target_repo, &new_oids) |
| 397 | .await | 405 | .await |
| @@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 404 | } | 412 | } |
| 405 | } | 413 | } |
| 406 | 414 | ||
| 407 | oids_fetched | 415 | fetched_oids.len() |
| 408 | } | 416 | } |
| 409 | 417 | ||
| 410 | /// Sync git data for an identifier. | 418 | /// Sync git data for an identifier. |
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index d891bc9..1af5c4e 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -10,6 +10,28 @@ use std::collections::HashSet; | |||
| 10 | use std::path::PathBuf; | 10 | use std::path::PathBuf; |
| 11 | use std::time::Instant; | 11 | use std::time::Instant; |
| 12 | 12 | ||
| 13 | /// Source of an event entering purgatory. | ||
| 14 | /// | ||
| 15 | /// Tracks whether an event was submitted directly by a user or fetched via | ||
| 16 | /// proactive sync from another relay. This distinction is used for: | ||
| 17 | /// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG | ||
| 18 | /// - Operational monitoring: Helps identify user-facing issues vs sync noise | ||
| 19 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] | ||
| 20 | pub enum EventSource { | ||
| 21 | /// Event was published directly to this relay by a user | ||
| 22 | #[default] | ||
| 23 | Direct, | ||
| 24 | /// Event was fetched via proactive sync from another relay | ||
| 25 | Sync, | ||
| 26 | } | ||
| 27 | |||
| 28 | impl EventSource { | ||
| 29 | /// Returns true if this is a direct submission (not synced) | ||
| 30 | pub fn is_direct(&self) -> bool { | ||
| 31 | matches!(self, EventSource::Direct) | ||
| 32 | } | ||
| 33 | } | ||
| 34 | |||
| 13 | /// Default value for Instant fields during deserialization | 35 | /// Default value for Instant fields during deserialization |
| 14 | fn instant_now() -> Instant { | 36 | fn instant_now() -> Instant { |
| 15 | Instant::now() | 37 | Instant::now() |
| @@ -88,6 +110,10 @@ pub struct StatePurgatoryEntry { | |||
| 88 | /// Expiry deadline (30 min from creation, may be extended) | 110 | /// Expiry deadline (30 min from creation, may be extended) |
| 89 | #[serde(skip, default = "instant_now")] | 111 | #[serde(skip, default = "instant_now")] |
| 90 | pub expires_at: Instant, | 112 | pub expires_at: Instant, |
| 113 | |||
| 114 | /// Source of this event (direct submission vs sync) | ||
| 115 | #[serde(default)] | ||
| 116 | pub source: EventSource, | ||
| 91 | } | 117 | } |
| 92 | 118 | ||
| 93 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. | 119 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. |
| @@ -114,6 +140,10 @@ pub struct PrPurgatoryEntry { | |||
| 114 | /// Expiry deadline (30 min from creation, may be extended) | 140 | /// Expiry deadline (30 min from creation, may be extended) |
| 115 | #[serde(skip, default = "instant_now")] | 141 | #[serde(skip, default = "instant_now")] |
| 116 | pub expires_at: Instant, | 142 | pub expires_at: Instant, |
| 143 | |||
| 144 | /// Source of this event (direct submission vs sync) | ||
| 145 | #[serde(default)] | ||
| 146 | pub source: EventSource, | ||
| 117 | } | 147 | } |
| 118 | 148 | ||
| 119 | /// Entry for a repository announcement (kind 30617) waiting in purgatory. | 149 | /// Entry for a repository announcement (kind 30617) waiting in purgatory. |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 44efbf0..cd62380 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1512,6 +1512,7 @@ impl SyncManager { | |||
| 1512 | self.service_domain.clone(), | 1512 | self.service_domain.clone(), |
| 1513 | Arc::clone(&self.repo_sync_index), | 1513 | Arc::clone(&self.repo_sync_index), |
| 1514 | action_tx, | 1514 | action_tx, |
| 1515 | self.database.clone(), | ||
| 1515 | ); | 1516 | ); |
| 1516 | let subscriber_shutdown = shutdown_tx.subscribe(); | 1517 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 1517 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | 1518 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); |
| @@ -2969,6 +2970,7 @@ impl SyncManager { | |||
| 2969 | event_id = %event.id, | 2970 | event_id = %event.id, |
| 2970 | kind = %event.kind.as_u16(), | 2971 | kind = %event.kind.as_u16(), |
| 2971 | identifier = %identifier, | 2972 | identifier = %identifier, |
| 2973 | pubkey = %event.pubkey, | ||
| 2972 | "Added rejected announcement to two-tier index" | 2974 | "Added rejected announcement to two-tier index" |
| 2973 | ); | 2975 | ); |
| 2974 | } | 2976 | } |
diff --git a/src/sync/naughty_list.rs b/src/sync/naughty_list.rs index 35fcc0f..0abb986 100644 --- a/src/sync/naughty_list.rs +++ b/src/sync/naughty_list.rs | |||
| @@ -101,6 +101,69 @@ impl NaughtyListTracker { | |||
| 101 | Self::new(12) | 101 | Self::new(12) |
| 102 | } | 102 | } |
| 103 | 103 | ||
| 104 | /// Strip URLs from an error message to prevent false positives from URL components. | ||
| 105 | /// | ||
| 106 | /// URLs can contain path components, repository names, or user identifiers that | ||
| 107 | /// accidentally match error patterns (e.g., "my-openssl-project", "ssl-team", | ||
| 108 | /// "certificate-manager"). By stripping URLs before classification, we ensure | ||
| 109 | /// only the actual error message text is analyzed. | ||
| 110 | /// | ||
| 111 | /// Handles: http://, https://, git://, ws://, wss:// | ||
| 112 | fn strip_urls(error: &str) -> String { | ||
| 113 | let mut result = String::with_capacity(error.len()); | ||
| 114 | let mut chars = error.chars().peekable(); | ||
| 115 | |||
| 116 | while let Some(c) = chars.next() { | ||
| 117 | // Check for URL start patterns | ||
| 118 | let potential_url = match c { | ||
| 119 | 'h' => { | ||
| 120 | // Check for http:// or https:// | ||
| 121 | let rest: String = chars.clone().take(7).collect(); | ||
| 122 | rest.starts_with("ttp://") || rest.starts_with("ttps://") | ||
| 123 | } | ||
| 124 | 'g' => { | ||
| 125 | // Check for git:// | ||
| 126 | let rest: String = chars.clone().take(5).collect(); | ||
| 127 | rest.starts_with("it://") | ||
| 128 | } | ||
| 129 | 'w' => { | ||
| 130 | // Check for ws:// or wss:// | ||
| 131 | let rest: String = chars.clone().take(5).collect(); | ||
| 132 | rest.starts_with("s://") || rest.starts_with("ss://") | ||
| 133 | } | ||
| 134 | _ => false, | ||
| 135 | }; | ||
| 136 | |||
| 137 | if potential_url { | ||
| 138 | // Found URL start, consume until URL end | ||
| 139 | result.push_str("[URL]"); | ||
| 140 | |||
| 141 | // Skip until we hit a URL terminator | ||
| 142 | loop { | ||
| 143 | match chars.peek() { | ||
| 144 | Some(&ch) if Self::is_url_char(ch) => { | ||
| 145 | chars.next(); | ||
| 146 | } | ||
| 147 | _ => break, | ||
| 148 | } | ||
| 149 | } | ||
| 150 | } else { | ||
| 151 | result.push(c); | ||
| 152 | } | ||
| 153 | } | ||
| 154 | |||
| 155 | result | ||
| 156 | } | ||
| 157 | |||
| 158 | /// Check if a character can be part of a URL | ||
| 159 | #[inline] | ||
| 160 | fn is_url_char(c: char) -> bool { | ||
| 161 | // URLs end at whitespace, quotes, or certain brackets | ||
| 162 | // This is conservative - real URLs can contain more, but git errors | ||
| 163 | // typically have URLs followed by these terminators | ||
| 164 | !matches!(c, ' ' | '\t' | '\n' | '\r' | '"' | '\'' | '>' | ']' | ')') | ||
| 165 | } | ||
| 166 | |||
| 104 | /// Classify an error string into a naughty category or return None for transient errors | 167 | /// Classify an error string into a naughty category or return None for transient errors |
| 105 | /// | 168 | /// |
| 106 | /// # Arguments | 169 | /// # Arguments |
| @@ -112,21 +175,56 @@ impl NaughtyListTracker { | |||
| 112 | /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue | 175 | /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue |
| 113 | /// - `None` if the error is a transient network issue (use HealthTracker backoff) | 176 | /// - `None` if the error is a transient network issue (use HealthTracker backoff) |
| 114 | pub fn classify_error(error: &str) -> Option<NaughtyCategory> { | 177 | pub fn classify_error(error: &str) -> Option<NaughtyCategory> { |
| 115 | let error_lower = error.to_lowercase(); | 178 | // Filter out remote warnings - these are informational messages from the remote |
| 179 | // server that don't indicate infrastructure problems with the domain itself. | ||
| 180 | // Example: "remote: warning: unable to access '/root/.config/git/attributes': Permission denied" | ||
| 181 | // These warnings are about the remote server's internal configuration, not connectivity. | ||
| 182 | let filtered_error: String = error | ||
| 183 | .lines() | ||
| 184 | .filter(|line| { | ||
| 185 | let line_lower = line.to_lowercase(); | ||
| 186 | // Keep lines that are NOT remote warnings | ||
| 187 | !(line_lower.starts_with("remote: warning:") | ||
| 188 | || line_lower.starts_with("warning: remote")) | ||
| 189 | }) | ||
| 190 | .collect::<Vec<_>>() | ||
| 191 | .join("\n"); | ||
| 192 | |||
| 193 | // If after filtering we have no content, this was just warnings - not a real error | ||
| 194 | if filtered_error.trim().is_empty() { | ||
| 195 | return None; | ||
| 196 | } | ||
| 197 | |||
| 198 | // Strip URLs to prevent false positives from URL components | ||
| 199 | // (e.g., repository named "openssl-test" or path containing "certificate") | ||
| 200 | let url_stripped = Self::strip_urls(&filtered_error); | ||
| 201 | let error_lower = url_stripped.to_lowercase(); | ||
| 116 | 202 | ||
| 117 | // DNS lookup failures | 203 | // DNS lookup failures |
| 118 | if error_lower.contains("failed to lookup address") | 204 | if error_lower.contains("failed to lookup address") |
| 119 | || error_lower.contains("name or service not known") | 205 | || error_lower.contains("name or service not known") |
| 120 | || error_lower.contains("nodename nor servname provided") | 206 | || error_lower.contains("nodename nor servname provided") |
| 121 | || (error_lower.contains("dns") && !error_lower.contains("timeout")) | 207 | || error_lower.contains("dns error") |
| 208 | || error_lower.contains("dns lookup") | ||
| 209 | || error_lower.contains("dns resolution") | ||
| 210 | || error_lower.contains("getaddrinfo") | ||
| 122 | { | 211 | { |
| 123 | return Some(NaughtyCategory::DnsLookupFailed); | 212 | return Some(NaughtyCategory::DnsLookupFailed); |
| 124 | } | 213 | } |
| 125 | 214 | ||
| 126 | // TLS certificate errors | 215 | // TLS certificate errors |
| 127 | if error_lower.contains("certificate") | 216 | if error_lower.contains("certificate") |
| 128 | || error_lower.contains("ssl") | 217 | || error_lower.contains("ssl error") |
| 129 | || error_lower.contains("tls") | 218 | || error_lower.contains("ssl certificate") |
| 219 | || error_lower.contains("ssl handshake") | ||
| 220 | || error_lower.contains("ssl_error") | ||
| 221 | || error_lower.contains("tls error") | ||
| 222 | || error_lower.contains("tls handshake") | ||
| 223 | || error_lower.contains("tls alert") | ||
| 224 | || error_lower.contains("tls_error") | ||
| 225 | || error_lower.contains("openssl") | ||
| 226 | || error_lower.contains("schannel") | ||
| 227 | || error_lower.contains("secure channel") | ||
| 130 | { | 228 | { |
| 131 | // Exclude timeout errors that mention TLS | 229 | // Exclude timeout errors that mention TLS |
| 132 | if !error_lower.contains("timeout") && !error_lower.contains("timed out") { | 230 | if !error_lower.contains("timeout") && !error_lower.contains("timed out") { |
| @@ -134,12 +232,12 @@ impl NaughtyListTracker { | |||
| 134 | } | 232 | } |
| 135 | } | 233 | } |
| 136 | 234 | ||
| 137 | // Protocol errors | 235 | // Protocol errors - specifically WebSocket/Nostr protocol violations |
| 138 | if error_lower.contains("websocket") | 236 | // Note: We check for "websocket" specifically, NOT generic "protocol" keyword |
| 139 | || error_lower.contains("protocol") | 237 | // because git errors often contain "protocol error" (e.g., "fatal: protocol error: bad line length") |
| 140 | || error_lower.contains("invalid frame") | 238 | // which are transient network issues, not persistent infrastructure problems. |
| 141 | { | 239 | if error_lower.contains("websocket") || error_lower.contains("invalid frame") { |
| 142 | // Exclude connection errors | 240 | // Exclude connection errors (transient) |
| 143 | if !error_lower.contains("connection") | 241 | if !error_lower.contains("connection") |
| 144 | && !error_lower.contains("timeout") | 242 | && !error_lower.contains("timeout") |
| 145 | && !error_lower.contains("refused") | 243 | && !error_lower.contains("refused") |
| @@ -290,183 +388,216 @@ impl NaughtyListTracker { | |||
| 290 | mod tests { | 388 | mod tests { |
| 291 | use super::*; | 389 | use super::*; |
| 292 | 390 | ||
| 391 | // ========================================================================= | ||
| 392 | // URL STRIPPING TESTS | ||
| 393 | // ========================================================================= | ||
| 394 | |||
| 293 | #[test] | 395 | #[test] |
| 294 | fn test_classify_dns_errors() { | 396 | fn test_strip_urls_basic_protocols() { |
| 295 | assert_eq!( | 397 | // HTTP/HTTPS |
| 296 | NaughtyListTracker::classify_error("failed to lookup address information"), | ||
| 297 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 298 | ); | ||
| 299 | assert_eq!( | 398 | assert_eq!( |
| 300 | NaughtyListTracker::classify_error("Name or service not known"), | 399 | NaughtyListTracker::strip_urls("error: https://example.com/repo.git failed"), |
| 301 | Some(NaughtyCategory::DnsLookupFailed) | 400 | "error: [URL] failed" |
| 302 | ); | ||
| 303 | assert_eq!( | ||
| 304 | NaughtyListTracker::classify_error("nodename nor servname provided"), | ||
| 305 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 306 | ); | 401 | ); |
| 307 | assert_eq!( | 402 | assert_eq!( |
| 308 | NaughtyListTracker::classify_error("dns error: NXDOMAIN"), | 403 | NaughtyListTracker::strip_urls("error: http://example.com/path failed"), |
| 309 | Some(NaughtyCategory::DnsLookupFailed) | 404 | "error: [URL] failed" |
| 310 | ); | 405 | ); |
| 311 | } | ||
| 312 | 406 | ||
| 313 | #[test] | 407 | // Git protocol |
| 314 | fn test_classify_tls_errors() { | ||
| 315 | assert_eq!( | 408 | assert_eq!( |
| 316 | NaughtyListTracker::classify_error("certificate not valid for 'example.com'"), | 409 | NaughtyListTracker::strip_urls("fatal: git://github.com/user/repo.git not found"), |
| 317 | Some(NaughtyCategory::TlsCertificateInvalid) | 410 | "fatal: [URL] not found" |
| 318 | ); | 411 | ); |
| 412 | |||
| 413 | // WebSocket protocols (used for relay URLs) | ||
| 319 | assert_eq!( | 414 | assert_eq!( |
| 320 | NaughtyListTracker::classify_error("SSL certificate problem"), | 415 | NaughtyListTracker::strip_urls("error: wss://relay.example.com failed"), |
| 321 | Some(NaughtyCategory::TlsCertificateInvalid) | 416 | "error: [URL] failed" |
| 322 | ); | 417 | ); |
| 323 | assert_eq!( | 418 | assert_eq!( |
| 324 | NaughtyListTracker::classify_error("TLS handshake failed"), | 419 | NaughtyListTracker::strip_urls("error: ws://localhost:8080 failed"), |
| 325 | Some(NaughtyCategory::TlsCertificateInvalid) | 420 | "error: [URL] failed" |
| 326 | ); | 421 | ); |
| 422 | } | ||
| 327 | 423 | ||
| 328 | // TLS timeout should NOT be classified as naughty | 424 | #[test] |
| 329 | assert_eq!( | 425 | fn test_strip_urls_multiple() { |
| 330 | NaughtyListTracker::classify_error("TLS connection timed out"), | 426 | let error = "failed to clone https://a.com/repo.git and wss://relay.com"; |
| 331 | None | 427 | let stripped = NaughtyListTracker::strip_urls(error); |
| 332 | ); | 428 | assert_eq!(stripped, "failed to clone [URL] and [URL]"); |
| 333 | } | 429 | } |
| 334 | 430 | ||
| 335 | #[test] | 431 | #[test] |
| 336 | fn test_classify_protocol_errors() { | 432 | fn test_strip_urls_preserves_error_text() { |
| 337 | assert_eq!( | 433 | let error = |
| 338 | NaughtyListTracker::classify_error("websocket protocol error"), | 434 | "fatal: unable to access 'https://example.com/repo.git/': SSL certificate problem"; |
| 339 | Some(NaughtyCategory::ProtocolError) | 435 | let stripped = NaughtyListTracker::strip_urls(error); |
| 340 | ); | 436 | assert!(stripped.contains("SSL certificate problem")); |
| 437 | assert!(!stripped.contains("example.com")); | ||
| 438 | } | ||
| 439 | |||
| 440 | // ========================================================================= | ||
| 441 | // EDGE CASES: TIMEOUT/CONNECTION EXCEPTIONS | ||
| 442 | // These are the "unusual rules" where a pattern matches but should be excluded | ||
| 443 | // ========================================================================= | ||
| 444 | |||
| 445 | #[test] | ||
| 446 | fn test_tls_timeout_not_naughty() { | ||
| 447 | // TLS errors with timeout should NOT be classified as naughty | ||
| 448 | // (timeout is transient, not a certificate problem) | ||
| 341 | assert_eq!( | 449 | assert_eq!( |
| 342 | NaughtyListTracker::classify_error("invalid frame header"), | 450 | NaughtyListTracker::classify_error("TLS connection timed out"), |
| 343 | Some(NaughtyCategory::ProtocolError) | 451 | None |
| 344 | ); | 452 | ); |
| 345 | |||
| 346 | // WebSocket connection errors should NOT be classified as naughty | ||
| 347 | assert_eq!( | 453 | assert_eq!( |
| 348 | NaughtyListTracker::classify_error("websocket connection refused"), | 454 | NaughtyListTracker::classify_error("SSL handshake timeout"), |
| 349 | None | 455 | None |
| 350 | ); | 456 | ); |
| 351 | } | 457 | } |
| 352 | 458 | ||
| 353 | #[test] | 459 | #[test] |
| 354 | fn test_classify_transient_errors() { | 460 | fn test_websocket_connection_errors_not_naughty() { |
| 355 | // Timeouts are transient | 461 | // WebSocket connection errors are transient, not protocol violations |
| 356 | assert_eq!( | 462 | assert_eq!( |
| 357 | NaughtyListTracker::classify_error("connection timed out"), | 463 | NaughtyListTracker::classify_error("websocket connection refused"), |
| 358 | None | 464 | None |
| 359 | ); | 465 | ); |
| 360 | assert_eq!( | 466 | assert_eq!( |
| 361 | NaughtyListTracker::classify_error("operation timed out"), | 467 | NaughtyListTracker::classify_error("websocket connection timeout"), |
| 362 | None | 468 | None |
| 363 | ); | 469 | ); |
| 470 | } | ||
| 364 | 471 | ||
| 365 | // Connection refused is transient | 472 | #[test] |
| 473 | fn test_remote_warnings_filtered() { | ||
| 474 | // Remote warnings should be filtered out before classification | ||
| 475 | let warning_only = | ||
| 476 | "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"; | ||
| 477 | assert_eq!(NaughtyListTracker::classify_error(warning_only), None); | ||
| 478 | |||
| 479 | // But real errors after warnings should still be classified | ||
| 480 | let warning_with_error = "remote: warning: something\nfatal: failed to lookup address"; | ||
| 366 | assert_eq!( | 481 | assert_eq!( |
| 367 | NaughtyListTracker::classify_error("connection refused"), | 482 | NaughtyListTracker::classify_error(warning_with_error), |
| 368 | None | 483 | Some(NaughtyCategory::DnsLookupFailed) |
| 369 | ); | 484 | ); |
| 485 | } | ||
| 370 | 486 | ||
| 371 | // Generic network errors are transient | 487 | // ========================================================================= |
| 488 | // INTEGRATION: FULL CLASSIFICATION FLOW | ||
| 489 | // Verify URL stripping + classification work together correctly | ||
| 490 | // ========================================================================= | ||
| 491 | |||
| 492 | #[test] | ||
| 493 | fn test_url_with_keywords_not_false_positive() { | ||
| 494 | // URLs containing keywords should NOT trigger classification | ||
| 495 | let cases = [ | ||
| 496 | ("https://example.com/my-openssl-project.git", "not found"), | ||
| 497 | ("https://example.com/ssl-team/repo.git", "not found"), | ||
| 498 | ("https://example.com/certificate-manager.git", "not found"), | ||
| 499 | ("https://example.com/dns-tools.git", "not found"), | ||
| 500 | ("wss://relay-tls-test.example.com", "connection refused"), | ||
| 501 | ]; | ||
| 502 | |||
| 503 | for (url, suffix) in cases { | ||
| 504 | let error = format!("fatal: repository '{}/' {}", url, suffix); | ||
| 505 | assert_eq!( | ||
| 506 | NaughtyListTracker::classify_error(&error), | ||
| 507 | None, | ||
| 508 | "URL '{}' should not trigger false positive", | ||
| 509 | url | ||
| 510 | ); | ||
| 511 | } | ||
| 512 | } | ||
| 513 | |||
| 514 | #[test] | ||
| 515 | fn test_real_errors_still_detected() { | ||
| 516 | // Real errors in the message text (not URL) should still be detected | ||
| 372 | assert_eq!( | 517 | assert_eq!( |
| 373 | NaughtyListTracker::classify_error("network unreachable"), | 518 | NaughtyListTracker::classify_error( |
| 374 | None | 519 | "fatal: 'https://example.com/repo.git': SSL certificate problem" |
| 520 | ), | ||
| 521 | Some(NaughtyCategory::TlsCertificateInvalid) | ||
| 522 | ); | ||
| 523 | assert_eq!( | ||
| 524 | NaughtyListTracker::classify_error( | ||
| 525 | "fatal: 'https://example.com/repo.git': failed to lookup address" | ||
| 526 | ), | ||
| 527 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 528 | ); | ||
| 529 | assert_eq!( | ||
| 530 | NaughtyListTracker::classify_error("websocket protocol error"), | ||
| 531 | Some(NaughtyCategory::ProtocolError) | ||
| 375 | ); | 532 | ); |
| 376 | } | 533 | } |
| 377 | 534 | ||
| 378 | #[test] | 535 | #[test] |
| 379 | fn test_record_new_entry() { | 536 | fn test_url_with_keyword_and_real_error() { |
| 380 | let tracker = NaughtyListTracker::with_defaults(); | 537 | // URL contains keyword AND there's a real error - should detect the error |
| 381 | let url = "wss://bad-relay.example.com"; | 538 | let error = "fatal: 'https://example.com/ssl-tools/repo.git': SSL certificate problem"; |
| 382 | 539 | assert_eq!( | |
| 383 | let is_new = tracker.record( | 540 | NaughtyListTracker::classify_error(error), |
| 384 | url, | 541 | Some(NaughtyCategory::TlsCertificateInvalid) |
| 385 | NaughtyCategory::DnsLookupFailed, | ||
| 386 | "failed to lookup address".to_string(), | ||
| 387 | ); | 542 | ); |
| 388 | |||
| 389 | assert!(is_new); | ||
| 390 | assert!(tracker.is_naughty(url)); | ||
| 391 | |||
| 392 | let entry = tracker.get_entry(url).unwrap(); | ||
| 393 | assert_eq!(entry.category, NaughtyCategory::DnsLookupFailed); | ||
| 394 | assert_eq!(entry.occurrence_count, 1); | ||
| 395 | } | 543 | } |
| 396 | 544 | ||
| 545 | // ========================================================================= | ||
| 546 | // TRACKER FUNCTIONALITY | ||
| 547 | // ========================================================================= | ||
| 548 | |||
| 397 | #[test] | 549 | #[test] |
| 398 | fn test_record_updates_existing() { | 550 | fn test_tracker_record_and_update() { |
| 399 | let tracker = NaughtyListTracker::with_defaults(); | 551 | let tracker = NaughtyListTracker::with_defaults(); |
| 400 | let url = "wss://bad-relay.example.com"; | 552 | let url = "wss://bad-relay.example.com"; |
| 401 | 553 | ||
| 402 | // First occurrence | 554 | // First occurrence |
| 403 | let is_new1 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); | 555 | let is_new = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); |
| 404 | assert!(is_new1); | 556 | assert!(is_new); |
| 557 | assert!(tracker.is_naughty(url)); | ||
| 405 | 558 | ||
| 406 | // Second occurrence | 559 | // Second occurrence updates existing |
| 407 | let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); | 560 | let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); |
| 408 | assert!(!is_new2); | 561 | assert!(!is_new2); |
| 409 | 562 | ||
| 410 | let entry = tracker.get_entry(url).unwrap(); | 563 | let entry = tracker.get_entry(url).unwrap(); |
| 411 | assert_eq!(entry.occurrence_count, 2); | 564 | assert_eq!(entry.occurrence_count, 2); |
| 412 | assert_eq!(entry.reason, "error 2"); // Updated to latest | 565 | assert_eq!(entry.reason, "error 2"); |
| 413 | } | 566 | } |
| 414 | 567 | ||
| 415 | #[test] | 568 | #[test] |
| 416 | fn test_is_naughty() { | 569 | fn test_tracker_expiration() { |
| 417 | let tracker = NaughtyListTracker::with_defaults(); | 570 | let tracker = NaughtyListTracker::new(0); // Expire immediately |
| 418 | let url = "wss://bad-relay.example.com"; | ||
| 419 | |||
| 420 | assert!(!tracker.is_naughty(url)); | ||
| 421 | 571 | ||
| 422 | tracker.record( | 572 | tracker.record( |
| 423 | url, | 573 | "wss://relay.example.com", |
| 424 | NaughtyCategory::TlsCertificateInvalid, | 574 | NaughtyCategory::DnsLookupFailed, |
| 425 | "cert error".to_string(), | 575 | "error".to_string(), |
| 426 | ); | 576 | ); |
| 427 | 577 | ||
| 428 | assert!(tracker.is_naughty(url)); | 578 | // Entry exists but is expired |
| 429 | } | 579 | assert!(!tracker.is_naughty("wss://relay.example.com")); |
| 430 | 580 | ||
| 431 | #[test] | 581 | std::thread::sleep(std::time::Duration::from_millis(10)); |
| 432 | fn test_get_all() { | ||
| 433 | let tracker = NaughtyListTracker::with_defaults(); | ||
| 434 | |||
| 435 | tracker.record( | ||
| 436 | "wss://relay1.example.com", | ||
| 437 | NaughtyCategory::DnsLookupFailed, | ||
| 438 | "dns error".to_string(), | ||
| 439 | ); | ||
| 440 | tracker.record( | ||
| 441 | "wss://relay2.example.com", | ||
| 442 | NaughtyCategory::TlsCertificateInvalid, | ||
| 443 | "tls error".to_string(), | ||
| 444 | ); | ||
| 445 | 582 | ||
| 446 | let all = tracker.get_all(); | 583 | let expired = tracker.expire_old_entries(); |
| 447 | assert_eq!(all.len(), 2); | 584 | assert_eq!(expired.len(), 1); |
| 585 | assert_eq!(tracker.total_count(), 0); | ||
| 448 | } | 586 | } |
| 449 | 587 | ||
| 450 | #[test] | 588 | #[test] |
| 451 | fn test_count_by_category() { | 589 | fn test_tracker_counts() { |
| 452 | let tracker = NaughtyListTracker::with_defaults(); | 590 | let tracker = NaughtyListTracker::with_defaults(); |
| 453 | 591 | ||
| 592 | tracker.record("wss://r1.com", NaughtyCategory::DnsLookupFailed, "e".into()); | ||
| 593 | tracker.record("wss://r2.com", NaughtyCategory::DnsLookupFailed, "e".into()); | ||
| 454 | tracker.record( | 594 | tracker.record( |
| 455 | "wss://relay1.example.com", | 595 | "wss://r3.com", |
| 456 | NaughtyCategory::DnsLookupFailed, | ||
| 457 | "error".to_string(), | ||
| 458 | ); | ||
| 459 | tracker.record( | ||
| 460 | "wss://relay2.example.com", | ||
| 461 | NaughtyCategory::DnsLookupFailed, | ||
| 462 | "error".to_string(), | ||
| 463 | ); | ||
| 464 | tracker.record( | ||
| 465 | "wss://relay3.example.com", | ||
| 466 | NaughtyCategory::TlsCertificateInvalid, | 596 | NaughtyCategory::TlsCertificateInvalid, |
| 467 | "error".to_string(), | 597 | "e".into(), |
| 468 | ); | 598 | ); |
| 469 | 599 | ||
| 600 | assert_eq!(tracker.total_count(), 3); | ||
| 470 | assert_eq!( | 601 | assert_eq!( |
| 471 | tracker.count_by_category(NaughtyCategory::DnsLookupFailed), | 602 | tracker.count_by_category(NaughtyCategory::DnsLookupFailed), |
| 472 | 2 | 603 | 2 |
| @@ -475,82 +606,84 @@ mod tests { | |||
| 475 | tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), | 606 | tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), |
| 476 | 1 | 607 | 1 |
| 477 | ); | 608 | ); |
| 478 | assert_eq!(tracker.count_by_category(NaughtyCategory::ProtocolError), 0); | 609 | assert_eq!(tracker.get_all().len(), 3); |
| 479 | } | 610 | } |
| 480 | 611 | ||
| 481 | #[test] | 612 | #[test] |
| 482 | fn test_total_count() { | 613 | fn test_category_display() { |
| 483 | let tracker = NaughtyListTracker::with_defaults(); | 614 | assert_eq!( |
| 484 | assert_eq!(tracker.total_count(), 0); | 615 | NaughtyCategory::DnsLookupFailed.as_str(), |
| 485 | 616 | "dns_lookup_failed" | |
| 486 | tracker.record( | ||
| 487 | "wss://relay1.example.com", | ||
| 488 | NaughtyCategory::DnsLookupFailed, | ||
| 489 | "error".to_string(), | ||
| 490 | ); | 617 | ); |
| 491 | assert_eq!(tracker.total_count(), 1); | 618 | assert_eq!( |
| 492 | 619 | NaughtyCategory::TlsCertificateInvalid.as_str(), | |
| 493 | tracker.record( | 620 | "tls_certificate_invalid" |
| 494 | "wss://relay2.example.com", | ||
| 495 | NaughtyCategory::TlsCertificateInvalid, | ||
| 496 | "error".to_string(), | ||
| 497 | ); | 621 | ); |
| 498 | assert_eq!(tracker.total_count(), 2); | 622 | assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); |
| 499 | } | 623 | } |
| 624 | } | ||
| 500 | 625 | ||
| 501 | #[test] | 626 | #[cfg(test)] |
| 502 | fn test_expire_old_entries() { | 627 | mod production_tests { |
| 503 | // Use very short expiration for testing | 628 | use super::*; |
| 504 | let tracker = NaughtyListTracker::new(0); // Expire immediately (0 hours) | ||
| 505 | |||
| 506 | tracker.record( | ||
| 507 | "wss://relay1.example.com", | ||
| 508 | NaughtyCategory::DnsLookupFailed, | ||
| 509 | "error".to_string(), | ||
| 510 | ); | ||
| 511 | |||
| 512 | // Entry should exist in the map | ||
| 513 | assert_eq!(tracker.total_count(), 1); | ||
| 514 | |||
| 515 | // But is_naughty should return false since it's already expired (0 hours) | ||
| 516 | assert!(!tracker.is_naughty("wss://relay1.example.com")); | ||
| 517 | |||
| 518 | // Sleep to ensure time passes | ||
| 519 | std::thread::sleep(std::time::Duration::from_millis(10)); | ||
| 520 | 629 | ||
| 521 | // Expire old entries (should remove the 0-hour expired entry) | 630 | /// Production case from relay.ngit.dev - remote warning should not be classified |
| 522 | let expired = tracker.expire_old_entries(); | 631 | #[test] |
| 523 | assert_eq!(expired.len(), 1); | 632 | fn test_classify_production_relay_ngit_dev_warning() { |
| 524 | assert_eq!(expired[0], "wss://relay1.example.com"); | 633 | let error = |
| 634 | "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"; | ||
| 635 | assert_eq!(NaughtyListTracker::classify_error(error), None); | ||
| 636 | } | ||
| 525 | 637 | ||
| 526 | // Entry should be gone | 638 | /// Git protocol errors are transient, not persistent infrastructure issues |
| 527 | assert!(!tracker.is_naughty("wss://relay1.example.com")); | 639 | #[test] |
| 528 | assert_eq!(tracker.total_count(), 0); | 640 | fn test_git_protocol_errors_not_naughty() { |
| 641 | // These are common git protocol errors that should NOT be classified as naughty | ||
| 642 | let git_protocol_errors = [ | ||
| 643 | "fatal: protocol error: bad line length character: remo", | ||
| 644 | "fatal: protocol error: expected old/new/ref, got 'shallow", | ||
| 645 | "fatal: git upload-pack: protocol error", | ||
| 646 | "error: protocol error: bad pack header", | ||
| 647 | "fatal: protocol error: bad band #3", | ||
| 648 | ]; | ||
| 649 | |||
| 650 | for error in git_protocol_errors { | ||
| 651 | assert_eq!( | ||
| 652 | NaughtyListTracker::classify_error(error), | ||
| 653 | None, | ||
| 654 | "Git protocol error should not be classified as naughty: {}", | ||
| 655 | error | ||
| 656 | ); | ||
| 657 | } | ||
| 529 | } | 658 | } |
| 530 | 659 | ||
| 660 | /// Remote warning followed by git protocol error - both should be filtered/ignored | ||
| 531 | #[test] | 661 | #[test] |
| 532 | fn test_category_display() { | 662 | fn test_warning_with_git_protocol_error() { |
| 663 | let error = "remote: warning: unable to access '/root/.config/git/attributes': Permission denied\nfatal: protocol error: bad line length character: remo"; | ||
| 533 | assert_eq!( | 664 | assert_eq!( |
| 534 | NaughtyCategory::DnsLookupFailed.to_string(), | 665 | NaughtyListTracker::classify_error(error), |
| 535 | "dns_lookup_failed" | 666 | None, |
| 667 | "Warning + git protocol error should not be classified as naughty" | ||
| 536 | ); | 668 | ); |
| 537 | assert_eq!( | ||
| 538 | NaughtyCategory::TlsCertificateInvalid.to_string(), | ||
| 539 | "tls_certificate_invalid" | ||
| 540 | ); | ||
| 541 | assert_eq!(NaughtyCategory::ProtocolError.to_string(), "protocol_error"); | ||
| 542 | } | 669 | } |
| 543 | 670 | ||
| 671 | /// WebSocket protocol errors ARE naughty (persistent infrastructure issues) | ||
| 544 | #[test] | 672 | #[test] |
| 545 | fn test_category_as_str() { | 673 | fn test_websocket_errors_still_naughty() { |
| 546 | assert_eq!( | 674 | let websocket_errors = [ |
| 547 | NaughtyCategory::DnsLookupFailed.as_str(), | 675 | "websocket protocol error", |
| 548 | "dns_lookup_failed" | 676 | "websocket handshake failed", |
| 549 | ); | 677 | "invalid frame received", |
| 550 | assert_eq!( | 678 | ]; |
| 551 | NaughtyCategory::TlsCertificateInvalid.as_str(), | 679 | |
| 552 | "tls_certificate_invalid" | 680 | for error in websocket_errors { |
| 553 | ); | 681 | assert_eq!( |
| 554 | assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); | 682 | NaughtyListTracker::classify_error(error), |
| 683 | Some(NaughtyCategory::ProtocolError), | ||
| 684 | "WebSocket error should be classified as protocol_error: {}", | ||
| 685 | error | ||
| 686 | ); | ||
| 687 | } | ||
| 555 | } | 688 | } |
| 556 | } | 689 | } |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index ab10c49..4d69c9a 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -99,6 +101,8 @@ pub struct SelfSubscriber { | |||
| 99 | action_tx: mpsc::Sender<AddFilters>, | 101 | action_tx: mpsc::Sender<AddFilters>, |
| 100 | /// Last time we connected - used for since filter on reconnect | 102 | /// Last time we connected - used for since filter on reconnect |
| 101 | last_connected: Option<Timestamp>, | 103 | last_connected: Option<Timestamp>, |
| 104 | /// Database for querying existing events on startup | ||
| 105 | database: SharedDatabase, | ||
| 102 | } | 106 | } |
| 103 | 107 | ||
| 104 | impl SelfSubscriber { | 108 | impl SelfSubscriber { |
| @@ -109,11 +113,13 @@ impl SelfSubscriber { | |||
| 109 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 113 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 110 | /// * `repo_sync_index` - Shared index to update with discovered repos | 114 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 111 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 115 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 116 | /// * `database` - Database for querying existing events on startup | ||
| 112 | pub fn new( | 117 | pub fn new( |
| 113 | own_relay_url: String, | 118 | own_relay_url: String, |
| 114 | relay_domain: String, | 119 | relay_domain: String, |
| 115 | repo_sync_index: RepoSyncIndex, | 120 | repo_sync_index: RepoSyncIndex, |
| 116 | action_tx: mpsc::Sender<AddFilters>, | 121 | action_tx: mpsc::Sender<AddFilters>, |
| 122 | database: SharedDatabase, | ||
| 117 | ) -> Self { | 123 | ) -> Self { |
| 118 | Self { | 124 | Self { |
| 119 | own_relay_url, | 125 | own_relay_url, |
| @@ -121,6 +127,7 @@ impl SelfSubscriber { | |||
| 121 | repo_sync_index, | 127 | repo_sync_index, |
| 122 | action_tx, | 128 | action_tx, |
| 123 | last_connected: None, | 129 | last_connected: None, |
| 130 | database, | ||
| 124 | } | 131 | } |
| 125 | } | 132 | } |
| 126 | 133 | ||
| @@ -136,6 +143,102 @@ impl SelfSubscriber { | |||
| 136 | } | 143 | } |
| 137 | } | 144 | } |
| 138 | 145 | ||
| 146 | /// Load existing events from database on startup | ||
| 147 | /// | ||
| 148 | /// Queries the database with two separate queries to build the initial | ||
| 149 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 150 | /// created, not just those returned by the WebSocket subscription | ||
| 151 | /// (which has limits on the number of events returned). | ||
| 152 | /// | ||
| 153 | /// Query order: | ||
| 154 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 155 | /// with repos and their relays | ||
| 156 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 157 | /// to add root event IDs for Layer 3 filter creation | ||
| 158 | /// | ||
| 159 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 160 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 161 | let mut pending = PendingUpdates::new(); | ||
| 162 | |||
| 163 | tracing::info!("Loading all events from database"); | ||
| 164 | |||
| 165 | // First query: Get all announcements to populate repo_sync_index | ||
| 166 | let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 167 | |||
| 168 | let announcements = match self.database.query(announcement_filter).await { | ||
| 169 | Ok(events) => { | ||
| 170 | tracing::info!(count = events.len(), "Loaded announcements from database"); | ||
| 171 | events | ||
| 172 | } | ||
| 173 | Err(e) => { | ||
| 174 | tracing::error!( | ||
| 175 | error = %e, | ||
| 176 | "Failed to query announcements from database" | ||
| 177 | ); | ||
| 178 | return pending; | ||
| 179 | } | ||
| 180 | }; | ||
| 181 | |||
| 182 | // Process announcements | ||
| 183 | let mut announcements_loaded = 0; | ||
| 184 | for event in announcements.iter() { | ||
| 185 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 186 | let relays = Self::extract_relay_urls(event); | ||
| 187 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 188 | announcements_loaded += 1; | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 192 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 193 | { | ||
| 194 | let mut index = self.repo_sync_index.write().await; | ||
| 195 | for (repo_id, needs) in &pending.repos { | ||
| 196 | let entry = index | ||
| 197 | .entry(repo_id.clone()) | ||
| 198 | .or_insert_with(|| RepoSyncNeeds { | ||
| 199 | relays: HashSet::new(), | ||
| 200 | root_events: HashSet::new(), | ||
| 201 | sync_level: SyncLevel::StateOnly, | ||
| 202 | }); | ||
| 203 | entry.relays.extend(needs.relays.clone()); | ||
| 204 | } | ||
| 205 | } | ||
| 206 | |||
| 207 | // Second query: Get all root events for handle_root_event() | ||
| 208 | let root_filter = | ||
| 209 | Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]); | ||
| 210 | |||
| 211 | let root_events = match self.database.query(root_filter).await { | ||
| 212 | Ok(events) => { | ||
| 213 | tracing::info!(count = events.len(), "Loaded root events from database"); | ||
| 214 | events | ||
| 215 | } | ||
| 216 | Err(e) => { | ||
| 217 | tracing::error!( | ||
| 218 | error = %e, | ||
| 219 | "Failed to query root events from database" | ||
| 220 | ); | ||
| 221 | // Continue with just announcements | ||
| 222 | return pending; | ||
| 223 | } | ||
| 224 | }; | ||
| 225 | |||
| 226 | // Process root events | ||
| 227 | let mut root_events_processed = 0; | ||
| 228 | for event in root_events.iter() { | ||
| 229 | self.handle_root_event(event, &mut pending).await; | ||
| 230 | root_events_processed += 1; | ||
| 231 | } | ||
| 232 | |||
| 233 | tracing::info!( | ||
| 234 | announcements_loaded = announcements_loaded, | ||
| 235 | root_events_processed = root_events_processed, | ||
| 236 | "Processed existing events from database" | ||
| 237 | ); | ||
| 238 | |||
| 239 | pending | ||
| 240 | } | ||
| 241 | |||
| 139 | /// Process a relay pool notification | 242 | /// Process a relay pool notification |
| 140 | /// | 243 | /// |
| 141 | /// Handles incoming events from the subscription, queueing 30617 announcements | 244 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -277,33 +380,22 @@ impl SelfSubscriber { | |||
| 277 | // Subscribe to announcement and root event kinds | 380 | // Subscribe to announcement and root event kinds |
| 278 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 381 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 279 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 382 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 280 | // Check if we have a last_connected time for reconnect filtering | 383 | let mut filter = Filter::new().kinds(vec![ |
| 281 | let filter = if let Some(last) = self.last_connected { | 384 | Kind::GitRepoAnnouncement, |
| 385 | Kind::GitPatch, | ||
| 386 | Kind::GitIssue, | ||
| 387 | Kind::GitPullRequest, | ||
| 388 | Kind::GitUserGraspList, | ||
| 389 | ]); | ||
| 390 | if let Some(timestamp) = self.last_connected { | ||
| 282 | // Quick reconnect - use since filter (15 min buffer) | 391 | // Quick reconnect - use since filter (15 min buffer) |
| 283 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 392 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 284 | tracing::debug!( | 393 | tracing::debug!( |
| 285 | since = %since, | 394 | since = %since, |
| 286 | "Using since filter for reconnect" | 395 | "Using since filter for reconnect" |
| 287 | ); | 396 | ); |
| 288 | Filter::new() | 397 | filter = filter.since(since); |
| 289 | .kinds(vec![ | 398 | } |
| 290 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 291 | Kind::GitPatch, // Patches | ||
| 292 | Kind::GitIssue, // Issues | ||
| 293 | Kind::GitPullRequest, // Pull Requests | ||
| 294 | Kind::GitUserGraspList, // User Grasp List | ||
| 295 | ]) | ||
| 296 | .since(since) | ||
| 297 | } else { | ||
| 298 | // First connection - no since filter | ||
| 299 | Filter::new().kinds(vec![ | ||
| 300 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 301 | Kind::GitPatch, // Patches | ||
| 302 | Kind::GitIssue, // Issues | ||
| 303 | Kind::GitPullRequest, // Pull Requests | ||
| 304 | Kind::GitUserGraspList, // User Grasp List | ||
| 305 | ]) | ||
| 306 | }; | ||
| 307 | 399 | ||
| 308 | // Update last_connected AFTER creating filter but BEFORE subscribing | 400 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 309 | self.last_connected = Some(Timestamp::now()); | 401 | self.last_connected = Some(Timestamp::now()); |
| @@ -324,7 +416,11 @@ impl SelfSubscriber { | |||
| 324 | 416 | ||
| 325 | let mut notifications = client.notifications(); | 417 | let mut notifications = client.notifications(); |
| 326 | let batch_window = Self::get_batch_window(); | 418 | let batch_window = Self::get_batch_window(); |
| 327 | let mut pending = PendingUpdates::new(); | 419 | |
| 420 | // Load existing events from database on startup | ||
| 421 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 422 | // returned by the WebSocket subscription (which has limits) | ||
| 423 | let mut pending = self.load_existing_events().await; | ||
| 328 | 424 | ||
| 329 | // Timer does NOT reset on new events - use interval | 425 | // Timer does NOT reset on new events - use interval |
| 330 | let mut timer = tokio::time::interval(batch_window); | 426 | let mut timer = tokio::time::interval(batch_window); |