upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/purgatory
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 12:12:49 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 12:12:49 +0000
commit18bfb246029a848a0b307e7c8a8e4df57addabb2 (patch)
tree1ca8977edecdab10973415b222d2d7e1a2d12e7c /src/purgatory
parentfc9f1e282b16bc373c3913973879b43d3f254eb2 (diff)
Add background sync loop for purgatory identifier processing
Implement the main sync loop that runs in the background and processes identifiers that are ready for git data synchronization: - Runs every 1 second (hardcoded interval, not configurable) - Finds all ready identifiers where !in_progress && next_attempt <= now - Spawns parallel tasks for each ready identifier - Each task calls sync_identifier to try fetching git data from remotes - Applies backoff when sync completes but events remain in purgatory - Removes identifiers from queue when sync completes or no events remain The loop integrates with the existing sync infrastructure: - Uses SyncContext trait for testability - Uses ThrottleManager for domain-based rate limiting - Uses sync_identifier for the actual fetch orchestration This enables automatic background fetching of git data for events in purgatory, complementing the existing push-triggered sync path.
Diffstat (limited to 'src/purgatory')
-rw-r--r--src/purgatory/sync/loop.rs173
-rw-r--r--src/purgatory/sync/mod.rs2
2 files changed, 175 insertions, 0 deletions
diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs
new file mode 100644
index 0000000..aaf1300
--- /dev/null
+++ b/src/purgatory/sync/loop.rs
@@ -0,0 +1,173 @@
1//! Background sync loop for purgatory synchronization.
2//!
3//! This module provides the main sync loop that runs in the background and
4//! processes identifiers that are ready for sync. The loop:
5//!
6//! 1. Runs every 1 second (hardcoded interval)
7//! 2. Finds all ready identifiers (where `!in_progress && next_attempt <= now`)
8//! 3. Spawns parallel tasks for each ready identifier
9//! 4. Applies backoff when sync completes (if events remain)
10//! 5. Removes identifiers from queue when sync completes or no events remain
11
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::task::JoinHandle;
15use tracing::{debug, info, warn};
16
17use crate::purgatory::Purgatory;
18
19use super::context::SyncContext;
20use super::functions::sync_identifier;
21use super::throttle::ThrottleManager;
22
23/// Interval between sync loop iterations (hardcoded, not configurable).
24const SYNC_LOOP_INTERVAL: Duration = Duration::from_secs(1);
25
26impl Purgatory {
27 /// Start the background sync loop.
28 ///
29 /// This spawns a background task that periodically checks for identifiers
30 /// ready for sync and processes them. The loop runs every 1 second and:
31 ///
32 /// 1. Finds all ready identifiers (where `!in_progress && next_attempt <= now`)
33 /// 2. Spawns parallel tasks for each ready identifier
34 /// 3. Each task calls `sync_identifier` to try fetching git data
35 /// 4. On completion, applies backoff if events remain, or removes from queue
36 ///
37 /// # Arguments
38 /// * `ctx` - The sync context providing repository data and fetch capabilities
39 /// * `throttle_manager` - Used for rate limiting and domain queue management
40 ///
41 /// # Returns
42 /// A `JoinHandle` for the background task (can be used to cancel the loop)
43 ///
44 /// # Example
45 ///
46 /// ```ignore
47 /// let purgatory = Arc::new(Purgatory::new("/data/git"));
48 /// let ctx = Arc::new(RealSyncContext::new(...));
49 /// let throttle_manager = Arc::new(ThrottleManager::new(5, 30));
50 ///
51 /// // Set context on throttle manager for queue processing
52 /// throttle_manager.set_context(ctx.clone());
53 ///
54 /// // Start the sync loop
55 /// let handle = purgatory.start_sync_loop(ctx, throttle_manager);
56 ///
57 /// // Later, to stop the loop:
58 /// handle.abort();
59 /// ```
60 pub fn start_sync_loop(
61 self: Arc<Self>,
62 ctx: Arc<dyn SyncContext>,
63 throttle_manager: Arc<ThrottleManager>,
64 ) -> JoinHandle<()> {
65 info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL);
66
67 tokio::spawn(async move {
68 let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL);
69
70 loop {
71 interval.tick().await;
72
73 // Find all ready identifiers
74 let ready: Vec<String> = self
75 .sync_queue
76 .iter()
77 .filter(|entry| entry.value().is_ready())
78 .map(|entry| entry.key().clone())
79 .collect();
80
81 if !ready.is_empty() {
82 debug!(
83 ready_count = ready.len(),
84 "Found identifiers ready for sync"
85 );
86 }
87
88 for identifier in ready {
89 // Check if events still exist for this identifier
90 if !self.has_pending_events(&identifier) {
91 debug!(
92 identifier = %identifier,
93 "No pending events - removing from sync queue"
94 );
95 self.sync_queue.remove(&identifier);
96 continue;
97 }
98
99 // Mark as in progress (skip if already in progress)
100 let should_process = {
101 if let Some(mut entry) = self.sync_queue.get_mut(&identifier) {
102 if entry.in_progress {
103 false
104 } else {
105 entry.in_progress = true;
106 true
107 }
108 } else {
109 false
110 }
111 };
112
113 if !should_process {
114 continue;
115 }
116
117 // Spawn sync task
118 let purgatory = self.clone();
119 let ctx = ctx.clone();
120 let throttle_manager = throttle_manager.clone();
121 let id = identifier.clone();
122
123 tokio::spawn(async move {
124 debug!(
125 identifier = %id,
126 "Starting sync task for identifier"
127 );
128
129 let complete = sync_identifier(ctx.as_ref(), &id, &throttle_manager).await;
130
131 // Check final state and update queue
132 if complete || !purgatory.has_pending_events(&id) {
133 purgatory.sync_queue.remove(&id);
134 info!(
135 identifier = %id,
136 complete = complete,
137 "Sync complete - removed from sync queue"
138 );
139 } else {
140 // Apply backoff - will retry later
141 // (throttled domains are being processed independently by ThrottleManager)
142 if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) {
143 entry.on_sync_complete();
144 debug!(
145 identifier = %id,
146 attempt_count = entry.attempt_count,
147 next_backoff_secs = entry.backoff().as_secs(),
148 "Sync incomplete - applying backoff"
149 );
150 }
151 }
152 });
153 }
154 }
155 })
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 // Note: The sync loop is tested via integration tests rather than unit tests
162 // because testing async loops with timing is fragile and prone to flakiness.
163 //
164 // Integration tests in tests/purgatory_sync.rs verify:
165 // - State events sync from remote
166 // - PR events sync from remote
167 // - Concurrent state and PR sync
168 // - Partial OID aggregation from multiple servers
169 // - Push triggers unified processing
170 //
171 // The individual components (SyncQueueEntry, ThrottleManager, sync_identifier)
172 // are thoroughly unit tested in their respective modules.
173}
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs
index 8ac9216..be89130 100644
--- a/src/purgatory/sync/mod.rs
+++ b/src/purgatory/sync/mod.rs
@@ -5,9 +5,11 @@
5//! - Domain-based throttling (configurable requests/minute per domain) 5//! - Domain-based throttling (configurable requests/minute per domain)
6//! - Exponential backoff per identifier (20s → 2m, then 2m intervals) 6//! - Exponential backoff per identifier (20s → 2m, then 2m intervals)
7//! - Debouncing for burst event arrivals 7//! - Debouncing for burst event arrivals
8//! - Background sync loop processing ready identifiers every 1 second
8 9
9mod context; 10mod context;
10mod functions; 11mod functions;
12mod r#loop;
11mod queue; 13mod queue;
12mod throttle; 14mod throttle;
13 15