1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
//! Sync queue entry for tracking sync state per identifier.
use std::time::{Duration, Instant};
/// Entry in the sync queue tracking when/how to sync an identifier.
///
/// Each identifier in purgatory has at most one `SyncQueueEntry` that tracks:
/// - When the next sync attempt should occur
/// - How many attempts have been made (for backoff calculation)
/// - Whether a sync is currently in progress
#[derive(Debug, Clone)]
pub struct SyncQueueEntry {
/// Don't attempt sync before this time
pub next_attempt: Instant,
/// Number of sync attempts (for backoff calculation).
/// Reset to 0 when new event arrives for this identifier.
pub attempt_count: u32,
/// Whether a sync is currently in progress for this identifier.
/// Prevents concurrent sync operations for the same identifier.
pub in_progress: bool,
}
impl SyncQueueEntry {
/// Create a new sync queue entry with the given initial delay.
///
/// # Arguments
/// * `delay` - How long to wait before the first sync attempt
pub fn new(delay: Duration) -> Self {
Self {
next_attempt: Instant::now() + delay,
attempt_count: 0,
in_progress: false,
}
}
/// Calculate backoff duration based on attempt count.
///
/// Backoff schedule:
/// - Attempt 1: 20s
/// - Attempt 2: 40s
/// - Attempt 3: 80s
/// - Attempt 4+: 120s (capped at 2 minutes)
///
/// The formula is: min(20s * 2^(attempt_count-1), 120s)
pub fn backoff(&self) -> Duration {
if self.attempt_count == 0 {
return Duration::from_secs(20);
}
let base = Duration::from_secs(20);
let exponent = self.attempt_count.saturating_sub(1).min(3);
let multiplier = 2u32.saturating_pow(exponent);
(base * multiplier).min(Duration::from_secs(120))
}
/// Check if this entry is ready for a sync attempt.
///
/// Returns true if:
/// - No sync is currently in progress
/// - The next_attempt time has passed
pub fn is_ready(&self) -> bool {
!self.in_progress && Instant::now() >= self.next_attempt
}
/// Called when a new event arrives for this identifier.
///
/// Resets the attempt count to 0 (fresh start) and updates
/// next_attempt if the new delay would be sooner.
///
/// # Arguments
/// * `delay` - The delay for the new event
pub fn on_new_event(&mut self, delay: Duration) {
self.attempt_count = 0;
let new_attempt = Instant::now() + delay;
if new_attempt < self.next_attempt {
self.next_attempt = new_attempt;
}
}
/// Called when a sync attempt completes (successfully or not).
///
/// Marks the entry as not in progress, increments the attempt count,
/// and schedules the next attempt based on backoff.
///
/// Only updates timing if the current next_attempt has passed
/// (prevents double-scheduling if called multiple times).
pub fn on_sync_complete(&mut self) {
self.in_progress = false;
if self.next_attempt <= Instant::now() {
self.attempt_count += 1;
self.next_attempt = Instant::now() + self.backoff();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn backoff_doubles_up_to_cap() {
// Test that backoff follows: 20s → 40s → 80s → 120s → 120s (capped)
let mut entry = SyncQueueEntry::new(Duration::from_secs(0));
// Attempt 0 (initial state): 20s
assert_eq!(entry.backoff(), Duration::from_secs(20));
// Simulate completing attempts and check backoff
entry.attempt_count = 1;
assert_eq!(entry.backoff(), Duration::from_secs(20)); // 20 * 2^0 = 20
entry.attempt_count = 2;
assert_eq!(entry.backoff(), Duration::from_secs(40)); // 20 * 2^1 = 40
entry.attempt_count = 3;
assert_eq!(entry.backoff(), Duration::from_secs(80)); // 20 * 2^2 = 80
entry.attempt_count = 4;
assert_eq!(entry.backoff(), Duration::from_secs(120)); // 20 * 2^3 = 160, capped to 120
entry.attempt_count = 5;
assert_eq!(entry.backoff(), Duration::from_secs(120)); // Still capped
entry.attempt_count = 100;
assert_eq!(entry.backoff(), Duration::from_secs(120)); // Always capped
}
#[test]
fn new_event_resets_attempt_count() {
let mut entry = SyncQueueEntry::new(Duration::from_secs(60));
// Simulate several sync attempts
entry.attempt_count = 5;
entry.next_attempt = Instant::now() + Duration::from_secs(120);
// New event arrives with shorter delay
entry.on_new_event(Duration::from_secs(10));
// Attempt count should be reset
assert_eq!(entry.attempt_count, 0);
// next_attempt should be updated to the sooner time
// (within a small tolerance for test timing)
let expected = Instant::now() + Duration::from_secs(10);
assert!(entry.next_attempt <= expected + Duration::from_millis(100));
assert!(entry.next_attempt >= expected - Duration::from_millis(100));
}
#[test]
fn new_event_does_not_delay_if_already_sooner() {
let mut entry = SyncQueueEntry::new(Duration::from_secs(5));
let original_next = entry.next_attempt;
// New event arrives with longer delay - should not push back
entry.on_new_event(Duration::from_secs(60));
// Attempt count should still be reset
assert_eq!(entry.attempt_count, 0);
// But next_attempt should not be pushed back
assert!(entry.next_attempt <= original_next + Duration::from_millis(100));
}
#[test]
fn is_ready_checks_both_conditions() {
let mut entry = SyncQueueEntry::new(Duration::from_secs(0));
// Should be ready initially (no delay, not in progress)
// Note: there might be a tiny delay, so we wait a moment
std::thread::sleep(Duration::from_millis(10));
assert!(entry.is_ready());
// Mark as in progress - should not be ready
entry.in_progress = true;
assert!(!entry.is_ready());
// Not in progress but future next_attempt - should not be ready
entry.in_progress = false;
entry.next_attempt = Instant::now() + Duration::from_secs(60);
assert!(!entry.is_ready());
}
#[test]
fn on_sync_complete_increments_and_schedules() {
let mut entry = SyncQueueEntry::new(Duration::from_secs(0));
std::thread::sleep(Duration::from_millis(10)); // Ensure next_attempt has passed
entry.in_progress = true;
entry.attempt_count = 0;
entry.on_sync_complete();
// Should no longer be in progress
assert!(!entry.in_progress);
// Attempt count should be incremented
assert_eq!(entry.attempt_count, 1);
// Next attempt should be scheduled with backoff (20s for attempt 1)
let expected = Instant::now() + Duration::from_secs(20);
assert!(entry.next_attempt >= expected - Duration::from_millis(100));
assert!(entry.next_attempt <= expected + Duration::from_millis(100));
}
}
|