diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-28 21:00:14 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-28 21:00:14 +0000 |
| commit | f148b3a0e4b032c0acf835cda6d2935e19b9f67e (patch) | |
| tree | dceea944969465ae6855fa8b1c68d3553334fc1f /src/purgatory | |
| parent | e9daa340ce1bd215e71d2dc86a81207b7d61df02 (diff) | |
feat(purgatory): track event source for filtered expiry logging
Add EventSource enum (Direct/Sync) to purgatory entries to distinguish
between user-submitted events and sync-fetched events. This enables:
- WARN-level logging for direct submissions that expire (user should know)
- DEBUG-level logging for sync-fetched expirations (expected behavior)
- Source upgrade from Sync→Direct if user submits after sync
- Expiry timer reset on source upgrade (fresh 30-min window for user)
The source is included in [PURGATORY_EXPIRED] logs as source=direct or
source=sync for easy filtering.
Diffstat (limited to 'src/purgatory')
| -rw-r--r-- | src/purgatory/mod.rs | 206 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 30 |
2 files changed, 185 insertions, 51 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 8b75351..d442ad8 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -17,7 +17,7 @@ pub mod sync; | |||
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| @@ -58,6 +58,9 @@ struct SerializableStatePurgatoryEntry { | |||
| 58 | created_at_offset_secs: u64, | 58 | created_at_offset_secs: u64, |
| 59 | /// Duration offset from saved_at for expires_at | 59 | /// Duration offset from saved_at for expires_at |
| 60 | expires_at_offset_secs: u64, | 60 | expires_at_offset_secs: u64, |
| 61 | /// Source of this event (direct submission vs sync) | ||
| 62 | #[serde(default)] | ||
| 63 | source: types::EventSource, | ||
| 61 | } | 64 | } |
| 62 | 65 | ||
| 63 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | 66 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. |
| @@ -75,6 +78,9 @@ struct SerializablePrPurgatoryEntry { | |||
| 75 | created_at_offset_secs: u64, | 78 | created_at_offset_secs: u64, |
| 76 | /// Duration offset from saved_at for expires_at | 79 | /// Duration offset from saved_at for expires_at |
| 77 | expires_at_offset_secs: u64, | 80 | expires_at_offset_secs: u64, |
| 81 | /// Source of this event (direct submission vs sync) | ||
| 82 | #[serde(default)] | ||
| 83 | source: types::EventSource, | ||
| 78 | } | 84 | } |
| 79 | 85 | ||
| 80 | /// Serializable purgatory state for disk persistence. | 86 | /// Serializable purgatory state for disk persistence. |
| @@ -271,11 +277,38 @@ impl Purgatory { | |||
| 271 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately | 277 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately |
| 272 | /// to override this delay. | 278 | /// to override this delay. |
| 273 | /// | 279 | /// |
| 280 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 281 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 282 | /// | ||
| 274 | /// # Arguments | 283 | /// # Arguments |
| 275 | /// * `event` - The state event (kind 30618) to hold | 284 | /// * `event` - The state event (kind 30618) to hold |
| 276 | /// * `identifier` - The repository identifier from the 'd' tag | 285 | /// * `identifier` - The repository identifier from the 'd' tag |
| 277 | /// * `author` - The event author's public key | 286 | /// * `author` - The event author's public key |
| 278 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 287 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 288 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) { | ||
| 289 | let source = if from_sync { | ||
| 290 | types::EventSource::Sync | ||
| 291 | } else { | ||
| 292 | types::EventSource::Direct | ||
| 293 | }; | ||
| 294 | |||
| 295 | // Check if event already exists - if so, potentially upgrade source | ||
| 296 | if let Some(mut entries) = self.state_events.get_mut(&identifier) { | ||
| 297 | if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) { | ||
| 298 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 299 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 300 | existing.source = types::EventSource::Direct; | ||
| 301 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 302 | tracing::debug!( | ||
| 303 | event_id = %event.id, | ||
| 304 | identifier = %identifier, | ||
| 305 | "Upgraded purgatory entry source from Sync to Direct, reset expiry" | ||
| 306 | ); | ||
| 307 | } | ||
| 308 | return; // Event already exists, don't add duplicate | ||
| 309 | } | ||
| 310 | } | ||
| 311 | |||
| 279 | let now = Instant::now(); | 312 | let now = Instant::now(); |
| 280 | let entry = StatePurgatoryEntry { | 313 | let entry = StatePurgatoryEntry { |
| 281 | event, | 314 | event, |
| @@ -283,6 +316,7 @@ impl Purgatory { | |||
| 283 | author, | 316 | author, |
| 284 | created_at: now, | 317 | created_at: now, |
| 285 | expires_at: now + DEFAULT_EXPIRY, | 318 | expires_at: now + DEFAULT_EXPIRY, |
| 319 | source, | ||
| 286 | }; | 320 | }; |
| 287 | 321 | ||
| 288 | self.state_events | 322 | self.state_events |
| @@ -302,11 +336,35 @@ impl Purgatory { | |||
| 302 | /// Automatically enqueues the referenced repository identifier for background sync | 336 | /// Automatically enqueues the referenced repository identifier for background sync |
| 303 | /// with the default delay (3 minutes), giving time for a git push to arrive. | 337 | /// with the default delay (3 minutes), giving time for a git push to arrive. |
| 304 | /// | 338 | /// |
| 339 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 340 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 341 | /// | ||
| 305 | /// # Arguments | 342 | /// # Arguments |
| 306 | /// * `event` - The PR event (kind 1617/1618) to hold | 343 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 307 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 344 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 308 | /// * `commit` - The commit SHA from the 'c' tag | 345 | /// * `commit` - The commit SHA from the 'c' tag |
| 309 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 346 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 347 | pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) { | ||
| 348 | let source = if from_sync { | ||
| 349 | types::EventSource::Sync | ||
| 350 | } else { | ||
| 351 | types::EventSource::Direct | ||
| 352 | }; | ||
| 353 | |||
| 354 | // Check if event already exists - if so, potentially upgrade source | ||
| 355 | if let Some(mut existing) = self.pr_events.get_mut(&event_id) { | ||
| 356 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 357 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 358 | existing.source = types::EventSource::Direct; | ||
| 359 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 360 | tracing::debug!( | ||
| 361 | event_id = %event_id, | ||
| 362 | "Upgraded PR purgatory entry source from Sync to Direct, reset expiry" | ||
| 363 | ); | ||
| 364 | } | ||
| 365 | return; // Event already exists, don't add duplicate | ||
| 366 | } | ||
| 367 | |||
| 310 | // Extract identifier from the event's `a` tag for sync enqueueing | 368 | // Extract identifier from the event's `a` tag for sync enqueueing |
| 311 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | 369 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); |
| 312 | 370 | ||
| @@ -316,6 +374,7 @@ impl Purgatory { | |||
| 316 | commit, | 374 | commit, |
| 317 | created_at: now, | 375 | created_at: now, |
| 318 | expires_at: now + DEFAULT_EXPIRY, | 376 | expires_at: now + DEFAULT_EXPIRY, |
| 377 | source, | ||
| 319 | }; | 378 | }; |
| 320 | 379 | ||
| 321 | self.pr_events.insert(event_id, entry); | 380 | self.pr_events.insert(event_id, entry); |
| @@ -329,6 +388,8 @@ impl Purgatory { | |||
| 329 | /// Add a PR placeholder (git data arrived before PR event). | 388 | /// Add a PR placeholder (git data arrived before PR event). |
| 330 | /// | 389 | /// |
| 331 | /// Creates a placeholder entry waiting for the corresponding PR event. | 390 | /// Creates a placeholder entry waiting for the corresponding PR event. |
| 391 | /// Placeholders are always marked as `Direct` source since they originate | ||
| 392 | /// from git pushes (direct user action). | ||
| 332 | /// | 393 | /// |
| 333 | /// # Arguments | 394 | /// # Arguments |
| 334 | /// * `event_id` - The expected event ID (from git ref name) | 395 | /// * `event_id` - The expected event ID (from git ref name) |
| @@ -340,6 +401,7 @@ impl Purgatory { | |||
| 340 | commit, | 401 | commit, |
| 341 | created_at: now, | 402 | created_at: now, |
| 342 | expires_at: now + DEFAULT_EXPIRY, | 403 | expires_at: now + DEFAULT_EXPIRY, |
| 404 | source: types::EventSource::Direct, // Git pushes are direct user actions | ||
| 343 | }; | 405 | }; |
| 344 | 406 | ||
| 345 | self.pr_events.insert(event_id, entry); | 407 | self.pr_events.insert(event_id, entry); |
| @@ -626,15 +688,29 @@ impl Purgatory { | |||
| 626 | for entry in entries.iter().filter(|e| e.expires_at <= now) { | 688 | for entry in entries.iter().filter(|e| e.expires_at <= now) { |
| 627 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); | 689 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); |
| 628 | let event_id_short = &entry.event.id.to_hex()[..12]; | 690 | let event_id_short = &entry.event.id.to_hex()[..12]; |
| 691 | let source_str = if entry.source.is_direct() { "direct" } else { "sync" }; | ||
| 629 | 692 | ||
| 630 | // Structured log for migration scripts | 693 | // Structured log for migration scripts |
| 631 | tracing::warn!( | 694 | // Direct submissions log at WARN, synced events at DEBUG |
| 632 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} reason=\"git data not received within 30 minutes\"", | 695 | if entry.source.is_direct() { |
| 633 | identifier, | 696 | tracing::warn!( |
| 634 | npub, | 697 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", |
| 635 | event_id_short, | 698 | identifier, |
| 636 | entry.event.kind.as_u16() | 699 | npub, |
| 637 | ); | 700 | event_id_short, |
| 701 | entry.event.kind.as_u16(), | ||
| 702 | source_str | ||
| 703 | ); | ||
| 704 | } else { | ||
| 705 | tracing::debug!( | ||
| 706 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 707 | identifier, | ||
| 708 | npub, | ||
| 709 | event_id_short, | ||
| 710 | entry.event.kind.as_u16(), | ||
| 711 | source_str | ||
| 712 | ); | ||
| 713 | } | ||
| 638 | 714 | ||
| 639 | self.mark_expired(entry.event.id); | 715 | self.mark_expired(entry.event.id); |
| 640 | } | 716 | } |
| @@ -655,16 +731,18 @@ impl Purgatory { | |||
| 655 | let event_id_str = entry.key().clone(); | 731 | let event_id_str = entry.key().clone(); |
| 656 | let event_opt = pr_entry.event.clone(); | 732 | let event_opt = pr_entry.event.clone(); |
| 657 | let commit = pr_entry.commit.clone(); | 733 | let commit = pr_entry.commit.clone(); |
| 658 | (event_id_str, event_opt, commit) | 734 | let source = pr_entry.source; |
| 735 | (event_id_str, event_opt, commit, source) | ||
| 659 | }) | 736 | }) |
| 660 | .collect(); | 737 | .collect(); |
| 661 | 738 | ||
| 662 | let pr_removed = expired_prs.len(); | 739 | let pr_removed = expired_prs.len(); |
| 663 | for (event_id_str, event_opt, commit) in expired_prs { | 740 | for (event_id_str, event_opt, commit, source) in expired_prs { |
| 664 | // Log structured entry for PR events (not placeholders) | 741 | // Log structured entry for PR events (not placeholders) |
| 665 | if let Some(ref event) = event_opt { | 742 | if let Some(ref event) = event_opt { |
| 666 | let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex()); | 743 | let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex()); |
| 667 | let event_id_short = &event.id.to_hex()[..12]; | 744 | let event_id_short = &event.id.to_hex()[..12]; |
| 745 | let source_str = if source.is_direct() { "direct" } else { "sync" }; | ||
| 668 | 746 | ||
| 669 | // Extract ALL repo identifiers from 'a' tags | 747 | // Extract ALL repo identifiers from 'a' tags |
| 670 | // (PR events can reference multiple repos when there are multiple maintainers) | 748 | // (PR events can reference multiple repos when there are multiple maintainers) |
| @@ -701,22 +779,37 @@ impl Purgatory { | |||
| 701 | }; | 779 | }; |
| 702 | 780 | ||
| 703 | // Structured log for migration scripts - log once per repo | 781 | // Structured log for migration scripts - log once per repo |
| 782 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 704 | for repo in &repos_to_log { | 783 | for repo in &repos_to_log { |
| 705 | tracing::warn!( | 784 | if source.is_direct() { |
| 706 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} reason=\"git data not received within 30 minutes\"", | 785 | tracing::warn!( |
| 707 | repo, | 786 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", |
| 708 | npub, | 787 | repo, |
| 709 | event_id_short, | 788 | npub, |
| 710 | event.kind.as_u16(), | 789 | event_id_short, |
| 711 | &commit[..commit.len().min(12)] | 790 | event.kind.as_u16(), |
| 712 | ); | 791 | &commit[..commit.len().min(12)], |
| 792 | source_str | ||
| 793 | ); | ||
| 794 | } else { | ||
| 795 | tracing::debug!( | ||
| 796 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 797 | repo, | ||
| 798 | npub, | ||
| 799 | event_id_short, | ||
| 800 | event.kind.as_u16(), | ||
| 801 | &commit[..commit.len().min(12)], | ||
| 802 | source_str | ||
| 803 | ); | ||
| 804 | } | ||
| 713 | } | 805 | } |
| 714 | 806 | ||
| 715 | self.mark_expired(event.id); | 807 | self.mark_expired(event.id); |
| 716 | } else { | 808 | } else { |
| 717 | // Placeholder (git data arrived first, but PR event never came) | 809 | // Placeholder (git data arrived first, but PR event never came) |
| 810 | // Placeholders are always Direct source (from git push) | ||
| 718 | tracing::debug!( | 811 | tracing::debug!( |
| 719 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} reason=\"PR event not received within 30 minutes\"", | 812 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"", |
| 720 | &event_id_str[..event_id_str.len().min(12)], | 813 | &event_id_str[..event_id_str.len().min(12)], |
| 721 | &commit[..commit.len().min(12)] | 814 | &commit[..commit.len().min(12)] |
| 722 | ); | 815 | ); |
| @@ -869,6 +962,7 @@ impl Purgatory { | |||
| 869 | author: e.author, | 962 | author: e.author, |
| 870 | created_at_offset_secs: created_offset.as_secs(), | 963 | created_at_offset_secs: created_offset.as_secs(), |
| 871 | expires_at_offset_secs: expires_offset.as_secs(), | 964 | expires_at_offset_secs: expires_offset.as_secs(), |
| 965 | source: e.source, | ||
| 872 | } | 966 | } |
| 873 | }) | 967 | }) |
| 874 | .collect(); | 968 | .collect(); |
| @@ -891,6 +985,7 @@ impl Purgatory { | |||
| 891 | commit: e.commit.clone(), | 985 | commit: e.commit.clone(), |
| 892 | created_at_offset_secs: created_offset.as_secs(), | 986 | created_at_offset_secs: created_offset.as_secs(), |
| 893 | expires_at_offset_secs: expires_offset.as_secs(), | 987 | expires_at_offset_secs: expires_offset.as_secs(), |
| 988 | source: e.source, | ||
| 894 | }; | 989 | }; |
| 895 | pr_events.insert(event_id, serializable); | 990 | pr_events.insert(event_id, serializable); |
| 896 | } | 991 | } |
| @@ -992,6 +1087,7 @@ impl Purgatory { | |||
| 992 | author: e.author, | 1087 | author: e.author, |
| 993 | created_at, | 1088 | created_at, |
| 994 | expires_at, | 1089 | expires_at, |
| 1090 | source: e.source, | ||
| 995 | } | 1091 | } |
| 996 | }) | 1092 | }) |
| 997 | .collect(); | 1093 | .collect(); |
| @@ -1017,6 +1113,7 @@ impl Purgatory { | |||
| 1017 | commit: e.commit, | 1113 | commit: e.commit, |
| 1018 | created_at, | 1114 | created_at, |
| 1019 | expires_at, | 1115 | expires_at, |
| 1116 | source: e.source, | ||
| 1020 | }; | 1117 | }; |
| 1021 | 1118 | ||
| 1022 | self.pr_events.insert(event_id, entry); | 1119 | self.pr_events.insert(event_id, entry); |
| @@ -1074,8 +1171,8 @@ mod tests { | |||
| 1074 | .sign_with_keys(&keys) | 1171 | .sign_with_keys(&keys) |
| 1075 | .unwrap(); | 1172 | .unwrap(); |
| 1076 | 1173 | ||
| 1077 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); | 1174 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key(), false); |
| 1078 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); | 1175 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string(), false); |
| 1079 | 1176 | ||
| 1080 | let (state_count, pr_count) = purgatory.count(); | 1177 | let (state_count, pr_count) = purgatory.count(); |
| 1081 | assert_eq!(state_count, 1); | 1178 | assert_eq!(state_count, 1); |
| @@ -1126,7 +1223,7 @@ mod tests { | |||
| 1126 | let event = EventBuilder::text_note("state") | 1223 | let event = EventBuilder::text_note("state") |
| 1127 | .sign_with_keys(&keys) | 1224 | .sign_with_keys(&keys) |
| 1128 | .unwrap(); | 1225 | .unwrap(); |
| 1129 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | 1226 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false); |
| 1130 | 1227 | ||
| 1131 | // Now should have pending events | 1228 | // Now should have pending events |
| 1132 | assert!(purgatory.has_pending_events("test-repo")); | 1229 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1156,7 +1253,7 @@ mod tests { | |||
| 1156 | .sign_with_keys(&keys) | 1253 | .sign_with_keys(&keys) |
| 1157 | .unwrap(); | 1254 | .unwrap(); |
| 1158 | 1255 | ||
| 1159 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | 1256 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string(), false); |
| 1160 | 1257 | ||
| 1161 | // Now should have pending events for test-repo | 1258 | // Now should have pending events for test-repo |
| 1162 | assert!(purgatory.has_pending_events("test-repo")); | 1259 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1221,6 +1318,7 @@ fn test_pr_event_vs_placeholder() { | |||
| 1221 | event.clone(), | 1318 | event.clone(), |
| 1222 | "event-id-1".to_string(), | 1319 | "event-id-1".to_string(), |
| 1223 | "commit-abc".to_string(), | 1320 | "commit-abc".to_string(), |
| 1321 | false, | ||
| 1224 | ); | 1322 | ); |
| 1225 | 1323 | ||
| 1226 | // Add a placeholder (no event) | 1324 | // Add a placeholder (no event) |
| @@ -1277,8 +1375,9 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1277 | state_event.clone(), | 1375 | state_event.clone(), |
| 1278 | "test-repo".to_string(), | 1376 | "test-repo".to_string(), |
| 1279 | keys.public_key(), | 1377 | keys.public_key(), |
| 1378 | false, | ||
| 1280 | ); | 1379 | ); |
| 1281 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1380 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string(), false); |
| 1282 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1381 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1283 | 1382 | ||
| 1284 | // Verify entries are there | 1383 | // Verify entries are there |
| @@ -1325,8 +1424,8 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1325 | .unwrap(); | 1424 | .unwrap(); |
| 1326 | 1425 | ||
| 1327 | // Add fresh entries | 1426 | // Add fresh entries |
| 1328 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); | 1427 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key(), false); |
| 1329 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1428 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string(), false); |
| 1330 | 1429 | ||
| 1331 | // Run cleanup | 1430 | // Run cleanup |
| 1332 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1431 | let (state_removed, pr_removed) = purgatory.cleanup(); |
| @@ -1356,8 +1455,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1356 | .sign_with_keys(&keys) | 1455 | .sign_with_keys(&keys) |
| 1357 | .unwrap(); | 1456 | .unwrap(); |
| 1358 | 1457 | ||
| 1359 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); | 1458 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false); |
| 1360 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); | 1459 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false); |
| 1361 | 1460 | ||
| 1362 | // Expire only the first one | 1461 | // Expire only the first one |
| 1363 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { | 1462 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { |
| @@ -1374,8 +1473,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1374 | .sign_with_keys(&keys) | 1473 | .sign_with_keys(&keys) |
| 1375 | .unwrap(); | 1474 | .unwrap(); |
| 1376 | 1475 | ||
| 1377 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); | 1476 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false); |
| 1378 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); | 1477 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false); |
| 1379 | 1478 | ||
| 1380 | // Expire only first PR | 1479 | // Expire only first PR |
| 1381 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { | 1480 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { |
| @@ -1407,8 +1506,8 @@ fn test_remove_expired_legacy_method() { | |||
| 1407 | .unwrap(); | 1506 | .unwrap(); |
| 1408 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); | 1507 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); |
| 1409 | 1508 | ||
| 1410 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1509 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1411 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1510 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1412 | 1511 | ||
| 1413 | // Expire both | 1512 | // Expire both |
| 1414 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1513 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1442,8 +1541,8 @@ fn test_expired_event_tracking() { | |||
| 1442 | let pr_event_id = pr_event.id; | 1541 | let pr_event_id = pr_event.id; |
| 1443 | 1542 | ||
| 1444 | // Add events to purgatory | 1543 | // Add events to purgatory |
| 1445 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1544 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1446 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1545 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1447 | 1546 | ||
| 1448 | // Events should not be marked as expired yet | 1547 | // Events should not be marked as expired yet |
| 1449 | assert!(!purgatory.is_expired(&state_event_id)); | 1548 | assert!(!purgatory.is_expired(&state_event_id)); |
| @@ -1495,7 +1594,7 @@ fn test_cleanup_expired_events() { | |||
| 1495 | let event2_id = event2.id; | 1594 | let event2_id = event2.id; |
| 1496 | 1595 | ||
| 1497 | // Add and immediately expire event1 | 1596 | // Add and immediately expire event1 |
| 1498 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); | 1597 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false); |
| 1499 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { | 1598 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { |
| 1500 | for entry in entries.iter_mut() { | 1599 | for entry in entries.iter_mut() { |
| 1501 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1600 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1504,7 +1603,7 @@ fn test_cleanup_expired_events() { | |||
| 1504 | purgatory.cleanup(); | 1603 | purgatory.cleanup(); |
| 1505 | 1604 | ||
| 1506 | // Add and expire event2 (will be more recent) | 1605 | // Add and expire event2 (will be more recent) |
| 1507 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); | 1606 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false); |
| 1508 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { | 1607 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { |
| 1509 | for entry in entries.iter_mut() { | 1608 | for entry in entries.iter_mut() { |
| 1510 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1609 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1546,7 +1645,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1546 | let event_id = event.id; | 1645 | let event_id = event.id; |
| 1547 | 1646 | ||
| 1548 | // Add event to purgatory | 1647 | // Add event to purgatory |
| 1549 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1648 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1550 | 1649 | ||
| 1551 | // Expire it | 1650 | // Expire it |
| 1552 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1651 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1566,7 +1665,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1566 | // This simulates what negentropy/REQ+EOSE should do: | 1665 | // This simulates what negentropy/REQ+EOSE should do: |
| 1567 | // Check if event is in event_ids() before adding | 1666 | // Check if event is in event_ids() before adding |
| 1568 | if !ids.contains(&event_id) { | 1667 | if !ids.contains(&event_id) { |
| 1569 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1668 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1570 | } | 1669 | } |
| 1571 | 1670 | ||
| 1572 | // Event should NOT be re-added | 1671 | // Event should NOT be re-added |
| @@ -1609,7 +1708,7 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1609 | let event_id = event.id; | 1708 | let event_id = event.id; |
| 1610 | 1709 | ||
| 1611 | // Add event to purgatory | 1710 | // Add event to purgatory |
| 1612 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1711 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1613 | 1712 | ||
| 1614 | // Expire it | 1713 | // Expire it |
| 1615 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1714 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1658,8 +1757,8 @@ async fn test_save_and_restore_state_events() { | |||
| 1658 | let event1_id = event1.id; | 1757 | let event1_id = event1.id; |
| 1659 | let event2_id = event2.id; | 1758 | let event2_id = event2.id; |
| 1660 | 1759 | ||
| 1661 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | 1760 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key(), false); |
| 1662 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | 1761 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key(), false); |
| 1663 | 1762 | ||
| 1664 | // Save to disk | 1763 | // Save to disk |
| 1665 | purgatory.save_to_disk(&state_file).unwrap(); | 1764 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -1721,6 +1820,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 1721 | pr_event.clone(), | 1820 | pr_event.clone(), |
| 1722 | "pr-event-id".to_string(), | 1821 | "pr-event-id".to_string(), |
| 1723 | "commit-abc".to_string(), | 1822 | "commit-abc".to_string(), |
| 1823 | false, | ||
| 1724 | ); | 1824 | ); |
| 1725 | 1825 | ||
| 1726 | // Save to disk | 1826 | // Save to disk |
| @@ -1790,7 +1890,7 @@ async fn test_save_and_restore_expired_events() { | |||
| 1790 | let event_id = event.id; | 1890 | let event_id = event.id; |
| 1791 | 1891 | ||
| 1792 | // Add and expire event | 1892 | // Add and expire event |
| 1793 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1893 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1794 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1894 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| 1795 | for entry in entries.iter_mut() { | 1895 | for entry in entries.iter_mut() { |
| 1796 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1896 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1929,7 +2029,7 @@ async fn test_downtime_calculation() { | |||
| 1929 | .sign_with_keys(&keys) | 2029 | .sign_with_keys(&keys) |
| 1930 | .unwrap(); | 2030 | .unwrap(); |
| 1931 | 2031 | ||
| 1932 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2032 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1933 | 2033 | ||
| 1934 | // Get original expiry time | 2034 | // Get original expiry time |
| 1935 | let original_entries = purgatory.find_state("repo"); | 2035 | let original_entries = purgatory.find_state("repo"); |
| @@ -1985,7 +2085,7 @@ async fn test_expiry_times_preserved() { | |||
| 1985 | .sign_with_keys(&keys) | 2085 | .sign_with_keys(&keys) |
| 1986 | .unwrap(); | 2086 | .unwrap(); |
| 1987 | 2087 | ||
| 1988 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2088 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1989 | 2089 | ||
| 1990 | // Manually set expiry to a specific time in the future | 2090 | // Manually set expiry to a specific time in the future |
| 1991 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | 2091 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes |
| @@ -2044,16 +2144,19 @@ async fn test_multiple_state_events_same_identifier() { | |||
| 2044 | event1.clone(), | 2144 | event1.clone(), |
| 2045 | "shared-repo".to_string(), | 2145 | "shared-repo".to_string(), |
| 2046 | keys1.public_key(), | 2146 | keys1.public_key(), |
| 2147 | false, | ||
| 2047 | ); | 2148 | ); |
| 2048 | purgatory.add_state( | 2149 | purgatory.add_state( |
| 2049 | event2.clone(), | 2150 | event2.clone(), |
| 2050 | "shared-repo".to_string(), | 2151 | "shared-repo".to_string(), |
| 2051 | keys2.public_key(), | 2152 | keys2.public_key(), |
| 2153 | false, | ||
| 2052 | ); | 2154 | ); |
| 2053 | purgatory.add_state( | 2155 | purgatory.add_state( |
| 2054 | event3.clone(), | 2156 | event3.clone(), |
| 2055 | "shared-repo".to_string(), | 2157 | "shared-repo".to_string(), |
| 2056 | keys3.public_key(), | 2158 | keys3.public_key(), |
| 2159 | false, | ||
| 2057 | ); | 2160 | ); |
| 2058 | 2161 | ||
| 2059 | // Save to disk | 2162 | // Save to disk |
| @@ -2100,6 +2203,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2100 | pr_event.clone(), | 2203 | pr_event.clone(), |
| 2101 | "pr-with-event".to_string(), | 2204 | "pr-with-event".to_string(), |
| 2102 | "commit-abc".to_string(), | 2205 | "commit-abc".to_string(), |
| 2206 | false, | ||
| 2103 | ); | 2207 | ); |
| 2104 | 2208 | ||
| 2105 | // Add PR placeholder | 2209 | // Add PR placeholder |
| @@ -2145,7 +2249,7 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2145 | let event = EventBuilder::text_note("test") | 2249 | let event = EventBuilder::text_note("test") |
| 2146 | .sign_with_keys(&keys) | 2250 | .sign_with_keys(&keys) |
| 2147 | .unwrap(); | 2251 | .unwrap(); |
| 2148 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2252 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2149 | 2253 | ||
| 2150 | // Save to disk | 2254 | // Save to disk |
| 2151 | purgatory.save_to_disk(&state_file).unwrap(); | 2255 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2179,8 +2283,8 @@ async fn test_comprehensive_roundtrip() { | |||
| 2179 | .sign_with_keys(&keys2) | 2283 | .sign_with_keys(&keys2) |
| 2180 | .unwrap(); | 2284 | .unwrap(); |
| 2181 | 2285 | ||
| 2182 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | 2286 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key(), false); |
| 2183 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | 2287 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key(), false); |
| 2184 | 2288 | ||
| 2185 | // Add PR event | 2289 | // Add PR event |
| 2186 | let tags = vec![Tag::custom( | 2290 | let tags = vec![Tag::custom( |
| @@ -2191,7 +2295,7 @@ async fn test_comprehensive_roundtrip() { | |||
| 2191 | .tags(tags) | 2295 | .tags(tags) |
| 2192 | .sign_with_keys(&keys1) | 2296 | .sign_with_keys(&keys1) |
| 2193 | .unwrap(); | 2297 | .unwrap(); |
| 2194 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | 2298 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string(), false); |
| 2195 | 2299 | ||
| 2196 | // Add PR placeholder | 2300 | // Add PR placeholder |
| 2197 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | 2301 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); |
| @@ -2201,7 +2305,7 @@ async fn test_comprehensive_roundtrip() { | |||
| 2201 | .sign_with_keys(&keys1) | 2305 | .sign_with_keys(&keys1) |
| 2202 | .unwrap(); | 2306 | .unwrap(); |
| 2203 | let expired_id = expired_event.id; | 2307 | let expired_id = expired_event.id; |
| 2204 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | 2308 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key(), false); |
| 2205 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | 2309 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { |
| 2206 | for entry in entries.iter_mut() { | 2310 | for entry in entries.iter_mut() { |
| 2207 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2311 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 919504b..e37a3e1 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -8,6 +8,28 @@ use nostr_sdk::prelude::*; | |||
| 8 | use serde::{Deserialize, Serialize}; | 8 | use serde::{Deserialize, Serialize}; |
| 9 | use std::time::Instant; | 9 | use std::time::Instant; |
| 10 | 10 | ||
| 11 | /// Source of an event entering purgatory. | ||
| 12 | /// | ||
| 13 | /// Tracks whether an event was submitted directly by a user or fetched via | ||
| 14 | /// proactive sync from another relay. This distinction is used for: | ||
| 15 | /// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG | ||
| 16 | /// - Operational monitoring: Helps identify user-facing issues vs sync noise | ||
| 17 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] | ||
| 18 | pub enum EventSource { | ||
| 19 | /// Event was published directly to this relay by a user | ||
| 20 | #[default] | ||
| 21 | Direct, | ||
| 22 | /// Event was fetched via proactive sync from another relay | ||
| 23 | Sync, | ||
| 24 | } | ||
| 25 | |||
| 26 | impl EventSource { | ||
| 27 | /// Returns true if this is a direct submission (not synced) | ||
| 28 | pub fn is_direct(&self) -> bool { | ||
| 29 | matches!(self, EventSource::Direct) | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 11 | /// Default value for Instant fields during deserialization | 33 | /// Default value for Instant fields during deserialization |
| 12 | fn instant_now() -> Instant { | 34 | fn instant_now() -> Instant { |
| 13 | Instant::now() | 35 | Instant::now() |
| @@ -86,6 +108,10 @@ pub struct StatePurgatoryEntry { | |||
| 86 | /// Expiry deadline (30 min from creation, may be extended) | 108 | /// Expiry deadline (30 min from creation, may be extended) |
| 87 | #[serde(skip, default = "instant_now")] | 109 | #[serde(skip, default = "instant_now")] |
| 88 | pub expires_at: Instant, | 110 | pub expires_at: Instant, |
| 111 | |||
| 112 | /// Source of this event (direct submission vs sync) | ||
| 113 | #[serde(default)] | ||
| 114 | pub source: EventSource, | ||
| 89 | } | 115 | } |
| 90 | 116 | ||
| 91 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. | 117 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. |
| @@ -112,4 +138,8 @@ pub struct PrPurgatoryEntry { | |||
| 112 | /// Expiry deadline (30 min from creation, may be extended) | 138 | /// Expiry deadline (30 min from creation, may be extended) |
| 113 | #[serde(skip, default = "instant_now")] | 139 | #[serde(skip, default = "instant_now")] |
| 114 | pub expires_at: Instant, | 140 | pub expires_at: Instant, |
| 141 | |||
| 142 | /// Source of this event (direct submission vs sync) | ||
| 143 | #[serde(default)] | ||
| 144 | pub source: EventSource, | ||
| 115 | } | 145 | } |