diff options
Diffstat (limited to 'src/purgatory/mod.rs')
| -rw-r--r-- | src/purgatory/mod.rs | 339 |
1 files changed, 289 insertions, 50 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 47798a6..8094450 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -17,10 +17,11 @@ pub mod sync; | |||
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use nostr_sdk::ToBech32; | ||
| 24 | use serde::{Deserialize, Serialize}; | 25 | use serde::{Deserialize, Serialize}; |
| 25 | use std::collections::HashMap; | 26 | use std::collections::HashMap; |
| 26 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| @@ -57,6 +58,9 @@ struct SerializableStatePurgatoryEntry { | |||
| 57 | created_at_offset_secs: u64, | 58 | created_at_offset_secs: u64, |
| 58 | /// Duration offset from saved_at for expires_at | 59 | /// Duration offset from saved_at for expires_at |
| 59 | expires_at_offset_secs: u64, | 60 | expires_at_offset_secs: u64, |
| 61 | /// Source of this event (direct submission vs sync) | ||
| 62 | #[serde(default)] | ||
| 63 | source: types::EventSource, | ||
| 60 | } | 64 | } |
| 61 | 65 | ||
| 62 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | 66 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. |
| @@ -74,6 +78,9 @@ struct SerializablePrPurgatoryEntry { | |||
| 74 | created_at_offset_secs: u64, | 78 | created_at_offset_secs: u64, |
| 75 | /// Duration offset from saved_at for expires_at | 79 | /// Duration offset from saved_at for expires_at |
| 76 | expires_at_offset_secs: u64, | 80 | expires_at_offset_secs: u64, |
| 81 | /// Source of this event (direct submission vs sync) | ||
| 82 | #[serde(default)] | ||
| 83 | source: types::EventSource, | ||
| 77 | } | 84 | } |
| 78 | 85 | ||
| 79 | /// Serializable purgatory state for disk persistence. | 86 | /// Serializable purgatory state for disk persistence. |
| @@ -270,11 +277,38 @@ impl Purgatory { | |||
| 270 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately | 277 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately |
| 271 | /// to override this delay. | 278 | /// to override this delay. |
| 272 | /// | 279 | /// |
| 280 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 281 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 282 | /// | ||
| 273 | /// # Arguments | 283 | /// # Arguments |
| 274 | /// * `event` - The state event (kind 30618) to hold | 284 | /// * `event` - The state event (kind 30618) to hold |
| 275 | /// * `identifier` - The repository identifier from the 'd' tag | 285 | /// * `identifier` - The repository identifier from the 'd' tag |
| 276 | /// * `author` - The event author's public key | 286 | /// * `author` - The event author's public key |
| 277 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 287 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 288 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) { | ||
| 289 | let source = if from_sync { | ||
| 290 | types::EventSource::Sync | ||
| 291 | } else { | ||
| 292 | types::EventSource::Direct | ||
| 293 | }; | ||
| 294 | |||
| 295 | // Check if event already exists - if so, potentially upgrade source | ||
| 296 | if let Some(mut entries) = self.state_events.get_mut(&identifier) { | ||
| 297 | if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) { | ||
| 298 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 299 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 300 | existing.source = types::EventSource::Direct; | ||
| 301 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 302 | tracing::debug!( | ||
| 303 | event_id = %event.id, | ||
| 304 | identifier = %identifier, | ||
| 305 | "Upgraded purgatory entry source from Sync to Direct, reset expiry" | ||
| 306 | ); | ||
| 307 | } | ||
| 308 | return; // Event already exists, don't add duplicate | ||
| 309 | } | ||
| 310 | } | ||
| 311 | |||
| 278 | let now = Instant::now(); | 312 | let now = Instant::now(); |
| 279 | let entry = StatePurgatoryEntry { | 313 | let entry = StatePurgatoryEntry { |
| 280 | event, | 314 | event, |
| @@ -282,6 +316,7 @@ impl Purgatory { | |||
| 282 | author, | 316 | author, |
| 283 | created_at: now, | 317 | created_at: now, |
| 284 | expires_at: now + DEFAULT_EXPIRY, | 318 | expires_at: now + DEFAULT_EXPIRY, |
| 319 | source, | ||
| 285 | }; | 320 | }; |
| 286 | 321 | ||
| 287 | self.state_events | 322 | self.state_events |
| @@ -301,11 +336,35 @@ impl Purgatory { | |||
| 301 | /// Automatically enqueues the referenced repository identifier for background sync | 336 | /// Automatically enqueues the referenced repository identifier for background sync |
| 302 | /// with the default delay (3 minutes), giving time for a git push to arrive. | 337 | /// with the default delay (3 minutes), giving time for a git push to arrive. |
| 303 | /// | 338 | /// |
| 339 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 340 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 341 | /// | ||
| 304 | /// # Arguments | 342 | /// # Arguments |
| 305 | /// * `event` - The PR event (kind 1617/1618) to hold | 343 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 306 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 344 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 307 | /// * `commit` - The commit SHA from the 'c' tag | 345 | /// * `commit` - The commit SHA from the 'c' tag |
| 308 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 346 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 347 | pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) { | ||
| 348 | let source = if from_sync { | ||
| 349 | types::EventSource::Sync | ||
| 350 | } else { | ||
| 351 | types::EventSource::Direct | ||
| 352 | }; | ||
| 353 | |||
| 354 | // Check if event already exists - if so, potentially upgrade source | ||
| 355 | if let Some(mut existing) = self.pr_events.get_mut(&event_id) { | ||
| 356 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 357 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 358 | existing.source = types::EventSource::Direct; | ||
| 359 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 360 | tracing::debug!( | ||
| 361 | event_id = %event_id, | ||
| 362 | "Upgraded PR purgatory entry source from Sync to Direct, reset expiry" | ||
| 363 | ); | ||
| 364 | } | ||
| 365 | return; // Event already exists, don't add duplicate | ||
| 366 | } | ||
| 367 | |||
| 309 | // Extract identifier from the event's `a` tag for sync enqueueing | 368 | // Extract identifier from the event's `a` tag for sync enqueueing |
| 310 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | 369 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); |
| 311 | 370 | ||
| @@ -315,6 +374,7 @@ impl Purgatory { | |||
| 315 | commit, | 374 | commit, |
| 316 | created_at: now, | 375 | created_at: now, |
| 317 | expires_at: now + DEFAULT_EXPIRY, | 376 | expires_at: now + DEFAULT_EXPIRY, |
| 377 | source, | ||
| 318 | }; | 378 | }; |
| 319 | 379 | ||
| 320 | self.pr_events.insert(event_id, entry); | 380 | self.pr_events.insert(event_id, entry); |
| @@ -328,6 +388,8 @@ impl Purgatory { | |||
| 328 | /// Add a PR placeholder (git data arrived before PR event). | 388 | /// Add a PR placeholder (git data arrived before PR event). |
| 329 | /// | 389 | /// |
| 330 | /// Creates a placeholder entry waiting for the corresponding PR event. | 390 | /// Creates a placeholder entry waiting for the corresponding PR event. |
| 391 | /// Placeholders are always marked as `Direct` source since they originate | ||
| 392 | /// from git pushes (direct user action). | ||
| 331 | /// | 393 | /// |
| 332 | /// # Arguments | 394 | /// # Arguments |
| 333 | /// * `event_id` - The expected event ID (from git ref name) | 395 | /// * `event_id` - The expected event ID (from git ref name) |
| @@ -339,6 +401,7 @@ impl Purgatory { | |||
| 339 | commit, | 401 | commit, |
| 340 | created_at: now, | 402 | created_at: now, |
| 341 | expires_at: now + DEFAULT_EXPIRY, | 403 | expires_at: now + DEFAULT_EXPIRY, |
| 404 | source: types::EventSource::Direct, // Git pushes are direct user actions | ||
| 342 | }; | 405 | }; |
| 343 | 406 | ||
| 344 | self.pr_events.insert(event_id, entry); | 407 | self.pr_events.insert(event_id, entry); |
| @@ -608,6 +671,9 @@ impl Purgatory { | |||
| 608 | /// prevent infinite re-sync loops. Events that expire without finding git data | 671 | /// prevent infinite re-sync loops. Events that expire without finding git data |
| 609 | /// will be filtered out during future negentropy/REQ sync operations. | 672 | /// will be filtered out during future negentropy/REQ sync operations. |
| 610 | /// | 673 | /// |
| 674 | /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event | ||
| 675 | /// to support migration scripts and operational monitoring. | ||
| 676 | /// | ||
| 611 | /// # Returns | 677 | /// # Returns |
| 612 | /// Tuple of (num_state_removed, num_pr_removed) | 678 | /// Tuple of (num_state_removed, num_pr_removed) |
| 613 | pub fn cleanup(&self) -> (usize, usize) { | 679 | pub fn cleanup(&self) -> (usize, usize) { |
| @@ -615,18 +681,38 @@ impl Purgatory { | |||
| 615 | let mut state_removed = 0; | 681 | let mut state_removed = 0; |
| 616 | 682 | ||
| 617 | // Remove expired state events and mark them as expired | 683 | // Remove expired state events and mark them as expired |
| 618 | self.state_events.retain(|_, entries| { | 684 | self.state_events.retain(|identifier, entries| { |
| 619 | let original_len = entries.len(); | 685 | let original_len = entries.len(); |
| 620 | // Collect event IDs before removing | ||
| 621 | let expired_ids: Vec<EventId> = entries | ||
| 622 | .iter() | ||
| 623 | .filter(|entry| entry.expires_at <= now) | ||
| 624 | .map(|entry| entry.event.id) | ||
| 625 | .collect(); | ||
| 626 | 686 | ||
| 627 | // Mark as expired to prevent re-sync | 687 | // Log and collect expired entries before removing |
| 628 | for event_id in expired_ids { | 688 | for entry in entries.iter().filter(|e| e.expires_at <= now) { |
| 629 | self.mark_expired(event_id); | 689 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); |
| 690 | let event_id_short = &entry.event.id.to_hex()[..12]; | ||
| 691 | let source_str = if entry.source.is_direct() { "direct" } else { "sync" }; | ||
| 692 | |||
| 693 | // Structured log for migration scripts | ||
| 694 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 695 | if entry.source.is_direct() { | ||
| 696 | tracing::warn!( | ||
| 697 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 698 | identifier, | ||
| 699 | npub, | ||
| 700 | event_id_short, | ||
| 701 | entry.event.kind.as_u16(), | ||
| 702 | source_str | ||
| 703 | ); | ||
| 704 | } else { | ||
| 705 | tracing::debug!( | ||
| 706 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 707 | identifier, | ||
| 708 | npub, | ||
| 709 | event_id_short, | ||
| 710 | entry.event.kind.as_u16(), | ||
| 711 | source_str | ||
| 712 | ); | ||
| 713 | } | ||
| 714 | |||
| 715 | self.mark_expired(entry.event.id); | ||
| 630 | } | 716 | } |
| 631 | 717 | ||
| 632 | // Remove expired entries | 718 | // Remove expired entries |
| @@ -636,21 +722,103 @@ impl Purgatory { | |||
| 636 | }); | 722 | }); |
| 637 | 723 | ||
| 638 | // Remove expired PR events and mark them as expired | 724 | // Remove expired PR events and mark them as expired |
| 639 | let expired_prs: Vec<(String, Option<EventId>)> = self | 725 | let expired_prs: Vec<_> = self |
| 640 | .pr_events | 726 | .pr_events |
| 641 | .iter() | 727 | .iter() |
| 642 | .filter(|entry| entry.value().expires_at <= now) | 728 | .filter(|entry| entry.value().expires_at <= now) |
| 643 | .map(|entry| { | 729 | .map(|entry| { |
| 644 | let event_id = entry.value().event.as_ref().map(|e| e.id); | 730 | let pr_entry = entry.value(); |
| 645 | (entry.key().clone(), event_id) | 731 | let event_id_str = entry.key().clone(); |
| 732 | let event_opt = pr_entry.event.clone(); | ||
| 733 | let commit = pr_entry.commit.clone(); | ||
| 734 | let source = pr_entry.source; | ||
| 735 | (event_id_str, event_opt, commit, source) | ||
| 646 | }) | 736 | }) |
| 647 | .collect(); | 737 | .collect(); |
| 648 | 738 | ||
| 649 | let pr_removed = expired_prs.len(); | 739 | let pr_removed = expired_prs.len(); |
| 650 | for (event_id_str, event_id_opt) in expired_prs { | 740 | for (event_id_str, event_opt, commit, source) in expired_prs { |
| 651 | // Mark actual PR events as expired (not placeholders) | 741 | // Log structured entry for PR events (not placeholders) |
| 652 | if let Some(event_id) = event_id_opt { | 742 | if let Some(ref event) = event_opt { |
| 653 | self.mark_expired(event_id); | 743 | let npub = event |
| 744 | .pubkey | ||
| 745 | .to_bech32() | ||
| 746 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 747 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 748 | let source_str = if source.is_direct() { "direct" } else { "sync" }; | ||
| 749 | |||
| 750 | // Extract ALL repo identifiers from 'a' tags | ||
| 751 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 752 | let repos: Vec<String> = event | ||
| 753 | .tags | ||
| 754 | .iter() | ||
| 755 | .filter_map(|tag| { | ||
| 756 | let tag_vec = tag.clone().to_vec(); | ||
| 757 | if tag_vec.len() >= 2 | ||
| 758 | && tag_vec[0] == "a" | ||
| 759 | && tag_vec[1].starts_with("30617:") | ||
| 760 | { | ||
| 761 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 762 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 763 | if parts.len() >= 3 { | ||
| 764 | Some(parts[2].to_string()) | ||
| 765 | } else { | ||
| 766 | None | ||
| 767 | } | ||
| 768 | } else { | ||
| 769 | None | ||
| 770 | } | ||
| 771 | }) | ||
| 772 | .collect(); | ||
| 773 | |||
| 774 | // Deduplicate while preserving order | ||
| 775 | let mut seen = std::collections::HashSet::new(); | ||
| 776 | let unique_repos: Vec<String> = repos | ||
| 777 | .into_iter() | ||
| 778 | .filter(|r| seen.insert(r.clone())) | ||
| 779 | .collect(); | ||
| 780 | |||
| 781 | let repos_to_log = if unique_repos.is_empty() { | ||
| 782 | vec!["unknown".to_string()] | ||
| 783 | } else { | ||
| 784 | unique_repos | ||
| 785 | }; | ||
| 786 | |||
| 787 | // Structured log for migration scripts - log once per repo | ||
| 788 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 789 | for repo in &repos_to_log { | ||
| 790 | if source.is_direct() { | ||
| 791 | tracing::warn!( | ||
| 792 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 793 | repo, | ||
| 794 | npub, | ||
| 795 | event_id_short, | ||
| 796 | event.kind.as_u16(), | ||
| 797 | &commit[..commit.len().min(12)], | ||
| 798 | source_str | ||
| 799 | ); | ||
| 800 | } else { | ||
| 801 | tracing::debug!( | ||
| 802 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 803 | repo, | ||
| 804 | npub, | ||
| 805 | event_id_short, | ||
| 806 | event.kind.as_u16(), | ||
| 807 | &commit[..commit.len().min(12)], | ||
| 808 | source_str | ||
| 809 | ); | ||
| 810 | } | ||
| 811 | } | ||
| 812 | |||
| 813 | self.mark_expired(event.id); | ||
| 814 | } else { | ||
| 815 | // Placeholder (git data arrived first, but PR event never came) | ||
| 816 | // Placeholders are always Direct source (from git push) | ||
| 817 | tracing::debug!( | ||
| 818 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"", | ||
| 819 | &event_id_str[..event_id_str.len().min(12)], | ||
| 820 | &commit[..commit.len().min(12)] | ||
| 821 | ); | ||
| 654 | } | 822 | } |
| 655 | self.pr_events.remove(&event_id_str); | 823 | self.pr_events.remove(&event_id_str); |
| 656 | } | 824 | } |
| @@ -800,6 +968,7 @@ impl Purgatory { | |||
| 800 | author: e.author, | 968 | author: e.author, |
| 801 | created_at_offset_secs: created_offset.as_secs(), | 969 | created_at_offset_secs: created_offset.as_secs(), |
| 802 | expires_at_offset_secs: expires_offset.as_secs(), | 970 | expires_at_offset_secs: expires_offset.as_secs(), |
| 971 | source: e.source, | ||
| 803 | } | 972 | } |
| 804 | }) | 973 | }) |
| 805 | .collect(); | 974 | .collect(); |
| @@ -822,6 +991,7 @@ impl Purgatory { | |||
| 822 | commit: e.commit.clone(), | 991 | commit: e.commit.clone(), |
| 823 | created_at_offset_secs: created_offset.as_secs(), | 992 | created_at_offset_secs: created_offset.as_secs(), |
| 824 | expires_at_offset_secs: expires_offset.as_secs(), | 993 | expires_at_offset_secs: expires_offset.as_secs(), |
| 994 | source: e.source, | ||
| 825 | }; | 995 | }; |
| 826 | pr_events.insert(event_id, serializable); | 996 | pr_events.insert(event_id, serializable); |
| 827 | } | 997 | } |
| @@ -923,6 +1093,7 @@ impl Purgatory { | |||
| 923 | author: e.author, | 1093 | author: e.author, |
| 924 | created_at, | 1094 | created_at, |
| 925 | expires_at, | 1095 | expires_at, |
| 1096 | source: e.source, | ||
| 926 | } | 1097 | } |
| 927 | }) | 1098 | }) |
| 928 | .collect(); | 1099 | .collect(); |
| @@ -948,6 +1119,7 @@ impl Purgatory { | |||
| 948 | commit: e.commit, | 1119 | commit: e.commit, |
| 949 | created_at, | 1120 | created_at, |
| 950 | expires_at, | 1121 | expires_at, |
| 1122 | source: e.source, | ||
| 951 | }; | 1123 | }; |
| 952 | 1124 | ||
| 953 | self.pr_events.insert(event_id, entry); | 1125 | self.pr_events.insert(event_id, entry); |
| @@ -1005,8 +1177,18 @@ mod tests { | |||
| 1005 | .sign_with_keys(&keys) | 1177 | .sign_with_keys(&keys) |
| 1006 | .unwrap(); | 1178 | .unwrap(); |
| 1007 | 1179 | ||
| 1008 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); | 1180 | purgatory.add_state( |
| 1009 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); | 1181 | event.clone(), |
| 1182 | "test-repo".to_string(), | ||
| 1183 | keys.public_key(), | ||
| 1184 | false, | ||
| 1185 | ); | ||
| 1186 | purgatory.add_pr( | ||
| 1187 | event, | ||
| 1188 | "test-event-id".to_string(), | ||
| 1189 | "abc123".to_string(), | ||
| 1190 | false, | ||
| 1191 | ); | ||
| 1010 | 1192 | ||
| 1011 | let (state_count, pr_count) = purgatory.count(); | 1193 | let (state_count, pr_count) = purgatory.count(); |
| 1012 | assert_eq!(state_count, 1); | 1194 | assert_eq!(state_count, 1); |
| @@ -1057,7 +1239,7 @@ mod tests { | |||
| 1057 | let event = EventBuilder::text_note("state") | 1239 | let event = EventBuilder::text_note("state") |
| 1058 | .sign_with_keys(&keys) | 1240 | .sign_with_keys(&keys) |
| 1059 | .unwrap(); | 1241 | .unwrap(); |
| 1060 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | 1242 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false); |
| 1061 | 1243 | ||
| 1062 | // Now should have pending events | 1244 | // Now should have pending events |
| 1063 | assert!(purgatory.has_pending_events("test-repo")); | 1245 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1087,7 +1269,12 @@ mod tests { | |||
| 1087 | .sign_with_keys(&keys) | 1269 | .sign_with_keys(&keys) |
| 1088 | .unwrap(); | 1270 | .unwrap(); |
| 1089 | 1271 | ||
| 1090 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | 1272 | purgatory.add_pr( |
| 1273 | event, | ||
| 1274 | "pr-event-id".to_string(), | ||
| 1275 | "commit123".to_string(), | ||
| 1276 | false, | ||
| 1277 | ); | ||
| 1091 | 1278 | ||
| 1092 | // Now should have pending events for test-repo | 1279 | // Now should have pending events for test-repo |
| 1093 | assert!(purgatory.has_pending_events("test-repo")); | 1280 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1152,6 +1339,7 @@ fn test_pr_event_vs_placeholder() { | |||
| 1152 | event.clone(), | 1339 | event.clone(), |
| 1153 | "event-id-1".to_string(), | 1340 | "event-id-1".to_string(), |
| 1154 | "commit-abc".to_string(), | 1341 | "commit-abc".to_string(), |
| 1342 | false, | ||
| 1155 | ); | 1343 | ); |
| 1156 | 1344 | ||
| 1157 | // Add a placeholder (no event) | 1345 | // Add a placeholder (no event) |
| @@ -1208,8 +1396,14 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1208 | state_event.clone(), | 1396 | state_event.clone(), |
| 1209 | "test-repo".to_string(), | 1397 | "test-repo".to_string(), |
| 1210 | keys.public_key(), | 1398 | keys.public_key(), |
| 1399 | false, | ||
| 1400 | ); | ||
| 1401 | purgatory.add_pr( | ||
| 1402 | pr_event, | ||
| 1403 | "pr-123".to_string(), | ||
| 1404 | "commit-abc".to_string(), | ||
| 1405 | false, | ||
| 1211 | ); | 1406 | ); |
| 1212 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | ||
| 1213 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1407 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1214 | 1408 | ||
| 1215 | // Verify entries are there | 1409 | // Verify entries are there |
| @@ -1256,8 +1450,18 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1256 | .unwrap(); | 1450 | .unwrap(); |
| 1257 | 1451 | ||
| 1258 | // Add fresh entries | 1452 | // Add fresh entries |
| 1259 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); | 1453 | purgatory.add_state( |
| 1260 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1454 | state_event, |
| 1455 | "test-repo".to_string(), | ||
| 1456 | keys.public_key(), | ||
| 1457 | false, | ||
| 1458 | ); | ||
| 1459 | purgatory.add_pr( | ||
| 1460 | pr_event, | ||
| 1461 | "pr-123".to_string(), | ||
| 1462 | "commit-abc".to_string(), | ||
| 1463 | false, | ||
| 1464 | ); | ||
| 1261 | 1465 | ||
| 1262 | // Run cleanup | 1466 | // Run cleanup |
| 1263 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1467 | let (state_removed, pr_removed) = purgatory.cleanup(); |
| @@ -1287,8 +1491,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1287 | .sign_with_keys(&keys) | 1491 | .sign_with_keys(&keys) |
| 1288 | .unwrap(); | 1492 | .unwrap(); |
| 1289 | 1493 | ||
| 1290 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); | 1494 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false); |
| 1291 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); | 1495 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false); |
| 1292 | 1496 | ||
| 1293 | // Expire only the first one | 1497 | // Expire only the first one |
| 1294 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { | 1498 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { |
| @@ -1305,8 +1509,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1305 | .sign_with_keys(&keys) | 1509 | .sign_with_keys(&keys) |
| 1306 | .unwrap(); | 1510 | .unwrap(); |
| 1307 | 1511 | ||
| 1308 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); | 1512 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false); |
| 1309 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); | 1513 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false); |
| 1310 | 1514 | ||
| 1311 | // Expire only first PR | 1515 | // Expire only first PR |
| 1312 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { | 1516 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { |
| @@ -1338,8 +1542,8 @@ fn test_remove_expired_legacy_method() { | |||
| 1338 | .unwrap(); | 1542 | .unwrap(); |
| 1339 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); | 1543 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); |
| 1340 | 1544 | ||
| 1341 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1545 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1342 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1546 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1343 | 1547 | ||
| 1344 | // Expire both | 1548 | // Expire both |
| 1345 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1549 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1373,8 +1577,8 @@ fn test_expired_event_tracking() { | |||
| 1373 | let pr_event_id = pr_event.id; | 1577 | let pr_event_id = pr_event.id; |
| 1374 | 1578 | ||
| 1375 | // Add events to purgatory | 1579 | // Add events to purgatory |
| 1376 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1580 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1377 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1581 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1378 | 1582 | ||
| 1379 | // Events should not be marked as expired yet | 1583 | // Events should not be marked as expired yet |
| 1380 | assert!(!purgatory.is_expired(&state_event_id)); | 1584 | assert!(!purgatory.is_expired(&state_event_id)); |
| @@ -1426,7 +1630,7 @@ fn test_cleanup_expired_events() { | |||
| 1426 | let event2_id = event2.id; | 1630 | let event2_id = event2.id; |
| 1427 | 1631 | ||
| 1428 | // Add and immediately expire event1 | 1632 | // Add and immediately expire event1 |
| 1429 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); | 1633 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false); |
| 1430 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { | 1634 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { |
| 1431 | for entry in entries.iter_mut() { | 1635 | for entry in entries.iter_mut() { |
| 1432 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1636 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1435,7 +1639,7 @@ fn test_cleanup_expired_events() { | |||
| 1435 | purgatory.cleanup(); | 1639 | purgatory.cleanup(); |
| 1436 | 1640 | ||
| 1437 | // Add and expire event2 (will be more recent) | 1641 | // Add and expire event2 (will be more recent) |
| 1438 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); | 1642 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false); |
| 1439 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { | 1643 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { |
| 1440 | for entry in entries.iter_mut() { | 1644 | for entry in entries.iter_mut() { |
| 1441 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1645 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1477,7 +1681,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1477 | let event_id = event.id; | 1681 | let event_id = event.id; |
| 1478 | 1682 | ||
| 1479 | // Add event to purgatory | 1683 | // Add event to purgatory |
| 1480 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1684 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1481 | 1685 | ||
| 1482 | // Expire it | 1686 | // Expire it |
| 1483 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1687 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1497,7 +1701,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1497 | // This simulates what negentropy/REQ+EOSE should do: | 1701 | // This simulates what negentropy/REQ+EOSE should do: |
| 1498 | // Check if event is in event_ids() before adding | 1702 | // Check if event is in event_ids() before adding |
| 1499 | if !ids.contains(&event_id) { | 1703 | if !ids.contains(&event_id) { |
| 1500 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1704 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1501 | } | 1705 | } |
| 1502 | 1706 | ||
| 1503 | // Event should NOT be re-added | 1707 | // Event should NOT be re-added |
| @@ -1540,7 +1744,7 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1540 | let event_id = event.id; | 1744 | let event_id = event.id; |
| 1541 | 1745 | ||
| 1542 | // Add event to purgatory | 1746 | // Add event to purgatory |
| 1543 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1747 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1544 | 1748 | ||
| 1545 | // Expire it | 1749 | // Expire it |
| 1546 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1750 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1589,8 +1793,18 @@ async fn test_save_and_restore_state_events() { | |||
| 1589 | let event1_id = event1.id; | 1793 | let event1_id = event1.id; |
| 1590 | let event2_id = event2.id; | 1794 | let event2_id = event2.id; |
| 1591 | 1795 | ||
| 1592 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | 1796 | purgatory.add_state( |
| 1593 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | 1797 | event1.clone(), |
| 1798 | "test-repo".to_string(), | ||
| 1799 | keys.public_key(), | ||
| 1800 | false, | ||
| 1801 | ); | ||
| 1802 | purgatory.add_state( | ||
| 1803 | event2.clone(), | ||
| 1804 | "test-repo".to_string(), | ||
| 1805 | keys.public_key(), | ||
| 1806 | false, | ||
| 1807 | ); | ||
| 1594 | 1808 | ||
| 1595 | // Save to disk | 1809 | // Save to disk |
| 1596 | purgatory.save_to_disk(&state_file).unwrap(); | 1810 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -1652,6 +1866,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 1652 | pr_event.clone(), | 1866 | pr_event.clone(), |
| 1653 | "pr-event-id".to_string(), | 1867 | "pr-event-id".to_string(), |
| 1654 | "commit-abc".to_string(), | 1868 | "commit-abc".to_string(), |
| 1869 | false, | ||
| 1655 | ); | 1870 | ); |
| 1656 | 1871 | ||
| 1657 | // Save to disk | 1872 | // Save to disk |
| @@ -1721,7 +1936,7 @@ async fn test_save_and_restore_expired_events() { | |||
| 1721 | let event_id = event.id; | 1936 | let event_id = event.id; |
| 1722 | 1937 | ||
| 1723 | // Add and expire event | 1938 | // Add and expire event |
| 1724 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1939 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1725 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1940 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| 1726 | for entry in entries.iter_mut() { | 1941 | for entry in entries.iter_mut() { |
| 1727 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1942 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1860,7 +2075,7 @@ async fn test_downtime_calculation() { | |||
| 1860 | .sign_with_keys(&keys) | 2075 | .sign_with_keys(&keys) |
| 1861 | .unwrap(); | 2076 | .unwrap(); |
| 1862 | 2077 | ||
| 1863 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2078 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1864 | 2079 | ||
| 1865 | // Get original expiry time | 2080 | // Get original expiry time |
| 1866 | let original_entries = purgatory.find_state("repo"); | 2081 | let original_entries = purgatory.find_state("repo"); |
| @@ -1916,7 +2131,7 @@ async fn test_expiry_times_preserved() { | |||
| 1916 | .sign_with_keys(&keys) | 2131 | .sign_with_keys(&keys) |
| 1917 | .unwrap(); | 2132 | .unwrap(); |
| 1918 | 2133 | ||
| 1919 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2134 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1920 | 2135 | ||
| 1921 | // Manually set expiry to a specific time in the future | 2136 | // Manually set expiry to a specific time in the future |
| 1922 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | 2137 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes |
| @@ -1975,16 +2190,19 @@ async fn test_multiple_state_events_same_identifier() { | |||
| 1975 | event1.clone(), | 2190 | event1.clone(), |
| 1976 | "shared-repo".to_string(), | 2191 | "shared-repo".to_string(), |
| 1977 | keys1.public_key(), | 2192 | keys1.public_key(), |
| 2193 | false, | ||
| 1978 | ); | 2194 | ); |
| 1979 | purgatory.add_state( | 2195 | purgatory.add_state( |
| 1980 | event2.clone(), | 2196 | event2.clone(), |
| 1981 | "shared-repo".to_string(), | 2197 | "shared-repo".to_string(), |
| 1982 | keys2.public_key(), | 2198 | keys2.public_key(), |
| 2199 | false, | ||
| 1983 | ); | 2200 | ); |
| 1984 | purgatory.add_state( | 2201 | purgatory.add_state( |
| 1985 | event3.clone(), | 2202 | event3.clone(), |
| 1986 | "shared-repo".to_string(), | 2203 | "shared-repo".to_string(), |
| 1987 | keys3.public_key(), | 2204 | keys3.public_key(), |
| 2205 | false, | ||
| 1988 | ); | 2206 | ); |
| 1989 | 2207 | ||
| 1990 | // Save to disk | 2208 | // Save to disk |
| @@ -2031,6 +2249,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2031 | pr_event.clone(), | 2249 | pr_event.clone(), |
| 2032 | "pr-with-event".to_string(), | 2250 | "pr-with-event".to_string(), |
| 2033 | "commit-abc".to_string(), | 2251 | "commit-abc".to_string(), |
| 2252 | false, | ||
| 2034 | ); | 2253 | ); |
| 2035 | 2254 | ||
| 2036 | // Add PR placeholder | 2255 | // Add PR placeholder |
| @@ -2076,7 +2295,7 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2076 | let event = EventBuilder::text_note("test") | 2295 | let event = EventBuilder::text_note("test") |
| 2077 | .sign_with_keys(&keys) | 2296 | .sign_with_keys(&keys) |
| 2078 | .unwrap(); | 2297 | .unwrap(); |
| 2079 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2298 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2080 | 2299 | ||
| 2081 | // Save to disk | 2300 | // Save to disk |
| 2082 | purgatory.save_to_disk(&state_file).unwrap(); | 2301 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2110,8 +2329,18 @@ async fn test_comprehensive_roundtrip() { | |||
| 2110 | .sign_with_keys(&keys2) | 2329 | .sign_with_keys(&keys2) |
| 2111 | .unwrap(); | 2330 | .unwrap(); |
| 2112 | 2331 | ||
| 2113 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | 2332 | purgatory.add_state( |
| 2114 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | 2333 | state1.clone(), |
| 2334 | "repo1".to_string(), | ||
| 2335 | keys1.public_key(), | ||
| 2336 | false, | ||
| 2337 | ); | ||
| 2338 | purgatory.add_state( | ||
| 2339 | state2.clone(), | ||
| 2340 | "repo2".to_string(), | ||
| 2341 | keys2.public_key(), | ||
| 2342 | false, | ||
| 2343 | ); | ||
| 2115 | 2344 | ||
| 2116 | // Add PR event | 2345 | // Add PR event |
| 2117 | let tags = vec![Tag::custom( | 2346 | let tags = vec![Tag::custom( |
| @@ -2122,7 +2351,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2122 | .tags(tags) | 2351 | .tags(tags) |
| 2123 | .sign_with_keys(&keys1) | 2352 | .sign_with_keys(&keys1) |
| 2124 | .unwrap(); | 2353 | .unwrap(); |
| 2125 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | 2354 | purgatory.add_pr( |
| 2355 | pr_event.clone(), | ||
| 2356 | "pr-1".to_string(), | ||
| 2357 | "commit-1".to_string(), | ||
| 2358 | false, | ||
| 2359 | ); | ||
| 2126 | 2360 | ||
| 2127 | // Add PR placeholder | 2361 | // Add PR placeholder |
| 2128 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | 2362 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); |
| @@ -2132,7 +2366,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2132 | .sign_with_keys(&keys1) | 2366 | .sign_with_keys(&keys1) |
| 2133 | .unwrap(); | 2367 | .unwrap(); |
| 2134 | let expired_id = expired_event.id; | 2368 | let expired_id = expired_event.id; |
| 2135 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | 2369 | purgatory.add_state( |
| 2370 | expired_event, | ||
| 2371 | "repo3".to_string(), | ||
| 2372 | keys1.public_key(), | ||
| 2373 | false, | ||
| 2374 | ); | ||
| 2136 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | 2375 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { |
| 2137 | for entry in entries.iter_mut() { | 2376 | for entry in entries.iter_mut() { |
| 2138 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2377 | entry.expires_at = Instant::now() - Duration::from_secs(1); |