upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 15:20:59 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 15:20:59 +0000
commit113928aa84894ea8f65c247d9987527e792b32a9 (patch)
treeec967d6195d9f7ec4f061449596611afe3a0950f /src/purgatory
parent26f608e5011b9d1ad6036da75b89272835e69695 (diff)
parente0ad39a489b3398f8208713bf728db0cb11475b0 (diff)
Merge master into 3ca0-announcements-purgatory
Diffstat (limited to 'src/purgatory')
-rw-r--r--src/purgatory/helpers.rs204
-rw-r--r--src/purgatory/mod.rs341
-rw-r--r--src/purgatory/sync/context.rs163
-rw-r--r--src/purgatory/sync/functions.rs22
-rw-r--r--src/purgatory/types.rs30
5 files changed, 634 insertions, 126 deletions
diff --git a/src/purgatory/helpers.rs b/src/purgatory/helpers.rs
index 193ef99..a9f6e66 100644
--- a/src/purgatory/helpers.rs
+++ b/src/purgatory/helpers.rs
@@ -225,6 +225,117 @@ pub fn get_unpushed_refs(event: &Event, pushed_refs: &[RefPair]) -> Vec<RefPair>
225 .collect() 225 .collect()
226} 226}
227 227
228/// Diagnose why a state event doesn't match the push.
229///
230/// Returns a human-readable explanation of the mismatch between the state event
231/// and what would result from applying the push to local refs.
232///
233/// # Arguments
234/// * `event` - The state event to check
235/// * `pushed_updates` - Ref updates in the current push operation
236/// * `local_refs` - Refs already existing locally (ref_name -> SHA)
237///
238/// # Returns
239/// String explaining why the state doesn't match, or None if it matches
240pub fn diagnose_state_mismatch(
241 event: &Event,
242 pushed_updates: &[RefUpdate],
243 local_refs: &HashMap<String, String>,
244) -> Option<String> {
245 let state_refs = extract_refs_from_state(event);
246
247 // Filter local_refs to only branches and tags
248 let mut would_be_state: HashMap<String, String> = local_refs
249 .iter()
250 .filter(|(ref_name, _)| {
251 ref_name.starts_with("refs/heads/") || ref_name.starts_with("refs/tags/")
252 })
253 .map(|(k, v)| (k.clone(), v.clone()))
254 .collect();
255
256 // Apply all pushed updates to create the would-be state
257 for update in pushed_updates {
258 // Only process branches and tags
259 if !update.ref_name.starts_with("refs/heads/") && !update.ref_name.starts_with("refs/tags/")
260 {
261 continue;
262 }
263
264 if update.is_deletion() {
265 would_be_state.remove(&update.ref_name);
266 } else {
267 would_be_state.insert(update.ref_name.clone(), update.new_oid.clone());
268 }
269 }
270
271 // Convert event's state refs to a HashMap for comparison
272 let declared_state: HashMap<String, String> = state_refs
273 .into_iter()
274 .map(|r| (r.ref_name, r.object_sha))
275 .collect();
276
277 // Check if they match
278 if would_be_state == declared_state {
279 return None; // No mismatch
280 }
281
282 // Build diagnostic message
283 let mut reasons = Vec::new();
284
285 // Check for refs in declared state but not in would-be state
286 for (ref_name, declared_sha) in &declared_state {
287 if let Some(would_be_sha) = would_be_state.get(ref_name) {
288 if would_be_sha != declared_sha {
289 let would_be_short = if would_be_sha.len() >= 8 {
290 &would_be_sha[..8]
291 } else {
292 would_be_sha.as_str()
293 };
294 let declared_short = if declared_sha.len() >= 8 {
295 &declared_sha[..8]
296 } else {
297 declared_sha.as_str()
298 };
299 reasons.push(format!(
300 "{} would be at {} but state declares {}",
301 ref_name, would_be_short, declared_short
302 ));
303 }
304 } else {
305 let declared_short = if declared_sha.len() >= 8 {
306 &declared_sha[..8]
307 } else {
308 declared_sha.as_str()
309 };
310 reasons.push(format!(
311 "{} missing (state declares {})",
312 ref_name, declared_short
313 ));
314 }
315 }
316
317 // Check for refs in would-be state but not in declared state
318 for (ref_name, would_be_sha) in &would_be_state {
319 if !declared_state.contains_key(ref_name) {
320 let would_be_short = if would_be_sha.len() >= 8 {
321 &would_be_sha[..8]
322 } else {
323 would_be_sha.as_str()
324 };
325 reasons.push(format!(
326 "{} would exist at {} but state doesn't declare it",
327 ref_name, would_be_short
328 ));
329 }
330 }
331
332 if reasons.is_empty() {
333 Some("Unknown mismatch".to_string())
334 } else {
335 Some(reasons.join("; "))
336 }
337}
338
228#[cfg(test)] 339#[cfg(test)]
229mod tests { 340mod tests {
230 use super::*; 341 use super::*;
@@ -695,4 +806,97 @@ mod tests {
695 // Should return true - real OID exists, symbolic ref skipped 806 // Should return true - real OID exists, symbolic ref skipped
696 assert!(can_apply_state(&event, repo_path)); 807 assert!(can_apply_state(&event, repo_path));
697 } 808 }
809
810 #[test]
811 fn test_diagnose_state_mismatch_missing_ref() {
812 // State declares both main and test branches
813 let event = create_test_state_event(
814 "test-repo",
815 vec![("refs/heads/main", "abc123"), ("refs/heads/test", "def456")],
816 );
817
818 // Push only creates test branch
819 let pushed_updates = vec![RefUpdate {
820 old_oid: "0000000000000000000000000000000000000000".to_string(),
821 new_oid: "def456".to_string(),
822 ref_name: "refs/heads/test".to_string(),
823 }];
824
825 // No local refs
826 let local_refs = HashMap::new();
827
828 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
829 assert!(diagnosis.is_some());
830 let msg = diagnosis.unwrap();
831 assert!(msg.contains("refs/heads/main"));
832 assert!(msg.contains("missing"));
833 }
834
835 #[test]
836 fn test_diagnose_state_mismatch_wrong_sha() {
837 // State declares main at abc123
838 let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]);
839
840 // Push updates main to different SHA
841 let pushed_updates = vec![RefUpdate {
842 old_oid: "0000000000000000000000000000000000000000".to_string(),
843 new_oid: "wrong123".to_string(),
844 ref_name: "refs/heads/main".to_string(),
845 }];
846
847 let local_refs = HashMap::new();
848
849 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
850 assert!(diagnosis.is_some());
851 let msg = diagnosis.unwrap();
852 assert!(msg.contains("refs/heads/main"));
853 assert!(msg.contains("would be at"));
854 assert!(msg.contains("state declares"));
855 }
856
857 #[test]
858 fn test_diagnose_state_mismatch_extra_ref() {
859 // State declares only main
860 let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]);
861
862 // Push creates both main and test
863 let pushed_updates = vec![
864 RefUpdate {
865 old_oid: "0000000000000000000000000000000000000000".to_string(),
866 new_oid: "abc123".to_string(),
867 ref_name: "refs/heads/main".to_string(),
868 },
869 RefUpdate {
870 old_oid: "0000000000000000000000000000000000000000".to_string(),
871 new_oid: "def456".to_string(),
872 ref_name: "refs/heads/test".to_string(),
873 },
874 ];
875
876 let local_refs = HashMap::new();
877
878 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
879 assert!(diagnosis.is_some());
880 let msg = diagnosis.unwrap();
881 assert!(msg.contains("refs/heads/test"));
882 assert!(msg.contains("doesn't declare"));
883 }
884
885 #[test]
886 fn test_diagnose_state_mismatch_no_mismatch() {
887 // State declares main
888 let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]);
889
890 // Push creates main at correct SHA
891 let pushed_updates = vec![RefUpdate {
892 old_oid: "0000000000000000000000000000000000000000".to_string(),
893 new_oid: "abc123".to_string(),
894 ref_name: "refs/heads/main".to_string(),
895 }];
896
897 let local_refs = HashMap::new();
898
899 let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs);
900 assert!(diagnosis.is_none()); // No mismatch
901 }
698} 902}
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 9a63bf6..bb6ff54 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -16,11 +16,12 @@ pub mod persistence;
16pub mod sync; 16pub mod sync;
17mod types; 17mod types;
18 18
19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 19pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs};
20pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 20pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 21
22use dashmap::DashMap; 22use dashmap::DashMap;
23use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use nostr_sdk::ToBech32;
24use serde::{Deserialize, Serialize}; 25use serde::{Deserialize, Serialize};
25use std::collections::HashMap; 26use std::collections::HashMap;
26use std::collections::HashSet; 27use std::collections::HashSet;
@@ -64,6 +65,9 @@ struct SerializableStatePurgatoryEntry {
64 created_at_offset_secs: u64, 65 created_at_offset_secs: u64,
65 /// Duration offset from saved_at for expires_at 66 /// Duration offset from saved_at for expires_at
66 expires_at_offset_secs: u64, 67 expires_at_offset_secs: u64,
68 /// Source of this event (direct submission vs sync)
69 #[serde(default)]
70 source: types::EventSource,
67} 71}
68 72
69/// Serializable wrapper for `PrPurgatoryEntry` with time offsets. 73/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
@@ -81,6 +85,9 @@ struct SerializablePrPurgatoryEntry {
81 created_at_offset_secs: u64, 85 created_at_offset_secs: u64,
82 /// Duration offset from saved_at for expires_at 86 /// Duration offset from saved_at for expires_at
83 expires_at_offset_secs: u64, 87 expires_at_offset_secs: u64,
88 /// Source of this event (direct submission vs sync)
89 #[serde(default)]
90 source: types::EventSource,
84} 91}
85 92
86/// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. 93/// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets.
@@ -313,11 +320,38 @@ impl Purgatory {
313 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately 320 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately
314 /// to override this delay. 321 /// to override this delay.
315 /// 322 ///
323 /// If an event already exists in purgatory with `Sync` source and the new submission
324 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
325 ///
316 /// # Arguments 326 /// # Arguments
317 /// * `event` - The state event (kind 30618) to hold 327 /// * `event` - The state event (kind 30618) to hold
318 /// * `identifier` - The repository identifier from the 'd' tag 328 /// * `identifier` - The repository identifier from the 'd' tag
319 /// * `author` - The event author's public key 329 /// * `author` - The event author's public key
320 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { 330 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
331 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) {
332 let source = if from_sync {
333 types::EventSource::Sync
334 } else {
335 types::EventSource::Direct
336 };
337
338 // Check if event already exists - if so, potentially upgrade source
339 if let Some(mut entries) = self.state_events.get_mut(&identifier) {
340 if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) {
341 // Upgrade source from Sync to Direct if new submission is direct
342 if existing.source == types::EventSource::Sync && !from_sync {
343 existing.source = types::EventSource::Direct;
344 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
345 tracing::debug!(
346 event_id = %event.id,
347 identifier = %identifier,
348 "Upgraded purgatory entry source from Sync to Direct, reset expiry"
349 );
350 }
351 return; // Event already exists, don't add duplicate
352 }
353 }
354
321 let now = Instant::now(); 355 let now = Instant::now();
322 let entry = StatePurgatoryEntry { 356 let entry = StatePurgatoryEntry {
323 event, 357 event,
@@ -325,6 +359,7 @@ impl Purgatory {
325 author, 359 author,
326 created_at: now, 360 created_at: now,
327 expires_at: now + DEFAULT_EXPIRY, 361 expires_at: now + DEFAULT_EXPIRY,
362 source,
328 }; 363 };
329 364
330 self.state_events 365 self.state_events
@@ -344,11 +379,35 @@ impl Purgatory {
344 /// Automatically enqueues the referenced repository identifier for background sync 379 /// Automatically enqueues the referenced repository identifier for background sync
345 /// with the default delay (3 minutes), giving time for a git push to arrive. 380 /// with the default delay (3 minutes), giving time for a git push to arrive.
346 /// 381 ///
382 /// If an event already exists in purgatory with `Sync` source and the new submission
383 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
384 ///
347 /// # Arguments 385 /// # Arguments
348 /// * `event` - The PR event (kind 1617/1618) to hold 386 /// * `event` - The PR event (kind 1617/1618) to hold
349 /// * `event_id` - The event ID (hex string) from the 'e' tag 387 /// * `event_id` - The event ID (hex string) from the 'e' tag
350 /// * `commit` - The commit SHA from the 'c' tag 388 /// * `commit` - The commit SHA from the 'c' tag
351 pub fn add_pr(&self, event: Event, event_id: String, commit: String) { 389 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
390 pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) {
391 let source = if from_sync {
392 types::EventSource::Sync
393 } else {
394 types::EventSource::Direct
395 };
396
397 // Check if event already exists - if so, potentially upgrade source
398 if let Some(mut existing) = self.pr_events.get_mut(&event_id) {
399 // Upgrade source from Sync to Direct if new submission is direct
400 if existing.source == types::EventSource::Sync && !from_sync {
401 existing.source = types::EventSource::Direct;
402 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
403 tracing::debug!(
404 event_id = %event_id,
405 "Upgraded PR purgatory entry source from Sync to Direct, reset expiry"
406 );
407 }
408 return; // Event already exists, don't add duplicate
409 }
410
352 // Extract identifier from the event's `a` tag for sync enqueueing 411 // Extract identifier from the event's `a` tag for sync enqueueing
353 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); 412 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event);
354 413
@@ -358,6 +417,7 @@ impl Purgatory {
358 commit, 417 commit,
359 created_at: now, 418 created_at: now,
360 expires_at: now + DEFAULT_EXPIRY, 419 expires_at: now + DEFAULT_EXPIRY,
420 source,
361 }; 421 };
362 422
363 self.pr_events.insert(event_id, entry); 423 self.pr_events.insert(event_id, entry);
@@ -371,6 +431,8 @@ impl Purgatory {
371 /// Add a PR placeholder (git data arrived before PR event). 431 /// Add a PR placeholder (git data arrived before PR event).
372 /// 432 ///
373 /// Creates a placeholder entry waiting for the corresponding PR event. 433 /// Creates a placeholder entry waiting for the corresponding PR event.
434 /// Placeholders are always marked as `Direct` source since they originate
435 /// from git pushes (direct user action).
374 /// 436 ///
375 /// # Arguments 437 /// # Arguments
376 /// * `event_id` - The expected event ID (from git ref name) 438 /// * `event_id` - The expected event ID (from git ref name)
@@ -382,6 +444,7 @@ impl Purgatory {
382 commit, 444 commit,
383 created_at: now, 445 created_at: now,
384 expires_at: now + DEFAULT_EXPIRY, 446 expires_at: now + DEFAULT_EXPIRY,
447 source: types::EventSource::Direct, // Git pushes are direct user actions
385 }; 448 };
386 449
387 self.pr_events.insert(event_id, entry); 450 self.pr_events.insert(event_id, entry);
@@ -892,6 +955,9 @@ impl Purgatory {
892 /// prevent infinite re-sync loops. Events that expire without finding git data 955 /// prevent infinite re-sync loops. Events that expire without finding git data
893 /// will be filtered out during future negentropy/REQ sync operations. 956 /// will be filtered out during future negentropy/REQ sync operations.
894 /// 957 ///
958 /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event
959 /// to support migration scripts and operational monitoring.
960 ///
895 /// # Returns 961 /// # Returns
896 /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) 962 /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed)
897 pub fn cleanup(&self) -> (usize, usize, usize) { 963 pub fn cleanup(&self) -> (usize, usize, usize) {
@@ -976,18 +1042,38 @@ impl Purgatory {
976 let mut state_removed = 0; 1042 let mut state_removed = 0;
977 1043
978 // Remove expired state events and mark them as expired 1044 // Remove expired state events and mark them as expired
979 self.state_events.retain(|_, entries| { 1045 self.state_events.retain(|identifier, entries| {
980 let original_len = entries.len(); 1046 let original_len = entries.len();
981 // Collect event IDs before removing
982 let expired_ids: Vec<EventId> = entries
983 .iter()
984 .filter(|entry| entry.expires_at <= now)
985 .map(|entry| entry.event.id)
986 .collect();
987 1047
988 // Mark as expired to prevent re-sync 1048 // Log and collect expired entries before removing
989 for event_id in expired_ids { 1049 for entry in entries.iter().filter(|e| e.expires_at <= now) {
990 self.mark_expired(event_id); 1050 let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex());
1051 let event_id_short = &entry.event.id.to_hex()[..12];
1052 let source_str = if entry.source.is_direct() { "direct" } else { "sync" };
1053
1054 // Structured log for migration scripts
1055 // Direct submissions log at WARN, synced events at DEBUG
1056 if entry.source.is_direct() {
1057 tracing::warn!(
1058 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
1059 identifier,
1060 npub,
1061 event_id_short,
1062 entry.event.kind.as_u16(),
1063 source_str
1064 );
1065 } else {
1066 tracing::debug!(
1067 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
1068 identifier,
1069 npub,
1070 event_id_short,
1071 entry.event.kind.as_u16(),
1072 source_str
1073 );
1074 }
1075
1076 self.mark_expired(entry.event.id);
991 } 1077 }
992 1078
993 // Remove expired entries 1079 // Remove expired entries
@@ -997,21 +1083,103 @@ impl Purgatory {
997 }); 1083 });
998 1084
999 // Remove expired PR events and mark them as expired 1085 // Remove expired PR events and mark them as expired
1000 let expired_prs: Vec<(String, Option<EventId>)> = self 1086 let expired_prs: Vec<_> = self
1001 .pr_events 1087 .pr_events
1002 .iter() 1088 .iter()
1003 .filter(|entry| entry.value().expires_at <= now) 1089 .filter(|entry| entry.value().expires_at <= now)
1004 .map(|entry| { 1090 .map(|entry| {
1005 let event_id = entry.value().event.as_ref().map(|e| e.id); 1091 let pr_entry = entry.value();
1006 (entry.key().clone(), event_id) 1092 let event_id_str = entry.key().clone();
1093 let event_opt = pr_entry.event.clone();
1094 let commit = pr_entry.commit.clone();
1095 let source = pr_entry.source;
1096 (event_id_str, event_opt, commit, source)
1007 }) 1097 })
1008 .collect(); 1098 .collect();
1009 1099
1010 let pr_removed = expired_prs.len(); 1100 let pr_removed = expired_prs.len();
1011 for (event_id_str, event_id_opt) in expired_prs { 1101 for (event_id_str, event_opt, commit, source) in expired_prs {
1012 // Mark actual PR events as expired (not placeholders) 1102 // Log structured entry for PR events (not placeholders)
1013 if let Some(event_id) = event_id_opt { 1103 if let Some(ref event) = event_opt {
1014 self.mark_expired(event_id); 1104 let npub = event
1105 .pubkey
1106 .to_bech32()
1107 .unwrap_or_else(|_| event.pubkey.to_hex());
1108 let event_id_short = &event.id.to_hex()[..12];
1109 let source_str = if source.is_direct() { "direct" } else { "sync" };
1110
1111 // Extract ALL repo identifiers from 'a' tags
1112 // (PR events can reference multiple repos when there are multiple maintainers)
1113 let repos: Vec<String> = event
1114 .tags
1115 .iter()
1116 .filter_map(|tag| {
1117 let tag_vec = tag.clone().to_vec();
1118 if tag_vec.len() >= 2
1119 && tag_vec[0] == "a"
1120 && tag_vec[1].starts_with("30617:")
1121 {
1122 // Format: 30617:<owner_pubkey>:<identifier>
1123 let parts: Vec<&str> = tag_vec[1].split(':').collect();
1124 if parts.len() >= 3 {
1125 Some(parts[2].to_string())
1126 } else {
1127 None
1128 }
1129 } else {
1130 None
1131 }
1132 })
1133 .collect();
1134
1135 // Deduplicate while preserving order
1136 let mut seen = std::collections::HashSet::new();
1137 let unique_repos: Vec<String> = repos
1138 .into_iter()
1139 .filter(|r| seen.insert(r.clone()))
1140 .collect();
1141
1142 let repos_to_log = if unique_repos.is_empty() {
1143 vec!["unknown".to_string()]
1144 } else {
1145 unique_repos
1146 };
1147
1148 // Structured log for migration scripts - log once per repo
1149 // Direct submissions log at WARN, synced events at DEBUG
1150 for repo in &repos_to_log {
1151 if source.is_direct() {
1152 tracing::warn!(
1153 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
1154 repo,
1155 npub,
1156 event_id_short,
1157 event.kind.as_u16(),
1158 &commit[..commit.len().min(12)],
1159 source_str
1160 );
1161 } else {
1162 tracing::debug!(
1163 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
1164 repo,
1165 npub,
1166 event_id_short,
1167 event.kind.as_u16(),
1168 &commit[..commit.len().min(12)],
1169 source_str
1170 );
1171 }
1172 }
1173
1174 self.mark_expired(event.id);
1175 } else {
1176 // Placeholder (git data arrived first, but PR event never came)
1177 // Placeholders are always Direct source (from git push)
1178 tracing::debug!(
1179 "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"",
1180 &event_id_str[..event_id_str.len().min(12)],
1181 &commit[..commit.len().min(12)]
1182 );
1015 } 1183 }
1016 self.pr_events.remove(&event_id_str); 1184 self.pr_events.remove(&event_id_str);
1017 } 1185 }
@@ -1191,6 +1359,7 @@ impl Purgatory {
1191 author: e.author, 1359 author: e.author,
1192 created_at_offset_secs: created_offset.as_secs(), 1360 created_at_offset_secs: created_offset.as_secs(),
1193 expires_at_offset_secs: expires_offset.as_secs(), 1361 expires_at_offset_secs: expires_offset.as_secs(),
1362 source: e.source,
1194 } 1363 }
1195 }) 1364 })
1196 .collect(); 1365 .collect();
@@ -1213,6 +1382,7 @@ impl Purgatory {
1213 commit: e.commit.clone(), 1382 commit: e.commit.clone(),
1214 created_at_offset_secs: created_offset.as_secs(), 1383 created_at_offset_secs: created_offset.as_secs(),
1215 expires_at_offset_secs: expires_offset.as_secs(), 1384 expires_at_offset_secs: expires_offset.as_secs(),
1385 source: e.source,
1216 }; 1386 };
1217 pr_events.insert(event_id, serializable); 1387 pr_events.insert(event_id, serializable);
1218 } 1388 }
@@ -1355,6 +1525,7 @@ impl Purgatory {
1355 author: e.author, 1525 author: e.author,
1356 created_at, 1526 created_at,
1357 expires_at, 1527 expires_at,
1528 source: e.source,
1358 } 1529 }
1359 }) 1530 })
1360 .collect(); 1531 .collect();
@@ -1380,6 +1551,7 @@ impl Purgatory {
1380 commit: e.commit, 1551 commit: e.commit,
1381 created_at, 1552 created_at,
1382 expires_at, 1553 expires_at,
1554 source: e.source,
1383 }; 1555 };
1384 1556
1385 self.pr_events.insert(event_id, entry); 1557 self.pr_events.insert(event_id, entry);
@@ -1439,8 +1611,18 @@ mod tests {
1439 .sign_with_keys(&keys) 1611 .sign_with_keys(&keys)
1440 .unwrap(); 1612 .unwrap();
1441 1613
1442 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); 1614 purgatory.add_state(
1443 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); 1615 event.clone(),
1616 "test-repo".to_string(),
1617 keys.public_key(),
1618 false,
1619 );
1620 purgatory.add_pr(
1621 event,
1622 "test-event-id".to_string(),
1623 "abc123".to_string(),
1624 false,
1625 );
1444 1626
1445 let (announcement_count, state_count, pr_count) = purgatory.count(); 1627 let (announcement_count, state_count, pr_count) = purgatory.count();
1446 assert_eq!(announcement_count, 0); 1628 assert_eq!(announcement_count, 0);
@@ -1492,7 +1674,7 @@ mod tests {
1492 let event = EventBuilder::text_note("state") 1674 let event = EventBuilder::text_note("state")
1493 .sign_with_keys(&keys) 1675 .sign_with_keys(&keys)
1494 .unwrap(); 1676 .unwrap();
1495 purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); 1677 purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false);
1496 1678
1497 // Now should have pending events 1679 // Now should have pending events
1498 assert!(purgatory.has_pending_events("test-repo")); 1680 assert!(purgatory.has_pending_events("test-repo"));
@@ -1522,7 +1704,12 @@ mod tests {
1522 .sign_with_keys(&keys) 1704 .sign_with_keys(&keys)
1523 .unwrap(); 1705 .unwrap();
1524 1706
1525 purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); 1707 purgatory.add_pr(
1708 event,
1709 "pr-event-id".to_string(),
1710 "commit123".to_string(),
1711 false,
1712 );
1526 1713
1527 // Now should have pending events for test-repo 1714 // Now should have pending events for test-repo
1528 assert!(purgatory.has_pending_events("test-repo")); 1715 assert!(purgatory.has_pending_events("test-repo"));
@@ -1587,6 +1774,7 @@ fn test_pr_event_vs_placeholder() {
1587 event.clone(), 1774 event.clone(),
1588 "event-id-1".to_string(), 1775 "event-id-1".to_string(),
1589 "commit-abc".to_string(), 1776 "commit-abc".to_string(),
1777 false,
1590 ); 1778 );
1591 1779
1592 // Add a placeholder (no event) 1780 // Add a placeholder (no event)
@@ -1643,8 +1831,14 @@ fn test_cleanup_removes_expired_entries() {
1643 state_event.clone(), 1831 state_event.clone(),
1644 "test-repo".to_string(), 1832 "test-repo".to_string(),
1645 keys.public_key(), 1833 keys.public_key(),
1834 false,
1835 );
1836 purgatory.add_pr(
1837 pr_event,
1838 "pr-123".to_string(),
1839 "commit-abc".to_string(),
1840 false,
1646 ); 1841 );
1647 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string());
1648 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); 1842 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string());
1649 1843
1650 // Verify entries are there 1844 // Verify entries are there
@@ -1691,8 +1885,18 @@ fn test_cleanup_preserves_non_expired_entries() {
1691 .unwrap(); 1885 .unwrap();
1692 1886
1693 // Add fresh entries 1887 // Add fresh entries
1694 purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); 1888 purgatory.add_state(
1695 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); 1889 state_event,
1890 "test-repo".to_string(),
1891 keys.public_key(),
1892 false,
1893 );
1894 purgatory.add_pr(
1895 pr_event,
1896 "pr-123".to_string(),
1897 "commit-abc".to_string(),
1898 false,
1899 );
1696 1900
1697 // Run cleanup 1901 // Run cleanup
1698 let (_, state_removed, pr_removed) = purgatory.cleanup(); 1902 let (_, state_removed, pr_removed) = purgatory.cleanup();
@@ -1722,8 +1926,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1722 .sign_with_keys(&keys) 1926 .sign_with_keys(&keys)
1723 .unwrap(); 1927 .unwrap();
1724 1928
1725 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); 1929 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false);
1726 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); 1930 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false);
1727 1931
1728 // Expire only the first one 1932 // Expire only the first one
1729 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { 1933 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") {
@@ -1740,8 +1944,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1740 .sign_with_keys(&keys) 1944 .sign_with_keys(&keys)
1741 .unwrap(); 1945 .unwrap();
1742 1946
1743 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); 1947 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false);
1744 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); 1948 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false);
1745 1949
1746 // Expire only first PR 1950 // Expire only first PR
1747 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { 1951 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") {
@@ -1773,8 +1977,8 @@ fn test_remove_expired_legacy_method() {
1773 .unwrap(); 1977 .unwrap();
1774 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); 1978 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap();
1775 1979
1776 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 1980 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1777 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 1981 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1778 1982
1779 // Expire both 1983 // Expire both
1780 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1984 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1808,8 +2012,8 @@ fn test_expired_event_tracking() {
1808 let pr_event_id = pr_event.id; 2012 let pr_event_id = pr_event.id;
1809 2013
1810 // Add events to purgatory 2014 // Add events to purgatory
1811 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 2015 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1812 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 2016 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1813 2017
1814 // Events should not be marked as expired yet 2018 // Events should not be marked as expired yet
1815 assert!(!purgatory.is_expired(&state_event_id)); 2019 assert!(!purgatory.is_expired(&state_event_id));
@@ -1861,7 +2065,7 @@ fn test_cleanup_expired_events() {
1861 let event2_id = event2.id; 2065 let event2_id = event2.id;
1862 2066
1863 // Add and immediately expire event1 2067 // Add and immediately expire event1
1864 purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); 2068 purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false);
1865 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { 2069 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") {
1866 for entry in entries.iter_mut() { 2070 for entry in entries.iter_mut() {
1867 entry.expires_at = Instant::now() - Duration::from_secs(1); 2071 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1870,7 +2074,7 @@ fn test_cleanup_expired_events() {
1870 purgatory.cleanup(); 2074 purgatory.cleanup();
1871 2075
1872 // Add and expire event2 (will be more recent) 2076 // Add and expire event2 (will be more recent)
1873 purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); 2077 purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false);
1874 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { 2078 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") {
1875 for entry in entries.iter_mut() { 2079 for entry in entries.iter_mut() {
1876 entry.expires_at = Instant::now() - Duration::from_secs(1); 2080 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1912,7 +2116,7 @@ fn test_expired_events_prevent_readdition() {
1912 let event_id = event.id; 2116 let event_id = event.id;
1913 2117
1914 // Add event to purgatory 2118 // Add event to purgatory
1915 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2119 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1916 2120
1917 // Expire it 2121 // Expire it
1918 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 2122 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1932,7 +2136,7 @@ fn test_expired_events_prevent_readdition() {
1932 // This simulates what negentropy/REQ+EOSE should do: 2136 // This simulates what negentropy/REQ+EOSE should do:
1933 // Check if event is in event_ids() before adding 2137 // Check if event is in event_ids() before adding
1934 if !ids.contains(&event_id) { 2138 if !ids.contains(&event_id) {
1935 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2139 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
1936 } 2140 }
1937 2141
1938 // Event should NOT be re-added 2142 // Event should NOT be re-added
@@ -1975,7 +2179,7 @@ fn test_user_can_resubmit_expired_event() {
1975 let event_id = event.id; 2179 let event_id = event.id;
1976 2180
1977 // Add event to purgatory 2181 // Add event to purgatory
1978 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2182 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1979 2183
1980 // Expire it 2184 // Expire it
1981 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 2185 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -2024,8 +2228,18 @@ async fn test_save_and_restore_state_events() {
2024 let event1_id = event1.id; 2228 let event1_id = event1.id;
2025 let event2_id = event2.id; 2229 let event2_id = event2.id;
2026 2230
2027 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); 2231 purgatory.add_state(
2028 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); 2232 event1.clone(),
2233 "test-repo".to_string(),
2234 keys.public_key(),
2235 false,
2236 );
2237 purgatory.add_state(
2238 event2.clone(),
2239 "test-repo".to_string(),
2240 keys.public_key(),
2241 false,
2242 );
2029 2243
2030 // Save to disk 2244 // Save to disk
2031 purgatory.save_to_disk(&state_file).unwrap(); 2245 purgatory.save_to_disk(&state_file).unwrap();
@@ -2087,6 +2301,7 @@ async fn test_save_and_restore_pr_events() {
2087 pr_event.clone(), 2301 pr_event.clone(),
2088 "pr-event-id".to_string(), 2302 "pr-event-id".to_string(),
2089 "commit-abc".to_string(), 2303 "commit-abc".to_string(),
2304 false,
2090 ); 2305 );
2091 2306
2092 // Save to disk 2307 // Save to disk
@@ -2156,7 +2371,7 @@ async fn test_save_and_restore_expired_events() {
2156 let event_id = event.id; 2371 let event_id = event.id;
2157 2372
2158 // Add and expire event 2373 // Add and expire event
2159 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2374 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
2160 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 2375 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
2161 for entry in entries.iter_mut() { 2376 for entry in entries.iter_mut() {
2162 entry.expires_at = Instant::now() - Duration::from_secs(1); 2377 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -2295,7 +2510,7 @@ async fn test_downtime_calculation() {
2295 .sign_with_keys(&keys) 2510 .sign_with_keys(&keys)
2296 .unwrap(); 2511 .unwrap();
2297 2512
2298 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2513 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
2299 2514
2300 // Get original expiry time 2515 // Get original expiry time
2301 let original_entries = purgatory.find_state("repo"); 2516 let original_entries = purgatory.find_state("repo");
@@ -2351,7 +2566,7 @@ async fn test_expiry_times_preserved() {
2351 .sign_with_keys(&keys) 2566 .sign_with_keys(&keys)
2352 .unwrap(); 2567 .unwrap();
2353 2568
2354 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2569 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
2355 2570
2356 // Manually set expiry to a specific time in the future 2571 // Manually set expiry to a specific time in the future
2357 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes 2572 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
@@ -2410,16 +2625,19 @@ async fn test_multiple_state_events_same_identifier() {
2410 event1.clone(), 2625 event1.clone(),
2411 "shared-repo".to_string(), 2626 "shared-repo".to_string(),
2412 keys1.public_key(), 2627 keys1.public_key(),
2628 false,
2413 ); 2629 );
2414 purgatory.add_state( 2630 purgatory.add_state(
2415 event2.clone(), 2631 event2.clone(),
2416 "shared-repo".to_string(), 2632 "shared-repo".to_string(),
2417 keys2.public_key(), 2633 keys2.public_key(),
2634 false,
2418 ); 2635 );
2419 purgatory.add_state( 2636 purgatory.add_state(
2420 event3.clone(), 2637 event3.clone(),
2421 "shared-repo".to_string(), 2638 "shared-repo".to_string(),
2422 keys3.public_key(), 2639 keys3.public_key(),
2640 false,
2423 ); 2641 );
2424 2642
2425 // Save to disk 2643 // Save to disk
@@ -2466,6 +2684,7 @@ async fn test_mixed_pr_events_and_placeholders() {
2466 pr_event.clone(), 2684 pr_event.clone(),
2467 "pr-with-event".to_string(), 2685 "pr-with-event".to_string(),
2468 "commit-abc".to_string(), 2686 "commit-abc".to_string(),
2687 false,
2469 ); 2688 );
2470 2689
2471 // Add PR placeholder 2690 // Add PR placeholder
@@ -2511,7 +2730,7 @@ async fn test_file_cleanup_after_successful_restore() {
2511 let event = EventBuilder::text_note("test") 2730 let event = EventBuilder::text_note("test")
2512 .sign_with_keys(&keys) 2731 .sign_with_keys(&keys)
2513 .unwrap(); 2732 .unwrap();
2514 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2733 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
2515 2734
2516 // Save to disk 2735 // Save to disk
2517 purgatory.save_to_disk(&state_file).unwrap(); 2736 purgatory.save_to_disk(&state_file).unwrap();
@@ -2697,8 +2916,18 @@ async fn test_comprehensive_roundtrip() {
2697 .sign_with_keys(&keys2) 2916 .sign_with_keys(&keys2)
2698 .unwrap(); 2917 .unwrap();
2699 2918
2700 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); 2919 purgatory.add_state(
2701 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); 2920 state1.clone(),
2921 "repo1".to_string(),
2922 keys1.public_key(),
2923 false,
2924 );
2925 purgatory.add_state(
2926 state2.clone(),
2927 "repo2".to_string(),
2928 keys2.public_key(),
2929 false,
2930 );
2702 2931
2703 // Add PR event 2932 // Add PR event
2704 let tags = vec![Tag::custom( 2933 let tags = vec![Tag::custom(
@@ -2709,7 +2938,12 @@ async fn test_comprehensive_roundtrip() {
2709 .tags(tags) 2938 .tags(tags)
2710 .sign_with_keys(&keys1) 2939 .sign_with_keys(&keys1)
2711 .unwrap(); 2940 .unwrap();
2712 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); 2941 purgatory.add_pr(
2942 pr_event.clone(),
2943 "pr-1".to_string(),
2944 "commit-1".to_string(),
2945 false,
2946 );
2713 2947
2714 // Add PR placeholder 2948 // Add PR placeholder
2715 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); 2949 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
@@ -2719,7 +2953,12 @@ async fn test_comprehensive_roundtrip() {
2719 .sign_with_keys(&keys1) 2953 .sign_with_keys(&keys1)
2720 .unwrap(); 2954 .unwrap();
2721 let expired_id = expired_event.id; 2955 let expired_id = expired_event.id;
2722 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); 2956 purgatory.add_state(
2957 expired_event,
2958 "repo3".to_string(),
2959 keys1.public_key(),
2960 false,
2961 );
2723 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { 2962 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2724 for entry in entries.iter_mut() { 2963 for entry in entries.iter_mut() {
2725 entry.expires_at = Instant::now() - Duration::from_secs(1); 2964 entry.expires_at = Instant::now() - Duration::from_secs(1);
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index ece8cd6..8297515 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -375,94 +375,121 @@ impl SyncContext for RealSyncContext {
375 let naughty_list = self.git_naughty_list.clone(); 375 let naughty_list = self.git_naughty_list.clone();
376 376
377 tokio::task::spawn_blocking(move || -> Result<Vec<String>> { 377 tokio::task::spawn_blocking(move || -> Result<Vec<String>> {
378 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history 378 let mut remaining_oids = missing_oids.clone();
379 let mut args = vec!["fetch", &url]; 379 let mut missing_from_remote: Vec<String> = Vec::new();
380 args.extend(missing_oids.iter().map(|s| s.as_str())); 380
381 381 // Retry loop: keep fetching until success or no OIDs left
382 let output = Command::new("git") 382 loop {
383 .args(&args) 383 if remaining_oids.is_empty() {
384 .current_dir(&repo_path) 384 // All OIDs were missing from remote
385 .output(); 385 debug!(
386 386 url = %url,
387 match output { 387 missing_count = missing_from_remote.len(),
388 Ok(result) if result.status.success() => { 388 "All requested OIDs missing from remote"
389 // Count how many OIDs we now have 389 );
390 let fetched: Vec<String> = missing_oids 390 return Ok(vec![]);
391 .iter()
392 .filter(|oid| crate::git::oid_exists(&repo_path, oid))
393 .cloned()
394 .collect();
395
396 debug!(fetched_count = fetched.len(), "Successfully fetched OIDs");
397
398 Ok(fetched)
399 } 391 }
400 Ok(result) => { 392
401 let stderr = String::from_utf8_lossy(&result.stderr); 393 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history
402 394 let mut args = vec!["fetch".to_string(), url.clone()];
403 // Extract domain and classify error for naughty list 395 args.extend(remaining_oids.iter().cloned());
404 if let Some(domain) = extract_domain(&url) { 396
405 if let Some(category) = NaughtyListTracker::classify_error(&stderr) { 397 let output = Command::new("git")
406 let is_new = naughty_list.record(&domain, category, stderr.to_string()); 398 .args(&args)
407 399 .current_dir(&repo_path)
408 if is_new { 400 .output();
409 tracing::warn!( 401
410 domain = %domain, 402 match output {
411 category = %category, 403 Ok(result) if result.status.success() => {
412 error = %stderr, 404 // Fetch succeeded - count how many OIDs we now have
413 "Git remote domain added to naughty list" 405 let fetched: Vec<String> = missing_oids
414 ); 406 .iter()
415 } else { 407 .filter(|oid| crate::git::oid_exists(&repo_path, oid))
416 debug!( 408 .cloned()
417 domain = %domain, 409 .collect();
418 category = %category, 410
419 "Git remote domain still on naughty list" 411 if !missing_from_remote.is_empty() {
420 ); 412 debug!(
421 } 413 url = %url,
414 fetched_count = fetched.len(),
415 missing_count = missing_from_remote.len(),
416 missing_oids = ?missing_from_remote,
417 "Fetch completed after retries - some OIDs were missing from remote"
418 );
419 } else {
420 debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs");
422 } 421 }
422
423 return Ok(fetched);
423 } 424 }
425 Ok(result) => {
426 let stderr = String::from_utf8_lossy(&result.stderr);
424 427
425 // Check for "not our ref" errors and provide a clearer error message 428 // Check for "not our ref" error - this is retryable
426 let error_msg = if stderr.contains("upload-pack: not our ref") { 429 if stderr.contains("upload-pack: not our ref") {
427 // Parse out the missing OID from stderr (git only reports one at a time) 430 // Parse out the missing OID from stderr
428 let missing_oid = stderr 431 let missing_oid = stderr.lines().find_map(|line| {
429 .lines()
430 .find_map(|line| {
431 if line.contains("not our ref") { 432 if line.contains("not our ref") {
432 // Extract the OID from lines like: 433 // Extract the OID from lines like:
433 // "fatal: remote error: upload-pack: not our ref <oid>" 434 // "fatal: remote error: upload-pack: not our ref <oid>"
434 line.split("not our ref").nth(1).map(|s| s.trim().to_string()) 435 line.split("not our ref")
436 .nth(1)
437 .map(|s| s.trim().to_string())
435 } else { 438 } else {
436 None 439 None
437 } 440 }
438 }); 441 });
439 442
440 let total_requested = missing_oids.len(); 443 if let Some(ref oid) = missing_oid {
444 // Remove the missing OID and retry with remaining
445 remaining_oids.retain(|o| o != oid);
446 missing_from_remote.push(oid.clone());
441 447
442 if let Some(oid) = missing_oid { 448 debug!(
443 if total_requested > 1 {
444 // BUG: Git stops at first missing OID, so we don't know if the others exist
445 // We need retry logic to fetch remaining OIDs individually
446 tracing::warn!(
447 url = %url, 449 url = %url,
448 missing_oid = %oid, 450 missing_oid = %oid,
449 total_requested = total_requested, 451 remaining_count = remaining_oids.len(),
450 "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." 452 "OID not found on remote, retrying with remaining OIDs"
451 ); 453 );
452 format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) 454
453 } else { 455 continue; // Retry with remaining OIDs
454 format!("remote missing only oid requested: {}", oid) 456 }
457 }
458
459 // Non-retryable error - record to naughty list and return error
460 if let Some(domain) = extract_domain(&url) {
461 if let Some(category) = NaughtyListTracker::classify_error(&stderr) {
462 let is_new =
463 naughty_list.record(&domain, category, stderr.to_string());
464
465 if is_new {
466 tracing::warn!(
467 domain = %domain,
468 category = %category,
469 error = %stderr,
470 "Git remote domain added to naughty list"
471 );
472 } else {
473 debug!(
474 domain = %domain,
475 category = %category,
476 error = %stderr,
477 "Git fetch failed (domain on naughty list)"
478 );
479 }
455 } 480 }
456 } else {
457 format!("git fetch failed: {}", stderr)
458 } 481 }
459 } else {
460 format!("git fetch failed: {}", stderr)
461 };
462 482
463 Err(anyhow::anyhow!("{}", error_msg)) 483 return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr));
484 }
485 Err(e) => {
486 return Err(anyhow::anyhow!(
487 "git fetch command error for {}: {}",
488 url,
489 e
490 ))
491 }
464 } 492 }
465 Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)),
466 } 493 }
467 }) 494 })
468 .await 495 .await
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index 65d29af..9207d58 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
368 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; 368 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await;
369 throttle_manager.complete_request(&domain); 369 throttle_manager.complete_request(&domain);
370 370
371 let oids_fetched = match fetch_result { 371 let fetched_oids = match fetch_result {
372 Ok(fetched) => { 372 Ok(fetched) if !fetched.is_empty() => {
373 debug!( 373 debug!(
374 identifier = %identifier, 374 identifier = %identifier,
375 url = %url, 375 url = %url,
376 oids_fetched = fetched.len(), 376 oids_fetched = fetched.len(),
377 "Fetch succeeded" 377 "Fetch succeeded"
378 ); 378 );
379 fetched.len() 379 fetched
380 }
381 Ok(_) => {
382 debug!(
383 identifier = %identifier,
384 url = %url,
385 "Fetch returned no OIDs (not available on remote)"
386 );
387 vec![]
380 } 388 }
381 Err(e) => { 389 Err(e) => {
382 debug!( 390 debug!(
@@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
385 error = %e, 393 error = %e,
386 "Fetch failed" 394 "Fetch failed"
387 ); 395 );
388 0 396 vec![]
389 } 397 }
390 }; 398 };
391 399
392 // Try to process any events that can now be satisfied 400 // Try to process any events that can now be satisfied
393 if oids_fetched > 0 { 401 if !fetched_oids.is_empty() {
394 let new_oids: HashSet<String> = needed_oids.into_iter().collect(); 402 let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect();
395 if let Err(e) = ctx 403 if let Err(e) = ctx
396 .process_newly_available_git_data(&target_repo, &new_oids) 404 .process_newly_available_git_data(&target_repo, &new_oids)
397 .await 405 .await
@@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
404 } 412 }
405 } 413 }
406 414
407 oids_fetched 415 fetched_oids.len()
408} 416}
409 417
410/// Sync git data for an identifier. 418/// Sync git data for an identifier.
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index d891bc9..1af5c4e 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -10,6 +10,28 @@ use std::collections::HashSet;
10use std::path::PathBuf; 10use std::path::PathBuf;
11use std::time::Instant; 11use std::time::Instant;
12 12
13/// Source of an event entering purgatory.
14///
15/// Tracks whether an event was submitted directly by a user or fetched via
16/// proactive sync from another relay. This distinction is used for:
17/// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG
18/// - Operational monitoring: Helps identify user-facing issues vs sync noise
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
20pub enum EventSource {
21 /// Event was published directly to this relay by a user
22 #[default]
23 Direct,
24 /// Event was fetched via proactive sync from another relay
25 Sync,
26}
27
28impl EventSource {
29 /// Returns true if this is a direct submission (not synced)
30 pub fn is_direct(&self) -> bool {
31 matches!(self, EventSource::Direct)
32 }
33}
34
13/// Default value for Instant fields during deserialization 35/// Default value for Instant fields during deserialization
14fn instant_now() -> Instant { 36fn instant_now() -> Instant {
15 Instant::now() 37 Instant::now()
@@ -88,6 +110,10 @@ pub struct StatePurgatoryEntry {
88 /// Expiry deadline (30 min from creation, may be extended) 110 /// Expiry deadline (30 min from creation, may be extended)
89 #[serde(skip, default = "instant_now")] 111 #[serde(skip, default = "instant_now")]
90 pub expires_at: Instant, 112 pub expires_at: Instant,
113
114 /// Source of this event (direct submission vs sync)
115 #[serde(default)]
116 pub source: EventSource,
91} 117}
92 118
93/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. 119/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory.
@@ -114,6 +140,10 @@ pub struct PrPurgatoryEntry {
114 /// Expiry deadline (30 min from creation, may be extended) 140 /// Expiry deadline (30 min from creation, may be extended)
115 #[serde(skip, default = "instant_now")] 141 #[serde(skip, default = "instant_now")]
116 pub expires_at: Instant, 142 pub expires_at: Instant,
143
144 /// Source of this event (direct submission vs sync)
145 #[serde(default)]
146 pub source: EventSource,
117} 147}
118 148
119/// Entry for a repository announcement (kind 30617) waiting in purgatory. 149/// Entry for a repository announcement (kind 30617) waiting in purgatory.