From 2665811f54f62f147b7d773c76bd26d032b8f9cb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 7 Jan 2026 11:21:26 +0000 Subject: Add SyncQueueEntry with exponential backoff for purgatory sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the sync queue entry struct that tracks sync state per identifier: - next_attempt: when the next sync should be attempted - attempt_count: for backoff calculation (resets on new events) - in_progress: prevents concurrent syncs for same identifier Backoff schedule: 20s → 40s → 80s → 120s (capped at 2 minutes) This is the foundation for the identifier-based purgatory sync system that will replace the current per-event syncing approach. --- src/purgatory/sync/queue.rs | 206 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 src/purgatory/sync/queue.rs (limited to 'src/purgatory/sync/queue.rs') diff --git a/src/purgatory/sync/queue.rs b/src/purgatory/sync/queue.rs new file mode 100644 index 0000000..3226f47 --- /dev/null +++ b/src/purgatory/sync/queue.rs @@ -0,0 +1,206 @@ +//! Sync queue entry for tracking sync state per identifier. + +use std::time::{Duration, Instant}; + +/// Entry in the sync queue tracking when/how to sync an identifier. +/// +/// Each identifier in purgatory has at most one `SyncQueueEntry` that tracks: +/// - When the next sync attempt should occur +/// - How many attempts have been made (for backoff calculation) +/// - Whether a sync is currently in progress +#[derive(Debug, Clone)] +pub struct SyncQueueEntry { + /// Don't attempt sync before this time + pub next_attempt: Instant, + + /// Number of sync attempts (for backoff calculation). + /// Reset to 0 when new event arrives for this identifier. + pub attempt_count: u32, + + /// Whether a sync is currently in progress for this identifier. + /// Prevents concurrent sync operations for the same identifier. + pub in_progress: bool, +} + +impl SyncQueueEntry { + /// Create a new sync queue entry with the given initial delay. + /// + /// # Arguments + /// * `delay` - How long to wait before the first sync attempt + pub fn new(delay: Duration) -> Self { + Self { + next_attempt: Instant::now() + delay, + attempt_count: 0, + in_progress: false, + } + } + + /// Calculate backoff duration based on attempt count. + /// + /// Backoff schedule: + /// - Attempt 1: 20s + /// - Attempt 2: 40s + /// - Attempt 3: 80s + /// - Attempt 4+: 120s (capped at 2 minutes) + /// + /// The formula is: min(20s * 2^(attempt_count-1), 120s) + pub fn backoff(&self) -> Duration { + if self.attempt_count == 0 { + return Duration::from_secs(20); + } + + let base = Duration::from_secs(20); + let exponent = self.attempt_count.saturating_sub(1).min(3); + let multiplier = 2u32.saturating_pow(exponent); + (base * multiplier).min(Duration::from_secs(120)) + } + + /// Check if this entry is ready for a sync attempt. + /// + /// Returns true if: + /// - No sync is currently in progress + /// - The next_attempt time has passed + pub fn is_ready(&self) -> bool { + !self.in_progress && Instant::now() >= self.next_attempt + } + + /// Called when a new event arrives for this identifier. + /// + /// Resets the attempt count to 0 (fresh start) and updates + /// next_attempt if the new delay would be sooner. + /// + /// # Arguments + /// * `delay` - The delay for the new event + pub fn on_new_event(&mut self, delay: Duration) { + self.attempt_count = 0; + let new_attempt = Instant::now() + delay; + if new_attempt < self.next_attempt { + self.next_attempt = new_attempt; + } + } + + /// Called when a sync attempt completes (successfully or not). + /// + /// Marks the entry as not in progress, increments the attempt count, + /// and schedules the next attempt based on backoff. + /// + /// Only updates timing if the current next_attempt has passed + /// (prevents double-scheduling if called multiple times). + pub fn on_sync_complete(&mut self) { + self.in_progress = false; + if self.next_attempt <= Instant::now() { + self.attempt_count += 1; + self.next_attempt = Instant::now() + self.backoff(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_doubles_up_to_cap() { + // Test that backoff follows: 20s → 40s → 80s → 120s → 120s (capped) + let mut entry = SyncQueueEntry::new(Duration::from_secs(0)); + + // Attempt 0 (initial state): 20s + assert_eq!(entry.backoff(), Duration::from_secs(20)); + + // Simulate completing attempts and check backoff + entry.attempt_count = 1; + assert_eq!(entry.backoff(), Duration::from_secs(20)); // 20 * 2^0 = 20 + + entry.attempt_count = 2; + assert_eq!(entry.backoff(), Duration::from_secs(40)); // 20 * 2^1 = 40 + + entry.attempt_count = 3; + assert_eq!(entry.backoff(), Duration::from_secs(80)); // 20 * 2^2 = 80 + + entry.attempt_count = 4; + assert_eq!(entry.backoff(), Duration::from_secs(120)); // 20 * 2^3 = 160, capped to 120 + + entry.attempt_count = 5; + assert_eq!(entry.backoff(), Duration::from_secs(120)); // Still capped + + entry.attempt_count = 100; + assert_eq!(entry.backoff(), Duration::from_secs(120)); // Always capped + } + + #[test] + fn new_event_resets_attempt_count() { + let mut entry = SyncQueueEntry::new(Duration::from_secs(60)); + + // Simulate several sync attempts + entry.attempt_count = 5; + entry.next_attempt = Instant::now() + Duration::from_secs(120); + + // New event arrives with shorter delay + entry.on_new_event(Duration::from_secs(10)); + + // Attempt count should be reset + assert_eq!(entry.attempt_count, 0); + + // next_attempt should be updated to the sooner time + // (within a small tolerance for test timing) + let expected = Instant::now() + Duration::from_secs(10); + assert!(entry.next_attempt <= expected + Duration::from_millis(100)); + assert!(entry.next_attempt >= expected - Duration::from_millis(100)); + } + + #[test] + fn new_event_does_not_delay_if_already_sooner() { + let mut entry = SyncQueueEntry::new(Duration::from_secs(5)); + let original_next = entry.next_attempt; + + // New event arrives with longer delay - should not push back + entry.on_new_event(Duration::from_secs(60)); + + // Attempt count should still be reset + assert_eq!(entry.attempt_count, 0); + + // But next_attempt should not be pushed back + assert!(entry.next_attempt <= original_next + Duration::from_millis(100)); + } + + #[test] + fn is_ready_checks_both_conditions() { + let mut entry = SyncQueueEntry::new(Duration::from_secs(0)); + + // Should be ready initially (no delay, not in progress) + // Note: there might be a tiny delay, so we wait a moment + std::thread::sleep(Duration::from_millis(10)); + assert!(entry.is_ready()); + + // Mark as in progress - should not be ready + entry.in_progress = true; + assert!(!entry.is_ready()); + + // Not in progress but future next_attempt - should not be ready + entry.in_progress = false; + entry.next_attempt = Instant::now() + Duration::from_secs(60); + assert!(!entry.is_ready()); + } + + #[test] + fn on_sync_complete_increments_and_schedules() { + let mut entry = SyncQueueEntry::new(Duration::from_secs(0)); + std::thread::sleep(Duration::from_millis(10)); // Ensure next_attempt has passed + + entry.in_progress = true; + entry.attempt_count = 0; + + entry.on_sync_complete(); + + // Should no longer be in progress + assert!(!entry.in_progress); + + // Attempt count should be incremented + assert_eq!(entry.attempt_count, 1); + + // Next attempt should be scheduled with backoff (20s for attempt 1) + let expected = Instant::now() + Duration::from_secs(20); + assert!(entry.next_attempt >= expected - Duration::from_millis(100)); + assert!(entry.next_attempt <= expected + Duration::from_millis(100)); + } +} -- cgit v1.2.3