upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory
diff options
context:
space:
mode:
Diffstat (limited to 'src/purgatory')
-rw-r--r--src/purgatory/sync/loop.rs173
-rw-r--r--src/purgatory/sync/mod.rs2
2 files changed, 175 insertions, 0 deletions
diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs
new file mode 100644
index 0000000..aaf1300
--- /dev/null
+++ b/src/purgatory/sync/loop.rs
@@ -0,0 +1,173 @@
1//! Background sync loop for purgatory synchronization.
2//!
3//! This module provides the main sync loop that runs in the background and
4//! processes identifiers that are ready for sync. The loop:
5//!
6//! 1. Runs every 1 second (hardcoded interval)
7//! 2. Finds all ready identifiers (where `!in_progress && next_attempt <= now`)
8//! 3. Spawns parallel tasks for each ready identifier
9//! 4. Applies backoff when sync completes (if events remain)
10//! 5. Removes identifiers from queue when sync completes or no events remain
11
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::task::JoinHandle;
15use tracing::{debug, info, warn};
16
17use crate::purgatory::Purgatory;
18
19use super::context::SyncContext;
20use super::functions::sync_identifier;
21use super::throttle::ThrottleManager;
22
23/// Interval between sync loop iterations (hardcoded, not configurable).
24const SYNC_LOOP_INTERVAL: Duration = Duration::from_secs(1);
25
26impl Purgatory {
27 /// Start the background sync loop.
28 ///
29 /// This spawns a background task that periodically checks for identifiers
30 /// ready for sync and processes them. The loop runs every 1 second and:
31 ///
32 /// 1. Finds all ready identifiers (where `!in_progress && next_attempt <= now`)
33 /// 2. Spawns parallel tasks for each ready identifier
34 /// 3. Each task calls `sync_identifier` to try fetching git data
35 /// 4. On completion, applies backoff if events remain, or removes from queue
36 ///
37 /// # Arguments
38 /// * `ctx` - The sync context providing repository data and fetch capabilities
39 /// * `throttle_manager` - Used for rate limiting and domain queue management
40 ///
41 /// # Returns
42 /// A `JoinHandle` for the background task (can be used to cancel the loop)
43 ///
44 /// # Example
45 ///
46 /// ```ignore
47 /// let purgatory = Arc::new(Purgatory::new("/data/git"));
48 /// let ctx = Arc::new(RealSyncContext::new(...));
49 /// let throttle_manager = Arc::new(ThrottleManager::new(5, 30));
50 ///
51 /// // Set context on throttle manager for queue processing
52 /// throttle_manager.set_context(ctx.clone());
53 ///
54 /// // Start the sync loop
55 /// let handle = purgatory.start_sync_loop(ctx, throttle_manager);
56 ///
57 /// // Later, to stop the loop:
58 /// handle.abort();
59 /// ```
60 pub fn start_sync_loop(
61 self: Arc<Self>,
62 ctx: Arc<dyn SyncContext>,
63 throttle_manager: Arc<ThrottleManager>,
64 ) -> JoinHandle<()> {
65 info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL);
66
67 tokio::spawn(async move {
68 let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL);
69
70 loop {
71 interval.tick().await;
72
73 // Find all ready identifiers
74 let ready: Vec<String> = self
75 .sync_queue
76 .iter()
77 .filter(|entry| entry.value().is_ready())
78 .map(|entry| entry.key().clone())
79 .collect();
80
81 if !ready.is_empty() {
82 debug!(
83 ready_count = ready.len(),
84 "Found identifiers ready for sync"
85 );
86 }
87
88 for identifier in ready {
89 // Check if events still exist for this identifier
90 if !self.has_pending_events(&identifier) {
91 debug!(
92 identifier = %identifier,
93 "No pending events - removing from sync queue"
94 );
95 self.sync_queue.remove(&identifier);
96 continue;
97 }
98
99 // Mark as in progress (skip if already in progress)
100 let should_process = {
101 if let Some(mut entry) = self.sync_queue.get_mut(&identifier) {
102 if entry.in_progress {
103 false
104 } else {
105 entry.in_progress = true;
106 true
107 }
108 } else {
109 false
110 }
111 };
112
113 if !should_process {
114 continue;
115 }
116
117 // Spawn sync task
118 let purgatory = self.clone();
119 let ctx = ctx.clone();
120 let throttle_manager = throttle_manager.clone();
121 let id = identifier.clone();
122
123 tokio::spawn(async move {
124 debug!(
125 identifier = %id,
126 "Starting sync task for identifier"
127 );
128
129 let complete = sync_identifier(ctx.as_ref(), &id, &throttle_manager).await;
130
131 // Check final state and update queue
132 if complete || !purgatory.has_pending_events(&id) {
133 purgatory.sync_queue.remove(&id);
134 info!(
135 identifier = %id,
136 complete = complete,
137 "Sync complete - removed from sync queue"
138 );
139 } else {
140 // Apply backoff - will retry later
141 // (throttled domains are being processed independently by ThrottleManager)
142 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) {
143 entry.on_sync_complete();
144 debug!(
145 identifier = %id,
146 attempt_count = entry.attempt_count,
147 next_backoff_secs = entry.backoff().as_secs(),
148 "Sync incomplete - applying backoff"
149 );
150 }
151 }
152 });
153 }
154 }
155 })
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 // Note: The sync loop is tested via integration tests rather than unit tests
162 // because testing async loops with timing is fragile and prone to flakiness.
163 //
164 // Integration tests in tests/purgatory_sync.rs verify:
165 // - State events sync from remote
166 // - PR events sync from remote
167 // - Concurrent state and PR sync
168 // - Partial OID aggregation from multiple servers
169 // - Push triggers unified processing
170 //
171 // The individual components (SyncQueueEntry, ThrottleManager, sync_identifier)
172 // are thoroughly unit tested in their respective modules.
173}
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs
index 8ac9216..be89130 100644
--- a/src/purgatory/sync/mod.rs
+++ b/src/purgatory/sync/mod.rs
@@ -5,9 +5,11 @@
5//! - Domain-based throttling (configurable requests/minute per domain) 5//! - Domain-based throttling (configurable requests/minute per domain)
6//! - Exponential backoff per identifier (20s → 2m, then 2m intervals) 6//! - Exponential backoff per identifier (20s → 2m, then 2m intervals)
7//! - Debouncing for burst event arrivals 7//! - Debouncing for burst event arrivals
8//! - Background sync loop processing ready identifiers every 1 second
8 9
9mod context; 10mod context;
10mod functions; 11mod functions;
12mod r#loop;
11mod queue; 13mod queue;
12mod throttle; 14mod throttle;
13 15