diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/purgatory/sync/loop.rs | 173 | ||||
| -rw-r--r-- | src/purgatory/sync/mod.rs | 2 |
2 files changed, 175 insertions, 0 deletions
diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs new file mode 100644 index 0000000..aaf1300 --- /dev/null +++ b/src/purgatory/sync/loop.rs | |||
| @@ -0,0 +1,173 @@ | |||
| 1 | //! Background sync loop for purgatory synchronization. | ||
| 2 | //! | ||
| 3 | //! This module provides the main sync loop that runs in the background and | ||
| 4 | //! processes identifiers that are ready for sync. The loop: | ||
| 5 | //! | ||
| 6 | //! 1. Runs every 1 second (hardcoded interval) | ||
| 7 | //! 2. Finds all ready identifiers (where `!in_progress && next_attempt <= now`) | ||
| 8 | //! 3. Spawns parallel tasks for each ready identifier | ||
| 9 | //! 4. Applies backoff when sync completes (if events remain) | ||
| 10 | //! 5. Removes identifiers from queue when sync completes or no events remain | ||
| 11 | |||
| 12 | use std::sync::Arc; | ||
| 13 | use std::time::Duration; | ||
| 14 | use tokio::task::JoinHandle; | ||
| 15 | use tracing::{debug, info, warn}; | ||
| 16 | |||
| 17 | use crate::purgatory::Purgatory; | ||
| 18 | |||
| 19 | use super::context::SyncContext; | ||
| 20 | use super::functions::sync_identifier; | ||
| 21 | use super::throttle::ThrottleManager; | ||
| 22 | |||
| 23 | /// Interval between sync loop iterations (hardcoded, not configurable). | ||
| 24 | const SYNC_LOOP_INTERVAL: Duration = Duration::from_secs(1); | ||
| 25 | |||
| 26 | impl Purgatory { | ||
| 27 | /// Start the background sync loop. | ||
| 28 | /// | ||
| 29 | /// This spawns a background task that periodically checks for identifiers | ||
| 30 | /// ready for sync and processes them. The loop runs every 1 second and: | ||
| 31 | /// | ||
| 32 | /// 1. Finds all ready identifiers (where `!in_progress && next_attempt <= now`) | ||
| 33 | /// 2. Spawns parallel tasks for each ready identifier | ||
| 34 | /// 3. Each task calls `sync_identifier` to try fetching git data | ||
| 35 | /// 4. On completion, applies backoff if events remain, or removes from queue | ||
| 36 | /// | ||
| 37 | /// # Arguments | ||
| 38 | /// * `ctx` - The sync context providing repository data and fetch capabilities | ||
| 39 | /// * `throttle_manager` - Used for rate limiting and domain queue management | ||
| 40 | /// | ||
| 41 | /// # Returns | ||
| 42 | /// A `JoinHandle` for the background task (can be used to cancel the loop) | ||
| 43 | /// | ||
| 44 | /// # Example | ||
| 45 | /// | ||
| 46 | /// ```ignore | ||
| 47 | /// let purgatory = Arc::new(Purgatory::new("/data/git")); | ||
| 48 | /// let ctx = Arc::new(RealSyncContext::new(...)); | ||
| 49 | /// let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | ||
| 50 | /// | ||
| 51 | /// // Set context on throttle manager for queue processing | ||
| 52 | /// throttle_manager.set_context(ctx.clone()); | ||
| 53 | /// | ||
| 54 | /// // Start the sync loop | ||
| 55 | /// let handle = purgatory.start_sync_loop(ctx, throttle_manager); | ||
| 56 | /// | ||
| 57 | /// // Later, to stop the loop: | ||
| 58 | /// handle.abort(); | ||
| 59 | /// ``` | ||
| 60 | pub fn start_sync_loop( | ||
| 61 | self: Arc<Self>, | ||
| 62 | ctx: Arc<dyn SyncContext>, | ||
| 63 | throttle_manager: Arc<ThrottleManager>, | ||
| 64 | ) -> JoinHandle<()> { | ||
| 65 | info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL); | ||
| 66 | |||
| 67 | tokio::spawn(async move { | ||
| 68 | let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL); | ||
| 69 | |||
| 70 | loop { | ||
| 71 | interval.tick().await; | ||
| 72 | |||
| 73 | // Find all ready identifiers | ||
| 74 | let ready: Vec<String> = self | ||
| 75 | .sync_queue | ||
| 76 | .iter() | ||
| 77 | .filter(|entry| entry.value().is_ready()) | ||
| 78 | .map(|entry| entry.key().clone()) | ||
| 79 | .collect(); | ||
| 80 | |||
| 81 | if !ready.is_empty() { | ||
| 82 | debug!( | ||
| 83 | ready_count = ready.len(), | ||
| 84 | "Found identifiers ready for sync" | ||
| 85 | ); | ||
| 86 | } | ||
| 87 | |||
| 88 | for identifier in ready { | ||
| 89 | // Check if events still exist for this identifier | ||
| 90 | if !self.has_pending_events(&identifier) { | ||
| 91 | debug!( | ||
| 92 | identifier = %identifier, | ||
| 93 | "No pending events - removing from sync queue" | ||
| 94 | ); | ||
| 95 | self.sync_queue.remove(&identifier); | ||
| 96 | continue; | ||
| 97 | } | ||
| 98 | |||
| 99 | // Mark as in progress (skip if already in progress) | ||
| 100 | let should_process = { | ||
| 101 | if let Some(mut entry) = self.sync_queue.get_mut(&identifier) { | ||
| 102 | if entry.in_progress { | ||
| 103 | false | ||
| 104 | } else { | ||
| 105 | entry.in_progress = true; | ||
| 106 | true | ||
| 107 | } | ||
| 108 | } else { | ||
| 109 | false | ||
| 110 | } | ||
| 111 | }; | ||
| 112 | |||
| 113 | if !should_process { | ||
| 114 | continue; | ||
| 115 | } | ||
| 116 | |||
| 117 | // Spawn sync task | ||
| 118 | let purgatory = self.clone(); | ||
| 119 | let ctx = ctx.clone(); | ||
| 120 | let throttle_manager = throttle_manager.clone(); | ||
| 121 | let id = identifier.clone(); | ||
| 122 | |||
| 123 | tokio::spawn(async move { | ||
| 124 | debug!( | ||
| 125 | identifier = %id, | ||
| 126 | "Starting sync task for identifier" | ||
| 127 | ); | ||
| 128 | |||
| 129 | let complete = sync_identifier(ctx.as_ref(), &id, &throttle_manager).await; | ||
| 130 | |||
| 131 | // Check final state and update queue | ||
| 132 | if complete || !purgatory.has_pending_events(&id) { | ||
| 133 | purgatory.sync_queue.remove(&id); | ||
| 134 | info!( | ||
| 135 | identifier = %id, | ||
| 136 | complete = complete, | ||
| 137 | "Sync complete - removed from sync queue" | ||
| 138 | ); | ||
| 139 | } else { | ||
| 140 | // Apply backoff - will retry later | ||
| 141 | // (throttled domains are being processed independently by ThrottleManager) | ||
| 142 | if let Some(mut entry) = purgatory.sync_queue.get_mut(&id) { | ||
| 143 | entry.on_sync_complete(); | ||
| 144 | debug!( | ||
| 145 | identifier = %id, | ||
| 146 | attempt_count = entry.attempt_count, | ||
| 147 | next_backoff_secs = entry.backoff().as_secs(), | ||
| 148 | "Sync incomplete - applying backoff" | ||
| 149 | ); | ||
| 150 | } | ||
| 151 | } | ||
| 152 | }); | ||
| 153 | } | ||
| 154 | } | ||
| 155 | }) | ||
| 156 | } | ||
| 157 | } | ||
| 158 | |||
| 159 | #[cfg(test)] | ||
| 160 | mod tests { | ||
| 161 | // Note: The sync loop is tested via integration tests rather than unit tests | ||
| 162 | // because testing async loops with timing is fragile and prone to flakiness. | ||
| 163 | // | ||
| 164 | // Integration tests in tests/purgatory_sync.rs verify: | ||
| 165 | // - State events sync from remote | ||
| 166 | // - PR events sync from remote | ||
| 167 | // - Concurrent state and PR sync | ||
| 168 | // - Partial OID aggregation from multiple servers | ||
| 169 | // - Push triggers unified processing | ||
| 170 | // | ||
| 171 | // The individual components (SyncQueueEntry, ThrottleManager, sync_identifier) | ||
| 172 | // are thoroughly unit tested in their respective modules. | ||
| 173 | } | ||
diff --git a/src/purgatory/sync/mod.rs b/src/purgatory/sync/mod.rs index 8ac9216..be89130 100644 --- a/src/purgatory/sync/mod.rs +++ b/src/purgatory/sync/mod.rs | |||
| @@ -5,9 +5,11 @@ | |||
| 5 | //! - Domain-based throttling (configurable requests/minute per domain) | 5 | //! - Domain-based throttling (configurable requests/minute per domain) |
| 6 | //! - Exponential backoff per identifier (20s → 2m, then 2m intervals) | 6 | //! - Exponential backoff per identifier (20s → 2m, then 2m intervals) |
| 7 | //! - Debouncing for burst event arrivals | 7 | //! - Debouncing for burst event arrivals |
| 8 | //! - Background sync loop processing ready identifiers every 1 second | ||
| 8 | 9 | ||
| 9 | mod context; | 10 | mod context; |
| 10 | mod functions; | 11 | mod functions; |
| 12 | mod r#loop; | ||
| 11 | mod queue; | 13 | mod queue; |
| 12 | mod throttle; | 14 | mod throttle; |
| 13 | 15 | ||