diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:21:26 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 11:21:26 +0000 |
| commit | 2665811f54f62f147b7d773c76bd26d032b8f9cb (patch) | |
| tree | a716d2115002cfbc360d74ce695817bc46f8a074 /src/purgatory/sync/queue.rs | |
| parent | 852eddcc33b59ed027e06d30456f6b9e3b9a31cb (diff) | |
Add SyncQueueEntry with exponential backoff for purgatory sync
Implement the sync queue entry struct that tracks sync state per identifier:
- next_attempt: when the next sync should be attempted
- attempt_count: for backoff calculation (resets on new events)
- in_progress: prevents concurrent syncs for same identifier
Backoff schedule: 20s → 40s → 80s → 120s (capped at 2 minutes)
This is the foundation for the identifier-based purgatory sync system
that will replace the current per-event syncing approach.
Diffstat (limited to 'src/purgatory/sync/queue.rs')
| -rw-r--r-- | src/purgatory/sync/queue.rs | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/src/purgatory/sync/queue.rs b/src/purgatory/sync/queue.rs new file mode 100644 index 0000000..3226f47 --- /dev/null +++ b/src/purgatory/sync/queue.rs | |||
| @@ -0,0 +1,206 @@ | |||
| 1 | //! Sync queue entry for tracking sync state per identifier. | ||
| 2 | |||
| 3 | use std::time::{Duration, Instant}; | ||
| 4 | |||
| 5 | /// Entry in the sync queue tracking when/how to sync an identifier. | ||
| 6 | /// | ||
| 7 | /// Each identifier in purgatory has at most one `SyncQueueEntry` that tracks: | ||
| 8 | /// - When the next sync attempt should occur | ||
| 9 | /// - How many attempts have been made (for backoff calculation) | ||
| 10 | /// - Whether a sync is currently in progress | ||
| 11 | #[derive(Debug, Clone)] | ||
| 12 | pub struct SyncQueueEntry { | ||
| 13 | /// Don't attempt sync before this time | ||
| 14 | pub next_attempt: Instant, | ||
| 15 | |||
| 16 | /// Number of sync attempts (for backoff calculation). | ||
| 17 | /// Reset to 0 when new event arrives for this identifier. | ||
| 18 | pub attempt_count: u32, | ||
| 19 | |||
| 20 | /// Whether a sync is currently in progress for this identifier. | ||
| 21 | /// Prevents concurrent sync operations for the same identifier. | ||
| 22 | pub in_progress: bool, | ||
| 23 | } | ||
| 24 | |||
| 25 | impl SyncQueueEntry { | ||
| 26 | /// Create a new sync queue entry with the given initial delay. | ||
| 27 | /// | ||
| 28 | /// # Arguments | ||
| 29 | /// * `delay` - How long to wait before the first sync attempt | ||
| 30 | pub fn new(delay: Duration) -> Self { | ||
| 31 | Self { | ||
| 32 | next_attempt: Instant::now() + delay, | ||
| 33 | attempt_count: 0, | ||
| 34 | in_progress: false, | ||
| 35 | } | ||
| 36 | } | ||
| 37 | |||
| 38 | /// Calculate backoff duration based on attempt count. | ||
| 39 | /// | ||
| 40 | /// Backoff schedule: | ||
| 41 | /// - Attempt 1: 20s | ||
| 42 | /// - Attempt 2: 40s | ||
| 43 | /// - Attempt 3: 80s | ||
| 44 | /// - Attempt 4+: 120s (capped at 2 minutes) | ||
| 45 | /// | ||
| 46 | /// The formula is: min(20s * 2^(attempt_count-1), 120s) | ||
| 47 | pub fn backoff(&self) -> Duration { | ||
| 48 | if self.attempt_count == 0 { | ||
| 49 | return Duration::from_secs(20); | ||
| 50 | } | ||
| 51 | |||
| 52 | let base = Duration::from_secs(20); | ||
| 53 | let exponent = self.attempt_count.saturating_sub(1).min(3); | ||
| 54 | let multiplier = 2u32.saturating_pow(exponent); | ||
| 55 | (base * multiplier).min(Duration::from_secs(120)) | ||
| 56 | } | ||
| 57 | |||
| 58 | /// Check if this entry is ready for a sync attempt. | ||
| 59 | /// | ||
| 60 | /// Returns true if: | ||
| 61 | /// - No sync is currently in progress | ||
| 62 | /// - The next_attempt time has passed | ||
| 63 | pub fn is_ready(&self) -> bool { | ||
| 64 | !self.in_progress && Instant::now() >= self.next_attempt | ||
| 65 | } | ||
| 66 | |||
| 67 | /// Called when a new event arrives for this identifier. | ||
| 68 | /// | ||
| 69 | /// Resets the attempt count to 0 (fresh start) and updates | ||
| 70 | /// next_attempt if the new delay would be sooner. | ||
| 71 | /// | ||
| 72 | /// # Arguments | ||
| 73 | /// * `delay` - The delay for the new event | ||
| 74 | pub fn on_new_event(&mut self, delay: Duration) { | ||
| 75 | self.attempt_count = 0; | ||
| 76 | let new_attempt = Instant::now() + delay; | ||
| 77 | if new_attempt < self.next_attempt { | ||
| 78 | self.next_attempt = new_attempt; | ||
| 79 | } | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Called when a sync attempt completes (successfully or not). | ||
| 83 | /// | ||
| 84 | /// Marks the entry as not in progress, increments the attempt count, | ||
| 85 | /// and schedules the next attempt based on backoff. | ||
| 86 | /// | ||
| 87 | /// Only updates timing if the current next_attempt has passed | ||
| 88 | /// (prevents double-scheduling if called multiple times). | ||
| 89 | pub fn on_sync_complete(&mut self) { | ||
| 90 | self.in_progress = false; | ||
| 91 | if self.next_attempt <= Instant::now() { | ||
| 92 | self.attempt_count += 1; | ||
| 93 | self.next_attempt = Instant::now() + self.backoff(); | ||
| 94 | } | ||
| 95 | } | ||
| 96 | } | ||
| 97 | |||
| 98 | #[cfg(test)] | ||
| 99 | mod tests { | ||
| 100 | use super::*; | ||
| 101 | |||
| 102 | #[test] | ||
| 103 | fn backoff_doubles_up_to_cap() { | ||
| 104 | // Test that backoff follows: 20s → 40s → 80s → 120s → 120s (capped) | ||
| 105 | let mut entry = SyncQueueEntry::new(Duration::from_secs(0)); | ||
| 106 | |||
| 107 | // Attempt 0 (initial state): 20s | ||
| 108 | assert_eq!(entry.backoff(), Duration::from_secs(20)); | ||
| 109 | |||
| 110 | // Simulate completing attempts and check backoff | ||
| 111 | entry.attempt_count = 1; | ||
| 112 | assert_eq!(entry.backoff(), Duration::from_secs(20)); // 20 * 2^0 = 20 | ||
| 113 | |||
| 114 | entry.attempt_count = 2; | ||
| 115 | assert_eq!(entry.backoff(), Duration::from_secs(40)); // 20 * 2^1 = 40 | ||
| 116 | |||
| 117 | entry.attempt_count = 3; | ||
| 118 | assert_eq!(entry.backoff(), Duration::from_secs(80)); // 20 * 2^2 = 80 | ||
| 119 | |||
| 120 | entry.attempt_count = 4; | ||
| 121 | assert_eq!(entry.backoff(), Duration::from_secs(120)); // 20 * 2^3 = 160, capped to 120 | ||
| 122 | |||
| 123 | entry.attempt_count = 5; | ||
| 124 | assert_eq!(entry.backoff(), Duration::from_secs(120)); // Still capped | ||
| 125 | |||
| 126 | entry.attempt_count = 100; | ||
| 127 | assert_eq!(entry.backoff(), Duration::from_secs(120)); // Always capped | ||
| 128 | } | ||
| 129 | |||
| 130 | #[test] | ||
| 131 | fn new_event_resets_attempt_count() { | ||
| 132 | let mut entry = SyncQueueEntry::new(Duration::from_secs(60)); | ||
| 133 | |||
| 134 | // Simulate several sync attempts | ||
| 135 | entry.attempt_count = 5; | ||
| 136 | entry.next_attempt = Instant::now() + Duration::from_secs(120); | ||
| 137 | |||
| 138 | // New event arrives with shorter delay | ||
| 139 | entry.on_new_event(Duration::from_secs(10)); | ||
| 140 | |||
| 141 | // Attempt count should be reset | ||
| 142 | assert_eq!(entry.attempt_count, 0); | ||
| 143 | |||
| 144 | // next_attempt should be updated to the sooner time | ||
| 145 | // (within a small tolerance for test timing) | ||
| 146 | let expected = Instant::now() + Duration::from_secs(10); | ||
| 147 | assert!(entry.next_attempt <= expected + Duration::from_millis(100)); | ||
| 148 | assert!(entry.next_attempt >= expected - Duration::from_millis(100)); | ||
| 149 | } | ||
| 150 | |||
| 151 | #[test] | ||
| 152 | fn new_event_does_not_delay_if_already_sooner() { | ||
| 153 | let mut entry = SyncQueueEntry::new(Duration::from_secs(5)); | ||
| 154 | let original_next = entry.next_attempt; | ||
| 155 | |||
| 156 | // New event arrives with longer delay - should not push back | ||
| 157 | entry.on_new_event(Duration::from_secs(60)); | ||
| 158 | |||
| 159 | // Attempt count should still be reset | ||
| 160 | assert_eq!(entry.attempt_count, 0); | ||
| 161 | |||
| 162 | // But next_attempt should not be pushed back | ||
| 163 | assert!(entry.next_attempt <= original_next + Duration::from_millis(100)); | ||
| 164 | } | ||
| 165 | |||
| 166 | #[test] | ||
| 167 | fn is_ready_checks_both_conditions() { | ||
| 168 | let mut entry = SyncQueueEntry::new(Duration::from_secs(0)); | ||
| 169 | |||
| 170 | // Should be ready initially (no delay, not in progress) | ||
| 171 | // Note: there might be a tiny delay, so we wait a moment | ||
| 172 | std::thread::sleep(Duration::from_millis(10)); | ||
| 173 | assert!(entry.is_ready()); | ||
| 174 | |||
| 175 | // Mark as in progress - should not be ready | ||
| 176 | entry.in_progress = true; | ||
| 177 | assert!(!entry.is_ready()); | ||
| 178 | |||
| 179 | // Not in progress but future next_attempt - should not be ready | ||
| 180 | entry.in_progress = false; | ||
| 181 | entry.next_attempt = Instant::now() + Duration::from_secs(60); | ||
| 182 | assert!(!entry.is_ready()); | ||
| 183 | } | ||
| 184 | |||
| 185 | #[test] | ||
| 186 | fn on_sync_complete_increments_and_schedules() { | ||
| 187 | let mut entry = SyncQueueEntry::new(Duration::from_secs(0)); | ||
| 188 | std::thread::sleep(Duration::from_millis(10)); // Ensure next_attempt has passed | ||
| 189 | |||
| 190 | entry.in_progress = true; | ||
| 191 | entry.attempt_count = 0; | ||
| 192 | |||
| 193 | entry.on_sync_complete(); | ||
| 194 | |||
| 195 | // Should no longer be in progress | ||
| 196 | assert!(!entry.in_progress); | ||
| 197 | |||
| 198 | // Attempt count should be incremented | ||
| 199 | assert_eq!(entry.attempt_count, 1); | ||
| 200 | |||
| 201 | // Next attempt should be scheduled with backoff (20s for attempt 1) | ||
| 202 | let expected = Instant::now() + Duration::from_secs(20); | ||
| 203 | assert!(entry.next_attempt >= expected - Duration::from_millis(100)); | ||
| 204 | assert!(entry.next_attempt <= expected + Duration::from_millis(100)); | ||
| 205 | } | ||
| 206 | } | ||