From 4e5a937a4ef5288e702ba2bae3daf2a78398b690 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 14:18:05 +0000 Subject: fix docs --- src/sync/mod.rs | 64 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 27 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 16c8924..e17565b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -103,8 +103,6 @@ pub struct RelayState { pub last_connected: Option, /// When we disconnected - for 15-minute state retention rule pub disconnected_at: Option, - // The active connection - will be added in Phase 4 - // pub connection: Option, } impl Default for RelayState { @@ -283,7 +281,7 @@ async fn run_disconnect_checker( interval_secs = check_interval_secs, "Disconnect checker started with configured interval" ); - + loop { tokio::select! { _ = tokio::time::sleep(interval) => { @@ -406,7 +404,7 @@ impl SyncManager { async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { // 1. Find and update the pending batch let mut pending = self.pending_sync_index.write().await; - + let Some(batches) = pending.get_mut(relay_url) else { tracing::warn!( relay = %relay_url, @@ -417,8 +415,10 @@ impl SyncManager { }; // Find the batch containing this subscription - let batch_index = batches.iter().position(|b| b.outstanding_subs.contains(&sub_id)); - + let batch_index = batches + .iter() + .position(|b| b.outstanding_subs.contains(&sub_id)); + let Some(batch_idx) = batch_index else { tracing::warn!( relay = %relay_url, @@ -431,7 +431,7 @@ impl SyncManager { // Remove the subscription from outstanding_subs let batch = &mut batches[batch_idx]; batch.outstanding_subs.remove(&sub_id); - + tracing::debug!( relay = %relay_url, sub_id = %sub_id, @@ -450,25 +450,25 @@ impl SyncManager { let batch_id = completed_batch.batch_id; let repos_count = completed_batch.items.repos.len(); let events_count = completed_batch.items.root_events.len(); - + // Clean up empty relay entry if batches.is_empty() { pending.remove(relay_url); } - + // Drop the pending lock before acquiring relay_sync_index lock drop(pending); // 3. Move items to confirmed state in RelayState { let mut relay_index = self.relay_sync_index.write().await; - + if let Some(state) = relay_index.get_mut(relay_url) { // Move repos to confirmed state.repos.extend(completed_batch.items.repos); // Move root_events to confirmed state.root_events.extend(completed_batch.items.root_events); - + tracing::info!( relay = %relay_url, batch_id = batch_id, @@ -628,7 +628,12 @@ impl SyncManager { let checker_manager = Arc::clone(&sync_manager); let checker_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { - run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await; + run_disconnect_checker( + checker_manager, + checker_shutdown, + disconnect_check_interval_secs, + ) + .await; }); // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications @@ -702,7 +707,7 @@ impl SyncManager { filter_count = action.filters.len(), "[DIAG] handle_add_filters called" ); - + // Step 1: Check if relay exists in relay_sync_index let connection_status = { let index = self.relay_sync_index.read().await; @@ -756,7 +761,8 @@ impl SyncManager { } // Step 2: Check if consolidation is needed BEFORE adding new filters - self.maybe_consolidate(&action.relay_url, action.filters.len()).await; + self.maybe_consolidate(&action.relay_url, action.filters.len()) + .await; // Step 3: Get connection and subscribe to all filters let connection = match self.connections.get(&action.relay_url) { @@ -1185,7 +1191,10 @@ impl SyncManager { let connection_timeout_secs = self.health_tracker.base_backoff_secs(); // Connect and subscribe to Layer 1 - match connection.connect_and_subscribe(None, connection_timeout_secs).await { + match connection + .connect_and_subscribe(None, connection_timeout_secs) + .await + { Ok(_) => { // Record successful connection attempt if let Some(ref metrics) = self.metrics { @@ -1194,20 +1203,21 @@ impl SyncManager { } Err(e) => { tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); - + // Record failed connection attempt if let Some(ref metrics) = self.metrics { metrics.record_connection_attempt(&relay_url, false); } - + // Record failure in health tracker self.health_tracker.record_failure(&relay_url); - + // Record health state in metrics if let Some(ref metrics) = self.metrics { - metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); + metrics + .record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); } - + // Update state to disconnected on failure { let mut index = relay_sync_index.write().await; @@ -1276,7 +1286,7 @@ impl SyncManager { let mut disconnect_sent = false; // Track whether EOSE has been received - events before EOSE are "startup", after are "live" let mut eose_received = false; - + while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { @@ -1342,7 +1352,7 @@ impl SyncManager { } } } - + // If the event channel closed without a Closed/Shutdown event // (e.g., connection dropped unexpectedly), still notify SyncManager if !disconnect_sent { @@ -1468,7 +1478,7 @@ impl SyncManager { // Layer 3: 3 filters per 100-event chunk (ceiling division) let repo_count = state.repos.len(); let event_count = state.root_events.len(); - + let layer1_filters = 1; let layer2_filters = if repo_count > 0 { ((repo_count + 99) / 100) * 3 @@ -1480,7 +1490,7 @@ impl SyncManager { } else { 0 }; - + layer1_filters + layer2_filters + layer3_filters } else { 0 @@ -1564,7 +1574,7 @@ impl SyncManager { threshold = CONSOLIDATION_THRESHOLD, "Filter count exceeds threshold, consolidating" ); - + if let Err(e) = self.consolidate(relay_url).await { tracing::error!( relay = %relay_url, @@ -1603,7 +1613,7 @@ impl SyncManager { return Ok(()); // No connection, nothing to consolidate } }; - + connection.unsubscribe_all().await; // Step 3: Rebuild all subscriptions with since filter @@ -1824,4 +1834,4 @@ impl SyncManager { tracing::info!("SyncManager shutdown complete"); } -} \ No newline at end of file +} -- cgit v1.2.3