upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-28 21:00:14 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-28 21:00:14 +0000
commitf148b3a0e4b032c0acf835cda6d2935e19b9f67e (patch)
treedceea944969465ae6855fa8b1c68d3553334fc1f /src/purgatory
parente9daa340ce1bd215e71d2dc86a81207b7d61df02 (diff)
feat(purgatory): track event source for filtered expiry logging
Add EventSource enum (Direct/Sync) to purgatory entries to distinguish between user-submitted events and sync-fetched events. This enables: - WARN-level logging for direct submissions that expire (user should know) - DEBUG-level logging for sync-fetched expirations (expected behavior) - Source upgrade from Sync→Direct if user submits after sync - Expiry timer reset on source upgrade (fresh 30-min window for user) The source is included in [PURGATORY_EXPIRED] logs as source=direct or source=sync for easy filtering.
Diffstat (limited to 'src/purgatory')
-rw-r--r--src/purgatory/mod.rs206
-rw-r--r--src/purgatory/types.rs30
2 files changed, 185 insertions, 51 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 8b75351..d442ad8 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -17,7 +17,7 @@ pub mod sync;
17mod types; 17mod types;
18 18
19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs};
20pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 20pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 21
22use dashmap::DashMap; 22use dashmap::DashMap;
23use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
@@ -58,6 +58,9 @@ struct SerializableStatePurgatoryEntry {
58 created_at_offset_secs: u64, 58 created_at_offset_secs: u64,
59 /// Duration offset from saved_at for expires_at 59 /// Duration offset from saved_at for expires_at
60 expires_at_offset_secs: u64, 60 expires_at_offset_secs: u64,
61 /// Source of this event (direct submission vs sync)
62 #[serde(default)]
63 source: types::EventSource,
61} 64}
62 65
63/// Serializable wrapper for `PrPurgatoryEntry` with time offsets. 66/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
@@ -75,6 +78,9 @@ struct SerializablePrPurgatoryEntry {
75 created_at_offset_secs: u64, 78 created_at_offset_secs: u64,
76 /// Duration offset from saved_at for expires_at 79 /// Duration offset from saved_at for expires_at
77 expires_at_offset_secs: u64, 80 expires_at_offset_secs: u64,
81 /// Source of this event (direct submission vs sync)
82 #[serde(default)]
83 source: types::EventSource,
78} 84}
79 85
80/// Serializable purgatory state for disk persistence. 86/// Serializable purgatory state for disk persistence.
@@ -271,11 +277,38 @@ impl Purgatory {
271 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately 277 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately
272 /// to override this delay. 278 /// to override this delay.
273 /// 279 ///
280 /// If an event already exists in purgatory with `Sync` source and the new submission
281 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
282 ///
274 /// # Arguments 283 /// # Arguments
275 /// * `event` - The state event (kind 30618) to hold 284 /// * `event` - The state event (kind 30618) to hold
276 /// * `identifier` - The repository identifier from the 'd' tag 285 /// * `identifier` - The repository identifier from the 'd' tag
277 /// * `author` - The event author's public key 286 /// * `author` - The event author's public key
278 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { 287 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
288 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) {
289 let source = if from_sync {
290 types::EventSource::Sync
291 } else {
292 types::EventSource::Direct
293 };
294
295 // Check if event already exists - if so, potentially upgrade source
296 if let Some(mut entries) = self.state_events.get_mut(&identifier) {
297 if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) {
298 // Upgrade source from Sync to Direct if new submission is direct
299 if existing.source == types::EventSource::Sync && !from_sync {
300 existing.source = types::EventSource::Direct;
301 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
302 tracing::debug!(
303 event_id = %event.id,
304 identifier = %identifier,
305 "Upgraded purgatory entry source from Sync to Direct, reset expiry"
306 );
307 }
308 return; // Event already exists, don't add duplicate
309 }
310 }
311
279 let now = Instant::now(); 312 let now = Instant::now();
280 let entry = StatePurgatoryEntry { 313 let entry = StatePurgatoryEntry {
281 event, 314 event,
@@ -283,6 +316,7 @@ impl Purgatory {
283 author, 316 author,
284 created_at: now, 317 created_at: now,
285 expires_at: now + DEFAULT_EXPIRY, 318 expires_at: now + DEFAULT_EXPIRY,
319 source,
286 }; 320 };
287 321
288 self.state_events 322 self.state_events
@@ -302,11 +336,35 @@ impl Purgatory {
302 /// Automatically enqueues the referenced repository identifier for background sync 336 /// Automatically enqueues the referenced repository identifier for background sync
303 /// with the default delay (3 minutes), giving time for a git push to arrive. 337 /// with the default delay (3 minutes), giving time for a git push to arrive.
304 /// 338 ///
339 /// If an event already exists in purgatory with `Sync` source and the new submission
340 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
341 ///
305 /// # Arguments 342 /// # Arguments
306 /// * `event` - The PR event (kind 1617/1618) to hold 343 /// * `event` - The PR event (kind 1617/1618) to hold
307 /// * `event_id` - The event ID (hex string) from the 'e' tag 344 /// * `event_id` - The event ID (hex string) from the 'e' tag
308 /// * `commit` - The commit SHA from the 'c' tag 345 /// * `commit` - The commit SHA from the 'c' tag
309 pub fn add_pr(&self, event: Event, event_id: String, commit: String) { 346 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
347 pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) {
348 let source = if from_sync {
349 types::EventSource::Sync
350 } else {
351 types::EventSource::Direct
352 };
353
354 // Check if event already exists - if so, potentially upgrade source
355 if let Some(mut existing) = self.pr_events.get_mut(&event_id) {
356 // Upgrade source from Sync to Direct if new submission is direct
357 if existing.source == types::EventSource::Sync && !from_sync {
358 existing.source = types::EventSource::Direct;
359 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
360 tracing::debug!(
361 event_id = %event_id,
362 "Upgraded PR purgatory entry source from Sync to Direct, reset expiry"
363 );
364 }
365 return; // Event already exists, don't add duplicate
366 }
367
310 // Extract identifier from the event's `a` tag for sync enqueueing 368 // Extract identifier from the event's `a` tag for sync enqueueing
311 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); 369 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event);
312 370
@@ -316,6 +374,7 @@ impl Purgatory {
316 commit, 374 commit,
317 created_at: now, 375 created_at: now,
318 expires_at: now + DEFAULT_EXPIRY, 376 expires_at: now + DEFAULT_EXPIRY,
377 source,
319 }; 378 };
320 379
321 self.pr_events.insert(event_id, entry); 380 self.pr_events.insert(event_id, entry);
@@ -329,6 +388,8 @@ impl Purgatory {
329 /// Add a PR placeholder (git data arrived before PR event). 388 /// Add a PR placeholder (git data arrived before PR event).
330 /// 389 ///
331 /// Creates a placeholder entry waiting for the corresponding PR event. 390 /// Creates a placeholder entry waiting for the corresponding PR event.
391 /// Placeholders are always marked as `Direct` source since they originate
392 /// from git pushes (direct user action).
332 /// 393 ///
333 /// # Arguments 394 /// # Arguments
334 /// * `event_id` - The expected event ID (from git ref name) 395 /// * `event_id` - The expected event ID (from git ref name)
@@ -340,6 +401,7 @@ impl Purgatory {
340 commit, 401 commit,
341 created_at: now, 402 created_at: now,
342 expires_at: now + DEFAULT_EXPIRY, 403 expires_at: now + DEFAULT_EXPIRY,
404 source: types::EventSource::Direct, // Git pushes are direct user actions
343 }; 405 };
344 406
345 self.pr_events.insert(event_id, entry); 407 self.pr_events.insert(event_id, entry);
@@ -626,15 +688,29 @@ impl Purgatory {
626 for entry in entries.iter().filter(|e| e.expires_at <= now) { 688 for entry in entries.iter().filter(|e| e.expires_at <= now) {
627 let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); 689 let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex());
628 let event_id_short = &entry.event.id.to_hex()[..12]; 690 let event_id_short = &entry.event.id.to_hex()[..12];
691 let source_str = if entry.source.is_direct() { "direct" } else { "sync" };
629 692
630 // Structured log for migration scripts 693 // Structured log for migration scripts
631 tracing::warn!( 694 // Direct submissions log at WARN, synced events at DEBUG
632 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} reason=\"git data not received within 30 minutes\"", 695 if entry.source.is_direct() {
633 identifier, 696 tracing::warn!(
634 npub, 697 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
635 event_id_short, 698 identifier,
636 entry.event.kind.as_u16() 699 npub,
637 ); 700 event_id_short,
701 entry.event.kind.as_u16(),
702 source_str
703 );
704 } else {
705 tracing::debug!(
706 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
707 identifier,
708 npub,
709 event_id_short,
710 entry.event.kind.as_u16(),
711 source_str
712 );
713 }
638 714
639 self.mark_expired(entry.event.id); 715 self.mark_expired(entry.event.id);
640 } 716 }
@@ -655,16 +731,18 @@ impl Purgatory {
655 let event_id_str = entry.key().clone(); 731 let event_id_str = entry.key().clone();
656 let event_opt = pr_entry.event.clone(); 732 let event_opt = pr_entry.event.clone();
657 let commit = pr_entry.commit.clone(); 733 let commit = pr_entry.commit.clone();
658 (event_id_str, event_opt, commit) 734 let source = pr_entry.source;
735 (event_id_str, event_opt, commit, source)
659 }) 736 })
660 .collect(); 737 .collect();
661 738
662 let pr_removed = expired_prs.len(); 739 let pr_removed = expired_prs.len();
663 for (event_id_str, event_opt, commit) in expired_prs { 740 for (event_id_str, event_opt, commit, source) in expired_prs {
664 // Log structured entry for PR events (not placeholders) 741 // Log structured entry for PR events (not placeholders)
665 if let Some(ref event) = event_opt { 742 if let Some(ref event) = event_opt {
666 let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex()); 743 let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex());
667 let event_id_short = &event.id.to_hex()[..12]; 744 let event_id_short = &event.id.to_hex()[..12];
745 let source_str = if source.is_direct() { "direct" } else { "sync" };
668 746
669 // Extract ALL repo identifiers from 'a' tags 747 // Extract ALL repo identifiers from 'a' tags
670 // (PR events can reference multiple repos when there are multiple maintainers) 748 // (PR events can reference multiple repos when there are multiple maintainers)
@@ -701,22 +779,37 @@ impl Purgatory {
701 }; 779 };
702 780
703 // Structured log for migration scripts - log once per repo 781 // Structured log for migration scripts - log once per repo
782 // Direct submissions log at WARN, synced events at DEBUG
704 for repo in &repos_to_log { 783 for repo in &repos_to_log {
705 tracing::warn!( 784 if source.is_direct() {
706 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} reason=\"git data not received within 30 minutes\"", 785 tracing::warn!(
707 repo, 786 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
708 npub, 787 repo,
709 event_id_short, 788 npub,
710 event.kind.as_u16(), 789 event_id_short,
711 &commit[..commit.len().min(12)] 790 event.kind.as_u16(),
712 ); 791 &commit[..commit.len().min(12)],
792 source_str
793 );
794 } else {
795 tracing::debug!(
796 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
797 repo,
798 npub,
799 event_id_short,
800 event.kind.as_u16(),
801 &commit[..commit.len().min(12)],
802 source_str
803 );
804 }
713 } 805 }
714 806
715 self.mark_expired(event.id); 807 self.mark_expired(event.id);
716 } else { 808 } else {
717 // Placeholder (git data arrived first, but PR event never came) 809 // Placeholder (git data arrived first, but PR event never came)
810 // Placeholders are always Direct source (from git push)
718 tracing::debug!( 811 tracing::debug!(
719 "[PURGATORY_EXPIRED] placeholder event_id={} commit={} reason=\"PR event not received within 30 minutes\"", 812 "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"",
720 &event_id_str[..event_id_str.len().min(12)], 813 &event_id_str[..event_id_str.len().min(12)],
721 &commit[..commit.len().min(12)] 814 &commit[..commit.len().min(12)]
722 ); 815 );
@@ -869,6 +962,7 @@ impl Purgatory {
869 author: e.author, 962 author: e.author,
870 created_at_offset_secs: created_offset.as_secs(), 963 created_at_offset_secs: created_offset.as_secs(),
871 expires_at_offset_secs: expires_offset.as_secs(), 964 expires_at_offset_secs: expires_offset.as_secs(),
965 source: e.source,
872 } 966 }
873 }) 967 })
874 .collect(); 968 .collect();
@@ -891,6 +985,7 @@ impl Purgatory {
891 commit: e.commit.clone(), 985 commit: e.commit.clone(),
892 created_at_offset_secs: created_offset.as_secs(), 986 created_at_offset_secs: created_offset.as_secs(),
893 expires_at_offset_secs: expires_offset.as_secs(), 987 expires_at_offset_secs: expires_offset.as_secs(),
988 source: e.source,
894 }; 989 };
895 pr_events.insert(event_id, serializable); 990 pr_events.insert(event_id, serializable);
896 } 991 }
@@ -992,6 +1087,7 @@ impl Purgatory {
992 author: e.author, 1087 author: e.author,
993 created_at, 1088 created_at,
994 expires_at, 1089 expires_at,
1090 source: e.source,
995 } 1091 }
996 }) 1092 })
997 .collect(); 1093 .collect();
@@ -1017,6 +1113,7 @@ impl Purgatory {
1017 commit: e.commit, 1113 commit: e.commit,
1018 created_at, 1114 created_at,
1019 expires_at, 1115 expires_at,
1116 source: e.source,
1020 }; 1117 };
1021 1118
1022 self.pr_events.insert(event_id, entry); 1119 self.pr_events.insert(event_id, entry);
@@ -1074,8 +1171,8 @@ mod tests {
1074 .sign_with_keys(&keys) 1171 .sign_with_keys(&keys)
1075 .unwrap(); 1172 .unwrap();
1076 1173
1077 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); 1174 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key(), false);
1078 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); 1175 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string(), false);
1079 1176
1080 let (state_count, pr_count) = purgatory.count(); 1177 let (state_count, pr_count) = purgatory.count();
1081 assert_eq!(state_count, 1); 1178 assert_eq!(state_count, 1);
@@ -1126,7 +1223,7 @@ mod tests {
1126 let event = EventBuilder::text_note("state") 1223 let event = EventBuilder::text_note("state")
1127 .sign_with_keys(&keys) 1224 .sign_with_keys(&keys)
1128 .unwrap(); 1225 .unwrap();
1129 purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); 1226 purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false);
1130 1227
1131 // Now should have pending events 1228 // Now should have pending events
1132 assert!(purgatory.has_pending_events("test-repo")); 1229 assert!(purgatory.has_pending_events("test-repo"));
@@ -1156,7 +1253,7 @@ mod tests {
1156 .sign_with_keys(&keys) 1253 .sign_with_keys(&keys)
1157 .unwrap(); 1254 .unwrap();
1158 1255
1159 purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); 1256 purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string(), false);
1160 1257
1161 // Now should have pending events for test-repo 1258 // Now should have pending events for test-repo
1162 assert!(purgatory.has_pending_events("test-repo")); 1259 assert!(purgatory.has_pending_events("test-repo"));
@@ -1221,6 +1318,7 @@ fn test_pr_event_vs_placeholder() {
1221 event.clone(), 1318 event.clone(),
1222 "event-id-1".to_string(), 1319 "event-id-1".to_string(),
1223 "commit-abc".to_string(), 1320 "commit-abc".to_string(),
1321 false,
1224 ); 1322 );
1225 1323
1226 // Add a placeholder (no event) 1324 // Add a placeholder (no event)
@@ -1277,8 +1375,9 @@ fn test_cleanup_removes_expired_entries() {
1277 state_event.clone(), 1375 state_event.clone(),
1278 "test-repo".to_string(), 1376 "test-repo".to_string(),
1279 keys.public_key(), 1377 keys.public_key(),
1378 false,
1280 ); 1379 );
1281 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); 1380 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string(), false);
1282 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); 1381 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string());
1283 1382
1284 // Verify entries are there 1383 // Verify entries are there
@@ -1325,8 +1424,8 @@ fn test_cleanup_preserves_non_expired_entries() {
1325 .unwrap(); 1424 .unwrap();
1326 1425
1327 // Add fresh entries 1426 // Add fresh entries
1328 purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); 1427 purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key(), false);
1329 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); 1428 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string(), false);
1330 1429
1331 // Run cleanup 1430 // Run cleanup
1332 let (state_removed, pr_removed) = purgatory.cleanup(); 1431 let (state_removed, pr_removed) = purgatory.cleanup();
@@ -1356,8 +1455,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1356 .sign_with_keys(&keys) 1455 .sign_with_keys(&keys)
1357 .unwrap(); 1456 .unwrap();
1358 1457
1359 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); 1458 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false);
1360 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); 1459 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false);
1361 1460
1362 // Expire only the first one 1461 // Expire only the first one
1363 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { 1462 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") {
@@ -1374,8 +1473,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1374 .sign_with_keys(&keys) 1473 .sign_with_keys(&keys)
1375 .unwrap(); 1474 .unwrap();
1376 1475
1377 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); 1476 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false);
1378 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); 1477 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false);
1379 1478
1380 // Expire only first PR 1479 // Expire only first PR
1381 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { 1480 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") {
@@ -1407,8 +1506,8 @@ fn test_remove_expired_legacy_method() {
1407 .unwrap(); 1506 .unwrap();
1408 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); 1507 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap();
1409 1508
1410 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 1509 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1411 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 1510 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1412 1511
1413 // Expire both 1512 // Expire both
1414 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1513 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1442,8 +1541,8 @@ fn test_expired_event_tracking() {
1442 let pr_event_id = pr_event.id; 1541 let pr_event_id = pr_event.id;
1443 1542
1444 // Add events to purgatory 1543 // Add events to purgatory
1445 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 1544 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1446 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 1545 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1447 1546
1448 // Events should not be marked as expired yet 1547 // Events should not be marked as expired yet
1449 assert!(!purgatory.is_expired(&state_event_id)); 1548 assert!(!purgatory.is_expired(&state_event_id));
@@ -1495,7 +1594,7 @@ fn test_cleanup_expired_events() {
1495 let event2_id = event2.id; 1594 let event2_id = event2.id;
1496 1595
1497 // Add and immediately expire event1 1596 // Add and immediately expire event1
1498 purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); 1597 purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false);
1499 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { 1598 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") {
1500 for entry in entries.iter_mut() { 1599 for entry in entries.iter_mut() {
1501 entry.expires_at = Instant::now() - Duration::from_secs(1); 1600 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1504,7 +1603,7 @@ fn test_cleanup_expired_events() {
1504 purgatory.cleanup(); 1603 purgatory.cleanup();
1505 1604
1506 // Add and expire event2 (will be more recent) 1605 // Add and expire event2 (will be more recent)
1507 purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); 1606 purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false);
1508 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { 1607 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") {
1509 for entry in entries.iter_mut() { 1608 for entry in entries.iter_mut() {
1510 entry.expires_at = Instant::now() - Duration::from_secs(1); 1609 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1546,7 +1645,7 @@ fn test_expired_events_prevent_readdition() {
1546 let event_id = event.id; 1645 let event_id = event.id;
1547 1646
1548 // Add event to purgatory 1647 // Add event to purgatory
1549 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 1648 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1550 1649
1551 // Expire it 1650 // Expire it
1552 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1651 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1566,7 +1665,7 @@ fn test_expired_events_prevent_readdition() {
1566 // This simulates what negentropy/REQ+EOSE should do: 1665 // This simulates what negentropy/REQ+EOSE should do:
1567 // Check if event is in event_ids() before adding 1666 // Check if event is in event_ids() before adding
1568 if !ids.contains(&event_id) { 1667 if !ids.contains(&event_id) {
1569 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 1668 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
1570 } 1669 }
1571 1670
1572 // Event should NOT be re-added 1671 // Event should NOT be re-added
@@ -1609,7 +1708,7 @@ fn test_user_can_resubmit_expired_event() {
1609 let event_id = event.id; 1708 let event_id = event.id;
1610 1709
1611 // Add event to purgatory 1710 // Add event to purgatory
1612 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 1711 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1613 1712
1614 // Expire it 1713 // Expire it
1615 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1714 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1658,8 +1757,8 @@ async fn test_save_and_restore_state_events() {
1658 let event1_id = event1.id; 1757 let event1_id = event1.id;
1659 let event2_id = event2.id; 1758 let event2_id = event2.id;
1660 1759
1661 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); 1760 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key(), false);
1662 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); 1761 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key(), false);
1663 1762
1664 // Save to disk 1763 // Save to disk
1665 purgatory.save_to_disk(&state_file).unwrap(); 1764 purgatory.save_to_disk(&state_file).unwrap();
@@ -1721,6 +1820,7 @@ async fn test_save_and_restore_pr_events() {
1721 pr_event.clone(), 1820 pr_event.clone(),
1722 "pr-event-id".to_string(), 1821 "pr-event-id".to_string(),
1723 "commit-abc".to_string(), 1822 "commit-abc".to_string(),
1823 false,
1724 ); 1824 );
1725 1825
1726 // Save to disk 1826 // Save to disk
@@ -1790,7 +1890,7 @@ async fn test_save_and_restore_expired_events() {
1790 let event_id = event.id; 1890 let event_id = event.id;
1791 1891
1792 // Add and expire event 1892 // Add and expire event
1793 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 1893 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
1794 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1894 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1795 for entry in entries.iter_mut() { 1895 for entry in entries.iter_mut() {
1796 entry.expires_at = Instant::now() - Duration::from_secs(1); 1896 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1929,7 +2029,7 @@ async fn test_downtime_calculation() {
1929 .sign_with_keys(&keys) 2029 .sign_with_keys(&keys)
1930 .unwrap(); 2030 .unwrap();
1931 2031
1932 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2032 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1933 2033
1934 // Get original expiry time 2034 // Get original expiry time
1935 let original_entries = purgatory.find_state("repo"); 2035 let original_entries = purgatory.find_state("repo");
@@ -1985,7 +2085,7 @@ async fn test_expiry_times_preserved() {
1985 .sign_with_keys(&keys) 2085 .sign_with_keys(&keys)
1986 .unwrap(); 2086 .unwrap();
1987 2087
1988 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2088 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1989 2089
1990 // Manually set expiry to a specific time in the future 2090 // Manually set expiry to a specific time in the future
1991 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes 2091 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
@@ -2044,16 +2144,19 @@ async fn test_multiple_state_events_same_identifier() {
2044 event1.clone(), 2144 event1.clone(),
2045 "shared-repo".to_string(), 2145 "shared-repo".to_string(),
2046 keys1.public_key(), 2146 keys1.public_key(),
2147 false,
2047 ); 2148 );
2048 purgatory.add_state( 2149 purgatory.add_state(
2049 event2.clone(), 2150 event2.clone(),
2050 "shared-repo".to_string(), 2151 "shared-repo".to_string(),
2051 keys2.public_key(), 2152 keys2.public_key(),
2153 false,
2052 ); 2154 );
2053 purgatory.add_state( 2155 purgatory.add_state(
2054 event3.clone(), 2156 event3.clone(),
2055 "shared-repo".to_string(), 2157 "shared-repo".to_string(),
2056 keys3.public_key(), 2158 keys3.public_key(),
2159 false,
2057 ); 2160 );
2058 2161
2059 // Save to disk 2162 // Save to disk
@@ -2100,6 +2203,7 @@ async fn test_mixed_pr_events_and_placeholders() {
2100 pr_event.clone(), 2203 pr_event.clone(),
2101 "pr-with-event".to_string(), 2204 "pr-with-event".to_string(),
2102 "commit-abc".to_string(), 2205 "commit-abc".to_string(),
2206 false,
2103 ); 2207 );
2104 2208
2105 // Add PR placeholder 2209 // Add PR placeholder
@@ -2145,7 +2249,7 @@ async fn test_file_cleanup_after_successful_restore() {
2145 let event = EventBuilder::text_note("test") 2249 let event = EventBuilder::text_note("test")
2146 .sign_with_keys(&keys) 2250 .sign_with_keys(&keys)
2147 .unwrap(); 2251 .unwrap();
2148 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2252 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
2149 2253
2150 // Save to disk 2254 // Save to disk
2151 purgatory.save_to_disk(&state_file).unwrap(); 2255 purgatory.save_to_disk(&state_file).unwrap();
@@ -2179,8 +2283,8 @@ async fn test_comprehensive_roundtrip() {
2179 .sign_with_keys(&keys2) 2283 .sign_with_keys(&keys2)
2180 .unwrap(); 2284 .unwrap();
2181 2285
2182 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); 2286 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key(), false);
2183 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); 2287 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key(), false);
2184 2288
2185 // Add PR event 2289 // Add PR event
2186 let tags = vec![Tag::custom( 2290 let tags = vec![Tag::custom(
@@ -2191,7 +2295,7 @@ async fn test_comprehensive_roundtrip() {
2191 .tags(tags) 2295 .tags(tags)
2192 .sign_with_keys(&keys1) 2296 .sign_with_keys(&keys1)
2193 .unwrap(); 2297 .unwrap();
2194 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); 2298 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string(), false);
2195 2299
2196 // Add PR placeholder 2300 // Add PR placeholder
2197 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); 2301 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
@@ -2201,7 +2305,7 @@ async fn test_comprehensive_roundtrip() {
2201 .sign_with_keys(&keys1) 2305 .sign_with_keys(&keys1)
2202 .unwrap(); 2306 .unwrap();
2203 let expired_id = expired_event.id; 2307 let expired_id = expired_event.id;
2204 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); 2308 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key(), false);
2205 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { 2309 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2206 for entry in entries.iter_mut() { 2310 for entry in entries.iter_mut() {
2207 entry.expires_at = Instant::now() - Duration::from_secs(1); 2311 entry.expires_at = Instant::now() - Duration::from_secs(1);
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 919504b..e37a3e1 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -8,6 +8,28 @@ use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize}; 8use serde::{Deserialize, Serialize};
9use std::time::Instant; 9use std::time::Instant;
10 10
11/// Source of an event entering purgatory.
12///
13/// Tracks whether an event was submitted directly by a user or fetched via
14/// proactive sync from another relay. This distinction is used for:
15/// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG
16/// - Operational monitoring: Helps identify user-facing issues vs sync noise
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
18pub enum EventSource {
19 /// Event was published directly to this relay by a user
20 #[default]
21 Direct,
22 /// Event was fetched via proactive sync from another relay
23 Sync,
24}
25
26impl EventSource {
27 /// Returns true if this is a direct submission (not synced)
28 pub fn is_direct(&self) -> bool {
29 matches!(self, EventSource::Direct)
30 }
31}
32
11/// Default value for Instant fields during deserialization 33/// Default value for Instant fields during deserialization
12fn instant_now() -> Instant { 34fn instant_now() -> Instant {
13 Instant::now() 35 Instant::now()
@@ -86,6 +108,10 @@ pub struct StatePurgatoryEntry {
86 /// Expiry deadline (30 min from creation, may be extended) 108 /// Expiry deadline (30 min from creation, may be extended)
87 #[serde(skip, default = "instant_now")] 109 #[serde(skip, default = "instant_now")]
88 pub expires_at: Instant, 110 pub expires_at: Instant,
111
112 /// Source of this event (direct submission vs sync)
113 #[serde(default)]
114 pub source: EventSource,
89} 115}
90 116
91/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. 117/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory.
@@ -112,4 +138,8 @@ pub struct PrPurgatoryEntry {
112 /// Expiry deadline (30 min from creation, may be extended) 138 /// Expiry deadline (30 min from creation, may be extended)
113 #[serde(skip, default = "instant_now")] 139 #[serde(skip, default = "instant_now")]
114 pub expires_at: Instant, 140 pub expires_at: Instant,
141
142 /// Source of this event (direct submission vs sync)
143 #[serde(default)]
144 pub source: EventSource,
115} 145}