From e98a3850b6dcd3bbd5d251896ef56199cd49dc33 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 18 Dec 2025 12:06:57 +0000 Subject: sync: new connection logic --- src/sync/algorithms.rs | 4 + src/sync/health.rs | 15 ++ src/sync/mod.rs | 668 +++++++++++++++++++++++++------------------------ 3 files changed, 355 insertions(+), 332 deletions(-) (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 84248b1..a6e0787 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -348,6 +348,7 @@ mod tests { connection_status: ConnectionStatus::Disconnected, last_connected: None, disconnected_at: None, + announcements_synced: false, }, ); @@ -435,6 +436,7 @@ mod tests { connection_status: ConnectionStatus::Connected, last_connected: None, disconnected_at: None, + announcements_synced: false, }, ); @@ -468,6 +470,7 @@ mod tests { connection_status: ConnectionStatus::Connecting, last_connected: None, disconnected_at: None, + announcements_synced: false, }, ); @@ -523,6 +526,7 @@ mod tests { connection_status: ConnectionStatus::Connected, last_connected: None, disconnected_at: None, + announcements_synced: false, }, ); diff --git a/src/sync/health.rs b/src/sync/health.rs index 0ae7dee..d919a80 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs @@ -64,6 +64,8 @@ pub struct RelayHealth { pub last_failure_time: Option, /// Time of the last successful connection pub last_success_time: Option, + /// Time of the last connection attempt (success or failure) + pub last_attempt_time: Option, /// Next time a connection attempt should be made pub next_retry_at: Option, } @@ -76,6 +78,7 @@ impl Default for RelayHealth { first_failure_time: None, last_failure_time: None, last_success_time: None, + last_attempt_time: None, next_retry_at: None, } } @@ -132,6 +135,17 @@ impl RelayHealthTracker { self.base_backoff_secs } + /// Record a connection attempt (updates last_attempt_time) + /// + /// This should be called before trying to connect, to track when + /// attempts are made regardless of success or failure. + pub fn record_attempt(&self, relay_url: &str) { + let now = Instant::now(); + let mut entry = self.health.entry(relay_url.to_string()).or_default(); + let health = entry.value_mut(); + health.last_attempt_time = Some(now); + } + /// Record a successful connection to a relay /// /// Resets the relay to Healthy state and clears failure counters. @@ -148,6 +162,7 @@ impl RelayHealthTracker { health.first_failure_time = None; health.last_failure_time = None; health.last_success_time = Some(now); + health.last_attempt_time = Some(now); health.next_retry_at = None; if old_state != HealthState::Healthy { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3c50387..5bea701 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -103,6 +103,9 @@ pub struct RelayState { pub last_connected: Option, /// When we disconnected - for 15-minute state retention rule pub disconnected_at: Option, + /// Whether announcement filter historic sync has completed for this relay + /// Used to determine if we can use `since` filter on reconnect for Layer 1 + pub announcements_synced: bool, } impl Default for RelayState { @@ -114,6 +117,7 @@ impl Default for RelayState { connection_status: ConnectionStatus::Disconnected, last_connected: None, disconnected_at: None, + announcements_synced: false, } } } @@ -134,6 +138,7 @@ impl RelayState { pub fn clear_sync_state(&mut self) { self.repos.clear(); self.root_events.clear(); + self.announcements_synced = false; } } @@ -301,7 +306,7 @@ async fn run_disconnect_checker( let mut manager = sync_manager.lock().await; manager.check_disconnects().await; - manager.check_reconnects().await; + manager.retry_disconnected_relays().await; } _ = shutdown_rx.recv() => { tracing::info!("Disconnect checker received shutdown signal"); @@ -475,6 +480,9 @@ impl SyncManager { /// move repos and root_events from pending to confirmed state. This unified /// flow ensures consistent state tracking regardless of sync method. /// + /// For generic filter batches (identified by empty repos and root_events), + /// this sets the announcements_synced flag to enable incremental sync on reconnect. + /// /// # Arguments /// * `relay_url` - The relay URL the batch belongs to /// * `batch` - The completed batch to confirm @@ -483,6 +491,7 @@ impl SyncManager { let repos_count = batch.items.repos.len(); let events_count = batch.items.root_events.len(); let sync_method = batch.sync_method; + let is_generic_filter = repos_count == 0 && events_count == 0; let mut relay_index = self.relay_sync_index.write().await; @@ -492,6 +501,17 @@ impl SyncManager { // Move root_events to confirmed state.root_events.extend(batch.items.root_events.clone()); + // Set announcements_synced flag for generic filter batches + if is_generic_filter { + state.announcements_synced = true; + tracing::info!( + relay = %relay_url, + batch_id = batch_id, + sync_method = ?sync_method, + "Generic filter (announcements) historic sync complete - announcements_synced set to true" + ); + } + // DEBUG TRACING: Log the root events being confirmed tracing::info!( relay = %relay_url, @@ -503,6 +523,8 @@ impl SyncManager { total_repos = state.repos.len(), total_root_events = state.root_events.len(), all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::>(), + is_generic_filter = is_generic_filter, + announcements_synced = state.announcements_synced, "Batch confirmed - items moved from pending to confirmed" ); } else { @@ -623,7 +645,8 @@ impl SyncManager { // 6. Connect to bootstrap relay if configured if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { - self.spawn_relay_connection(bootstrap_url.clone()).await; + self.register_relay(bootstrap_url.clone()).await; + self.try_connect_relay(bootstrap_url).await; } // 7. Capture config values before moving self into Arc @@ -731,33 +754,16 @@ impl SyncManager { match connection_status { None => { - // New relay - create entry with Connecting status - { - let mut index = self.relay_sync_index.write().await; - let new_state = RelayState { - connection_status: ConnectionStatus::Connecting, - is_bootstrap: false, // Only bootstrap relays set this to true - last_connected: None, - disconnected_at: None, - repos: HashSet::new(), - root_events: HashSet::new(), - }; - index.insert(action.relay_url.clone(), new_state); - } - - // Track new relay in metrics - if let Some(ref metrics) = self.metrics { - metrics.inc_tracked_count(); - } - + // New relay - register and connect tracing::info!( relay = %action.relay_url, repos = action.items.repos.len(), - "Spawning connection for new relay" + "Registering and connecting to new relay" ); - // Spawn connection for new relay - self.spawn_relay_connection(action.relay_url.clone()).await; + // Register relay (creates RelayConnection, initializes RelayState, updates metrics) + self.register_relay(action.relay_url.clone()).await; + self.try_connect_relay(&action.relay_url).await; // Connection will trigger handle_connect_or_reconnect which will process items return; } @@ -795,11 +801,146 @@ impl SyncManager { /// Handle a connection success (called when a relay connects or reconnects) /// - /// This method dispatches to the appropriate reconnection strategy: - /// - `fresh_start()` if never connected before - /// - `quick_reconnect()` if disconnected < 15 minutes - /// - `long_reconnect()` if disconnected > 15 minutes + /// This method: + /// 1. Updates RelayState to Connected + /// 2. Spawns event loop (MUST happen on every connection/reconnect) + /// 3. Dispatches to appropriate reconnection strategy based on disconnect time async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { + use tokio::sync::mpsc; + + // 1. Update state to Connected + { + let mut index = self.relay_sync_index.write().await; + let state = index.entry(relay_url.to_string()).or_default(); + state.connection_status = ConnectionStatus::Connected; + state.last_connected = Some(Timestamp::now()); + state.disconnected_at = None; + } + + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.set_relay_connected(relay_url, true); + metrics.inc_connected_count(); + } + + // 2. SPAWN EVENT LOOP (moved from spawn_relay_connection) + // This MUST happen on every connection (initial or reconnect) + // because event loops die on disconnect and cannot be reused + let connection = match self.connections.get(relay_url) { + Some(c) => c.clone(), + None => { + tracing::error!(relay = %relay_url, "No RelayConnection found for connected relay"); + return; + } + }; + + let (event_tx, mut event_rx) = mpsc::channel::(1000); + + // Spawn event loop task + let relay_url_for_loop = relay_url.to_string(); + tokio::spawn(async move { + connection.run_event_loop(event_tx).await; + tracing::debug!(relay = %relay_url_for_loop, "Event loop terminated"); + }); + + // Spawn event processor task + let relay_url_clone = relay_url.to_string(); + let database = Arc::clone(&self.database); + let write_policy = self.write_policy.clone(); + let local_relay = self.local_relay.clone(); + let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); + let eose_tx = self.eose_tx.as_ref().unwrap().clone(); + let metrics_clone = self.metrics.clone(); + + tokio::spawn(async move { + let mut disconnect_sent = false; + let mut eose_received = false; + + while let Some(relay_event) = event_rx.recv().await { + match relay_event { + RelayEvent::Event(event) => { + if let Some(ref metrics) = metrics_clone { + let source = if eose_received { + event_source::LIVE + } else { + event_source::STARTUP + }; + metrics.record_event(source); + } + Self::process_event_static( + &event, + &relay_url_clone, + &database, + &write_policy, + &local_relay, + ) + .await; + } + RelayEvent::EndOfStoredEvents(sub_id) => { + eose_received = true; + tracing::debug!( + relay = %relay_url_clone, + sub_id = %sub_id, + "EOSE received, notifying SyncManager" + ); + let _ = eose_tx + .send(EoseNotification { + relay_url: relay_url_clone.clone(), + sub_id, + }) + .await; + } + RelayEvent::Closed(reason) => { + tracing::info!( + relay = %relay_url_clone, + reason = %reason, + "Relay connection closed" + ); + if !disconnect_sent { + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; + disconnect_sent = true; + } + break; + } + RelayEvent::Shutdown => { + tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); + if !disconnect_sent { + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone.clone(), + }) + .await; + disconnect_sent = true; + } + break; + } + } + } + + // If the event channel closed without a Closed/Shutdown event + if !disconnect_sent { + tracing::info!( + relay = %relay_url_clone, + "Event channel closed, notifying SyncManager of disconnect" + ); + let _ = disconnect_tx + .send(DisconnectNotification { + relay_url: relay_url_clone, + }) + .await; + } + }); + + tracing::info!( + relay = %relay_url, + "Event loop and processor spawned for connected relay" + ); + + // 3. Decide reconnection strategy based on last_connected time let last_connected = { let index = self.relay_sync_index.read().await; index.get(relay_url).and_then(|s| s.last_connected) @@ -808,7 +949,7 @@ impl SyncManager { if let Some(last) = last_connected { let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); if elapsed < QUICK_RECONNECT_WINDOW_SECS { - // short disconnect + // Short disconnect - quick reconnect tracing::info!( relay = %relay_url, disconnect_secs = elapsed, @@ -817,7 +958,7 @@ impl SyncManager { self.quick_reconnect(relay_url, Timestamp::from(elapsed)) .await; } else { - // long disconnect + // Long disconnect - fresh start tracing::info!( relay = %relay_url, disconnect_secs = elapsed, @@ -826,13 +967,13 @@ impl SyncManager { self.fresh_start(relay_url).await; } } else { - // not successfully connected before (since launching binary) + // First connection - fresh start tracing::info!( relay = %relay_url, "First connection - initiating fresh_start" ); self.fresh_start(relay_url).await; - }; + } } /// Fresh start - clears state and does full sync @@ -846,7 +987,7 @@ impl SyncManager { /// 4. L1 live + L1 historic (negentropy if available) /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 async fn fresh_start(&mut self, relay_url: &str) { - let now = Timestamp::now(); + let _now = Timestamp::now(); tracing::info!(relay = %relay_url, "Starting fresh_start"); @@ -877,28 +1018,38 @@ impl SyncManager { ); } if state.connection_status == ConnectionStatus::Connected { - // TODO start layer 1 drop(index); + self.sync_generic_filters(relay_url, None).await; + // Step 5: compute_actions for L2+L3 (will be triggered by EOSE) self.recompute_new_sync_filters_for_relay(relay_url).await; } } else { drop(index); - return self.spawn_relay_connection(relay_url.to_string()).await; } } } + async fn sync_generic_filters(&mut self, relay_url: &str, since: Option) { + let filters = vec![filters::build_announcement_filter(None)]; + self.sync_live(relay_url, &filters).await; + + // Use historic_sync with empty PendingItems for generic filters + // Generic filters (announcements) don't have associated repos or root_events + let items = PendingItems::default(); + self.historic_sync(relay_url, filters, items, since).await; + } + /// Quick reconnect - for disconnections < 15 minutes /// - /// Flow: - /// 1. Clear PendingSyncIndex for this relay - /// 2. Update connection state to Connected - /// 3. L1 live + L1 historic(since) - /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since) - /// 5. compute_actions for any new items discovered during catchup + /// Re-establishes subscriptions after a brief disconnection by: + /// 1. Clearing stale PendingSyncIndex entries + /// 2. Syncing L1 filters with since timestamp (announcements) + /// 3. Rebuilding L2+L3 from preserved RelaySyncIndex state + /// 4. Computing actions for new items discovered during catchup + /// + /// Basic connection state and metrics are managed by handle_connect_or_reconnect. + /// This method handles reconnect-specific concerns (health tracking, reconnect metrics). async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { - let now = Timestamp::now(); - tracing::info!( relay = %relay_url, since = %since, @@ -917,56 +1068,36 @@ impl SyncManager { } } - // Step 2: Update connection state (preserve repos/root_events - that's the point!) - { - let mut index = self.relay_sync_index.write().await; - let state = index.entry(relay_url.to_string()).or_default(); - state.connection_status = ConnectionStatus::Connected; - state.last_connected = Some(now); - state.disconnected_at = None; - } - - // Record success in health tracker + // Record successful reconnection in health tracker self.health_tracker.record_success(relay_url); - // Update metrics + // Record reconnect-specific metrics (not basic connection metrics) if let Some(ref metrics) = self.metrics { - metrics.set_relay_connected(relay_url, true); - metrics.inc_connected_count(); metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); metrics.record_event(event_source::RECONNECT); } - // Step 3: L1 live + L1 historic with since filter - // L1 live subscription (since=now for ongoing events) - let live_filter = filters::build_announcement_filter(Some(now)); - if let Some(connection) = self.connections.get(relay_url) { - if let Err(e) = connection.subscribe_filter(live_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to set up L1 live subscription in quick_reconnect" - ); - } - } - - // L1 historic with since filter (catch up on missed announcements) - let layer1_filter = filters::build_announcement_filter(Some(since)); - if let Some(connection) = self.connections.get(relay_url) { - if let Err(e) = connection.subscribe_filter(layer1_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to subscribe to L1 historic filter in quick_reconnect" - ); + // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) + let announcement_since = { + let index = self.relay_sync_index.read().await; + if let Some(state) = index.get(relay_url) { + if state.announcements_synced { + Some(since) // Can use incremental sync + } else { + None // Need full sync - announcements never completed + } + } else { + None } - } + }; + self.sync_generic_filters(relay_url, announcement_since) + .await; - // Step 4: Rebuild L2+L3 from confirmed state with since filter + // Step 3: Rebuild L2+L3 from confirmed state with since filter // This uses the preserved repos/root_events from RelaySyncIndex self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; - // Step 5: compute_actions for any NEW items discovered while disconnected + // Step 4: compute_actions for any NEW items discovered while disconnected self.recompute_new_sync_filters_for_relay(relay_url).await; tracing::info!(relay = %relay_url, "quick_reconnect complete"); @@ -1038,6 +1169,119 @@ impl SyncManager { } } + /// Register a relay for managed connection/reconnection + /// + /// Creates a RelayConnection object and stores it in the connections HashMap. + /// Also initializes RelayState if it doesn't exist. + /// Does NOT connect - connection happens via try_connect_relay or retry_disconnected_relays. + /// The RelayConnection persists forever and is reused on reconnects. + async fn register_relay(&mut self, relay_url: String) { + // Create RelayConnection if not exists + if !self.connections.contains_key(&relay_url) { + let connection = + RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database)); + self.connections.insert(relay_url.clone(), connection); + tracing::debug!(relay = %relay_url, "Registered new relay connection"); + } + + // Initialize RelayState if not exists + let is_new = { + let mut index = self.relay_sync_index.write().await; + if !index.contains_key(&relay_url) { + let new_state = RelayState { + connection_status: ConnectionStatus::Disconnected, + is_bootstrap: false, + last_connected: None, + disconnected_at: None, + repos: HashSet::new(), + root_events: HashSet::new(), + announcements_synced: false, + }; + index.insert(relay_url.clone(), new_state); + true + } else { + false + } + }; + + // Track new relay in metrics + if is_new { + if let Some(ref metrics) = self.metrics { + metrics.inc_tracked_count(); + } + tracing::info!(relay = %relay_url, "Registered new relay for tracking"); + } + } + + /// Attempt a single connection to a registered relay + /// + /// Uses the existing RelayConnection from the HashMap and attempts to connect. + /// On success, sends ConnectNotification which triggers handle_connect_or_reconnect. + /// On failure, updates state and health tracker. + async fn try_connect_relay(&mut self, relay_url: &str) { + // 1. Mark attempting (optional, helpful for debugging) + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + state.connection_status = ConnectionStatus::Connecting; + } + } + + // 2. Record attempt in health tracker + self.health_tracker.record_attempt(relay_url); + + // 3. Get connection and attempt + let connection = match self.connections.get(relay_url) { + Some(c) => c, + None => { + tracing::error!(relay = %relay_url, "No RelayConnection registered"); + return; + } + }; + + let timeout = self.health_tracker.base_backoff_secs(); + + match connection.connect_and_subscribe(None, timeout).await { + Ok(_) => { + // Success - record and send notification + self.health_tracker.record_success(relay_url); + + if let Some(ref metrics) = self.metrics { + metrics.record_connection_attempt(relay_url, true); + } + + if let Some(ref connect_tx) = self.connect_tx { + let _ = connect_tx + .send(ConnectNotification { + relay_url: relay_url.to_string(), + }) + .await; + } + } + Err(e) => { + tracing::error!(relay = %relay_url, error = %e, "Connection failed"); + + // 4. Update state back to Disconnected on failure + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + state.connection_status = ConnectionStatus::Disconnected; + } + } + + // 5. Record failure in health tracker + self.health_tracker.record_failure(relay_url); + + // 6. Update metrics + if let Some(ref metrics) = self.metrics { + metrics.record_connection_attempt(relay_url, false); + metrics + .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + } + } + } + } + /// Recompute sync actions for a specific relay /// /// Uses derive_relay_targets and compute_actions to find new items @@ -1136,13 +1380,13 @@ impl SyncManager { } } - // 3. Remove from active connections - if self.connections.remove(relay_url).is_some() { - tracing::debug!( - relay = %relay_url, - "Removed relay from active connections" - ); - } + // 3. Keep RelayConnection in HashMap for reuse on reconnect + // The connection object persists and will be reused when retry_disconnected_relays + // calls try_connect_relay -> connection.connect_and_subscribe() + tracing::debug!( + relay = %relay_url, + "Keeping RelayConnection in HashMap for reconnection" + ); // 4. Record failure in health tracker self.health_tracker.record_failure(relay_url); @@ -1161,246 +1405,6 @@ impl SyncManager { ); } - /// Spawn a relay connection and start its event loop - /// - /// Creates a new RelayConnection, connects to Layer 1, stores the connection, - /// and spawns event processing tasks. Uses stored channel senders for notifications. - async fn spawn_relay_connection(&mut self, relay_url: String) { - use tokio::sync::mpsc; - - // Get channel senders (must exist during run) - let disconnect_tx = match &self.disconnect_tx { - Some(tx) => tx.clone(), - None => { - tracing::error!( - relay = %relay_url, - "Cannot spawn connection - channels not initialized" - ); - return; - } - }; - let eose_tx = match &self.eose_tx { - Some(tx) => tx.clone(), - None => { - tracing::error!( - relay = %relay_url, - "Cannot spawn connection - channels not initialized" - ); - return; - } - }; - let connect_tx = match &self.connect_tx { - Some(tx) => tx.clone(), - None => { - tracing::error!( - relay = %relay_url, - "Cannot spawn connection - channels not initialized" - ); - return; - } - }; - - let database = Arc::clone(&self.database); - let write_policy = self.write_policy.clone(); - let local_relay = self.local_relay.clone(); - let relay_sync_index = Arc::clone(&self.relay_sync_index); - - // Check if this is a bootstrap relay - let is_bootstrap = self.bootstrap_relay_url.as_ref() == Some(&relay_url); - - // Create relay connection with database for negentropy sync support - let connection = - RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database)); - - // Get connection timeout from health tracker (capped at base backoff) - // This ensures the connection attempt completes before the next retry would be scheduled - let connection_timeout_secs = self.health_tracker.base_backoff_secs(); - - // Connect and subscribe to Layer 1 - match connection - .connect_and_subscribe(None, connection_timeout_secs) - .await - { - Ok(_) => { - // Record successful connection attempt - if let Some(ref metrics) = self.metrics { - metrics.record_connection_attempt(&relay_url, true); - } - } - Err(e) => { - tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); - - // Record failed connection attempt - if let Some(ref metrics) = self.metrics { - metrics.record_connection_attempt(&relay_url, false); - } - - // Record failure in health tracker - self.health_tracker.record_failure(&relay_url); - - // Record health state in metrics - if let Some(ref metrics) = self.metrics { - metrics - .record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); - } - - // Update state to disconnected on failure - { - let mut index = relay_sync_index.write().await; - if let Some(state) = index.get_mut(&relay_url) { - state.connection_status = ConnectionStatus::Disconnected; - } - } - return; - } - } - - // Mark as connected in relay sync index - // Track whether this is a new relay for metrics - let is_new_relay = { - let mut index = relay_sync_index.write().await; - let is_new = !index.contains_key(&relay_url); - let state = index.entry(relay_url.clone()).or_default(); - state.connection_status = ConnectionStatus::Connected; - state.is_bootstrap = is_bootstrap; - state.last_connected = Some(Timestamp::now()); - state.disconnected_at = None; - is_new - }; - - // Increment tracked count for new relays - if is_new_relay { - if let Some(ref metrics) = self.metrics { - metrics.inc_tracked_count(); - } - } - - // Store connection in HashMap BEFORE sending notification - // This ensures it's available when handle_connect_or_reconnect is called - self.connections.insert(relay_url.clone(), connection); - - tracing::info!( - relay = %relay_url, - is_bootstrap = is_bootstrap, - "Spawned relay connection" - ); - - // Notify SyncManager of successful connection - let _ = connect_tx - .send(ConnectNotification { - relay_url: relay_url.clone(), - }) - .await; - - // Clone the connection for the event loop spawn - // The stored connection is used for subscription management - let connection_for_loop = self.connections.get(&relay_url).unwrap().clone(); - - // Create event channel - let (event_tx, mut event_rx) = mpsc::channel::(1000); - - // Spawn event loop with cloned connection - tokio::spawn(async move { - connection_for_loop.run_event_loop(event_tx).await; - }); - - // Spawn event processor - let relay_url_clone = relay_url.clone(); - let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task - tokio::spawn(async move { - // Track whether we've already sent a disconnect notification - let mut disconnect_sent = false; - // Track whether EOSE has been received - events before EOSE are "startup", after are "live" - let mut eose_received = false; - - while let Some(relay_event) = event_rx.recv().await { - match relay_event { - RelayEvent::Event(event) => { - if let Some(ref metrics) = metrics_clone { - // Events before EOSE are "startup", events after EOSE are "live" - let source = if eose_received { - event_source::LIVE - } else { - event_source::STARTUP - }; - metrics.record_event(source); - } - Self::process_event_static( - &event, - &relay_url_clone, - &database, - &write_policy, - &local_relay, - ) - .await; - } - RelayEvent::EndOfStoredEvents(sub_id) => { - // Mark EOSE as received - subsequent events are "live" - eose_received = true; - tracing::debug!( - relay = %relay_url_clone, - sub_id = %sub_id, - "EOSE received, notifying SyncManager" - ); - // Notify SyncManager of EOSE - let _ = eose_tx - .send(EoseNotification { - relay_url: relay_url_clone.clone(), - sub_id, - }) - .await; - } - RelayEvent::Closed(reason) => { - tracing::info!( - relay = %relay_url_clone, - reason = %reason, - "Relay connection closed" - ); - // Notify SyncManager of disconnect - let _ = disconnect_tx - .send(DisconnectNotification { - relay_url: relay_url_clone.clone(), - }) - .await; - disconnect_sent = true; - break; - } - RelayEvent::Shutdown => { - tracing::info!(relay = %relay_url_clone, "Relay shutdown detected"); - // Notify SyncManager of disconnect - let _ = disconnect_tx - .send(DisconnectNotification { - relay_url: relay_url_clone.clone(), - }) - .await; - disconnect_sent = true; - break; - } - } - } - - // If the event channel closed without a Closed/Shutdown event - // (e.g., connection dropped unexpectedly), still notify SyncManager - if !disconnect_sent { - tracing::info!( - relay = %relay_url_clone, - "Event channel closed, notifying SyncManager of disconnect" - ); - let _ = disconnect_tx - .send(DisconnectNotification { - relay_url: relay_url_clone.clone(), - }) - .await; - } - }); - - tracing::info!( - relay = %relay_url, - is_bootstrap = is_bootstrap, - "Spawned relay connection" - ); - } - /// Process a single event from a relay (static version for spawned tasks) /// /// Processes events with dedup, policy check, database save, and broadcast: @@ -1759,7 +1763,7 @@ impl SyncManager { tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); } - /// Check for disconnected relays that should be reconnected + /// Retry disconnected relays that are ready for reconnection /// /// This method is called periodically by run_disconnect_checker. /// It identifies relays that: @@ -1767,8 +1771,8 @@ impl SyncManager { /// - Have repos or root events to sync (not empty) /// - Have passed the exponential backoff period (respects health tracker) /// - /// For each eligible relay, a reconnection is attempted via spawn_relay_connection. - async fn check_reconnects(&mut self) { + /// For each eligible relay, a reconnection is attempted via try_connect_relay. + async fn retry_disconnected_relays(&mut self) { // Collect relays to reconnect let to_reconnect: Vec = { let index = self.relay_sync_index.read().await; @@ -1813,7 +1817,7 @@ impl SyncManager { health_state = %self.health_tracker.get_state(&relay_url), "Attempting reconnection" ); - self.spawn_relay_connection(relay_url).await; + self.try_connect_relay(&relay_url).await; } } -- cgit v1.2.3