upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 12:05:49 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 12:05:49 +0000
commitfc9f1e282b16bc373c3913973879b43d3f254eb2 (patch)
treece8e3584921b154266c663b9ee3a353a14609265 /src
parent8babcee8fdfa5b0f460aa1e6d8057feb7d2fda49 (diff)
Add sync queue to Purgatory with enqueue_sync and has_pending_events
- Add sync_queue field to Purgatory struct for tracking identifiers that need background git data fetching - Implement enqueue_sync() with debouncing - resets attempt_count and updates next_attempt when new events arrive for an identifier already in queue - Add enqueue_sync_default() for user-submitted events (3 minute delay to wait for git push) - Add enqueue_sync_immediate() for sync-triggered events (500ms delay for batching burst arrivals) - Implement has_pending_events() to check if an identifier has state events or PR events in purgatory - Add helper methods: sync_queue(), remove_from_sync_queue(), sync_queue_size() - Add unit tests for debouncing behavior and pending event detection
Diffstat (limited to 'src')
-rw-r--r--src/purgatory/mod.rs257
1 files changed, 257 insertions, 0 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 34a8e7a..fcb812b 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -33,14 +33,28 @@ use crate::git::sync::sync_to_owner_repos;
33use crate::nostr::builder::SharedDatabase; 33use crate::nostr::builder::SharedDatabase;
34use crate::nostr::events::RepositoryState; 34use crate::nostr::events::RepositoryState;
35 35
36pub use sync::SyncQueueEntry;
37
36/// Default expiry duration for purgatory entries (30 minutes) 38/// Default expiry duration for purgatory entries (30 minutes)
37const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); 39const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800);
38 40
41/// Default delay before syncing user-submitted events (3 minutes).
42/// This gives time for the git push to arrive after the nostr event.
43const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180);
44
45/// Delay for sync-triggered events (500ms).
46/// Used for batching burst arrivals during negentropy sync.
47const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500);
48
39/// Main purgatory structure holding events awaiting git data. 49/// Main purgatory structure holding events awaiting git data.
40/// 50///
41/// Provides thread-safe concurrent access to two separate stores: 51/// Provides thread-safe concurrent access to two separate stores:
42/// - State events indexed by repository identifier 52/// - State events indexed by repository identifier
43/// - PR events indexed by event ID 53/// - PR events indexed by event ID
54///
55/// Also manages a sync queue for background git data fetching:
56/// - Tracks identifiers that need syncing with backoff/debouncing
57/// - Supports both user-submitted events (3min delay) and sync-triggered (500ms delay)
44#[derive(Clone)] 58#[derive(Clone)]
45pub struct Purgatory { 59pub struct Purgatory {
46 /// State events (kind 30618) indexed by repository identifier. 60 /// State events (kind 30618) indexed by repository identifier.
@@ -51,6 +65,10 @@ pub struct Purgatory {
51 /// Event ID is from the 'e' tag in the PR event itself. 65 /// Event ID is from the 'e' tag in the PR event itself.
52 pr_events: Arc<DashMap<String, PrPurgatoryEntry>>, 66 pr_events: Arc<DashMap<String, PrPurgatoryEntry>>,
53 67
68 /// Sync queue for background git data fetching.
69 /// Maps repository identifier to sync queue entry with timing/backoff state.
70 sync_queue: Arc<DashMap<String, SyncQueueEntry>>,
71
54 git_data_path: PathBuf, 72 git_data_path: PathBuf,
55} 73}
56 74
@@ -60,10 +78,118 @@ impl Purgatory {
60 Self { 78 Self {
61 state_events: Arc::new(DashMap::new()), 79 state_events: Arc::new(DashMap::new()),
62 pr_events: Arc::new(DashMap::new()), 80 pr_events: Arc::new(DashMap::new()),
81 sync_queue: Arc::new(DashMap::new()),
63 git_data_path: git_data_path.into(), 82 git_data_path: git_data_path.into(),
64 } 83 }
65 } 84 }
66 85
86 /// Enqueue an identifier for background git data sync.
87 ///
88 /// This method is called when a state or PR event is added to purgatory.
89 /// It uses debouncing to handle burst arrivals efficiently:
90 /// - If the identifier is already queued, resets attempt_count and updates
91 /// next_attempt if the new delay would be sooner
92 /// - If not queued, creates a new entry with the given delay
93 ///
94 /// # Arguments
95 /// * `identifier` - The repository identifier to sync
96 /// * `delay` - How long to wait before the first sync attempt
97 pub fn enqueue_sync(&self, identifier: &str, delay: Duration) {
98 self.sync_queue
99 .entry(identifier.to_string())
100 .and_modify(|entry| {
101 // Reset attempt count and potentially update next_attempt
102 entry.on_new_event(delay);
103 tracing::debug!(
104 identifier = %identifier,
105 "Updated existing sync queue entry"
106 );
107 })
108 .or_insert_with(|| {
109 tracing::debug!(
110 identifier = %identifier,
111 delay_secs = delay.as_secs(),
112 "Added new sync queue entry"
113 );
114 SyncQueueEntry::new(delay)
115 });
116 }
117
118 /// Enqueue an identifier for sync with the default delay (3 minutes).
119 ///
120 /// Used for user-submitted events where we expect a git push to follow.
121 pub fn enqueue_sync_default(&self, identifier: &str) {
122 self.enqueue_sync(identifier, DEFAULT_SYNC_DELAY);
123 }
124
125 /// Enqueue an identifier for immediate sync (500ms delay).
126 ///
127 /// Used for sync-triggered events (e.g., from negentropy) where we want
128 /// to batch burst arrivals but start syncing quickly.
129 pub fn enqueue_sync_immediate(&self, identifier: &str) {
130 self.enqueue_sync(identifier, IMMEDIATE_SYNC_DELAY);
131 }
132
133 /// Check if there are pending events for an identifier.
134 ///
135 /// Returns true if purgatory has state events or PR events for this identifier.
136 /// This is used by the sync loop to determine if an identifier should remain
137 /// in the sync queue.
138 ///
139 /// # Arguments
140 /// * `identifier` - The repository identifier to check
141 pub fn has_pending_events(&self, identifier: &str) -> bool {
142 // Check state events
143 if self
144 .state_events
145 .get(identifier)
146 .map_or(false, |entries| !entries.is_empty())
147 {
148 return true;
149 }
150
151 // Check PR events - need to scan all entries since they're indexed by event_id
152 // PR events reference repositories via `a` tags with format `30617:<owner_pubkey>:<identifier>`
153 for entry in self.pr_events.iter() {
154 if let Some(ref event) = entry.value().event {
155 if Self::event_references_identifier(event, identifier) {
156 return true;
157 }
158 }
159 }
160
161 false
162 }
163
164 /// Check if an event references a specific repository identifier.
165 ///
166 /// Looks for `a` tags with format `30617:<owner_pubkey>:<identifier>`.
167 fn event_references_identifier(event: &Event, identifier: &str) -> bool {
168 for tag in event.tags.iter() {
169 let tag_vec = tag.clone().to_vec();
170 if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") {
171 // Format: 30617:<owner_pubkey>:<identifier>
172 let parts: Vec<&str> = tag_vec[1].split(':').collect();
173 if parts.len() >= 3 && parts[2] == identifier {
174 return true;
175 }
176 }
177 }
178 false
179 }
180
181 /// Get a reference to the sync queue (for the sync loop).
182 pub fn sync_queue(&self) -> &Arc<DashMap<String, SyncQueueEntry>> {
183 &self.sync_queue
184 }
185
186 /// Remove an identifier from the sync queue.
187 ///
188 /// Called when sync completes or the identifier no longer has pending events.
189 pub fn remove_from_sync_queue(&self, identifier: &str) {
190 self.sync_queue.remove(identifier);
191 }
192
67 /// Add a state event to purgatory. 193 /// Add a state event to purgatory.
68 /// 194 ///
69 /// The event will expire after the default duration unless matched with git data. 195 /// The event will expire after the default duration unless matched with git data.
@@ -442,6 +568,12 @@ impl Purgatory {
442 pub fn clear(&self) { 568 pub fn clear(&self) {
443 self.state_events.clear(); 569 self.state_events.clear();
444 self.pr_events.clear(); 570 self.pr_events.clear();
571 self.sync_queue.clear();
572 }
573
574 /// Get the current size of the sync queue (for testing/metrics).
575 pub fn sync_queue_size(&self) -> usize {
576 self.sync_queue.len()
445 } 577 }
446} 578}
447 579
@@ -862,6 +994,131 @@ mod tests {
862 assert_eq!(state_count, 1); 994 assert_eq!(state_count, 1);
863 assert_eq!(pr_count, 1); 995 assert_eq!(pr_count, 1);
864 } 996 }
997
998 #[test]
999 fn test_enqueue_sync_debounces_rapid_calls() {
1000 let purgatory = Purgatory::new(PathBuf::new());
1001
1002 // First call - creates entry
1003 purgatory.enqueue_sync("test-repo", Duration::from_secs(60));
1004 assert_eq!(purgatory.sync_queue_size(), 1);
1005
1006 // Simulate some sync attempts
1007 if let Some(mut entry) = purgatory.sync_queue.get_mut("test-repo") {
1008 entry.attempt_count = 3;
1009 entry.next_attempt = Instant::now() + Duration::from_secs(120);
1010 }
1011
1012 // Second call with shorter delay - should reset attempt_count and update next_attempt
1013 purgatory.enqueue_sync("test-repo", Duration::from_secs(10));
1014
1015 // Should still be only one entry (debounced)
1016 assert_eq!(purgatory.sync_queue_size(), 1);
1017
1018 // Attempt count should be reset
1019 let entry = purgatory.sync_queue.get("test-repo").unwrap();
1020 assert_eq!(entry.attempt_count, 0, "attempt_count should be reset to 0");
1021
1022 // next_attempt should be updated to the sooner time (within tolerance)
1023 let expected_max = Instant::now() + Duration::from_secs(10) + Duration::from_millis(100);
1024 assert!(
1025 entry.next_attempt <= expected_max,
1026 "next_attempt should be updated to sooner time"
1027 );
1028 }
1029
1030 #[test]
1031 fn test_has_pending_events_with_state_events() {
1032 let purgatory = Purgatory::new(PathBuf::new());
1033 let keys = Keys::generate();
1034
1035 // No events initially
1036 assert!(!purgatory.has_pending_events("test-repo"));
1037
1038 // Add a state event
1039 let event = EventBuilder::text_note("state")
1040 .sign_with_keys(&keys)
1041 .unwrap();
1042 purgatory.add_state(event, "test-repo".to_string(), keys.public_key());
1043
1044 // Now should have pending events
1045 assert!(purgatory.has_pending_events("test-repo"));
1046
1047 // Different identifier should not have pending events
1048 assert!(!purgatory.has_pending_events("other-repo"));
1049 }
1050
1051 #[test]
1052 fn test_has_pending_events_with_pr_events() {
1053 use nostr_sdk::{Kind, Tag, TagKind};
1054
1055 let purgatory = Purgatory::new(PathBuf::new());
1056 let keys = Keys::generate();
1057
1058 // No events initially
1059 assert!(!purgatory.has_pending_events("test-repo"));
1060
1061 // Add a PR event with `a` tag referencing the repository
1062 let tags = vec![Tag::custom(
1063 TagKind::Custom("a".into()),
1064 vec!["30617:abc123def456:test-repo".to_string()],
1065 )];
1066
1067 let event = EventBuilder::new(Kind::from(1618), "PR content")
1068 .tags(tags)
1069 .sign_with_keys(&keys)
1070 .unwrap();
1071
1072 purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string());
1073
1074 // Now should have pending events for test-repo
1075 assert!(purgatory.has_pending_events("test-repo"));
1076
1077 // Different identifier should not have pending events
1078 assert!(!purgatory.has_pending_events("other-repo"));
1079 }
1080
1081 #[test]
1082 fn test_remove_from_sync_queue() {
1083 let purgatory = Purgatory::new(PathBuf::new());
1084
1085 purgatory.enqueue_sync("repo-1", Duration::from_secs(60));
1086 purgatory.enqueue_sync("repo-2", Duration::from_secs(60));
1087 assert_eq!(purgatory.sync_queue_size(), 2);
1088
1089 purgatory.remove_from_sync_queue("repo-1");
1090 assert_eq!(purgatory.sync_queue_size(), 1);
1091
1092 // repo-1 should be gone
1093 assert!(purgatory.sync_queue.get("repo-1").is_none());
1094 // repo-2 should still be there
1095 assert!(purgatory.sync_queue.get("repo-2").is_some());
1096 }
1097
1098 #[test]
1099 fn test_enqueue_sync_default_and_immediate() {
1100 let purgatory = Purgatory::new(PathBuf::new());
1101
1102 // Test default delay (3 minutes)
1103 purgatory.enqueue_sync_default("repo-default");
1104 let entry = purgatory.sync_queue.get("repo-default").unwrap();
1105 let expected_min = Instant::now() + Duration::from_secs(170); // ~3min minus tolerance
1106 let expected_max = Instant::now() + Duration::from_secs(190); // ~3min plus tolerance
1107 assert!(
1108 entry.next_attempt >= expected_min && entry.next_attempt <= expected_max,
1109 "Default delay should be ~180 seconds"
1110 );
1111 drop(entry);
1112
1113 // Test immediate delay (500ms)
1114 purgatory.enqueue_sync_immediate("repo-immediate");
1115 let entry = purgatory.sync_queue.get("repo-immediate").unwrap();
1116 let expected_max = Instant::now() + Duration::from_millis(600);
1117 assert!(
1118 entry.next_attempt <= expected_max,
1119 "Immediate delay should be ~500ms"
1120 );
1121 }
865} 1122}
866 1123
867#[test] 1124#[test]