From fc9f1e282b16bc373c3913973879b43d3f254eb2 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 12:05:49 +0000 Subject: Add sync queue to Purgatory with enqueue_sync and has_pending_events - Add sync_queue field to Purgatory struct for tracking identifiers that need background git data fetching - Implement enqueue_sync() with debouncing - resets attempt_count and updates next_attempt when new events arrive for an identifier already in queue - Add enqueue_sync_default() for user-submitted events (3 minute delay to wait for git push) - Add enqueue_sync_immediate() for sync-triggered events (500ms delay for batching burst arrivals) - Implement has_pending_events() to check if an identifier has state events or PR events in purgatory - Add helper methods: sync_queue(), remove_from_sync_queue(), sync_queue_size() - Add unit tests for debouncing behavior and pending event detection --- src/purgatory/mod.rs | 257 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) (limited to 'src') diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 34a8e7a..fcb812b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -33,14 +33,28 @@ use crate::git::sync::sync_to_owner_repos; use crate::nostr::builder::SharedDatabase; use crate::nostr::events::RepositoryState; +pub use sync::SyncQueueEntry; + /// Default expiry duration for purgatory entries (30 minutes) const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); +/// Default delay before syncing user-submitted events (3 minutes). +/// This gives time for the git push to arrive after the nostr event. +const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); + +/// Delay for sync-triggered events (500ms). +/// Used for batching burst arrivals during negentropy sync. +const IMMEDIATE_SYNC_DELAY: Duration = Duration::from_millis(500); + /// Main purgatory structure holding events awaiting git data. /// /// Provides thread-safe concurrent access to two separate stores: /// - State events indexed by repository identifier /// - PR events indexed by event ID +/// +/// Also manages a sync queue for background git data fetching: +/// - Tracks identifiers that need syncing with backoff/debouncing +/// - Supports both user-submitted events (3min delay) and sync-triggered (500ms delay) #[derive(Clone)] pub struct Purgatory { /// State events (kind 30618) indexed by repository identifier. @@ -51,6 +65,10 @@ pub struct Purgatory { /// Event ID is from the 'e' tag in the PR event itself. pr_events: Arc>, + /// Sync queue for background git data fetching. + /// Maps repository identifier to sync queue entry with timing/backoff state. + sync_queue: Arc>, + git_data_path: PathBuf, } @@ -60,10 +78,118 @@ impl Purgatory { Self { state_events: Arc::new(DashMap::new()), pr_events: Arc::new(DashMap::new()), + sync_queue: Arc::new(DashMap::new()), git_data_path: git_data_path.into(), } } + /// Enqueue an identifier for background git data sync. + /// + /// This method is called when a state or PR event is added to purgatory. + /// It uses debouncing to handle burst arrivals efficiently: + /// - If the identifier is already queued, resets attempt_count and updates + /// next_attempt if the new delay would be sooner + /// - If not queued, creates a new entry with the given delay + /// + /// # Arguments + /// * `identifier` - The repository identifier to sync + /// * `delay` - How long to wait before the first sync attempt + pub fn enqueue_sync(&self, identifier: &str, delay: Duration) { + self.sync_queue + .entry(identifier.to_string()) + .and_modify(|entry| { + // Reset attempt count and potentially update next_attempt + entry.on_new_event(delay); + tracing::debug!( + identifier = %identifier, + "Updated existing sync queue entry" + ); + }) + .or_insert_with(|| { + tracing::debug!( + identifier = %identifier, + delay_secs = delay.as_secs(), + "Added new sync queue entry" + ); + SyncQueueEntry::new(delay) + }); + } + + /// Enqueue an identifier for sync with the default delay (3 minutes). + /// + /// Used for user-submitted events where we expect a git push to follow. + pub fn enqueue_sync_default(&self, identifier: &str) { + self.enqueue_sync(identifier, DEFAULT_SYNC_DELAY); + } + + /// Enqueue an identifier for immediate sync (500ms delay). + /// + /// Used for sync-triggered events (e.g., from negentropy) where we want + /// to batch burst arrivals but start syncing quickly. + pub fn enqueue_sync_immediate(&self, identifier: &str) { + self.enqueue_sync(identifier, IMMEDIATE_SYNC_DELAY); + } + + /// Check if there are pending events for an identifier. + /// + /// Returns true if purgatory has state events or PR events for this identifier. + /// This is used by the sync loop to determine if an identifier should remain + /// in the sync queue. + /// + /// # Arguments + /// * `identifier` - The repository identifier to check + pub fn has_pending_events(&self, identifier: &str) -> bool { + // Check state events + if self + .state_events + .get(identifier) + .map_or(false, |entries| !entries.is_empty()) + { + return true; + } + + // Check PR events - need to scan all entries since they're indexed by event_id + // PR events reference repositories via `a` tags with format `30617::` + for entry in self.pr_events.iter() { + if let Some(ref event) = entry.value().event { + if Self::event_references_identifier(event, identifier) { + return true; + } + } + } + + false + } + + /// Check if an event references a specific repository identifier. + /// + /// Looks for `a` tags with format `30617::`. + fn event_references_identifier(event: &Event, identifier: &str) -> bool { + for tag in event.tags.iter() { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { + // Format: 30617:: + let parts: Vec<&str> = tag_vec[1].split(':').collect(); + if parts.len() >= 3 && parts[2] == identifier { + return true; + } + } + } + false + } + + /// Get a reference to the sync queue (for the sync loop). + pub fn sync_queue(&self) -> &Arc> { + &self.sync_queue + } + + /// Remove an identifier from the sync queue. + /// + /// Called when sync completes or the identifier no longer has pending events. + pub fn remove_from_sync_queue(&self, identifier: &str) { + self.sync_queue.remove(identifier); + } + /// Add a state event to purgatory. /// /// The event will expire after the default duration unless matched with git data. @@ -442,6 +568,12 @@ impl Purgatory { pub fn clear(&self) { self.state_events.clear(); self.pr_events.clear(); + self.sync_queue.clear(); + } + + /// Get the current size of the sync queue (for testing/metrics). + pub fn sync_queue_size(&self) -> usize { + self.sync_queue.len() } } @@ -862,6 +994,131 @@ mod tests { assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } + + #[test] + fn test_enqueue_sync_debounces_rapid_calls() { + let purgatory = Purgatory::new(PathBuf::new()); + + // First call - creates entry + purgatory.enqueue_sync("test-repo", Duration::from_secs(60)); + assert_eq!(purgatory.sync_queue_size(), 1); + + // Simulate some sync attempts + if let Some(mut entry) = purgatory.sync_queue.get_mut("test-repo") { + entry.attempt_count = 3; + entry.next_attempt = Instant::now() + Duration::from_secs(120); + } + + // Second call with shorter delay - should reset attempt_count and update next_attempt + purgatory.enqueue_sync("test-repo", Duration::from_secs(10)); + + // Should still be only one entry (debounced) + assert_eq!(purgatory.sync_queue_size(), 1); + + // Attempt count should be reset + let entry = purgatory.sync_queue.get("test-repo").unwrap(); + assert_eq!(entry.attempt_count, 0, "attempt_count should be reset to 0"); + + // next_attempt should be updated to the sooner time (within tolerance) + let expected_max = Instant::now() + Duration::from_secs(10) + Duration::from_millis(100); + assert!( + entry.next_attempt <= expected_max, + "next_attempt should be updated to sooner time" + ); + } + + #[test] + fn test_has_pending_events_with_state_events() { + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // No events initially + assert!(!purgatory.has_pending_events("test-repo")); + + // Add a state event + let event = EventBuilder::text_note("state") + .sign_with_keys(&keys) + .unwrap(); + purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); + + // Now should have pending events + assert!(purgatory.has_pending_events("test-repo")); + + // Different identifier should not have pending events + assert!(!purgatory.has_pending_events("other-repo")); + } + + #[test] + fn test_has_pending_events_with_pr_events() { + use nostr_sdk::{Kind, Tag, TagKind}; + + let purgatory = Purgatory::new(PathBuf::new()); + let keys = Keys::generate(); + + // No events initially + assert!(!purgatory.has_pending_events("test-repo")); + + // Add a PR event with `a` tag referencing the repository + let tags = vec![Tag::custom( + TagKind::Custom("a".into()), + vec!["30617:abc123def456:test-repo".to_string()], + )]; + + let event = EventBuilder::new(Kind::from(1618), "PR content") + .tags(tags) + .sign_with_keys(&keys) + .unwrap(); + + purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); + + // Now should have pending events for test-repo + assert!(purgatory.has_pending_events("test-repo")); + + // Different identifier should not have pending events + assert!(!purgatory.has_pending_events("other-repo")); + } + + #[test] + fn test_remove_from_sync_queue() { + let purgatory = Purgatory::new(PathBuf::new()); + + purgatory.enqueue_sync("repo-1", Duration::from_secs(60)); + purgatory.enqueue_sync("repo-2", Duration::from_secs(60)); + assert_eq!(purgatory.sync_queue_size(), 2); + + purgatory.remove_from_sync_queue("repo-1"); + assert_eq!(purgatory.sync_queue_size(), 1); + + // repo-1 should be gone + assert!(purgatory.sync_queue.get("repo-1").is_none()); + // repo-2 should still be there + assert!(purgatory.sync_queue.get("repo-2").is_some()); + } + + #[test] + fn test_enqueue_sync_default_and_immediate() { + let purgatory = Purgatory::new(PathBuf::new()); + + // Test default delay (3 minutes) + purgatory.enqueue_sync_default("repo-default"); + let entry = purgatory.sync_queue.get("repo-default").unwrap(); + let expected_min = Instant::now() + Duration::from_secs(170); // ~3min minus tolerance + let expected_max = Instant::now() + Duration::from_secs(190); // ~3min plus tolerance + assert!( + entry.next_attempt >= expected_min && entry.next_attempt <= expected_max, + "Default delay should be ~180 seconds" + ); + drop(entry); + + // Test immediate delay (500ms) + purgatory.enqueue_sync_immediate("repo-immediate"); + let entry = purgatory.sync_queue.get("repo-immediate").unwrap(); + let expected_max = Instant::now() + Duration::from_millis(600); + assert!( + entry.next_attempt <= expected_max, + "Immediate delay should be ~500ms" + ); + } } #[test] -- cgit v1.2.3