diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 12:05:49 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 12:05:49 +0000 |
| commit | fc9f1e282b16bc373c3913973879b43d3f254eb2 (patch) | |
| tree | ce8e3584921b154266c663b9ee3a353a14609265 /src | |
| parent | 8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 (diff) | |
Add sync queue to Purgatory with enqueue_sync and has_pending_events
- Add sync_queue field to Purgatory struct for tracking identifiers that need
background git data fetching
- Implement enqueue_sync() with debouncing - resets attempt_count and updates
next_attempt when new events arrive for an identifier already in queue
- Add enqueue_sync_default() for user-submitted events (3 minute delay to wait
for git push)
- Add enqueue_sync_immediate() for sync-triggered events (500ms delay for
batching burst arrivals)
- Implement has_pending_events() to check if an identifier has state events
or PR events in purgatory
- Add helper methods: sync_queue(), remove_from_sync_queue(), sync_queue_size()
- Add unit tests for debouncing behavior and pending event detection
Diffstat (limited to 'src')
| -rw-r--r-- | src/purgatory/mod.rs | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 34a8e7a..fcb812b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -33,14 +33,28 @@ use crate::git::sync::sync_to_owner_repos; | |||
| 33 | use crate::nostr::builder::SharedDatabase; | 33 | use crate::nostr::builder::SharedDatabase; |
| 34 | use crate::nostr::events::RepositoryState; | 34 | use crate::nostr::events::RepositoryState; |
| 35 | 35 | ||
| 36 | pub use sync::SyncQueueEntry; | ||
| 37 | |||
| 36 | /// Default expiry duration for purgatory entries (30 minutes) | 38 | /// Default expiry duration for purgatory entries (30 minutes) |
| 37 | const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); | 39 | const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); |
| 38 | 40 | ||
| 41 | /// Default delay before syncing user-submitted events (3 minutes). | ||
| 42 | /// This gives time for the git push to arrive after the nostr event. | ||
| 43 | const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); | ||
| 44 | |||
| 45 | /// Delay for sync-triggered events (500ms). | ||
| 46 | /// Used for batching burst arrivals during negentropy sync. | ||
| 47 | const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); | ||
| 48 | |||
| 39 | /// Main purgatory structure holding events awaiting git data. | 49 | /// Main purgatory structure holding events awaiting git data. |
| 40 | /// | 50 | /// |
| 41 | /// Provides thread-safe concurrent access to two separate stores: | 51 | /// Provides thread-safe concurrent access to two separate stores: |
| 42 | /// - State events indexed by repository identifier | 52 | /// - State events indexed by repository identifier |
| 43 | /// - PR events indexed by event ID | 53 | /// - PR events indexed by event ID |
| 54 | /// | ||
| 55 | /// Also manages a sync queue for background git data fetching: | ||
| 56 | /// - Tracks identifiers that need syncing with backoff/debouncing | ||
| 57 | /// - Supports both user-submitted events (3min delay) and sync-triggered (500ms delay) | ||
| 44 | #[derive(Clone)] | 58 | #[derive(Clone)] |
| 45 | pub struct Purgatory { | 59 | pub struct Purgatory { |
| 46 | /// State events (kind 30618) indexed by repository identifier. | 60 | /// State events (kind 30618) indexed by repository identifier. |
| @@ -51,6 +65,10 @@ pub struct Purgatory { | |||
| 51 | /// Event ID is from the 'e' tag in the PR event itself. | 65 | /// Event ID is from the 'e' tag in the PR event itself. |
| 52 | pr_events: Arc<DashMap<String, PrPurgatoryEntry>>, | 66 | pr_events: Arc<DashMap<String, PrPurgatoryEntry>>, |
| 53 | 67 | ||
| 68 | /// Sync queue for background git data fetching. | ||
| 69 | /// Maps repository identifier to sync queue entry with timing/backoff state. | ||
| 70 | sync_queue: Arc<DashMap<String, SyncQueueEntry>>, | ||
| 71 | |||
| 54 | git_data_path: PathBuf, | 72 | git_data_path: PathBuf, |
| 55 | } | 73 | } |
| 56 | 74 | ||
| @@ -60,10 +78,118 @@ impl Purgatory { | |||
| 60 | Self { | 78 | Self { |
| 61 | state_events: Arc::new(DashMap::new()), | 79 | state_events: Arc::new(DashMap::new()), |
| 62 | pr_events: Arc::new(DashMap::new()), | 80 | pr_events: Arc::new(DashMap::new()), |
| 81 | sync_queue: Arc::new(DashMap::new()), | ||
| 63 | git_data_path: git_data_path.into(), | 82 | git_data_path: git_data_path.into(), |
| 64 | } | 83 | } |
| 65 | } | 84 | } |
| 66 | 85 | ||
| 86 | /// Enqueue an identifier for background git data sync. | ||
| 87 | /// | ||
| 88 | /// This method is called when a state or PR event is added to purgatory. | ||
| 89 | /// It uses debouncing to handle burst arrivals efficiently: | ||
| 90 | /// - If the identifier is already queued, resets attempt_count and updates | ||
| 91 | /// next_attempt if the new delay would be sooner | ||
| 92 | /// - If not queued, creates a new entry with the given delay | ||
| 93 | /// | ||
| 94 | /// # Arguments | ||
| 95 | /// * `identifier` - The repository identifier to sync | ||
| 96 | /// * `delay` - How long to wait before the first sync attempt | ||
| 97 | pub fn enqueue_sync(&self, identifier: &str, delay: Duration) { | ||
| 98 | self.sync_queue | ||
| 99 | .entry(identifier.to_string()) | ||
| 100 | .and_modify(|entry| { | ||
| 101 | // Reset attempt count and potentially update next_attempt | ||
| 102 | entry.on_new_event(delay); | ||
| 103 | tracing::debug!( | ||
| 104 | identifier = %identifier, | ||
| 105 | "Updated existing sync queue entry" | ||
| 106 | ); | ||
| 107 | }) | ||
| 108 | .or_insert_with(|| { | ||
| 109 | tracing::debug!( | ||
| 110 | identifier = %identifier, | ||
| 111 | delay_secs = delay.as_secs(), | ||
| 112 | "Added new sync queue entry" | ||
| 113 | ); | ||
| 114 | SyncQueueEntry::new(delay) | ||
| 115 | }); | ||
| 116 | } | ||
| 117 | |||
| 118 | /// Enqueue an identifier for sync with the default delay (3 minutes). | ||
| 119 | /// | ||
| 120 | /// Used for user-submitted events where we expect a git push to follow. | ||
| 121 | pub fn enqueue_sync_default(&self, identifier: &str) { | ||
| 122 | self.enqueue_sync(identifier, DEFAULT_SYNC_DELAY); | ||
| 123 | } | ||
| 124 | |||
| 125 | /// Enqueue an identifier for immediate sync (500ms delay). | ||
| 126 | /// | ||
| 127 | /// Used for sync-triggered events (e.g., from negentropy) where we want | ||
| 128 | /// to batch burst arrivals but start syncing quickly. | ||
| 129 | pub fn enqueue_sync_immediate(&self, identifier: &str) { | ||
| 130 | self.enqueue_sync(identifier, IMMEDIATE_SYNC_DELAY); | ||
| 131 | } | ||
| 132 | |||
| 133 | /// Check if there are pending events for an identifier. | ||
| 134 | /// | ||
| 135 | /// Returns true if purgatory has state events or PR events for this identifier. | ||
| 136 | /// This is used by the sync loop to determine if an identifier should remain | ||
| 137 | /// in the sync queue. | ||
| 138 | /// | ||
| 139 | /// # Arguments | ||
| 140 | /// * `identifier` - The repository identifier to check | ||
| 141 | pub fn has_pending_events(&self, identifier: &str) -> bool { | ||
| 142 | // Check state events | ||
| 143 | if self | ||
| 144 | .state_events | ||
| 145 | .get(identifier) | ||
| 146 | .map_or(false, |entries| !entries.is_empty()) | ||
| 147 | { | ||
| 148 | return true; | ||
| 149 | } | ||
| 150 | |||
| 151 | // Check PR events - need to scan all entries since they're indexed by event_id | ||
| 152 | // PR events reference repositories via `a` tags with format `30617:<owner_pubkey>:<identifier>` | ||
| 153 | for entry in self.pr_events.iter() { | ||
| 154 | if let Some(ref event) = entry.value().event { | ||
| 155 | if Self::event_references_identifier(event, identifier) { | ||
| 156 | return true; | ||
| 157 | } | ||
| 158 | } | ||
| 159 | } | ||
| 160 | |||
| 161 | false | ||
| 162 | } | ||
| 163 | |||
| 164 | /// Check if an event references a specific repository identifier. | ||
| 165 | /// | ||
| 166 | /// Looks for `a` tags with format `30617:<owner_pubkey>:<identifier>`. | ||
| 167 | fn event_references_identifier(event: &Event, identifier: &str) -> bool { | ||
| 168 | for tag in event.tags.iter() { | ||
| 169 | let tag_vec = tag.clone().to_vec(); | ||
| 170 | if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { | ||
| 171 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 172 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 173 | if parts.len() >= 3 && parts[2] == identifier { | ||
| 174 | return true; | ||
| 175 | } | ||
| 176 | } | ||
| 177 | } | ||
| 178 | false | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Get a reference to the sync queue (for the sync loop). | ||
| 182 | pub fn sync_queue(&self) -> &Arc<DashMap<String, SyncQueueEntry>> { | ||
| 183 | &self.sync_queue | ||
| 184 | } | ||
| 185 | |||
| 186 | /// Remove an identifier from the sync queue. | ||
| 187 | /// | ||
| 188 | /// Called when sync completes or the identifier no longer has pending events. | ||
| 189 | pub fn remove_from_sync_queue(&self, identifier: &str) { | ||
| 190 | self.sync_queue.remove(identifier); | ||
| 191 | } | ||
| 192 | |||
| 67 | /// Add a state event to purgatory. | 193 | /// Add a state event to purgatory. |
| 68 | /// | 194 | /// |
| 69 | /// The event will expire after the default duration unless matched with git data. | 195 | /// The event will expire after the default duration unless matched with git data. |
| @@ -442,6 +568,12 @@ impl Purgatory { | |||
| 442 | pub fn clear(&self) { | 568 | pub fn clear(&self) { |
| 443 | self.state_events.clear(); | 569 | self.state_events.clear(); |
| 444 | self.pr_events.clear(); | 570 | self.pr_events.clear(); |
| 571 | self.sync_queue.clear(); | ||
| 572 | } | ||
| 573 | |||
| 574 | /// Get the current size of the sync queue (for testing/metrics). | ||
| 575 | pub fn sync_queue_size(&self) -> usize { | ||
| 576 | self.sync_queue.len() | ||
| 445 | } | 577 | } |
| 446 | } | 578 | } |
| 447 | 579 | ||
| @@ -862,6 +994,131 @@ mod tests { | |||
| 862 | assert_eq!(state_count, 1); | 994 | assert_eq!(state_count, 1); |
| 863 | assert_eq!(pr_count, 1); | 995 | assert_eq!(pr_count, 1); |
| 864 | } | 996 | } |
| 997 | |||
| 998 | #[test] | ||
| 999 | fn test_enqueue_sync_debounces_rapid_calls() { | ||
| 1000 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1001 | |||
| 1002 | // First call - creates entry | ||
| 1003 | purgatory.enqueue_sync("test-repo", Duration::from_secs(60)); | ||
| 1004 | assert_eq!(purgatory.sync_queue_size(), 1); | ||
| 1005 | |||
| 1006 | // Simulate some sync attempts | ||
| 1007 | if let Some(mut entry) = purgatory.sync_queue.get_mut("test-repo") { | ||
| 1008 | entry.attempt_count = 3; | ||
| 1009 | entry.next_attempt = Instant::now() + Duration::from_secs(120); | ||
| 1010 | } | ||
| 1011 | |||
| 1012 | // Second call with shorter delay - should reset attempt_count and update next_attempt | ||
| 1013 | purgatory.enqueue_sync("test-repo", Duration::from_secs(10)); | ||
| 1014 | |||
| 1015 | // Should still be only one entry (debounced) | ||
| 1016 | assert_eq!(purgatory.sync_queue_size(), 1); | ||
| 1017 | |||
| 1018 | // Attempt count should be reset | ||
| 1019 | let entry = purgatory.sync_queue.get("test-repo").unwrap(); | ||
| 1020 | assert_eq!(entry.attempt_count, 0, "attempt_count should be reset to 0"); | ||
| 1021 | |||
| 1022 | // next_attempt should be updated to the sooner time (within tolerance) | ||
| 1023 | let expected_max = Instant::now() + Duration::from_secs(10) + Duration::from_millis(100); | ||
| 1024 | assert!( | ||
| 1025 | entry.next_attempt <= expected_max, | ||
| 1026 | "next_attempt should be updated to sooner time" | ||
| 1027 | ); | ||
| 1028 | } | ||
| 1029 | |||
| 1030 | #[test] | ||
| 1031 | fn test_has_pending_events_with_state_events() { | ||
| 1032 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1033 | let keys = Keys::generate(); | ||
| 1034 | |||
| 1035 | // No events initially | ||
| 1036 | assert!(!purgatory.has_pending_events("test-repo")); | ||
| 1037 | |||
| 1038 | // Add a state event | ||
| 1039 | let event = EventBuilder::text_note("state") | ||
| 1040 | .sign_with_keys(&keys) | ||
| 1041 | .unwrap(); | ||
| 1042 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | ||
| 1043 | |||
| 1044 | // Now should have pending events | ||
| 1045 | assert!(purgatory.has_pending_events("test-repo")); | ||
| 1046 | |||
| 1047 | // Different identifier should not have pending events | ||
| 1048 | assert!(!purgatory.has_pending_events("other-repo")); | ||
| 1049 | } | ||
| 1050 | |||
| 1051 | #[test] | ||
| 1052 | fn test_has_pending_events_with_pr_events() { | ||
| 1053 | use nostr_sdk::{Kind, Tag, TagKind}; | ||
| 1054 | |||
| 1055 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1056 | let keys = Keys::generate(); | ||
| 1057 | |||
| 1058 | // No events initially | ||
| 1059 | assert!(!purgatory.has_pending_events("test-repo")); | ||
| 1060 | |||
| 1061 | // Add a PR event with `a` tag referencing the repository | ||
| 1062 | let tags = vec![Tag::custom( | ||
| 1063 | TagKind::Custom("a".into()), | ||
| 1064 | vec!["30617:abc123def456:test-repo".to_string()], | ||
| 1065 | )]; | ||
| 1066 | |||
| 1067 | let event = EventBuilder::new(Kind::from(1618), "PR content") | ||
| 1068 | .tags(tags) | ||
| 1069 | .sign_with_keys(&keys) | ||
| 1070 | .unwrap(); | ||
| 1071 | |||
| 1072 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | ||
| 1073 | |||
| 1074 | // Now should have pending events for test-repo | ||
| 1075 | assert!(purgatory.has_pending_events("test-repo")); | ||
| 1076 | |||
| 1077 | // Different identifier should not have pending events | ||
| 1078 | assert!(!purgatory.has_pending_events("other-repo")); | ||
| 1079 | } | ||
| 1080 | |||
| 1081 | #[test] | ||
| 1082 | fn test_remove_from_sync_queue() { | ||
| 1083 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1084 | |||
| 1085 | purgatory.enqueue_sync("repo-1", Duration::from_secs(60)); | ||
| 1086 | purgatory.enqueue_sync("repo-2", Duration::from_secs(60)); | ||
| 1087 | assert_eq!(purgatory.sync_queue_size(), 2); | ||
| 1088 | |||
| 1089 | purgatory.remove_from_sync_queue("repo-1"); | ||
| 1090 | assert_eq!(purgatory.sync_queue_size(), 1); | ||
| 1091 | |||
| 1092 | // repo-1 should be gone | ||
| 1093 | assert!(purgatory.sync_queue.get("repo-1").is_none()); | ||
| 1094 | // repo-2 should still be there | ||
| 1095 | assert!(purgatory.sync_queue.get("repo-2").is_some()); | ||
| 1096 | } | ||
| 1097 | |||
| 1098 | #[test] | ||
| 1099 | fn test_enqueue_sync_default_and_immediate() { | ||
| 1100 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 1101 | |||
| 1102 | // Test default delay (3 minutes) | ||
| 1103 | purgatory.enqueue_sync_default("repo-default"); | ||
| 1104 | let entry = purgatory.sync_queue.get("repo-default").unwrap(); | ||
| 1105 | let expected_min = Instant::now() + Duration::from_secs(170); // ~3min minus tolerance | ||
| 1106 | let expected_max = Instant::now() + Duration::from_secs(190); // ~3min plus tolerance | ||
| 1107 | assert!( | ||
| 1108 | entry.next_attempt >= expected_min && entry.next_attempt <= expected_max, | ||
| 1109 | "Default delay should be ~180 seconds" | ||
| 1110 | ); | ||
| 1111 | drop(entry); | ||
| 1112 | |||
| 1113 | // Test immediate delay (500ms) | ||
| 1114 | purgatory.enqueue_sync_immediate("repo-immediate"); | ||
| 1115 | let entry = purgatory.sync_queue.get("repo-immediate").unwrap(); | ||
| 1116 | let expected_max = Instant::now() + Duration::from_millis(600); | ||
| 1117 | assert!( | ||
| 1118 | entry.next_attempt <= expected_max, | ||
| 1119 | "Immediate delay should be ~500ms" | ||
| 1120 | ); | ||
| 1121 | } | ||
| 865 | } | 1122 | } |
| 866 | 1123 | ||
| 867 | #[test] | 1124 | #[test] |