upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/purgatory/mod.rs1
-rw-r--r--src/purgatory/sync/mod.rs11
-rw-r--r--src/purgatory/sync/queue.rs206
3 files changed, 218 insertions, 0 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 88377fb..34a8e7a 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -12,6 +12,7 @@
12//! - **Separate stores**: State events and PR events use different indexing strategies 12//! - **Separate stores**: State events and PR events use different indexing strategies
13 13
14mod helpers; 14mod helpers;
15pub mod sync;
15mod types; 16mod types;
16 17
17use anyhow::{bail, Result}; 18use anyhow::{bail, Result};
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs
new file mode 100644
index 0000000..7b6d64a
--- /dev/null
+++ b/src/purgatory/sync/mod.rs
@@ -0,0 +1,11 @@
1//! Purgatory sync module for background git data synchronization.
2//!
3//! This module implements identifier-based syncing with:
4//! - Batched OID fetching across all purgatory events for an identifier
5//! - Domain-based throttling (configurable requests/minute per domain)
6//! - Exponential backoff per identifier (20s → 2m, then 2m intervals)
7//! - Debouncing for burst event arrivals
8
9mod queue;
10
11pub use queue::SyncQueueEntry;
diff --git a/src/purgatory/sync/queue.rs b/src/purgatory/sync/queue.rs
new file mode 100644
index 0000000..3226f47
--- /dev/null
+++ b/src/purgatory/sync/queue.rs
@@ -0,0 +1,206 @@
1//! Sync queue entry for tracking sync state per identifier.
2
3use std::time::{Duration, Instant};
4
5/// Entry in the sync queue tracking when/how to sync an identifier.
6///
7/// Each identifier in purgatory has at most one `SyncQueueEntry` that tracks:
8/// - When the next sync attempt should occur
9/// - How many attempts have been made (for backoff calculation)
10/// - Whether a sync is currently in progress
11#[derive(Debug, Clone)]
12pub struct SyncQueueEntry {
13 /// Don't attempt sync before this time
14 pub next_attempt: Instant,
15
16 /// Number of sync attempts (for backoff calculation).
17 /// Reset to 0 when new event arrives for this identifier.
18 pub attempt_count: u32,
19
20 /// Whether a sync is currently in progress for this identifier.
21 /// Prevents concurrent sync operations for the same identifier.
22 pub in_progress: bool,
23}
24
25impl SyncQueueEntry {
26 /// Create a new sync queue entry with the given initial delay.
27 ///
28 /// # Arguments
29 /// * `delay` - How long to wait before the first sync attempt
30 pub fn new(delay: Duration) -> Self {
31 Self {
32 next_attempt: Instant::now() + delay,
33 attempt_count: 0,
34 in_progress: false,
35 }
36 }
37
38 /// Calculate backoff duration based on attempt count.
39 ///
40 /// Backoff schedule:
41 /// - Attempt 1: 20s
42 /// - Attempt 2: 40s
43 /// - Attempt 3: 80s
44 /// - Attempt 4+: 120s (capped at 2 minutes)
45 ///
46 /// The formula is: min(20s * 2^(attempt_count-1), 120s)
47 pub fn backoff(&self) -> Duration {
48 if self.attempt_count == 0 {
49 return Duration::from_secs(20);
50 }
51
52 let base = Duration::from_secs(20);
53 let exponent = self.attempt_count.saturating_sub(1).min(3);
54 let multiplier = 2u32.saturating_pow(exponent);
55 (base * multiplier).min(Duration::from_secs(120))
56 }
57
58 /// Check if this entry is ready for a sync attempt.
59 ///
60 /// Returns true if:
61 /// - No sync is currently in progress
62 /// - The next_attempt time has passed
63 pub fn is_ready(&self) -> bool {
64 !self.in_progress && Instant::now() >= self.next_attempt
65 }
66
67 /// Called when a new event arrives for this identifier.
68 ///
69 /// Resets the attempt count to 0 (fresh start) and updates
70 /// next_attempt if the new delay would be sooner.
71 ///
72 /// # Arguments
73 /// * `delay` - The delay for the new event
74 pub fn on_new_event(&mut self, delay: Duration) {
75 self.attempt_count = 0;
76 let new_attempt = Instant::now() + delay;
77 if new_attempt < self.next_attempt {
78 self.next_attempt = new_attempt;
79 }
80 }
81
82 /// Called when a sync attempt completes (successfully or not).
83 ///
84 /// Marks the entry as not in progress, increments the attempt count,
85 /// and schedules the next attempt based on backoff.
86 ///
87 /// Only updates timing if the current next_attempt has passed
88 /// (prevents double-scheduling if called multiple times).
89 pub fn on_sync_complete(&mut self) {
90 self.in_progress = false;
91 if self.next_attempt <= Instant::now() {
92 self.attempt_count += 1;
93 self.next_attempt = Instant::now() + self.backoff();
94 }
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn backoff_doubles_up_to_cap() {
104 // Test that backoff follows: 20s → 40s → 80s → 120s → 120s (capped)
105 let mut entry = SyncQueueEntry::new(Duration::from_secs(0));
106
107 // Attempt 0 (initial state): 20s
108 assert_eq!(entry.backoff(), Duration::from_secs(20));
109
110 // Simulate completing attempts and check backoff
111 entry.attempt_count = 1;
112 assert_eq!(entry.backoff(), Duration::from_secs(20)); // 20 * 2^0 = 20
113
114 entry.attempt_count = 2;
115 assert_eq!(entry.backoff(), Duration::from_secs(40)); // 20 * 2^1 = 40
116
117 entry.attempt_count = 3;
118 assert_eq!(entry.backoff(), Duration::from_secs(80)); // 20 * 2^2 = 80
119
120 entry.attempt_count = 4;
121 assert_eq!(entry.backoff(), Duration::from_secs(120)); // 20 * 2^3 = 160, capped to 120
122
123 entry.attempt_count = 5;
124 assert_eq!(entry.backoff(), Duration::from_secs(120)); // Still capped
125
126 entry.attempt_count = 100;
127 assert_eq!(entry.backoff(), Duration::from_secs(120)); // Always capped
128 }
129
130 #[test]
131 fn new_event_resets_attempt_count() {
132 let mut entry = SyncQueueEntry::new(Duration::from_secs(60));
133
134 // Simulate several sync attempts
135 entry.attempt_count = 5;
136 entry.next_attempt = Instant::now() + Duration::from_secs(120);
137
138 // New event arrives with shorter delay
139 entry.on_new_event(Duration::from_secs(10));
140
141 // Attempt count should be reset
142 assert_eq!(entry.attempt_count, 0);
143
144 // next_attempt should be updated to the sooner time
145 // (within a small tolerance for test timing)
146 let expected = Instant::now() + Duration::from_secs(10);
147 assert!(entry.next_attempt <= expected + Duration::from_millis(100));
148 assert!(entry.next_attempt >= expected - Duration::from_millis(100));
149 }
150
151 #[test]
152 fn new_event_does_not_delay_if_already_sooner() {
153 let mut entry = SyncQueueEntry::new(Duration::from_secs(5));
154 let original_next = entry.next_attempt;
155
156 // New event arrives with longer delay - should not push back
157 entry.on_new_event(Duration::from_secs(60));
158
159 // Attempt count should still be reset
160 assert_eq!(entry.attempt_count, 0);
161
162 // But next_attempt should not be pushed back
163 assert!(entry.next_attempt <= original_next + Duration::from_millis(100));
164 }
165
166 #[test]
167 fn is_ready_checks_both_conditions() {
168 let mut entry = SyncQueueEntry::new(Duration::from_secs(0));
169
170 // Should be ready initially (no delay, not in progress)
171 // Note: there might be a tiny delay, so we wait a moment
172 std::thread::sleep(Duration::from_millis(10));
173 assert!(entry.is_ready());
174
175 // Mark as in progress - should not be ready
176 entry.in_progress = true;
177 assert!(!entry.is_ready());
178
179 // Not in progress but future next_attempt - should not be ready
180 entry.in_progress = false;
181 entry.next_attempt = Instant::now() + Duration::from_secs(60);
182 assert!(!entry.is_ready());
183 }
184
185 #[test]
186 fn on_sync_complete_increments_and_schedules() {
187 let mut entry = SyncQueueEntry::new(Duration::from_secs(0));
188 std::thread::sleep(Duration::from_millis(10)); // Ensure next_attempt has passed
189
190 entry.in_progress = true;
191 entry.attempt_count = 0;
192
193 entry.on_sync_complete();
194
195 // Should no longer be in progress
196 assert!(!entry.in_progress);
197
198 // Attempt count should be incremented
199 assert_eq!(entry.attempt_count, 1);
200
201 // Next attempt should be scheduled with backoff (20s for attempt 1)
202 let expected = Instant::now() + Duration::from_secs(20);
203 assert!(entry.next_attempt >= expected - Duration::from_millis(100));
204 assert!(entry.next_attempt <= expected + Duration::from_millis(100));
205 }
206}