upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 15:20:59 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 15:20:59 +0000
commit113928aa84894ea8f65c247d9987527e792b32a9 (patch)
treeec967d6195d9f7ec4f061449596611afe3a0950f /src
parent26f608e5011b9d1ad6036da75b89272835e69695 (diff)
parente0ad39a489b3398f8208713bf728db0cb11475b0 (diff)
Merge master into 3ca0-announcements-purgatory
Diffstat (limited to 'src')
-rw-r--r--src/config.rs30
-rw-r--r--src/git/authorization.rs102
-rw-r--r--src/git/handlers.rs108
-rw-r--r--src/git/mod.rs2
-rw-r--r--src/git/sync.rs6
-rw-r--r--src/http/mod.rs2
-rw-r--r--src/main.rs75
-rw-r--r--src/nostr/builder.rs165
-rw-r--r--src/nostr/events.rs2
-rw-r--r--src/nostr/policy/state.rs9
-rw-r--r--src/purgatory/helpers.rs204
-rw-r--r--src/purgatory/mod.rs341
-rw-r--r--src/purgatory/sync/context.rs163
-rw-r--r--src/purgatory/sync/functions.rs22
-rw-r--r--src/purgatory/types.rs30
-rw-r--r--src/sync/mod.rs2
-rw-r--r--src/sync/naughty_list.rs485
-rw-r--r--src/sync/self_subscriber.rs142
18 files changed, 1493 insertions, 397 deletions
diff --git a/src/config.rs b/src/config.rs
index 7062187..dd7b1e3 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -115,16 +115,19 @@ pub struct ArchiveConfig {
115 /// 115 ///
116 /// WARNING: Setting this to true allows anyone to mirror any repository 116 /// WARNING: Setting this to true allows anyone to mirror any repository
117 /// to this relay, potentially causing storage/bandwidth exhaustion. 117 /// to this relay, potentially causing storage/bandwidth exhaustion.
118 #[serde(default)]
118 pub archive_all: bool, 119 pub archive_all: bool,
119 120
120 /// Whitelist entries for selective archiving 121 /// Whitelist entries for selective archiving
121 /// 122 ///
122 /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode). 123 /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode).
124 #[serde(default)]
123 pub whitelist: Vec<WhitelistEntry>, 125 pub whitelist: Vec<WhitelistEntry>,
124 126
125 /// GRASP server domains to archive (archive all repositories from these domains) 127 /// GRASP server domains to archive (archive all repositories from these domains)
126 /// 128 ///
127 /// If non-empty, archives all repositories from the specified GRASP server domains. 129 /// If non-empty, archives all repositories from the specified GRASP server domains.
130 #[serde(default)]
128 pub grasp_services: Vec<String>, 131 pub grasp_services: Vec<String>,
129 132
130 /// Read-only archive mode: relay is a read-only sync of archived repositories 133 /// Read-only archive mode: relay is a read-only sync of archived repositories
@@ -132,6 +135,7 @@ pub struct ArchiveConfig {
132 /// When true, the relay ONLY accepts announcements matching the archive whitelist/all. 135 /// When true, the relay ONLY accepts announcements matching the archive whitelist/all.
133 /// Announcements listing the relay but not in the whitelist are rejected. 136 /// Announcements listing the relay but not in the whitelist are rejected.
134 /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos. 137 /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos.
138 #[serde(default)]
135 pub read_only: bool, 139 pub read_only: bool,
136} 140}
137 141
@@ -178,6 +182,7 @@ pub struct RepositoryConfig {
178 /// Whitelist entries for selective repository acceptance 182 /// Whitelist entries for selective repository acceptance
179 /// 183 ///
180 /// If empty, all repositories listing the service are accepted (GRASP-01 mode). 184 /// If empty, all repositories listing the service are accepted (GRASP-01 mode).
185 #[serde(default)]
181 pub whitelist: Vec<WhitelistEntry>, 186 pub whitelist: Vec<WhitelistEntry>,
182} 187}
183 188
@@ -204,6 +209,7 @@ pub struct BlacklistConfig {
204 /// 209 ///
205 /// If empty, no repositories are blacklisted. 210 /// If empty, no repositories are blacklisted.
206 /// Blacklist takes precedence over both archive and repository whitelists. 211 /// Blacklist takes precedence over both archive and repository whitelists.
212 #[serde(default)]
207 pub blacklist: Vec<WhitelistEntry>, 213 pub blacklist: Vec<WhitelistEntry>,
208} 214}
209 215
@@ -245,6 +251,7 @@ pub struct EventBlacklistConfig {
245 /// 251 ///
246 /// If empty, no events are blacklisted by author. 252 /// If empty, no events are blacklisted by author.
247 /// Applies to ALL event types, preventing events from reaching both the relay and purgatory. 253 /// Applies to ALL event types, preventing events from reaching both the relay and purgatory.
254 #[serde(default)]
248 pub blacklisted_npubs: Vec<String>, 255 pub blacklisted_npubs: Vec<String>,
249} 256}
250 257
@@ -466,6 +473,10 @@ pub struct Config {
466 /// Prevents connection exhaustion DoS attacks 473 /// Prevents connection exhaustion DoS attacks
467 #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)] 474 #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)]
468 pub max_connections: usize, 475 pub max_connections: usize,
476
477 /// Log level for application logging
478 #[arg(long, env = "NGIT_LOG_LEVEL", default_value = "info")]
479 pub log_level: String,
469} 480}
470 481
471impl Config { 482impl Config {
@@ -748,6 +759,7 @@ impl Config {
748 repository_blacklist: String::new(), 759 repository_blacklist: String::new(),
749 event_blacklist: String::new(), 760 event_blacklist: String::new(),
750 max_connections: 500, 761 max_connections: 500,
762 log_level: "debug".to_string(),
751 } 763 }
752 } 764 }
753} 765}
@@ -1069,14 +1081,14 @@ mod tests {
1069 fn test_archive_read_only_defaults() { 1081 fn test_archive_read_only_defaults() {
1070 // Default: false when no archive mode 1082 // Default: false when no archive mode
1071 let config = Config::for_testing(); 1083 let config = Config::for_testing();
1072 assert_eq!(config.archive_config().read_only, false); 1084 assert!(!config.archive_config().read_only);
1073 1085
1074 // Default: true when archive_all is set 1086 // Default: true when archive_all is set
1075 let config = Config { 1087 let config = Config {
1076 archive_all: true, 1088 archive_all: true,
1077 ..Config::for_testing() 1089 ..Config::for_testing()
1078 }; 1090 };
1079 assert_eq!(config.archive_config().read_only, true); 1091 assert!(config.archive_config().read_only);
1080 1092
1081 // Default: true when archive_whitelist is set 1093 // Default: true when archive_whitelist is set
1082 let keys = Keys::generate(); 1094 let keys = Keys::generate();
@@ -1085,7 +1097,7 @@ mod tests {
1085 archive_whitelist: test_npub, 1097 archive_whitelist: test_npub,
1086 ..Config::for_testing() 1098 ..Config::for_testing()
1087 }; 1099 };
1088 assert_eq!(config.archive_config().read_only, true); 1100 assert!(config.archive_config().read_only);
1089 } 1101 }
1090 1102
1091 #[test] 1103 #[test]
@@ -1096,7 +1108,7 @@ mod tests {
1096 archive_read_only: Some(true), 1108 archive_read_only: Some(true),
1097 ..Config::for_testing() 1109 ..Config::for_testing()
1098 }; 1110 };
1099 assert_eq!(config.archive_config().read_only, true); 1111 assert!(config.archive_config().read_only);
1100 1112
1101 // Explicit false with archive_all (unusual but allowed) 1113 // Explicit false with archive_all (unusual but allowed)
1102 let config = Config { 1114 let config = Config {
@@ -1104,14 +1116,14 @@ mod tests {
1104 archive_read_only: Some(false), 1116 archive_read_only: Some(false),
1105 ..Config::for_testing() 1117 ..Config::for_testing()
1106 }; 1118 };
1107 assert_eq!(config.archive_config().read_only, false); 1119 assert!(!config.archive_config().read_only);
1108 1120
1109 // Explicit false without archive mode 1121 // Explicit false without archive mode
1110 let config = Config { 1122 let config = Config {
1111 archive_read_only: Some(false), 1123 archive_read_only: Some(false),
1112 ..Config::for_testing() 1124 ..Config::for_testing()
1113 }; 1125 };
1114 assert_eq!(config.archive_config().read_only, false); 1126 assert!(!config.archive_config().read_only);
1115 } 1127 }
1116 1128
1117 #[test] 1129 #[test]
@@ -1514,7 +1526,7 @@ mod tests {
1514 }; 1526 };
1515 let archive_config = config.archive_config(); 1527 let archive_config = config.archive_config();
1516 assert!(archive_config.enabled()); 1528 assert!(archive_config.enabled());
1517 assert_eq!(archive_config.read_only, true); // Default to true 1529 assert!(archive_config.read_only); // Default to true
1518 } 1530 }
1519 1531
1520 #[test] 1532 #[test]
@@ -1524,7 +1536,7 @@ mod tests {
1524 archive_grasp_services: "git.example.com".to_string(), 1536 archive_grasp_services: "git.example.com".to_string(),
1525 ..Config::for_testing() 1537 ..Config::for_testing()
1526 }; 1538 };
1527 assert_eq!(config.archive_config().read_only, true); 1539 assert!(config.archive_config().read_only);
1528 } 1540 }
1529 1541
1530 #[test] 1542 #[test]
@@ -1535,7 +1547,7 @@ mod tests {
1535 archive_read_only: Some(false), 1547 archive_read_only: Some(false),
1536 ..Config::for_testing() 1548 ..Config::for_testing()
1537 }; 1549 };
1538 assert_eq!(config.archive_config().read_only, false); 1550 assert!(!config.archive_config().read_only);
1539 } 1551 }
1540 1552
1541 #[test] 1553 #[test]
diff --git a/src/git/authorization.rs b/src/git/authorization.rs
index 69a0751..df780bb 100644
--- a/src/git/authorization.rs
+++ b/src/git/authorization.rs
@@ -609,6 +609,28 @@ pub async fn get_state_authorization_for_specific_owner_repo(
609 owner_pubkey 609 owner_pubkey
610 ); 610 );
611 611
612 // Accept pushes where all refs are already at the desired state (old_oid == new_oid)
613 // This handles race conditions where state events are applied between fetch and push
614 if !pushed_refs.is_empty() {
615 let all_refs_unchanged = pushed_refs
616 .iter()
617 .all(|(old_oid, new_oid, _)| old_oid == new_oid);
618
619 if all_refs_unchanged {
620 debug!(
621 "All pushed refs unchanged (old_oid == new_oid) for {} owned by {}, accepting without purgatory check",
622 identifier, owner_pubkey
623 );
624 return Ok(AuthorizationResult {
625 authorized: true,
626 reason: "Push accepted: all refs already at desired state (no-op)".to_string(),
627 state: None,
628 maintainers: authorized.into_iter().collect(),
629 purgatory_events: vec![],
630 });
631 }
632 }
633
612 // Check purgatory for matching state events 634 // Check purgatory for matching state events
613 // Convert pushed refs to RefUpdate (filter out refs/nostr/* refs) 635 // Convert pushed refs to RefUpdate (filter out refs/nostr/* refs)
614 let pushed_updates: Vec<RefUpdate> = pushed_refs 636 let pushed_updates: Vec<RefUpdate> = pushed_refs
@@ -699,12 +721,88 @@ pub async fn get_state_authorization_for_specific_owner_repo(
699 debug!("Purgatory events found but none from authorized authors"); 721 debug!("Purgatory events found but none from authorized authors");
700 } 722 }
701 } else { 723 } else {
702 debug!("No matching state events found in purgatory"); 724 // Check if there are ANY state events in purgatory for this identifier
725 let all_purgatory_states = purgatory.find_state(identifier);
726
727 if !all_purgatory_states.is_empty() {
728 // There are state events but none match the push - diagnose why
729 debug!(
730 "Found {} state event(s) in purgatory for {} but none match the push",
731 all_purgatory_states.len(),
732 identifier
733 );
734
735 // Count authorized state events and collect diagnostic info
736 let mut authorized_count = 0;
737 let mut diagnostic_reasons = Vec::new();
738
739 // Diagnose why each authorized state event doesn't match
740 for entry in all_purgatory_states.iter() {
741 let author_hex = entry.event.pubkey.to_hex();
742 if authorized.contains(&author_hex) {
743 authorized_count += 1;
744 if let Some(reason) = crate::purgatory::diagnose_state_mismatch(
745 &entry.event,
746 &pushed_updates,
747 &local_refs,
748 ) {
749 debug!(
750 "State event {} from authorized author {} doesn't match push: {}",
751 entry.event.id,
752 entry
753 .event
754 .pubkey
755 .to_bech32()
756 .unwrap_or_else(|_| author_hex.clone()),
757 reason
758 );
759 diagnostic_reasons.push(reason);
760 }
761 }
762 }
763
764 // Create concise WARN message summarizing the rejection
765 let summary = if authorized_count > 0 {
766 let reason_summary = if !diagnostic_reasons.is_empty() {
767 // Take the first diagnostic reason as representative
768 format!(" ({})", diagnostic_reasons[0])
769 } else {
770 String::new()
771 };
772 format!(
773 "{} state event{} in purgatory from authorized publisher{} but doesn't match push{}",
774 authorized_count,
775 if authorized_count == 1 { "" } else { "s" },
776 if authorized_count == 1 { "" } else { "s" },
777 reason_summary
778 )
779 } else {
780 format!(
781 "{} state event{} in purgatory but none from authorized publishers",
782 all_purgatory_states.len(),
783 if all_purgatory_states.len() == 1 {
784 ""
785 } else {
786 "s"
787 }
788 )
789 };
790
791 warn!("Push rejected for {}: {}", identifier, summary);
792 return Ok(AuthorizationResult::denied(summary));
793 } else {
794 debug!("No state events found in purgatory for {}", identifier);
795 warn!(
796 "Push rejected for {}: No state events in purgatory",
797 identifier
798 );
799 return Ok(AuthorizationResult::denied("No state events in purgatory"));
800 }
703 } 801 }
704 802
705 // No matching state found in purgatory 803 // No matching state found in purgatory
706 Ok(AuthorizationResult::denied( 804 Ok(AuthorizationResult::denied(
707 "No state event found in purgatory from authorized publishers", 805 "No matching state event found in purgatory from authorized publishers",
708 )) 806 ))
709} 807}
710 808
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 13d6ba0..f43cbb6 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -100,6 +100,42 @@ pub async fn handle_info_refs(
100 .unwrap()) 100 .unwrap())
101} 101}
102 102
103/// Build an HTTP 200 OK response with an ERR pkt-line for git protocol errors.
104///
105/// Per the git smart HTTP protocol spec, protocol-level errors (like "not our ref")
106/// should be returned as HTTP 200 OK with the error message in pkt-line format:
107/// `PKT-LINE("ERR" SP explanation-text)`
108///
109/// This allows git clients to properly parse and display the error message.
110fn build_git_protocol_error_response(
111 service: GitService,
112 error_message: &str,
113) -> Response<Full<Bytes>> {
114 // Format: "ERR <message>\n"
115 let err_content = format!("ERR {}\n", error_message.trim());
116 let err_pktline = PktLine::data(err_content.as_bytes()).encode();
117
118 Response::builder()
119 .status(StatusCode::OK)
120 .header("content-type", service.result_content_type())
121 .header("cache-control", "no-cache")
122 .body(Full::new(Bytes::from(err_pktline)))
123 .unwrap()
124}
125
126/// Check if a git process failure is a protocol error (vs transport error).
127///
128/// Protocol errors are communicated via stderr when git exits with code 128.
129/// These should be returned to the client as HTTP 200 with ERR pkt-line.
130///
131/// Transport errors (process spawn failures, I/O errors, signals) should
132/// remain as HTTP 500 errors.
133fn is_git_protocol_error(exit_code: Option<i32>, stderr: &[u8]) -> bool {
134 // Git uses exit code 128 for protocol/usage errors
135 // If there's stderr content, it's a protocol error message
136 exit_code == Some(128) && !stderr.is_empty()
137}
138
103/// Handle POST /git-upload-pack (clone/fetch) 139/// Handle POST /git-upload-pack (clone/fetch)
104pub async fn handle_upload_pack( 140pub async fn handle_upload_pack(
105 repo_path: PathBuf, 141 repo_path: PathBuf,
@@ -121,7 +157,10 @@ pub async fn handle_upload_pack(
121 stdin 157 stdin
122 .write_all(&request_body) 158 .write_all(&request_body)
123 .await 159 .await
124 .map_err(GitError::IoError)?; 160 .map_err(|e| {
161 error!("Failed to write to git upload-pack stdin: {}", e);
162 GitError::IoError(e)
163 })?;
125 // Close stdin to signal end of input 164 // Close stdin to signal end of input
126 drop(stdin); 165 drop(stdin);
127 } 166 }
@@ -135,7 +174,10 @@ pub async fn handle_upload_pack(
135 stdout 174 stdout
136 .read_to_end(&mut output) 175 .read_to_end(&mut output)
137 .await 176 .await
138 .map_err(GitError::IoError)?; 177 .map_err(|e| {
178 error!("Failed to read git upload-pack stdout: {}", e);
179 GitError::IoError(e)
180 })?;
139 } 181 }
140 182
141 if let Some(stderr) = git.take_stderr() { 183 if let Some(stderr) = git.take_stderr() {
@@ -143,14 +185,35 @@ pub async fn handle_upload_pack(
143 stderr 185 stderr
144 .read_to_end(&mut stderr_output) 186 .read_to_end(&mut stderr_output)
145 .await 187 .await
146 .map_err(GitError::IoError)?; 188 .map_err(|e| {
189 error!("Failed to read git upload-pack stderr: {}", e);
190 GitError::IoError(e)
191 })?;
147 } 192 }
148 193
149 // Wait for process 194 // Wait for process
150 let status = git.wait().await.map_err(GitError::IoError)?; 195 let status = git.wait().await.map_err(|e| {
196 error!("Failed to wait for git upload-pack process: {}", e);
197 GitError::IoError(e)
198 })?;
151 199
152 if !status.success() { 200 if !status.success() {
153 let stderr_str = String::from_utf8_lossy(&stderr_output); 201 let stderr_str = String::from_utf8_lossy(&stderr_output);
202
203 // Check if this is a git protocol error (exit code 128 with stderr)
204 // Protocol errors should be returned as HTTP 200 with ERR pkt-line
205 if is_git_protocol_error(status.code(), &stderr_output) {
206 warn!(
207 "Git upload-pack protocol error (returning ERR pkt-line): {}",
208 stderr_str
209 );
210 return Ok(build_git_protocol_error_response(
211 GitService::UploadPack,
212 &stderr_str,
213 ));
214 }
215
216 // Transport errors (spawn failures, signals, etc.) remain as HTTP 500
154 error!("Git upload-pack failed: {}", stderr_str); 217 error!("Git upload-pack failed: {}", stderr_str);
155 return Err(GitError::GitFailed(status.code())); 218 return Err(GitError::GitFailed(status.code()));
156 } 219 }
@@ -206,7 +269,7 @@ pub async fn handle_receive_pack(
206 } 269 }
207 270
208 // GRASP Authorization Check 271 // GRASP Authorization Check
209 info!( 272 debug!(
210 "Authorizing push for {} owned by {} via database query", 273 "Authorizing push for {} owned by {} via database query",
211 identifier, owner_pubkey 274 identifier, owner_pubkey
212 ); 275 );
@@ -251,7 +314,10 @@ pub async fn handle_receive_pack(
251 stdin 314 stdin
252 .write_all(&request_body) 315 .write_all(&request_body)
253 .await 316 .await
254 .map_err(GitError::IoError)?; 317 .map_err(|e| {
318 error!("Failed to write to git receive-pack stdin: {}", e);
319 GitError::IoError(e)
320 })?;
255 drop(stdin); 321 drop(stdin);
256 } 322 }
257 323
@@ -264,7 +330,10 @@ pub async fn handle_receive_pack(
264 stdout 330 stdout
265 .read_to_end(&mut output) 331 .read_to_end(&mut output)
266 .await 332 .await
267 .map_err(GitError::IoError)?; 333 .map_err(|e| {
334 error!("Failed to read git receive-pack stdout: {}", e);
335 GitError::IoError(e)
336 })?;
268 } 337 }
269 338
270 if let Some(stderr) = git.take_stderr() { 339 if let Some(stderr) = git.take_stderr() {
@@ -272,14 +341,35 @@ pub async fn handle_receive_pack(
272 stderr 341 stderr
273 .read_to_end(&mut stderr_output) 342 .read_to_end(&mut stderr_output)
274 .await 343 .await
275 .map_err(GitError::IoError)?; 344 .map_err(|e| {
345 error!("Failed to read git receive-pack stderr: {}", e);
346 GitError::IoError(e)
347 })?;
276 } 348 }
277 349
278 // Wait for process 350 // Wait for process
279 let status = git.wait().await.map_err(GitError::IoError)?; 351 let status = git.wait().await.map_err(|e| {
352 error!("Failed to wait for git receive-pack process: {}", e);
353 GitError::IoError(e)
354 })?;
280 355
281 if !status.success() { 356 if !status.success() {
282 let stderr_str = String::from_utf8_lossy(&stderr_output); 357 let stderr_str = String::from_utf8_lossy(&stderr_output);
358
359 // Check if this is a git protocol error (exit code 128 with stderr)
360 // Protocol errors should be returned as HTTP 200 with ERR pkt-line
361 if is_git_protocol_error(status.code(), &stderr_output) {
362 warn!(
363 "Git receive-pack protocol error (returning ERR pkt-line): {}",
364 stderr_str
365 );
366 return Ok(build_git_protocol_error_response(
367 GitService::ReceivePack,
368 &stderr_str,
369 ));
370 }
371
372 // Transport errors (spawn failures, signals, etc.) remain as HTTP 500
283 error!("Git receive-pack failed: {}", stderr_str); 373 error!("Git receive-pack failed: {}", stderr_str);
284 return Err(GitError::GitFailed(status.code())); 374 return Err(GitError::GitFailed(status.code()));
285 } 375 }
diff --git a/src/git/mod.rs b/src/git/mod.rs
index b3fee69..1255b6f 100644
--- a/src/git/mod.rs
+++ b/src/git/mod.rs
@@ -253,7 +253,7 @@ pub fn update_ref(repo_path: &Path, ref_name: &str, commit_hash: &str) -> Result
253 return Err(format!("git update-ref failed: {}", stderr)); 253 return Err(format!("git update-ref failed: {}", stderr));
254 } 254 }
255 255
256 info!( 256 debug!(
257 "Updated ref {} to {} in {}", 257 "Updated ref {} to {} in {}",
258 ref_name, 258 ref_name,
259 commit_hash, 259 commit_hash,
diff --git a/src/git/sync.rs b/src/git/sync.rs
index 8401736..c24d16b 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -673,7 +673,7 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) ->
673 match git::update_ref(repo_path, ref_name, expected_commit) { 673 match git::update_ref(repo_path, ref_name, expected_commit) {
674 Ok(()) => { 674 Ok(()) => {
675 if current_commit.is_some() { 675 if current_commit.is_some() {
676 info!( 676 debug!(
677 "Updated {} to {} in {}", 677 "Updated {} to {} in {}",
678 ref_name, 678 ref_name,
679 expected_commit, 679 expected_commit,
@@ -681,7 +681,7 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) ->
681 ); 681 );
682 result.refs_updated += 1; 682 result.refs_updated += 1;
683 } else { 683 } else {
684 info!( 684 debug!(
685 "Created {} at {} in {}", 685 "Created {} at {} in {}",
686 ref_name, 686 ref_name,
687 expected_commit, 687 expected_commit,
@@ -707,7 +707,7 @@ pub fn align_repository_with_state(repo_path: &Path, state: &RepositoryState) ->
707 if let Some(head_commit) = state.get_branch_commit(branch_name) { 707 if let Some(head_commit) = state.get_branch_commit(branch_name) {
708 match git::try_set_head_if_available(repo_path, head_ref, head_commit) { 708 match git::try_set_head_if_available(repo_path, head_ref, head_commit) {
709 Ok(true) => { 709 Ok(true) => {
710 info!("Set HEAD to {} in {}", head_ref, repo_path.display()); 710 debug!("Set HEAD to {} in {}", head_ref, repo_path.display());
711 result.head_set = true; 711 result.head_set = true;
712 } 712 }
713 Ok(false) => { 713 Ok(false) => {
diff --git a/src/http/mod.rs b/src/http/mod.rs
index cfd7c52..76ffef3 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -345,7 +345,7 @@ impl Service<Request<Incoming>> for HttpService {
345 .unwrap()) 345 .unwrap())
346 } 346 }
347 Err(e) => { 347 Err(e) => {
348 tracing::error!("Git handler error: {}", e); 348 // Errors are already logged at their source with full context
349 let error_msg = format!("Git error: {}", e); 349 let error_msg = format!("Git error: {}", e);
350 Ok(add_cors_headers(Response::builder()) 350 Ok(add_cors_headers(Response::builder())
351 .status(e.status_code()) 351 .status(e.status_code())
diff --git a/src/main.rs b/src/main.rs
index 6769cf3..bf3aefb 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,8 +3,8 @@ use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
6use tracing::{error, info, warn, Level}; 6use tracing::{error, info, warn};
7use tracing_subscriber::FmtSubscriber; 7use tracing_subscriber::{EnvFilter, FmtSubscriber};
8 8
9use ngit_grasp::{ 9use ngit_grasp::{
10 config::{Config, DatabaseBackend}, 10 config::{Config, DatabaseBackend},
@@ -17,16 +17,16 @@ use ngit_grasp::{
17 17
18#[tokio::main] 18#[tokio::main]
19async fn main() -> Result<()> { 19async fn main() -> Result<()> {
20 // Initialize tracing 20 // Load configuration first (priority: CLI flags > env vars > .env file > defaults)
21 let config = Config::load()?;
22
23 // Initialize tracing with configured log level
21 let subscriber = FmtSubscriber::builder() 24 let subscriber = FmtSubscriber::builder()
22 .with_max_level(Level::DEBUG) 25 .with_env_filter(EnvFilter::new(&config.log_level))
23 .finish(); 26 .finish();
24 tracing::subscriber::set_global_default(subscriber)?; 27 tracing::subscriber::set_global_default(subscriber)?;
25 28
26 info!("Starting ngit-grasp with nostr-relay-builder..."); 29 info!("Starting ngit-grasp with log level: {}", config.log_level);
27
28 // Load configuration (priority: CLI flags > env vars > .env file > defaults)
29 let config = Config::load()?;
30 30
31 // Validate configuration and fail fast on fatal errors 31 // Validate configuration and fail fast on fatal errors
32 // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings 32 // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings
@@ -189,8 +189,8 @@ async fn main() -> Result<()> {
189 )); 189 ));
190 190
191 // Create throttle manager for rate limiting remote git servers 191 // Create throttle manager for rate limiting remote git servers
192 // Default: 5 concurrent requests per domain, 30 requests per minute per domain 192 // Default: 5 concurrent requests per domain, 60 requests per minute per domain
193 let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); 193 let throttle_manager = Arc::new(ThrottleManager::new(5, 60));
194 throttle_manager.set_context(sync_ctx.clone()); 194 throttle_manager.set_context(sync_ctx.clone());
195 throttle_manager.set_git_naughty_list(git_naughty_list.clone()); 195 throttle_manager.set_git_naughty_list(git_naughty_list.clone());
196 196
@@ -212,20 +212,49 @@ async fn main() -> Result<()> {
212 let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); 212 let http_write_policy = Arc::new(relay_with_db.write_policy.clone());
213 213
214 // Run server until shutdown signal, then cleanup 214 // Run server until shutdown signal, then cleanup
215 tokio::select! { 215 #[cfg(unix)]
216 result = http::run_server( 216 {
217 config, 217 use tokio::signal::unix::{signal, SignalKind};
218 relay_with_db.relay, 218 let mut sigterm = signal(SignalKind::terminate())?;
219 relay_with_db.database, 219
220 metrics, 220 tokio::select! {
221 purgatory, 221 result = http::run_server(
222 http_write_policy, 222 config,
223 http_rejected_index, 223 relay_with_db.relay,
224 ) => { 224 relay_with_db.database,
225 result? 225 metrics,
226 purgatory,
227 http_write_policy,
228 http_rejected_index,
229 ) => {
230 result?
231 }
232 _ = signal::ctrl_c() => {
233 info!("Received SIGINT (Ctrl+C), cleaning up...");
234 }
235 _ = sigterm.recv() => {
236 info!("Received SIGTERM, cleaning up...");
237 }
226 } 238 }
227 _ = signal::ctrl_c() => { 239 }
228 info!("Received shutdown signal, cleaning up..."); 240
241 #[cfg(not(unix))]
242 {
243 tokio::select! {
244 result = http::run_server(
245 config,
246 relay_with_db.relay,
247 relay_with_db.database,
248 metrics,
249 purgatory,
250 http_write_policy,
251 http_rejected_index,
252 ) => {
253 result?
254 }
255 _ = signal::ctrl_c() => {
256 info!("Received SIGINT (Ctrl+C), cleaning up...");
257 }
229 } 258 }
230 } 259 }
231 260
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index d056e46..7a05348 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -102,6 +102,62 @@ impl Nip34WritePolicy {
102 self.ctx.set_local_relay(relay); 102 self.ctx.set_local_relay(relay);
103 } 103 }
104 104
105 /// Extract repository identifier from event's 'd' tag.
106 ///
107 /// Used for structured logging when parsing fails - we try to extract
108 /// the identifier even if full parsing failed.
109 fn extract_identifier_from_event(event: &Event) -> String {
110 use nostr_relay_builder::prelude::TagKind;
111 event
112 .tags
113 .iter()
114 .find(|t| t.kind() == TagKind::d())
115 .and_then(|t| t.content())
116 .map(|s| s.to_string())
117 .unwrap_or_else(|| "unknown".to_string())
118 }
119
120 /// Extract ALL repository identifiers from PR event's 'a' tags.
121 ///
122 /// PR events can reference multiple repositories via multiple 'a' tags
123 /// (e.g., when there are multiple maintainers). Each tag has format
124 /// `30617:<owner_pubkey>:<identifier>`.
125 ///
126 /// Returns a vector of unique identifiers, or `["unknown"]` if none found.
127 fn extract_repos_from_pr_event(event: &Event) -> Vec<String> {
128 let repos: Vec<String> = event
129 .tags
130 .iter()
131 .filter_map(|tag| {
132 let tag_vec = tag.clone().to_vec();
133 if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") {
134 // Format: 30617:<owner_pubkey>:<identifier>
135 let parts: Vec<&str> = tag_vec[1].split(':').collect();
136 if parts.len() >= 3 {
137 Some(parts[2].to_string())
138 } else {
139 None
140 }
141 } else {
142 None
143 }
144 })
145 .collect();
146
147 // Deduplicate while preserving order
148 let mut seen = std::collections::HashSet::new();
149 let unique_repos: Vec<String> = repos
150 .into_iter()
151 .filter(|r| seen.insert(r.clone()))
152 .collect();
153
154 if unique_repos.is_empty() {
155 vec!["unknown".to_string()]
156 } else {
157 unique_repos
158 }
159 }
160
105 /// Handle repository announcement event 161 /// Handle repository announcement event
106 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { 162 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult {
107 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 163 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
@@ -133,10 +189,21 @@ impl Nip34WritePolicy {
133 WritePolicyResult::Accept 189 WritePolicyResult::Accept
134 } 190 }
135 Err(e) => { 191 Err(e) => {
192 let npub = event
193 .pubkey
194 .to_bech32()
195 .unwrap_or_else(|_| event.pubkey.to_hex());
196 let event_id_short = &event.id.to_hex()[..12];
197 // Try to extract repo identifier from 'd' tag even if parsing failed
198 let repo = Self::extract_identifier_from_event(event);
199 // Structured log for migration scripts
136 tracing::warn!( 200 tracing::warn!(
137 "Failed to parse repository announcement {}: {}", 201 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
138 event_id_str, 202 event.kind.as_u16(),
139 e 203 event_id_short,
204 e,
205 repo,
206 npub
140 ); 207 );
141 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) 208 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e))
142 } 209 }
@@ -185,10 +252,21 @@ impl Nip34WritePolicy {
185 WritePolicyResult::Accept 252 WritePolicyResult::Accept
186 } 253 }
187 Err(e) => { 254 Err(e) => {
255 let npub = event
256 .pubkey
257 .to_bech32()
258 .unwrap_or_else(|_| event.pubkey.to_hex());
259 let event_id_short = &event.id.to_hex()[..12];
260 // Try to extract repo identifier from 'd' tag even if parsing failed
261 let repo = Self::extract_identifier_from_event(event);
262 // Structured log for migration scripts
188 tracing::warn!( 263 tracing::warn!(
189 "Failed to parse maintainer announcement {}: {}", 264 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
190 event_id_str, 265 event.kind.as_u16(),
191 e 266 event_id_short,
267 e,
268 repo,
269 npub
192 ); 270 );
193 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) 271 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e))
194 } 272 }
@@ -211,8 +289,6 @@ impl Nip34WritePolicy {
211 /// * `event` - The state event to validate 289 /// * `event` - The state event to validate
212 /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) 290 /// * `is_synced` - True if this event came from proactive sync (vs user-submitted)
213 async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult { 291 async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult {
214 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
215
216 match self.state_policy.validate(event) { 292 match self.state_policy.validate(event) {
217 StateResult::Accept => { 293 StateResult::Accept => {
218 // Process state alignment asynchronously 294 // Process state alignment asynchronously
@@ -223,7 +299,22 @@ impl Nip34WritePolicy {
223 { 299 {
224 Ok(poilicy_result) => poilicy_result, 300 Ok(poilicy_result) => poilicy_result,
225 Err(e) => { 301 Err(e) => {
226 tracing::warn!("Failed to process state event {}: {}", event_id_str, e); 302 let npub = event
303 .pubkey
304 .to_bech32()
305 .unwrap_or_else(|_| event.pubkey.to_hex());
306 let event_id_short = &event.id.to_hex()[..12];
307 // Try to extract repo identifier from 'd' tag even if parsing failed
308 let repo = Self::extract_identifier_from_event(event);
309 // Structured log for migration scripts
310 tracing::warn!(
311 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
312 event.kind.as_u16(),
313 event_id_short,
314 e,
315 repo,
316 npub
317 );
227 // reject if processing failed 318 // reject if processing failed
228 WritePolicyResult::Reject { 319 WritePolicyResult::Reject {
229 status: false, 320 status: false,
@@ -233,7 +324,22 @@ impl Nip34WritePolicy {
233 } 324 }
234 } 325 }
235 StateResult::Reject(reason) => { 326 StateResult::Reject(reason) => {
236 tracing::warn!("Rejected repository state {}: {}", event_id_str, reason); 327 let npub = event
328 .pubkey
329 .to_bech32()
330 .unwrap_or_else(|_| event.pubkey.to_hex());
331 let event_id_short = &event.id.to_hex()[..12];
332 // Try to extract repo identifier from 'd' tag even if parsing failed
333 let repo = Self::extract_identifier_from_event(event);
334 // Structured log for migration scripts
335 tracing::warn!(
336 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
337 event.kind.as_u16(),
338 event_id_short,
339 reason,
340 repo,
341 npub
342 );
237 WritePolicyResult::reject(reason) 343 WritePolicyResult::reject(reason)
238 } 344 }
239 } 345 }
@@ -331,9 +437,12 @@ impl Nip34WritePolicy {
331 ); 437 );
332 438
333 // Add to purgatory 439 // Add to purgatory
334 self.ctx 440 self.ctx.purgatory.add_pr(
335 .purgatory 441 event.clone(),
336 .add_pr(event.clone(), event.id.to_hex(), commit.clone()); 442 event.id.to_hex(),
443 commit.clone(),
444 is_synced,
445 );
337 446
338 WritePolicyResult::Reject { 447 WritePolicyResult::Reject {
339 status: true, // Client sees OK 448 status: true, // Client sees OK
@@ -351,11 +460,25 @@ impl Nip34WritePolicy {
351 } 460 }
352 Err(e) => { 461 Err(e) => {
353 // Error checking git data - reject event 462 // Error checking git data - reject event
354 tracing::warn!( 463 let npub = event
355 "Failed to check git data for PR event {}: {}", 464 .pubkey
356 event_id_str, 465 .to_bech32()
357 e 466 .unwrap_or_else(|_| event.pubkey.to_hex());
358 ); 467 let event_id_short = &event.id.to_hex()[..12];
468 // Extract ALL repo identifiers from 'a' tags for PR events
469 // (PR events can reference multiple repos when there are multiple maintainers)
470 let repos = Self::extract_repos_from_pr_event(event);
471 // Structured log for migration scripts - log once per repo
472 for repo in &repos {
473 tracing::warn!(
474 "[PARSE_FAIL] kind={} event_id={}... reason=\"git data check failed: {}\" repo={} npub={}",
475 event.kind.as_u16(),
476 event_id_short,
477 e,
478 repo,
479 npub
480 );
481 }
359 WritePolicyResult::reject(format!("Failed to check git data: {}", e)) 482 WritePolicyResult::reject(format!("Failed to check git data: {}", e))
360 } 483 }
361 } 484 }
@@ -462,9 +585,11 @@ impl Nip34WritePolicy {
462 let (addressable_refs, event_refs) = 585 let (addressable_refs, event_refs) =
463 RelatedEventPolicy::extract_reference_tags(event); 586 RelatedEventPolicy::extract_reference_tags(event);
464 tracing::info!( 587 tracing::info!(
465 "Rejected orphan {} event {}: no references to accepted repos or events (checked {} addressable, {} event refs)", 588 "Rejected orphan {} event {} (kind={}, pubkey={}): no references to accepted repos or events (checked {} addressable, {} event refs)",
466 event_type, 589 event_type,
467 event_id_str, 590 event.id.to_hex(),
591 event.kind.as_u16(),
592 event.pubkey.to_hex(),
468 addressable_refs.len(), 593 addressable_refs.len(),
469 event_refs.len() 594 event_refs.len()
470 ); 595 );
diff --git a/src/nostr/events.rs b/src/nostr/events.rs
index b9784f7..a441742 100644
--- a/src/nostr/events.rs
+++ b/src/nostr/events.rs
@@ -424,7 +424,7 @@ pub fn validate_announcement(
424 { 424 {
425 return AnnouncementResult::Reject(format!( 425 return AnnouncementResult::Reject(format!(
426 "Announcement lists service but does not match repository whitelist. \ 426 "Announcement lists service but does not match repository whitelist. \
427 Repository {}/{} not in whitelist", 427 Repository {}/{} not in whitelist",
428 npub, announcement.identifier 428 npub, announcement.identifier
429 )); 429 ));
430 } 430 }
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index e6de54e..df743ae 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -276,9 +276,12 @@ impl StatePolicy {
276 276
277 // If no git data - add to purgatory 277 // If no git data - add to purgatory
278 // (add_state automatically enqueues for background sync) 278 // (add_state automatically enqueues for background sync)
279 self.ctx 279 self.ctx.purgatory.add_state(
280 .purgatory 280 event.clone(),
281 .add_state(event.clone(), state.identifier.clone(), event.pubkey); 281 state.identifier.clone(),
282 event.pubkey,
283 is_synced,
284 );
282 285
283 tracing::info!( 286 tracing::info!(
284 "state event added to purgatory: eventid: {}, identifier: {}", 287 "state event added to purgatory: eventid: {}, identifier: {}",
diff --git a/src/purgatory/helpers.rs b/src/purgatory/helpers.rs
index 193ef99..a9f6e66 100644
--- a/src/purgatory/helpers.rs
+++ b/src/purgatory/helpers.rs
@@ -225,6 +225,117 @@ pub fn get_unpushed_refs(event: &Event, pushed_refs: &[RefPair]) -> Vec<RefPair>
225 .collect() 225 .collect()
226} 226}
227 227
228/// Diagnose why a state event doesn't match the push.
229///
230/// Returns a human-readable explanation of the mismatch between the state event
231/// and what would result from applying the push to local refs.
232///
233/// # Arguments
234/// * `event` - The state event to check
235/// * `pushed_updates` - Ref updates in the current push operation
236/// * `local_refs` - Refs already existing locally (ref_name -> SHA)
237///
238/// # Returns
239/// String explaining why the state doesn't match, or None if it matches
240pub fn diagnose_state_mismatch(
241 event: &Event,
242 pushed_updates: &[RefUpdate],
243 local_refs: &HashMap<String, String>,
244) -> Option<String> {
245 let state_refs = extract_refs_from_state(event);
246
247 // Filter local_refs to only branches and tags
248 let mut would_be_state: HashMap<String, String> = local_refs
249 .iter()
250 .filter(|(ref_name, _)| {
251 ref_name.starts_with("refs/heads/") || ref_name.starts_with("refs/tags/")
252 })
253 .map(|(k, v)| (k.clone(), v.clone()))
254 .collect();
255
256 // Apply all pushed updates to create the would-be state
257 for update in pushed_updates {
258 // Only process branches and tags
259 if !update.ref_name.starts_with("refs/heads/") && !update.ref_name.starts_with("refs/tags/")
260 {
261 continue;
262 }
263
264 if update.is_deletion() {
265 would_be_state.remove(&update.ref_name);
266 } else {
267 would_be_state.insert(update.ref_name.clone(), update.new_oid.clone());
268 }
269 }
270
271 // Convert event's state refs to a HashMap for comparison
272 let declared_state: HashMap<String, String> = state_refs
273 .into_iter()
274 .map(|r| (r.ref_name, r.object_sha))
275 .collect();
276
277 // Check if they match
278 if would_be_state == declared_state {
279 return None; // No mismatch
280 }
281
282 // Build diagnostic message
283 let mut reasons = Vec::new();
284
285 // Check for refs in declared state but not in would-be state
286 for (ref_name, declared_sha) in &declared_state {
287 if let Some(would_be_sha) = would_be_state.get(ref_name) {
288 if would_be_sha != declared_sha {
289 let would_be_short = if would_be_sha.len() >= 8 {
290 &would_be_sha[..8]
291 } else {
292 would_be_sha.as_str()
293 };
294 let declared_short = if declared_sha.len() >= 8 {
295 &declared_sha[..8]
296 } else {
297 declared_sha.as_str()
298 };
299 reasons.push(format!(
300 "{} would be at {} but state declares {}",
301 ref_name, would_be_short, declared_short
302 ));
303 }
304 } else {
305 let declared_short = if declared_sha.len() >= 8 {
306 &declared_sha[..8]
307 } else {
308 declared_sha.as_str()
309 };
310 reasons.push(format!(
311 "{} missing (state declares {})",
312 ref_name, declared_short
313 ));
314 }
315 }
316
317 // Check for refs in would-be state but not in declared state
318 for (ref_name, would_be_sha) in &would_be_state {
319 if !declared_state.contains_key(ref_name) {
320 let would_be_short = if would_be_sha.len() >= 8 {
321 &would_be_sha[..8]
322 } else {
323 would_be_sha.as_str()
324 };
325 reasons.push(format!(
326 "{} would exist at {} but state doesn't declare it",
327 ref_name, would_be_short
328 ));
329 }
330 }
331
332 if reasons.is_empty() {
333 Some("Unknown mismatch".to_string())
334 } else {
335 Some(reasons.join("; "))
336 }
337}
338
228#[cfg(test)] 339#[cfg(test)]
229mod tests { 340mod tests {
230 use super::*; 341 use super::*;
@@ -695,4 +806,97 @@ mod tests {
695 // Should return true - real OID exists, symbolic ref skipped 806 // Should return true - real OID exists, symbolic ref skipped
696 assert!(can_apply_state(&event, repo_path)); 807 assert!(can_apply_state(&event, repo_path));
697 } 808 }
809
810 #[test]
811 fn test_diagnose_state_mismatch_missing_ref() {
812 // State declares both main and test branches
813 let event = create_test_state_event(
814 "test-repo",
815 vec![("refs/heads/main", "abc123"), ("refs/heads/test", "def456")],
816 );
817
818 // Push only creates test branch
819 let pushed_updates = vec![RefUpdate {
820 old_oid: "0000000000000000000000000000000000000000".to_string(),
821 new_oid: "def456".to_string(),
822 ref_name: "refs/heads/test".to_string(),
823 }];
824
825 // No local refs
826 let local_refs = HashMap::new();
827
828 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
829 assert!(diagnosis.is_some());
830 let msg = diagnosis.unwrap();
831 assert!(msg.contains("refs/heads/main"));
832 assert!(msg.contains("missing"));
833 }
834
835 #[test]
836 fn test_diagnose_state_mismatch_wrong_sha() {
837 // State declares main at abc123
838 let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]);
839
840 // Push updates main to different SHA
841 let pushed_updates = vec![RefUpdate {
842 old_oid: "0000000000000000000000000000000000000000".to_string(),
843 new_oid: "wrong123".to_string(),
844 ref_name: "refs/heads/main".to_string(),
845 }];
846
847 let local_refs = HashMap::new();
848
849 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
850 assert!(diagnosis.is_some());
851 let msg = diagnosis.unwrap();
852 assert!(msg.contains("refs/heads/main"));
853 assert!(msg.contains("would be at"));
854 assert!(msg.contains("state declares"));
855 }
856
857 #[test]
858 fn test_diagnose_state_mismatch_extra_ref() {
859 // State declares only main
860 let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]);
861
862 // Push creates both main and test
863 let pushed_updates = vec![
864 RefUpdate {
865 old_oid: "0000000000000000000000000000000000000000".to_string(),
866 new_oid: "abc123".to_string(),
867 ref_name: "refs/heads/main".to_string(),
868 },
869 RefUpdate {
870 old_oid: "0000000000000000000000000000000000000000".to_string(),
871 new_oid: "def456".to_string(),
872 ref_name: "refs/heads/test".to_string(),
873 },
874 ];
875
876 let local_refs = HashMap::new();
877
878 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
879 assert!(diagnosis.is_some());
880 let msg = diagnosis.unwrap();
881 assert!(msg.contains("refs/heads/test"));
882 assert!(msg.contains("doesn't declare"));
883 }
884
885 #[test]
886 fn test_diagnose_state_mismatch_no_mismatch() {
887 // State declares main
888 let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]);
889
890 // Push creates main at correct SHA
891 let pushed_updates = vec![RefUpdate {
892 old_oid: "0000000000000000000000000000000000000000".to_string(),
893 new_oid: "abc123".to_string(),
894 ref_name: "refs/heads/main".to_string(),
895 }];
896
897 let local_refs = HashMap::new();
898
899 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
900 assert!(diagnosis.is_none()); // No mismatch
901 }
698} 902}
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 9a63bf6..bb6ff54 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -16,11 +16,12 @@ pub mod persistence;
16pub mod sync; 16pub mod sync;
17mod types; 17mod types;
18 18
19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 19pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs};
20pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 20pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 21
22use dashmap::DashMap; 22use dashmap::DashMap;
23use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use nostr_sdk::ToBech32;
24use serde::{Deserialize, Serialize}; 25use serde::{Deserialize, Serialize};
25use std::collections::HashMap; 26use std::collections::HashMap;
26use std::collections::HashSet; 27use std::collections::HashSet;
@@ -64,6 +65,9 @@ struct SerializableStatePurgatoryEntry {
64 created_at_offset_secs: u64, 65 created_at_offset_secs: u64,
65 /// Duration offset from saved_at for expires_at 66 /// Duration offset from saved_at for expires_at
66 expires_at_offset_secs: u64, 67 expires_at_offset_secs: u64,
68 /// Source of this event (direct submission vs sync)
69 #[serde(default)]
70 source: types::EventSource,
67} 71}
68 72
69/// Serializable wrapper for `PrPurgatoryEntry` with time offsets. 73/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
@@ -81,6 +85,9 @@ struct SerializablePrPurgatoryEntry {
81 created_at_offset_secs: u64, 85 created_at_offset_secs: u64,
82 /// Duration offset from saved_at for expires_at 86 /// Duration offset from saved_at for expires_at
83 expires_at_offset_secs: u64, 87 expires_at_offset_secs: u64,
88 /// Source of this event (direct submission vs sync)
89 #[serde(default)]
90 source: types::EventSource,
84} 91}
85 92
86/// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. 93/// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets.
@@ -313,11 +320,38 @@ impl Purgatory {
313 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately 320 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately
314 /// to override this delay. 321 /// to override this delay.
315 /// 322 ///
323 /// If an event already exists in purgatory with `Sync` source and the new submission
324 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
325 ///
316 /// # Arguments 326 /// # Arguments
317 /// * `event` - The state event (kind 30618) to hold 327 /// * `event` - The state event (kind 30618) to hold
318 /// * `identifier` - The repository identifier from the 'd' tag 328 /// * `identifier` - The repository identifier from the 'd' tag
319 /// * `author` - The event author's public key 329 /// * `author` - The event author's public key
320 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { 330 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
331 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) {
332 let source = if from_sync {
333 types::EventSource::Sync
334 } else {
335 types::EventSource::Direct
336 };
337
338 // Check if event already exists - if so, potentially upgrade source
339 if let Some(mut entries) = self.state_events.get_mut(&identifier) {
340 if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) {
341 // Upgrade source from Sync to Direct if new submission is direct
342 if existing.source == types::EventSource::Sync && !from_sync {
343 existing.source = types::EventSource::Direct;
344 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
345 tracing::debug!(
346 event_id = %event.id,
347 identifier = %identifier,
348 "Upgraded purgatory entry source from Sync to Direct, reset expiry"
349 );
350 }
351 return; // Event already exists, don't add duplicate
352 }
353 }
354
321 let now = Instant::now(); 355 let now = Instant::now();
322 let entry = StatePurgatoryEntry { 356 let entry = StatePurgatoryEntry {
323 event, 357 event,
@@ -325,6 +359,7 @@ impl Purgatory {
325 author, 359 author,
326 created_at: now, 360 created_at: now,
327 expires_at: now + DEFAULT_EXPIRY, 361 expires_at: now + DEFAULT_EXPIRY,
362 source,
328 }; 363 };
329 364
330 self.state_events 365 self.state_events
@@ -344,11 +379,35 @@ impl Purgatory {
344 /// Automatically enqueues the referenced repository identifier for background sync 379 /// Automatically enqueues the referenced repository identifier for background sync
345 /// with the default delay (3 minutes), giving time for a git push to arrive. 380 /// with the default delay (3 minutes), giving time for a git push to arrive.
346 /// 381 ///
382 /// If an event already exists in purgatory with `Sync` source and the new submission
383 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
384 ///
347 /// # Arguments 385 /// # Arguments
348 /// * `event` - The PR event (kind 1617/1618) to hold 386 /// * `event` - The PR event (kind 1617/1618) to hold
349 /// * `event_id` - The event ID (hex string) from the 'e' tag 387 /// * `event_id` - The event ID (hex string) from the 'e' tag
350 /// * `commit` - The commit SHA from the 'c' tag 388 /// * `commit` - The commit SHA from the 'c' tag
351 pub fn add_pr(&self, event: Event, event_id: String, commit: String) { 389 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
390 pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) {
391 let source = if from_sync {
392 types::EventSource::Sync
393 } else {
394 types::EventSource::Direct
395 };
396
397 // Check if event already exists - if so, potentially upgrade source
398 if let Some(mut existing) = self.pr_events.get_mut(&event_id) {
399 // Upgrade source from Sync to Direct if new submission is direct
400 if existing.source == types::EventSource::Sync && !from_sync {
401 existing.source = types::EventSource::Direct;
402 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
403 tracing::debug!(
404 event_id = %event_id,
405 "Upgraded PR purgatory entry source from Sync to Direct, reset expiry"
406 );
407 }
408 return; // Event already exists, don't add duplicate
409 }
410
352 // Extract identifier from the event's `a` tag for sync enqueueing 411 // Extract identifier from the event's `a` tag for sync enqueueing
353 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); 412 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event);
354 413
@@ -358,6 +417,7 @@ impl Purgatory {
358 commit, 417 commit,
359 created_at: now, 418 created_at: now,
360 expires_at: now + DEFAULT_EXPIRY, 419 expires_at: now + DEFAULT_EXPIRY,
420 source,
361 }; 421 };
362 422
363 self.pr_events.insert(event_id, entry); 423 self.pr_events.insert(event_id, entry);
@@ -371,6 +431,8 @@ impl Purgatory {
371 /// Add a PR placeholder (git data arrived before PR event). 431 /// Add a PR placeholder (git data arrived before PR event).
372 /// 432 ///
373 /// Creates a placeholder entry waiting for the corresponding PR event. 433 /// Creates a placeholder entry waiting for the corresponding PR event.
434 /// Placeholders are always marked as `Direct` source since they originate
435 /// from git pushes (direct user action).
374 /// 436 ///
375 /// # Arguments 437 /// # Arguments
376 /// * `event_id` - The expected event ID (from git ref name) 438 /// * `event_id` - The expected event ID (from git ref name)
@@ -382,6 +444,7 @@ impl Purgatory {
382 commit, 444 commit,
383 created_at: now, 445 created_at: now,
384 expires_at: now + DEFAULT_EXPIRY, 446 expires_at: now + DEFAULT_EXPIRY,
447 source: types::EventSource::Direct, // Git pushes are direct user actions
385 }; 448 };
386 449
387 self.pr_events.insert(event_id, entry); 450 self.pr_events.insert(event_id, entry);
@@ -892,6 +955,9 @@ impl Purgatory {
892 /// prevent infinite re-sync loops. Events that expire without finding git data 955 /// prevent infinite re-sync loops. Events that expire without finding git data
893 /// will be filtered out during future negentropy/REQ sync operations. 956 /// will be filtered out during future negentropy/REQ sync operations.
894 /// 957 ///
958 /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event
959 /// to support migration scripts and operational monitoring.
960 ///
895 /// # Returns 961 /// # Returns
896 /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) 962 /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed)
897 pub fn cleanup(&self) -> (usize, usize, usize) { 963 pub fn cleanup(&self) -> (usize, usize, usize) {
@@ -976,18 +1042,38 @@ impl Purgatory {
976 let mut state_removed = 0; 1042 let mut state_removed = 0;
977 1043
978 // Remove expired state events and mark them as expired 1044 // Remove expired state events and mark them as expired
979 self.state_events.retain(|_, entries| { 1045 self.state_events.retain(|identifier, entries| {
980 let original_len = entries.len(); 1046 let original_len = entries.len();
981 // Collect event IDs before removing
982 let expired_ids: Vec<EventId> = entries
983 .iter()
984 .filter(|entry| entry.expires_at <= now)
985 .map(|entry| entry.event.id)
986 .collect();
987 1047
988 // Mark as expired to prevent re-sync 1048 // Log and collect expired entries before removing
989 for event_id in expired_ids { 1049 for entry in entries.iter().filter(|e| e.expires_at <= now) {
990 self.mark_expired(event_id); 1050 let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex());
1051 let event_id_short = &entry.event.id.to_hex()[..12];
1052 let source_str = if entry.source.is_direct() { "direct" } else { "sync" };
1053
1054 // Structured log for migration scripts
1055 // Direct submissions log at WARN, synced events at DEBUG
1056 if entry.source.is_direct() {
1057 tracing::warn!(
1058 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
1059 identifier,
1060 npub,
1061 event_id_short,
1062 entry.event.kind.as_u16(),
1063 source_str
1064 );
1065 } else {
1066 tracing::debug!(
1067 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
1068 identifier,
1069 npub,
1070 event_id_short,
1071 entry.event.kind.as_u16(),
1072 source_str
1073 );
1074 }
1075
1076 self.mark_expired(entry.event.id);
991 } 1077 }
992 1078
993 // Remove expired entries 1079 // Remove expired entries
@@ -997,21 +1083,103 @@ impl Purgatory {
997 }); 1083 });
998 1084
999 // Remove expired PR events and mark them as expired 1085 // Remove expired PR events and mark them as expired
1000 let expired_prs: Vec<(String, Option<EventId>)> = self 1086 let expired_prs: Vec<_> = self
1001 .pr_events 1087 .pr_events
1002 .iter() 1088 .iter()
1003 .filter(|entry| entry.value().expires_at <= now) 1089 .filter(|entry| entry.value().expires_at <= now)
1004 .map(|entry| { 1090 .map(|entry| {
1005 let event_id = entry.value().event.as_ref().map(|e| e.id); 1091 let pr_entry = entry.value();
1006 (entry.key().clone(), event_id) 1092 let event_id_str = entry.key().clone();
1093 let event_opt = pr_entry.event.clone();
1094 let commit = pr_entry.commit.clone();
1095 let source = pr_entry.source;
1096 (event_id_str, event_opt, commit, source)
1007 }) 1097 })
1008 .collect(); 1098 .collect();
1009 1099
1010 let pr_removed = expired_prs.len(); 1100 let pr_removed = expired_prs.len();
1011 for (event_id_str, event_id_opt) in expired_prs { 1101 for (event_id_str, event_opt, commit, source) in expired_prs {
1012 // Mark actual PR events as expired (not placeholders) 1102 // Log structured entry for PR events (not placeholders)
1013 if let Some(event_id) = event_id_opt { 1103 if let Some(ref event) = event_opt {
1014 self.mark_expired(event_id); 1104 let npub = event
1105 .pubkey
1106 .to_bech32()
1107 .unwrap_or_else(|_| event.pubkey.to_hex());
1108 let event_id_short = &event.id.to_hex()[..12];
1109 let source_str = if source.is_direct() { "direct" } else { "sync" };
1110
1111 // Extract ALL repo identifiers from 'a' tags
1112 // (PR events can reference multiple repos when there are multiple maintainers)
1113 let repos: Vec<String> = event
1114 .tags
1115 .iter()
1116 .filter_map(|tag| {
1117 let tag_vec = tag.clone().to_vec();
1118 if tag_vec.len() >= 2
1119 && tag_vec[0] == "a"
1120 && tag_vec[1].starts_with("30617:")
1121 {
1122 // Format: 30617:<owner_pubkey>:<identifier>
1123 let parts: Vec<&str> = tag_vec[1].split(':').collect();
1124 if parts.len() >= 3 {
1125 Some(parts[2].to_string())
1126 } else {
1127 None
1128 }
1129 } else {
1130 None
1131 }
1132 })
1133 .collect();
1134
1135 // Deduplicate while preserving order
1136 let mut seen = std::collections::HashSet::new();
1137 let unique_repos: Vec<String> = repos
1138 .into_iter()
1139 .filter(|r| seen.insert(r.clone()))
1140 .collect();
1141
1142 let repos_to_log = if unique_repos.is_empty() {
1143 vec!["unknown".to_string()]
1144 } else {
1145 unique_repos
1146 };
1147
1148 // Structured log for migration scripts - log once per repo
1149 // Direct submissions log at WARN, synced events at DEBUG
1150 for repo in &repos_to_log {
1151 if source.is_direct() {
1152 tracing::warn!(
1153 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
1154 repo,
1155 npub,
1156 event_id_short,
1157 event.kind.as_u16(),
1158 &commit[..commit.len().min(12)],
1159 source_str
1160 );
1161 } else {
1162 tracing::debug!(
1163 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
1164 repo,
1165 npub,
1166 event_id_short,
1167 event.kind.as_u16(),
1168 &commit[..commit.len().min(12)],
1169 source_str
1170 );
1171 }
1172 }
1173
1174 self.mark_expired(event.id);
1175 } else {
1176 // Placeholder (git data arrived first, but PR event never came)
1177 // Placeholders are always Direct source (from git push)
1178 tracing::debug!(
1179 "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"",
1180 &event_id_str[..event_id_str.len().min(12)],
1181 &commit[..commit.len().min(12)]
1182 );
1015 } 1183 }
1016 self.pr_events.remove(&event_id_str); 1184 self.pr_events.remove(&event_id_str);
1017 } 1185 }
@@ -1191,6 +1359,7 @@ impl Purgatory {
1191 author: e.author, 1359 author: e.author,
1192 created_at_offset_secs: created_offset.as_secs(), 1360 created_at_offset_secs: created_offset.as_secs(),
1193 expires_at_offset_secs: expires_offset.as_secs(), 1361 expires_at_offset_secs: expires_offset.as_secs(),
1362 source: e.source,
1194 } 1363 }
1195 }) 1364 })
1196 .collect(); 1365 .collect();
@@ -1213,6 +1382,7 @@ impl Purgatory {
1213 commit: e.commit.clone(), 1382 commit: e.commit.clone(),
1214 created_at_offset_secs: created_offset.as_secs(), 1383 created_at_offset_secs: created_offset.as_secs(),
1215 expires_at_offset_secs: expires_offset.as_secs(), 1384 expires_at_offset_secs: expires_offset.as_secs(),
1385 source: e.source,
1216 }; 1386 };
1217 pr_events.insert(event_id, serializable); 1387 pr_events.insert(event_id, serializable);
1218 } 1388 }
@@ -1355,6 +1525,7 @@ impl Purgatory {
1355 author: e.author, 1525 author: e.author,
1356 created_at, 1526 created_at,
1357 expires_at, 1527 expires_at,
1528 source: e.source,
1358 } 1529 }
1359 }) 1530 })
1360 .collect(); 1531 .collect();
@@ -1380,6 +1551,7 @@ impl Purgatory {
1380 commit: e.commit, 1551 commit: e.commit,
1381 created_at, 1552 created_at,
1382 expires_at, 1553 expires_at,
1554 source: e.source,
1383 }; 1555 };
1384 1556
1385 self.pr_events.insert(event_id, entry); 1557 self.pr_events.insert(event_id, entry);
@@ -1439,8 +1611,18 @@ mod tests {
1439 .sign_with_keys(&keys) 1611 .sign_with_keys(&keys)
1440 .unwrap(); 1612 .unwrap();
1441 1613
1442 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); 1614 purgatory.add_state(
1443 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); 1615 event.clone(),
1616 "test-repo".to_string(),
1617 keys.public_key(),
1618 false,
1619 );
1620 purgatory.add_pr(
1621 event,
1622 "test-event-id".to_string(),
1623 "abc123".to_string(),
1624 false,
1625 );
1444 1626
1445 let (announcement_count, state_count, pr_count) = purgatory.count(); 1627 let (announcement_count, state_count, pr_count) = purgatory.count();
1446 assert_eq!(announcement_count, 0); 1628 assert_eq!(announcement_count, 0);
@@ -1492,7 +1674,7 @@ mod tests {
1492 let event = EventBuilder::text_note("state") 1674 let event = EventBuilder::text_note("state")
1493 .sign_with_keys(&keys) 1675 .sign_with_keys(&keys)
1494 .unwrap(); 1676 .unwrap();
1495 purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); 1677 purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false);
1496 1678
1497 // Now should have pending events 1679 // Now should have pending events
1498 assert!(purgatory.has_pending_events("test-repo")); 1680 assert!(purgatory.has_pending_events("test-repo"));
@@ -1522,7 +1704,12 @@ mod tests {
1522 .sign_with_keys(&keys) 1704 .sign_with_keys(&keys)
1523 .unwrap(); 1705 .unwrap();
1524 1706
1525 purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); 1707 purgatory.add_pr(
1708 event,
1709 "pr-event-id".to_string(),
1710 "commit123".to_string(),
1711 false,
1712 );
1526 1713
1527 // Now should have pending events for test-repo 1714 // Now should have pending events for test-repo
1528 assert!(purgatory.has_pending_events("test-repo")); 1715 assert!(purgatory.has_pending_events("test-repo"));
@@ -1587,6 +1774,7 @@ fn test_pr_event_vs_placeholder() {
1587 event.clone(), 1774 event.clone(),
1588 "event-id-1".to_string(), 1775 "event-id-1".to_string(),
1589 "commit-abc".to_string(), 1776 "commit-abc".to_string(),
1777 false,
1590 ); 1778 );
1591 1779
1592 // Add a placeholder (no event) 1780 // Add a placeholder (no event)
@@ -1643,8 +1831,14 @@ fn test_cleanup_removes_expired_entries() {
1643 state_event.clone(), 1831 state_event.clone(),
1644 "test-repo".to_string(), 1832 "test-repo".to_string(),
1645 keys.public_key(), 1833 keys.public_key(),
1834 false,
1835 );
1836 purgatory.add_pr(
1837 pr_event,
1838 "pr-123".to_string(),
1839 "commit-abc".to_string(),
1840 false,
1646 ); 1841 );
1647 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string());
1648 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); 1842 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string());
1649 1843
1650 // Verify entries are there 1844 // Verify entries are there
@@ -1691,8 +1885,18 @@ fn test_cleanup_preserves_non_expired_entries() {
1691 .unwrap(); 1885 .unwrap();
1692 1886
1693 // Add fresh entries 1887 // Add fresh entries
1694 purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); 1888 purgatory.add_state(
1695 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); 1889 state_event,
1890 "test-repo".to_string(),
1891 keys.public_key(),
1892 false,
1893 );
1894 purgatory.add_pr(
1895 pr_event,
1896 "pr-123".to_string(),
1897 "commit-abc".to_string(),
1898 false,
1899 );
1696 1900
1697 // Run cleanup 1901 // Run cleanup
1698 let (_, state_removed, pr_removed) = purgatory.cleanup(); 1902 let (_, state_removed, pr_removed) = purgatory.cleanup();
@@ -1722,8 +1926,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1722 .sign_with_keys(&keys) 1926 .sign_with_keys(&keys)
1723 .unwrap(); 1927 .unwrap();
1724 1928
1725 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); 1929 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false);
1726 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); 1930 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false);
1727 1931
1728 // Expire only the first one 1932 // Expire only the first one
1729 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { 1933 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") {
@@ -1740,8 +1944,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1740 .sign_with_keys(&keys) 1944 .sign_with_keys(&keys)
1741 .unwrap(); 1945 .unwrap();
1742 1946
1743 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); 1947 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false);
1744 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); 1948 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false);
1745 1949
1746 // Expire only first PR 1950 // Expire only first PR
1747 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { 1951 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") {
@@ -1773,8 +1977,8 @@ fn test_remove_expired_legacy_method() {
1773 .unwrap(); 1977 .unwrap();
1774 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); 1978 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap();
1775 1979
1776 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 1980 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1777 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 1981 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1778 1982
1779 // Expire both 1983 // Expire both
1780 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1984 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1808,8 +2012,8 @@ fn test_expired_event_tracking() {
1808 let pr_event_id = pr_event.id; 2012 let pr_event_id = pr_event.id;
1809 2013
1810 // Add events to purgatory 2014 // Add events to purgatory
1811 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 2015 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1812 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 2016 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1813 2017
1814 // Events should not be marked as expired yet 2018 // Events should not be marked as expired yet
1815 assert!(!purgatory.is_expired(&state_event_id)); 2019 assert!(!purgatory.is_expired(&state_event_id));
@@ -1861,7 +2065,7 @@ fn test_cleanup_expired_events() {
1861 let event2_id = event2.id; 2065 let event2_id = event2.id;
1862 2066
1863 // Add and immediately expire event1 2067 // Add and immediately expire event1
1864 purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); 2068 purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false);
1865 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { 2069 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") {
1866 for entry in entries.iter_mut() { 2070 for entry in entries.iter_mut() {
1867 entry.expires_at = Instant::now() - Duration::from_secs(1); 2071 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1870,7 +2074,7 @@ fn test_cleanup_expired_events() {
1870 purgatory.cleanup(); 2074 purgatory.cleanup();
1871 2075
1872 // Add and expire event2 (will be more recent) 2076 // Add and expire event2 (will be more recent)
1873 purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); 2077 purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false);
1874 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { 2078 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") {
1875 for entry in entries.iter_mut() { 2079 for entry in entries.iter_mut() {
1876 entry.expires_at = Instant::now() - Duration::from_secs(1); 2080 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1912,7 +2116,7 @@ fn test_expired_events_prevent_readdition() {
1912 let event_id = event.id; 2116 let event_id = event.id;
1913 2117
1914 // Add event to purgatory 2118 // Add event to purgatory
1915 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2119 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1916 2120
1917 // Expire it 2121 // Expire it
1918 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 2122 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1932,7 +2136,7 @@ fn test_expired_events_prevent_readdition() {
1932 // This simulates what negentropy/REQ+EOSE should do: 2136 // This simulates what negentropy/REQ+EOSE should do:
1933 // Check if event is in event_ids() before adding 2137 // Check if event is in event_ids() before adding
1934 if !ids.contains(&event_id) { 2138 if !ids.contains(&event_id) {
1935 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2139 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
1936 } 2140 }
1937 2141
1938 // Event should NOT be re-added 2142 // Event should NOT be re-added
@@ -1975,7 +2179,7 @@ fn test_user_can_resubmit_expired_event() {
1975 let event_id = event.id; 2179 let event_id = event.id;
1976 2180
1977 // Add event to purgatory 2181 // Add event to purgatory
1978 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2182 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1979 2183
1980 // Expire it 2184 // Expire it
1981 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 2185 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -2024,8 +2228,18 @@ async fn test_save_and_restore_state_events() {
2024 let event1_id = event1.id; 2228 let event1_id = event1.id;
2025 let event2_id = event2.id; 2229 let event2_id = event2.id;
2026 2230
2027 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); 2231 purgatory.add_state(
2028 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); 2232 event1.clone(),
2233 "test-repo".to_string(),
2234 keys.public_key(),
2235 false,
2236 );
2237 purgatory.add_state(
2238 event2.clone(),
2239 "test-repo".to_string(),
2240 keys.public_key(),
2241 false,
2242 );
2029 2243
2030 // Save to disk 2244 // Save to disk
2031 purgatory.save_to_disk(&state_file).unwrap(); 2245 purgatory.save_to_disk(&state_file).unwrap();
@@ -2087,6 +2301,7 @@ async fn test_save_and_restore_pr_events() {
2087 pr_event.clone(), 2301 pr_event.clone(),
2088 "pr-event-id".to_string(), 2302 "pr-event-id".to_string(),
2089 "commit-abc".to_string(), 2303 "commit-abc".to_string(),
2304 false,
2090 ); 2305 );
2091 2306
2092 // Save to disk 2307 // Save to disk
@@ -2156,7 +2371,7 @@ async fn test_save_and_restore_expired_events() {
2156 let event_id = event.id; 2371 let event_id = event.id;
2157 2372
2158 // Add and expire event 2373 // Add and expire event
2159 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2374 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
2160 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 2375 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
2161 for entry in entries.iter_mut() { 2376 for entry in entries.iter_mut() {
2162 entry.expires_at = Instant::now() - Duration::from_secs(1); 2377 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -2295,7 +2510,7 @@ async fn test_downtime_calculation() {
2295 .sign_with_keys(&keys) 2510 .sign_with_keys(&keys)
2296 .unwrap(); 2511 .unwrap();
2297 2512
2298 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2513 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
2299 2514
2300 // Get original expiry time 2515 // Get original expiry time
2301 let original_entries = purgatory.find_state("repo"); 2516 let original_entries = purgatory.find_state("repo");
@@ -2351,7 +2566,7 @@ async fn test_expiry_times_preserved() {
2351 .sign_with_keys(&keys) 2566 .sign_with_keys(&keys)
2352 .unwrap(); 2567 .unwrap();
2353 2568
2354 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2569 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
2355 2570
2356 // Manually set expiry to a specific time in the future 2571 // Manually set expiry to a specific time in the future
2357 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes 2572 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
@@ -2410,16 +2625,19 @@ async fn test_multiple_state_events_same_identifier() {
2410 event1.clone(), 2625 event1.clone(),
2411 "shared-repo".to_string(), 2626 "shared-repo".to_string(),
2412 keys1.public_key(), 2627 keys1.public_key(),
2628 false,
2413 ); 2629 );
2414 purgatory.add_state( 2630 purgatory.add_state(
2415 event2.clone(), 2631 event2.clone(),
2416 "shared-repo".to_string(), 2632 "shared-repo".to_string(),
2417 keys2.public_key(), 2633 keys2.public_key(),
2634 false,
2418 ); 2635 );
2419 purgatory.add_state( 2636 purgatory.add_state(
2420 event3.clone(), 2637 event3.clone(),
2421 "shared-repo".to_string(), 2638 "shared-repo".to_string(),
2422 keys3.public_key(), 2639 keys3.public_key(),
2640 false,
2423 ); 2641 );
2424 2642
2425 // Save to disk 2643 // Save to disk
@@ -2466,6 +2684,7 @@ async fn test_mixed_pr_events_and_placeholders() {
2466 pr_event.clone(), 2684 pr_event.clone(),
2467 "pr-with-event".to_string(), 2685 "pr-with-event".to_string(),
2468 "commit-abc".to_string(), 2686 "commit-abc".to_string(),
2687 false,
2469 ); 2688 );
2470 2689
2471 // Add PR placeholder 2690 // Add PR placeholder
@@ -2511,7 +2730,7 @@ async fn test_file_cleanup_after_successful_restore() {
2511 let event = EventBuilder::text_note("test") 2730 let event = EventBuilder::text_note("test")
2512 .sign_with_keys(&keys) 2731 .sign_with_keys(&keys)
2513 .unwrap(); 2732 .unwrap();
2514 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2733 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
2515 2734
2516 // Save to disk 2735 // Save to disk
2517 purgatory.save_to_disk(&state_file).unwrap(); 2736 purgatory.save_to_disk(&state_file).unwrap();
@@ -2697,8 +2916,18 @@ async fn test_comprehensive_roundtrip() {
2697 .sign_with_keys(&keys2) 2916 .sign_with_keys(&keys2)
2698 .unwrap(); 2917 .unwrap();
2699 2918
2700 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); 2919 purgatory.add_state(
2701 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); 2920 state1.clone(),
2921 "repo1".to_string(),
2922 keys1.public_key(),
2923 false,
2924 );
2925 purgatory.add_state(
2926 state2.clone(),
2927 "repo2".to_string(),
2928 keys2.public_key(),
2929 false,
2930 );
2702 2931
2703 // Add PR event 2932 // Add PR event
2704 let tags = vec![Tag::custom( 2933 let tags = vec![Tag::custom(
@@ -2709,7 +2938,12 @@ async fn test_comprehensive_roundtrip() {
2709 .tags(tags) 2938 .tags(tags)
2710 .sign_with_keys(&keys1) 2939 .sign_with_keys(&keys1)
2711 .unwrap(); 2940 .unwrap();
2712 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); 2941 purgatory.add_pr(
2942 pr_event.clone(),
2943 "pr-1".to_string(),
2944 "commit-1".to_string(),
2945 false,
2946 );
2713 2947
2714 // Add PR placeholder 2948 // Add PR placeholder
2715 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); 2949 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
@@ -2719,7 +2953,12 @@ async fn test_comprehensive_roundtrip() {
2719 .sign_with_keys(&keys1) 2953 .sign_with_keys(&keys1)
2720 .unwrap(); 2954 .unwrap();
2721 let expired_id = expired_event.id; 2955 let expired_id = expired_event.id;
2722 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); 2956 purgatory.add_state(
2957 expired_event,
2958 "repo3".to_string(),
2959 keys1.public_key(),
2960 false,
2961 );
2723 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { 2962 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2724 for entry in entries.iter_mut() { 2963 for entry in entries.iter_mut() {
2725 entry.expires_at = Instant::now() - Duration::from_secs(1); 2964 entry.expires_at = Instant::now() - Duration::from_secs(1);
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index ece8cd6..8297515 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -375,94 +375,121 @@ impl SyncContext for RealSyncContext {
375 let naughty_list = self.git_naughty_list.clone(); 375 let naughty_list = self.git_naughty_list.clone();
376 376
377 tokio::task::spawn_blocking(move || -> Result<Vec<String>> { 377 tokio::task::spawn_blocking(move || -> Result<Vec<String>> {
378 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history 378 let mut remaining_oids = missing_oids.clone();
379 let mut args = vec!["fetch", &url]; 379 let mut missing_from_remote: Vec<String> = Vec::new();
380 args.extend(missing_oids.iter().map(|s| s.as_str())); 380
381 381 // Retry loop: keep fetching until success or no OIDs left
382 let output = Command::new("git") 382 loop {
383 .args(&args) 383 if remaining_oids.is_empty() {
384 .current_dir(&repo_path) 384 // All OIDs were missing from remote
385 .output(); 385 debug!(
386 386 url = %url,
387 match output { 387 missing_count = missing_from_remote.len(),
388 Ok(result) if result.status.success() => { 388 "All requested OIDs missing from remote"
389 // Count how many OIDs we now have 389 );
390 let fetched: Vec<String> = missing_oids 390 return Ok(vec![]);
391 .iter()
392 .filter(|oid| crate::git::oid_exists(&repo_path, oid))
393 .cloned()
394 .collect();
395
396 debug!(fetched_count = fetched.len(), "Successfully fetched OIDs");
397
398 Ok(fetched)
399 } 391 }
400 Ok(result) => { 392
401 let stderr = String::from_utf8_lossy(&result.stderr); 393 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history
402 394 let mut args = vec!["fetch".to_string(), url.clone()];
403 // Extract domain and classify error for naughty list 395 args.extend(remaining_oids.iter().cloned());
404 if let Some(domain) = extract_domain(&url) { 396
405 if let Some(category) = NaughtyListTracker::classify_error(&stderr) { 397 let output = Command::new("git")
406 let is_new = naughty_list.record(&domain, category, stderr.to_string()); 398 .args(&args)
407 399 .current_dir(&repo_path)
408 if is_new { 400 .output();
409 tracing::warn!( 401
410 domain = %domain, 402 match output {
411 category = %category, 403 Ok(result) if result.status.success() => {
412 error = %stderr, 404 // Fetch succeeded - count how many OIDs we now have
413 "Git remote domain added to naughty list" 405 let fetched: Vec<String> = missing_oids
414 ); 406 .iter()
415 } else { 407 .filter(|oid| crate::git::oid_exists(&repo_path, oid))
416 debug!( 408 .cloned()
417 domain = %domain, 409 .collect();
418 category = %category, 410
419 "Git remote domain still on naughty list" 411 if !missing_from_remote.is_empty() {
420 ); 412 debug!(
421 } 413 url = %url,
414 fetched_count = fetched.len(),
415 missing_count = missing_from_remote.len(),
416 missing_oids = ?missing_from_remote,
417 "Fetch completed after retries - some OIDs were missing from remote"
418 );
419 } else {
420 debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs");
422 } 421 }
422
423 return Ok(fetched);
423 } 424 }
425 Ok(result) => {
426 let stderr = String::from_utf8_lossy(&result.stderr);
424 427
425 // Check for "not our ref" errors and provide a clearer error message 428 // Check for "not our ref" error - this is retryable
426 let error_msg = if stderr.contains("upload-pack: not our ref") { 429 if stderr.contains("upload-pack: not our ref") {
427 // Parse out the missing OID from stderr (git only reports one at a time) 430 // Parse out the missing OID from stderr
428 let missing_oid = stderr 431 let missing_oid = stderr.lines().find_map(|line| {
429 .lines()
430 .find_map(|line| {
431 if line.contains("not our ref") { 432 if line.contains("not our ref") {
432 // Extract the OID from lines like: 433 // Extract the OID from lines like:
433 // "fatal: remote error: upload-pack: not our ref <oid>" 434 // "fatal: remote error: upload-pack: not our ref <oid>"
434 line.split("not our ref").nth(1).map(|s| s.trim().to_string()) 435 line.split("not our ref")
436 .nth(1)
437 .map(|s| s.trim().to_string())
435 } else { 438 } else {
436 None 439 None
437 } 440 }
438 }); 441 });
439 442
440 let total_requested = missing_oids.len(); 443 if let Some(ref oid) = missing_oid {
444 // Remove the missing OID and retry with remaining
445 remaining_oids.retain(|o| o != oid);
446 missing_from_remote.push(oid.clone());
441 447
442 if let Some(oid) = missing_oid { 448 debug!(
443 if total_requested > 1 {
444 // BUG: Git stops at first missing OID, so we don't know if the others exist
445 // We need retry logic to fetch remaining OIDs individually
446 tracing::warn!(
447 url = %url, 449 url = %url,
448 missing_oid = %oid, 450 missing_oid = %oid,
449 total_requested = total_requested, 451 remaining_count = remaining_oids.len(),
450 "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." 452 "OID not found on remote, retrying with remaining OIDs"
451 ); 453 );
452 format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) 454
453 } else { 455 continue; // Retry with remaining OIDs
454 format!("remote missing only oid requested: {}", oid) 456 }
457 }
458
459 // Non-retryable error - record to naughty list and return error
460 if let Some(domain) = extract_domain(&url) {
461 if let Some(category) = NaughtyListTracker::classify_error(&stderr) {
462 let is_new =
463 naughty_list.record(&domain, category, stderr.to_string());
464
465 if is_new {
466 tracing::warn!(
467 domain = %domain,
468 category = %category,
469 error = %stderr,
470 "Git remote domain added to naughty list"
471 );
472 } else {
473 debug!(
474 domain = %domain,
475 category = %category,
476 error = %stderr,
477 "Git fetch failed (domain on naughty list)"
478 );
479 }
455 } 480 }
456 } else {
457 format!("git fetch failed: {}", stderr)
458 } 481 }
459 } else {
460 format!("git fetch failed: {}", stderr)
461 };
462 482
463 Err(anyhow::anyhow!("{}", error_msg)) 483 return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr));
484 }
485 Err(e) => {
486 return Err(anyhow::anyhow!(
487 "git fetch command error for {}: {}",
488 url,
489 e
490 ))
491 }
464 } 492 }
465 Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)),
466 } 493 }
467 }) 494 })
468 .await 495 .await
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index 65d29af..9207d58 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
368 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; 368 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await;
369 throttle_manager.complete_request(&domain); 369 throttle_manager.complete_request(&domain);
370 370
371 let oids_fetched = match fetch_result { 371 let fetched_oids = match fetch_result {
372 Ok(fetched) => { 372 Ok(fetched) if !fetched.is_empty() => {
373 debug!( 373 debug!(
374 identifier = %identifier, 374 identifier = %identifier,
375 url = %url, 375 url = %url,
376 oids_fetched = fetched.len(), 376 oids_fetched = fetched.len(),
377 "Fetch succeeded" 377 "Fetch succeeded"
378 ); 378 );
379 fetched.len() 379 fetched
380 }
381 Ok(_) => {
382 debug!(
383 identifier = %identifier,
384 url = %url,
385 "Fetch returned no OIDs (not available on remote)"
386 );
387 vec![]
380 } 388 }
381 Err(e) => { 389 Err(e) => {
382 debug!( 390 debug!(
@@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
385 error = %e, 393 error = %e,
386 "Fetch failed" 394 "Fetch failed"
387 ); 395 );
388 0 396 vec![]
389 } 397 }
390 }; 398 };
391 399
392 // Try to process any events that can now be satisfied 400 // Try to process any events that can now be satisfied
393 if oids_fetched > 0 { 401 if !fetched_oids.is_empty() {
394 let new_oids: HashSet<String> = needed_oids.into_iter().collect(); 402 let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect();
395 if let Err(e) = ctx 403 if let Err(e) = ctx
396 .process_newly_available_git_data(&target_repo, &new_oids) 404 .process_newly_available_git_data(&target_repo, &new_oids)
397 .await 405 .await
@@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
404 } 412 }
405 } 413 }
406 414
407 oids_fetched 415 fetched_oids.len()
408} 416}
409 417
410/// Sync git data for an identifier. 418/// Sync git data for an identifier.
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index d891bc9..1af5c4e 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -10,6 +10,28 @@ use std::collections::HashSet;
10use std::path::PathBuf; 10use std::path::PathBuf;
11use std::time::Instant; 11use std::time::Instant;
12 12
13/// Source of an event entering purgatory.
14///
15/// Tracks whether an event was submitted directly by a user or fetched via
16/// proactive sync from another relay. This distinction is used for:
17/// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG
18/// - Operational monitoring: Helps identify user-facing issues vs sync noise
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
20pub enum EventSource {
21 /// Event was published directly to this relay by a user
22 #[default]
23 Direct,
24 /// Event was fetched via proactive sync from another relay
25 Sync,
26}
27
28impl EventSource {
29 /// Returns true if this is a direct submission (not synced)
30 pub fn is_direct(&self) -> bool {
31 matches!(self, EventSource::Direct)
32 }
33}
34
13/// Default value for Instant fields during deserialization 35/// Default value for Instant fields during deserialization
14fn instant_now() -> Instant { 36fn instant_now() -> Instant {
15 Instant::now() 37 Instant::now()
@@ -88,6 +110,10 @@ pub struct StatePurgatoryEntry {
88 /// Expiry deadline (30 min from creation, may be extended) 110 /// Expiry deadline (30 min from creation, may be extended)
89 #[serde(skip, default = "instant_now")] 111 #[serde(skip, default = "instant_now")]
90 pub expires_at: Instant, 112 pub expires_at: Instant,
113
114 /// Source of this event (direct submission vs sync)
115 #[serde(default)]
116 pub source: EventSource,
91} 117}
92 118
93/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. 119/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory.
@@ -114,6 +140,10 @@ pub struct PrPurgatoryEntry {
114 /// Expiry deadline (30 min from creation, may be extended) 140 /// Expiry deadline (30 min from creation, may be extended)
115 #[serde(skip, default = "instant_now")] 141 #[serde(skip, default = "instant_now")]
116 pub expires_at: Instant, 142 pub expires_at: Instant,
143
144 /// Source of this event (direct submission vs sync)
145 #[serde(default)]
146 pub source: EventSource,
117} 147}
118 148
119/// Entry for a repository announcement (kind 30617) waiting in purgatory. 149/// Entry for a repository announcement (kind 30617) waiting in purgatory.
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 44efbf0..cd62380 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1512,6 +1512,7 @@ impl SyncManager {
1512 self.service_domain.clone(), 1512 self.service_domain.clone(),
1513 Arc::clone(&self.repo_sync_index), 1513 Arc::clone(&self.repo_sync_index),
1514 action_tx, 1514 action_tx,
1515 self.database.clone(),
1515 ); 1516 );
1516 let subscriber_shutdown = shutdown_tx.subscribe(); 1517 let subscriber_shutdown = shutdown_tx.subscribe();
1517 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); 1518 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });
@@ -2969,6 +2970,7 @@ impl SyncManager {
2969 event_id = %event.id, 2970 event_id = %event.id,
2970 kind = %event.kind.as_u16(), 2971 kind = %event.kind.as_u16(),
2971 identifier = %identifier, 2972 identifier = %identifier,
2973 pubkey = %event.pubkey,
2972 "Added rejected announcement to two-tier index" 2974 "Added rejected announcement to two-tier index"
2973 ); 2975 );
2974 } 2976 }
diff --git a/src/sync/naughty_list.rs b/src/sync/naughty_list.rs
index 35fcc0f..0abb986 100644
--- a/src/sync/naughty_list.rs
+++ b/src/sync/naughty_list.rs
@@ -101,6 +101,69 @@ impl NaughtyListTracker {
101 Self::new(12) 101 Self::new(12)
102 } 102 }
103 103
104 /// Strip URLs from an error message to prevent false positives from URL components.
105 ///
106 /// URLs can contain path components, repository names, or user identifiers that
107 /// accidentally match error patterns (e.g., "my-openssl-project", "ssl-team",
108 /// "certificate-manager"). By stripping URLs before classification, we ensure
109 /// only the actual error message text is analyzed.
110 ///
111 /// Handles: http://, https://, git://, ws://, wss://
112 fn strip_urls(error: &str) -> String {
113 let mut result = String::with_capacity(error.len());
114 let mut chars = error.chars().peekable();
115
116 while let Some(c) = chars.next() {
117 // Check for URL start patterns
118 let potential_url = match c {
119 'h' => {
120 // Check for http:// or https://
121 let rest: String = chars.clone().take(7).collect();
122 rest.starts_with("ttp://") || rest.starts_with("ttps://")
123 }
124 'g' => {
125 // Check for git://
126 let rest: String = chars.clone().take(5).collect();
127 rest.starts_with("it://")
128 }
129 'w' => {
130 // Check for ws:// or wss://
131 let rest: String = chars.clone().take(5).collect();
132 rest.starts_with("s://") || rest.starts_with("ss://")
133 }
134 _ => false,
135 };
136
137 if potential_url {
138 // Found URL start, consume until URL end
139 result.push_str("[URL]");
140
141 // Skip until we hit a URL terminator
142 loop {
143 match chars.peek() {
144 Some(&ch) if Self::is_url_char(ch) => {
145 chars.next();
146 }
147 _ => break,
148 }
149 }
150 } else {
151 result.push(c);
152 }
153 }
154
155 result
156 }
157
158 /// Check if a character can be part of a URL
159 #[inline]
160 fn is_url_char(c: char) -> bool {
161 // URLs end at whitespace, quotes, or certain brackets
162 // This is conservative - real URLs can contain more, but git errors
163 // typically have URLs followed by these terminators
164 !matches!(c, ' ' | '\t' | '\n' | '\r' | '"' | '\'' | '>' | ']' | ')')
165 }
166
104 /// Classify an error string into a naughty category or return None for transient errors 167 /// Classify an error string into a naughty category or return None for transient errors
105 /// 168 ///
106 /// # Arguments 169 /// # Arguments
@@ -112,21 +175,56 @@ impl NaughtyListTracker {
112 /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue 175 /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue
113 /// - `None` if the error is a transient network issue (use HealthTracker backoff) 176 /// - `None` if the error is a transient network issue (use HealthTracker backoff)
114 pub fn classify_error(error: &str) -> Option<NaughtyCategory> { 177 pub fn classify_error(error: &str) -> Option<NaughtyCategory> {
115 let error_lower = error.to_lowercase(); 178 // Filter out remote warnings - these are informational messages from the remote
179 // server that don't indicate infrastructure problems with the domain itself.
180 // Example: "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"
181 // These warnings are about the remote server's internal configuration, not connectivity.
182 let filtered_error: String = error
183 .lines()
184 .filter(|line| {
185 let line_lower = line.to_lowercase();
186 // Keep lines that are NOT remote warnings
187 !(line_lower.starts_with("remote: warning:")
188 || line_lower.starts_with("warning: remote"))
189 })
190 .collect::<Vec<_>>()
191 .join("\n");
192
193 // If after filtering we have no content, this was just warnings - not a real error
194 if filtered_error.trim().is_empty() {
195 return None;
196 }
197
198 // Strip URLs to prevent false positives from URL components
199 // (e.g., repository named "openssl-test" or path containing "certificate")
200 let url_stripped = Self::strip_urls(&filtered_error);
201 let error_lower = url_stripped.to_lowercase();
116 202
117 // DNS lookup failures 203 // DNS lookup failures
118 if error_lower.contains("failed to lookup address") 204 if error_lower.contains("failed to lookup address")
119 || error_lower.contains("name or service not known") 205 || error_lower.contains("name or service not known")
120 || error_lower.contains("nodename nor servname provided") 206 || error_lower.contains("nodename nor servname provided")
121 || (error_lower.contains("dns") && !error_lower.contains("timeout")) 207 || error_lower.contains("dns error")
208 || error_lower.contains("dns lookup")
209 || error_lower.contains("dns resolution")
210 || error_lower.contains("getaddrinfo")
122 { 211 {
123 return Some(NaughtyCategory::DnsLookupFailed); 212 return Some(NaughtyCategory::DnsLookupFailed);
124 } 213 }
125 214
126 // TLS certificate errors 215 // TLS certificate errors
127 if error_lower.contains("certificate") 216 if error_lower.contains("certificate")
128 || error_lower.contains("ssl") 217 || error_lower.contains("ssl error")
129 || error_lower.contains("tls") 218 || error_lower.contains("ssl certificate")
219 || error_lower.contains("ssl handshake")
220 || error_lower.contains("ssl_error")
221 || error_lower.contains("tls error")
222 || error_lower.contains("tls handshake")
223 || error_lower.contains("tls alert")
224 || error_lower.contains("tls_error")
225 || error_lower.contains("openssl")
226 || error_lower.contains("schannel")
227 || error_lower.contains("secure channel")
130 { 228 {
131 // Exclude timeout errors that mention TLS 229 // Exclude timeout errors that mention TLS
132 if !error_lower.contains("timeout") && !error_lower.contains("timed out") { 230 if !error_lower.contains("timeout") && !error_lower.contains("timed out") {
@@ -134,12 +232,12 @@ impl NaughtyListTracker {
134 } 232 }
135 } 233 }
136 234
137 // Protocol errors 235 // Protocol errors - specifically WebSocket/Nostr protocol violations
138 if error_lower.contains("websocket") 236 // Note: We check for "websocket" specifically, NOT generic "protocol" keyword
139 || error_lower.contains("protocol") 237 // because git errors often contain "protocol error" (e.g., "fatal: protocol error: bad line length")
140 || error_lower.contains("invalid frame") 238 // which are transient network issues, not persistent infrastructure problems.
141 { 239 if error_lower.contains("websocket") || error_lower.contains("invalid frame") {
142 // Exclude connection errors 240 // Exclude connection errors (transient)
143 if !error_lower.contains("connection") 241 if !error_lower.contains("connection")
144 && !error_lower.contains("timeout") 242 && !error_lower.contains("timeout")
145 && !error_lower.contains("refused") 243 && !error_lower.contains("refused")
@@ -290,183 +388,216 @@ impl NaughtyListTracker {
290mod tests { 388mod tests {
291 use super::*; 389 use super::*;
292 390
391 // =========================================================================
392 // URL STRIPPING TESTS
393 // =========================================================================
394
293 #[test] 395 #[test]
294 fn test_classify_dns_errors() { 396 fn test_strip_urls_basic_protocols() {
295 assert_eq!( 397 // HTTP/HTTPS
296 NaughtyListTracker::classify_error("failed to lookup address information"),
297 Some(NaughtyCategory::DnsLookupFailed)
298 );
299 assert_eq!( 398 assert_eq!(
300 NaughtyListTracker::classify_error("Name or service not known"), 399 NaughtyListTracker::strip_urls("error: https://example.com/repo.git failed"),
301 Some(NaughtyCategory::DnsLookupFailed) 400 "error: [URL] failed"
302 );
303 assert_eq!(
304 NaughtyListTracker::classify_error("nodename nor servname provided"),
305 Some(NaughtyCategory::DnsLookupFailed)
306 ); 401 );
307 assert_eq!( 402 assert_eq!(
308 NaughtyListTracker::classify_error("dns error: NXDOMAIN"), 403 NaughtyListTracker::strip_urls("error: http://example.com/path failed"),
309 Some(NaughtyCategory::DnsLookupFailed) 404 "error: [URL] failed"
310 ); 405 );
311 }
312 406
313 #[test] 407 // Git protocol
314 fn test_classify_tls_errors() {
315 assert_eq!( 408 assert_eq!(
316 NaughtyListTracker::classify_error("certificate not valid for 'example.com'"), 409 NaughtyListTracker::strip_urls("fatal: git://github.com/user/repo.git not found"),
317 Some(NaughtyCategory::TlsCertificateInvalid) 410 "fatal: [URL] not found"
318 ); 411 );
412
413 // WebSocket protocols (used for relay URLs)
319 assert_eq!( 414 assert_eq!(
320 NaughtyListTracker::classify_error("SSL certificate problem"), 415 NaughtyListTracker::strip_urls("error: wss://relay.example.com failed"),
321 Some(NaughtyCategory::TlsCertificateInvalid) 416 "error: [URL] failed"
322 ); 417 );
323 assert_eq!( 418 assert_eq!(
324 NaughtyListTracker::classify_error("TLS handshake failed"), 419 NaughtyListTracker::strip_urls("error: ws://localhost:8080 failed"),
325 Some(NaughtyCategory::TlsCertificateInvalid) 420 "error: [URL] failed"
326 ); 421 );
422 }
327 423
328 // TLS timeout should NOT be classified as naughty 424 #[test]
329 assert_eq!( 425 fn test_strip_urls_multiple() {
330 NaughtyListTracker::classify_error("TLS connection timed out"), 426 let error = "failed to clone https://a.com/repo.git and wss://relay.com";
331 None 427 let stripped = NaughtyListTracker::strip_urls(error);
332 ); 428 assert_eq!(stripped, "failed to clone [URL] and [URL]");
333 } 429 }
334 430
335 #[test] 431 #[test]
336 fn test_classify_protocol_errors() { 432 fn test_strip_urls_preserves_error_text() {
337 assert_eq!( 433 let error =
338 NaughtyListTracker::classify_error("websocket protocol error"), 434 "fatal: unable to access 'https://example.com/repo.git/': SSL certificate problem";
339 Some(NaughtyCategory::ProtocolError) 435 let stripped = NaughtyListTracker::strip_urls(error);
340 ); 436 assert!(stripped.contains("SSL certificate problem"));
437 assert!(!stripped.contains("example.com"));
438 }
439
440 // =========================================================================
441 // EDGE CASES: TIMEOUT/CONNECTION EXCEPTIONS
442 // These are the "unusual rules" where a pattern matches but should be excluded
443 // =========================================================================
444
445 #[test]
446 fn test_tls_timeout_not_naughty() {
447 // TLS errors with timeout should NOT be classified as naughty
448 // (timeout is transient, not a certificate problem)
341 assert_eq!( 449 assert_eq!(
342 NaughtyListTracker::classify_error("invalid frame header"), 450 NaughtyListTracker::classify_error("TLS connection timed out"),
343 Some(NaughtyCategory::ProtocolError) 451 None
344 ); 452 );
345
346 // WebSocket connection errors should NOT be classified as naughty
347 assert_eq!( 453 assert_eq!(
348 NaughtyListTracker::classify_error("websocket connection refused"), 454 NaughtyListTracker::classify_error("SSL handshake timeout"),
349 None 455 None
350 ); 456 );
351 } 457 }
352 458
353 #[test] 459 #[test]
354 fn test_classify_transient_errors() { 460 fn test_websocket_connection_errors_not_naughty() {
355 // Timeouts are transient 461 // WebSocket connection errors are transient, not protocol violations
356 assert_eq!( 462 assert_eq!(
357 NaughtyListTracker::classify_error("connection timed out"), 463 NaughtyListTracker::classify_error("websocket connection refused"),
358 None 464 None
359 ); 465 );
360 assert_eq!( 466 assert_eq!(
361 NaughtyListTracker::classify_error("operation timed out"), 467 NaughtyListTracker::classify_error("websocket connection timeout"),
362 None 468 None
363 ); 469 );
470 }
364 471
365 // Connection refused is transient 472 #[test]
473 fn test_remote_warnings_filtered() {
474 // Remote warnings should be filtered out before classification
475 let warning_only =
476 "remote: warning: unable to access '/root/.config/git/attributes': Permission denied";
477 assert_eq!(NaughtyListTracker::classify_error(warning_only), None);
478
479 // But real errors after warnings should still be classified
480 let warning_with_error = "remote: warning: something\nfatal: failed to lookup address";
366 assert_eq!( 481 assert_eq!(
367 NaughtyListTracker::classify_error("connection refused"), 482 NaughtyListTracker::classify_error(warning_with_error),
368 None 483 Some(NaughtyCategory::DnsLookupFailed)
369 ); 484 );
485 }
370 486
371 // Generic network errors are transient 487 // =========================================================================
488 // INTEGRATION: FULL CLASSIFICATION FLOW
489 // Verify URL stripping + classification work together correctly
490 // =========================================================================
491
492 #[test]
493 fn test_url_with_keywords_not_false_positive() {
494 // URLs containing keywords should NOT trigger classification
495 let cases = [
496 ("https://example.com/my-openssl-project.git", "not found"),
497 ("https://example.com/ssl-team/repo.git", "not found"),
498 ("https://example.com/certificate-manager.git", "not found"),
499 ("https://example.com/dns-tools.git", "not found"),
500 ("wss://relay-tls-test.example.com", "connection refused"),
501 ];
502
503 for (url, suffix) in cases {
504 let error = format!("fatal: repository '{}/' {}", url, suffix);
505 assert_eq!(
506 NaughtyListTracker::classify_error(&error),
507 None,
508 "URL '{}' should not trigger false positive",
509 url
510 );
511 }
512 }
513
514 #[test]
515 fn test_real_errors_still_detected() {
516 // Real errors in the message text (not URL) should still be detected
372 assert_eq!( 517 assert_eq!(
373 NaughtyListTracker::classify_error("network unreachable"), 518 NaughtyListTracker::classify_error(
374 None 519 "fatal: 'https://example.com/repo.git': SSL certificate problem"
520 ),
521 Some(NaughtyCategory::TlsCertificateInvalid)
522 );
523 assert_eq!(
524 NaughtyListTracker::classify_error(
525 "fatal: 'https://example.com/repo.git': failed to lookup address"
526 ),
527 Some(NaughtyCategory::DnsLookupFailed)
528 );
529 assert_eq!(
530 NaughtyListTracker::classify_error("websocket protocol error"),
531 Some(NaughtyCategory::ProtocolError)
375 ); 532 );
376 } 533 }
377 534
378 #[test] 535 #[test]
379 fn test_record_new_entry() { 536 fn test_url_with_keyword_and_real_error() {
380 let tracker = NaughtyListTracker::with_defaults(); 537 // URL contains keyword AND there's a real error - should detect the error
381 let url = "wss://bad-relay.example.com"; 538 let error = "fatal: 'https://example.com/ssl-tools/repo.git': SSL certificate problem";
382 539 assert_eq!(
383 let is_new = tracker.record( 540 NaughtyListTracker::classify_error(error),
384 url, 541 Some(NaughtyCategory::TlsCertificateInvalid)
385 NaughtyCategory::DnsLookupFailed,
386 "failed to lookup address".to_string(),
387 ); 542 );
388
389 assert!(is_new);
390 assert!(tracker.is_naughty(url));
391
392 let entry = tracker.get_entry(url).unwrap();
393 assert_eq!(entry.category, NaughtyCategory::DnsLookupFailed);
394 assert_eq!(entry.occurrence_count, 1);
395 } 543 }
396 544
545 // =========================================================================
546 // TRACKER FUNCTIONALITY
547 // =========================================================================
548
397 #[test] 549 #[test]
398 fn test_record_updates_existing() { 550 fn test_tracker_record_and_update() {
399 let tracker = NaughtyListTracker::with_defaults(); 551 let tracker = NaughtyListTracker::with_defaults();
400 let url = "wss://bad-relay.example.com"; 552 let url = "wss://bad-relay.example.com";
401 553
402 // First occurrence 554 // First occurrence
403 let is_new1 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); 555 let is_new = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string());
404 assert!(is_new1); 556 assert!(is_new);
557 assert!(tracker.is_naughty(url));
405 558
406 // Second occurrence 559 // Second occurrence updates existing
407 let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); 560 let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string());
408 assert!(!is_new2); 561 assert!(!is_new2);
409 562
410 let entry = tracker.get_entry(url).unwrap(); 563 let entry = tracker.get_entry(url).unwrap();
411 assert_eq!(entry.occurrence_count, 2); 564 assert_eq!(entry.occurrence_count, 2);
412 assert_eq!(entry.reason, "error 2"); // Updated to latest 565 assert_eq!(entry.reason, "error 2");
413 } 566 }
414 567
415 #[test] 568 #[test]
416 fn test_is_naughty() { 569 fn test_tracker_expiration() {
417 let tracker = NaughtyListTracker::with_defaults(); 570 let tracker = NaughtyListTracker::new(0); // Expire immediately
418 let url = "wss://bad-relay.example.com";
419
420 assert!(!tracker.is_naughty(url));
421 571
422 tracker.record( 572 tracker.record(
423 url, 573 "wss://relay.example.com",
424 NaughtyCategory::TlsCertificateInvalid, 574 NaughtyCategory::DnsLookupFailed,
425 "cert error".to_string(), 575 "error".to_string(),
426 ); 576 );
427 577
428 assert!(tracker.is_naughty(url)); 578 // Entry exists but is expired
429 } 579 assert!(!tracker.is_naughty("wss://relay.example.com"));
430 580
431 #[test] 581 std::thread::sleep(std::time::Duration::from_millis(10));
432 fn test_get_all() {
433 let tracker = NaughtyListTracker::with_defaults();
434
435 tracker.record(
436 "wss://relay1.example.com",
437 NaughtyCategory::DnsLookupFailed,
438 "dns error".to_string(),
439 );
440 tracker.record(
441 "wss://relay2.example.com",
442 NaughtyCategory::TlsCertificateInvalid,
443 "tls error".to_string(),
444 );
445 582
446 let all = tracker.get_all(); 583 let expired = tracker.expire_old_entries();
447 assert_eq!(all.len(), 2); 584 assert_eq!(expired.len(), 1);
585 assert_eq!(tracker.total_count(), 0);
448 } 586 }
449 587
450 #[test] 588 #[test]
451 fn test_count_by_category() { 589 fn test_tracker_counts() {
452 let tracker = NaughtyListTracker::with_defaults(); 590 let tracker = NaughtyListTracker::with_defaults();
453 591
592 tracker.record("wss://r1.com", NaughtyCategory::DnsLookupFailed, "e".into());
593 tracker.record("wss://r2.com", NaughtyCategory::DnsLookupFailed, "e".into());
454 tracker.record( 594 tracker.record(
455 "wss://relay1.example.com", 595 "wss://r3.com",
456 NaughtyCategory::DnsLookupFailed,
457 "error".to_string(),
458 );
459 tracker.record(
460 "wss://relay2.example.com",
461 NaughtyCategory::DnsLookupFailed,
462 "error".to_string(),
463 );
464 tracker.record(
465 "wss://relay3.example.com",
466 NaughtyCategory::TlsCertificateInvalid, 596 NaughtyCategory::TlsCertificateInvalid,
467 "error".to_string(), 597 "e".into(),
468 ); 598 );
469 599
600 assert_eq!(tracker.total_count(), 3);
470 assert_eq!( 601 assert_eq!(
471 tracker.count_by_category(NaughtyCategory::DnsLookupFailed), 602 tracker.count_by_category(NaughtyCategory::DnsLookupFailed),
472 2 603 2
@@ -475,82 +606,84 @@ mod tests {
475 tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), 606 tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid),
476 1 607 1
477 ); 608 );
478 assert_eq!(tracker.count_by_category(NaughtyCategory::ProtocolError), 0); 609 assert_eq!(tracker.get_all().len(), 3);
479 } 610 }
480 611
481 #[test] 612 #[test]
482 fn test_total_count() { 613 fn test_category_display() {
483 let tracker = NaughtyListTracker::with_defaults(); 614 assert_eq!(
484 assert_eq!(tracker.total_count(), 0); 615 NaughtyCategory::DnsLookupFailed.as_str(),
485 616 "dns_lookup_failed"
486 tracker.record(
487 "wss://relay1.example.com",
488 NaughtyCategory::DnsLookupFailed,
489 "error".to_string(),
490 ); 617 );
491 assert_eq!(tracker.total_count(), 1); 618 assert_eq!(
492 619 NaughtyCategory::TlsCertificateInvalid.as_str(),
493 tracker.record( 620 "tls_certificate_invalid"
494 "wss://relay2.example.com",
495 NaughtyCategory::TlsCertificateInvalid,
496 "error".to_string(),
497 ); 621 );
498 assert_eq!(tracker.total_count(), 2); 622 assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error");
499 } 623 }
624}
500 625
501 #[test] 626#[cfg(test)]
502 fn test_expire_old_entries() { 627mod production_tests {
503 // Use very short expiration for testing 628 use super::*;
504 let tracker = NaughtyListTracker::new(0); // Expire immediately (0 hours)
505
506 tracker.record(
507 "wss://relay1.example.com",
508 NaughtyCategory::DnsLookupFailed,
509 "error".to_string(),
510 );
511
512 // Entry should exist in the map
513 assert_eq!(tracker.total_count(), 1);
514
515 // But is_naughty should return false since it's already expired (0 hours)
516 assert!(!tracker.is_naughty("wss://relay1.example.com"));
517
518 // Sleep to ensure time passes
519 std::thread::sleep(std::time::Duration::from_millis(10));
520 629
521 // Expire old entries (should remove the 0-hour expired entry) 630 /// Production case from relay.ngit.dev - remote warning should not be classified
522 let expired = tracker.expire_old_entries(); 631 #[test]
523 assert_eq!(expired.len(), 1); 632 fn test_classify_production_relay_ngit_dev_warning() {
524 assert_eq!(expired[0], "wss://relay1.example.com"); 633 let error =
634 "remote: warning: unable to access '/root/.config/git/attributes': Permission denied";
635 assert_eq!(NaughtyListTracker::classify_error(error), None);
636 }
525 637
526 // Entry should be gone 638 /// Git protocol errors are transient, not persistent infrastructure issues
527 assert!(!tracker.is_naughty("wss://relay1.example.com")); 639 #[test]
528 assert_eq!(tracker.total_count(), 0); 640 fn test_git_protocol_errors_not_naughty() {
641 // These are common git protocol errors that should NOT be classified as naughty
642 let git_protocol_errors = [
643 "fatal: protocol error: bad line length character: remo",
644 "fatal: protocol error: expected old/new/ref, got 'shallow",
645 "fatal: git upload-pack: protocol error",
646 "error: protocol error: bad pack header",
647 "fatal: protocol error: bad band #3",
648 ];
649
650 for error in git_protocol_errors {
651 assert_eq!(
652 NaughtyListTracker::classify_error(error),
653 None,
654 "Git protocol error should not be classified as naughty: {}",
655 error
656 );
657 }
529 } 658 }
530 659
660 /// Remote warning followed by git protocol error - both should be filtered/ignored
531 #[test] 661 #[test]
532 fn test_category_display() { 662 fn test_warning_with_git_protocol_error() {
663 let error = "remote: warning: unable to access '/root/.config/git/attributes': Permission denied\nfatal: protocol error: bad line length character: remo";
533 assert_eq!( 664 assert_eq!(
534 NaughtyCategory::DnsLookupFailed.to_string(), 665 NaughtyListTracker::classify_error(error),
535 "dns_lookup_failed" 666 None,
667 "Warning + git protocol error should not be classified as naughty"
536 ); 668 );
537 assert_eq!(
538 NaughtyCategory::TlsCertificateInvalid.to_string(),
539 "tls_certificate_invalid"
540 );
541 assert_eq!(NaughtyCategory::ProtocolError.to_string(), "protocol_error");
542 } 669 }
543 670
671 /// WebSocket protocol errors ARE naughty (persistent infrastructure issues)
544 #[test] 672 #[test]
545 fn test_category_as_str() { 673 fn test_websocket_errors_still_naughty() {
546 assert_eq!( 674 let websocket_errors = [
547 NaughtyCategory::DnsLookupFailed.as_str(), 675 "websocket protocol error",
548 "dns_lookup_failed" 676 "websocket handshake failed",
549 ); 677 "invalid frame received",
550 assert_eq!( 678 ];
551 NaughtyCategory::TlsCertificateInvalid.as_str(), 679
552 "tls_certificate_invalid" 680 for error in websocket_errors {
553 ); 681 assert_eq!(
554 assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); 682 NaughtyListTracker::classify_error(error),
683 Some(NaughtyCategory::ProtocolError),
684 "WebSocket error should be classified as protocol_error: {}",
685 error
686 );
687 }
555 } 688 }
556} 689}
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index ab10c49..4d69c9a 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,6 +16,8 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase;
20
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel};
20 22
21// ============================================================================= 23// =============================================================================
@@ -99,6 +101,8 @@ pub struct SelfSubscriber {
99 action_tx: mpsc::Sender<AddFilters>, 101 action_tx: mpsc::Sender<AddFilters>,
100 /// Last time we connected - used for since filter on reconnect 102 /// Last time we connected - used for since filter on reconnect
101 last_connected: Option<Timestamp>, 103 last_connected: Option<Timestamp>,
104 /// Database for querying existing events on startup
105 database: SharedDatabase,
102} 106}
103 107
104impl SelfSubscriber { 108impl SelfSubscriber {
@@ -109,11 +113,13 @@ impl SelfSubscriber {
109 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 113 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
110 /// * `repo_sync_index` - Shared index to update with discovered repos 114 /// * `repo_sync_index` - Shared index to update with discovered repos
111 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager 115 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
116 /// * `database` - Database for querying existing events on startup
112 pub fn new( 117 pub fn new(
113 own_relay_url: String, 118 own_relay_url: String,
114 relay_domain: String, 119 relay_domain: String,
115 repo_sync_index: RepoSyncIndex, 120 repo_sync_index: RepoSyncIndex,
116 action_tx: mpsc::Sender<AddFilters>, 121 action_tx: mpsc::Sender<AddFilters>,
122 database: SharedDatabase,
117 ) -> Self { 123 ) -> Self {
118 Self { 124 Self {
119 own_relay_url, 125 own_relay_url,
@@ -121,6 +127,7 @@ impl SelfSubscriber {
121 repo_sync_index, 127 repo_sync_index,
122 action_tx, 128 action_tx,
123 last_connected: None, 129 last_connected: None,
130 database,
124 } 131 }
125 } 132 }
126 133
@@ -136,6 +143,102 @@ impl SelfSubscriber {
136 } 143 }
137 } 144 }
138 145
146 /// Load existing events from database on startup
147 ///
148 /// Queries the database with two separate queries to build the initial
149 /// PendingUpdates state. This ensures all repos get Layer 2/3 filters
150 /// created, not just those returned by the WebSocket subscription
151 /// (which has limits on the number of events returned).
152 ///
153 /// Query order:
154 /// 1. First query: Get announcements (30617) to populate repo_sync_index
155 /// with repos and their relays
156 /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event()
157 /// to add root event IDs for Layer 3 filter creation
158 ///
159 /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters.
160 async fn load_existing_events(&self) -> PendingUpdates {
161 let mut pending = PendingUpdates::new();
162
163 tracing::info!("Loading all events from database");
164
165 // First query: Get all announcements to populate repo_sync_index
166 let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement);
167
168 let announcements = match self.database.query(announcement_filter).await {
169 Ok(events) => {
170 tracing::info!(count = events.len(), "Loaded announcements from database");
171 events
172 }
173 Err(e) => {
174 tracing::error!(
175 error = %e,
176 "Failed to query announcements from database"
177 );
178 return pending;
179 }
180 };
181
182 // Process announcements
183 let mut announcements_loaded = 0;
184 for event in announcements.iter() {
185 if let Some(repo_id) = Self::extract_repo_id(event) {
186 let relays = Self::extract_relay_urls(event);
187 pending.add_repo(repo_id, relays, HashSet::new());
188 announcements_loaded += 1;
189 }
190 }
191
192 // Update repo_sync_index with announcements BEFORE querying root events
193 {
194 let mut index = self.repo_sync_index.write().await;
195 for (repo_id, needs) in &pending.repos {
196 let entry = index
197 .entry(repo_id.clone())
198 .or_insert_with(|| RepoSyncNeeds {
199 relays: HashSet::new(),
200 root_events: HashSet::new(),
201 sync_level: SyncLevel::StateOnly,
202 });
203 entry.relays.extend(needs.relays.clone());
204 }
205 }
206
207 // Second query: Get all root events for handle_root_event()
208 let root_filter =
209 Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]);
210
211 let root_events = match self.database.query(root_filter).await {
212 Ok(events) => {
213 tracing::info!(count = events.len(), "Loaded root events from database");
214 events
215 }
216 Err(e) => {
217 tracing::error!(
218 error = %e,
219 "Failed to query root events from database"
220 );
221 // Continue with just announcements
222 return pending;
223 }
224 };
225
226 // Process root events
227 let mut root_events_processed = 0;
228 for event in root_events.iter() {
229 self.handle_root_event(event, &mut pending).await;
230 root_events_processed += 1;
231 }
232
233 tracing::info!(
234 announcements_loaded = announcements_loaded,
235 root_events_processed = root_events_processed,
236 "Processed existing events from database"
237 );
238
239 pending
240 }
241
139 /// Process a relay pool notification 242 /// Process a relay pool notification
140 /// 243 ///
141 /// Handles incoming events from the subscription, queueing 30617 announcements 244 /// Handles incoming events from the subscription, queueing 30617 announcements
@@ -277,33 +380,22 @@ impl SelfSubscriber {
277 // Subscribe to announcement and root event kinds 380 // Subscribe to announcement and root event kinds
278 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) 381 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618)
279 // Plus kind 10317 (User Grasp List) for GRASP discovery 382 // Plus kind 10317 (User Grasp List) for GRASP discovery
280 // Check if we have a last_connected time for reconnect filtering 383 let mut filter = Filter::new().kinds(vec![
281 let filter = if let Some(last) = self.last_connected { 384 Kind::GitRepoAnnouncement,
385 Kind::GitPatch,
386 Kind::GitIssue,
387 Kind::GitPullRequest,
388 Kind::GitUserGraspList,
389 ]);
390 if let Some(timestamp) = self.last_connected {
282 // Quick reconnect - use since filter (15 min buffer) 391 // Quick reconnect - use since filter (15 min buffer)
283 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); 392 let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60));
284 tracing::debug!( 393 tracing::debug!(
285 since = %since, 394 since = %since,
286 "Using since filter for reconnect" 395 "Using since filter for reconnect"
287 ); 396 );
288 Filter::new() 397 filter = filter.since(since);
289 .kinds(vec![ 398 }
290 Kind::GitRepoAnnouncement, // Repository Announcements
291 Kind::GitPatch, // Patches
292 Kind::GitIssue, // Issues
293 Kind::GitPullRequest, // Pull Requests
294 Kind::GitUserGraspList, // User Grasp List
295 ])
296 .since(since)
297 } else {
298 // First connection - no since filter
299 Filter::new().kinds(vec![
300 Kind::GitRepoAnnouncement, // Repository Announcements
301 Kind::GitPatch, // Patches
302 Kind::GitIssue, // Issues
303 Kind::GitPullRequest, // Pull Requests
304 Kind::GitUserGraspList, // User Grasp List
305 ])
306 };
307 399
308 // Update last_connected AFTER creating filter but BEFORE subscribing 400 // Update last_connected AFTER creating filter but BEFORE subscribing
309 self.last_connected = Some(Timestamp::now()); 401 self.last_connected = Some(Timestamp::now());
@@ -324,7 +416,11 @@ impl SelfSubscriber {
324 416
325 let mut notifications = client.notifications(); 417 let mut notifications = client.notifications();
326 let batch_window = Self::get_batch_window(); 418 let batch_window = Self::get_batch_window();
327 let mut pending = PendingUpdates::new(); 419
420 // Load existing events from database on startup
421 // This ensures all repos get Layer 2/3 filters created, not just those
422 // returned by the WebSocket subscription (which has limits)
423 let mut pending = self.load_existing_events().await;
328 424
329 // Timer does NOT reset on new events - use interval 425 // Timer does NOT reset on new events - use interval
330 let mut timer = tokio::time::interval(batch_window); 426 let mut timer = tokio::time::interval(batch_window);