From 82c5783a4d40c4273cb12317ec9bf88a2e281a04 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 12:50:05 +0000 Subject: refactor: use Relay::notifications() for event-driven disconnect detection Replace the 1-second polling loop with nostr-sdk's relay-level notification system that provides immediate disconnect detection via RelayNotification::RelayStatus. Key changes: - Use relay.notifications() instead of client.notifications() - Handle RelayNotification::RelayStatus { Disconnected | Terminated } to detect connection loss immediately without polling - Remove tokio::select! with interval timer - now uses simple match loop - Handle additional notification types (Authenticated, AuthenticationFailed) Why this is better: - Event-driven vs polling: no wasted CPU cycles checking every second - Immediate detection: disconnect triggers notification instantly - Uses nostr-sdk's built-in mechanism that was previously inaccessible at pool level (RelayStatus notifications are filtered out in RelayPoolNotification) Technical note: RelayNotification::RelayStatus is only available via Relay::notifications(), not Client::notifications(), because the pool-level broadcast filters out status change events. Future refactoring opportunity: Consider restructuring RelayConnection to hold a Relay directly instead of wrapping a Client, since we only manage one relay per connection anyway. --- src/sync/relay_connection.rs | 178 +++++++++++++++++++++++++++---------------- 1 file changed, 111 insertions(+), 67 deletions(-) (limited to 'src/sync') diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 63e4247..91825f2 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -111,97 +111,141 @@ impl RelayConnection { /// Run the event loop, sending events through the provided channel /// - /// This method blocks and processes notifications from the relay: - /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event` - /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` - /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown` + /// This method blocks and processes notifications from the relay using + /// nostr-sdk's `Relay::notifications()` channel, which provides event-driven + /// disconnect detection via `RelayNotification::RelayStatus`. + /// + /// Notification types handled: + /// - `RelayNotification::Event` -> sends `RelayEvent::Event` + /// - `RelayNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` + /// - `RelayNotification::RelayStatus { Disconnected }` -> terminates loop (disconnect detected) + /// - `RelayNotification::Shutdown` -> sends `RelayEvent::Shutdown` /// /// The loop terminates when: /// - The sender channel is closed (receiver dropped) /// - A shutdown notification is received + /// - Relay status changes to Disconnected or Terminated /// - An error occurs receiving notifications /// /// # Arguments /// * `event_sender` - Channel to send relay events through + /// + /// # Note + /// This uses `Relay::notifications()` instead of `Client::notifications()` because + /// `RelayNotification::RelayStatus` events are not forwarded to the pool-level channel. + /// This enables immediate, event-driven disconnect detection without polling. + /// + /// # Future Refactoring + /// Consider refactoring `RelayConnection` to hold a `Relay` directly instead of + /// wrapping a `Client`. This would simplify the architecture since we only manage + /// one relay per connection. pub async fn run_event_loop(self, event_sender: mpsc::Sender) { - use std::time::Duration; - use tokio::time::interval; - - let mut notifications = self.client.notifications(); let url = self.url.clone(); - - // Check connection status every second to detect dead connections - let mut check_interval = interval(Duration::from_secs(1)); - tracing::debug!(relay = %url, "Starting event loop"); + // Get the Relay from the client to access relay-level notifications + // which include RelayStatus changes (not available at pool level) + let relay = match self.client.relay(&self.url).await { + Ok(r) => r, + Err(e) => { + tracing::error!(relay = %url, error = %e, "Failed to get relay from client"); + return; + } + }; + + // Subscribe to relay-level notifications (includes RelayStatus) + let mut notifications = relay.notifications(); + + tracing::debug!(relay = %url, "Starting event loop with relay-level notifications"); loop { - tokio::select! { - // Check for new notifications - notification_result = notifications.recv() => { - match notification_result { - Ok(notification) => { - match notification { - RelayPoolNotification::Event { event, .. } => { - tracing::trace!(relay = %url, event_id = %event.id, "Received event"); - if event_sender.send(RelayEvent::Event(*event)).await.is_err() { - tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); - break; - } + match notifications.recv().await { + Ok(notification) => { + match notification { + RelayNotification::Event { + event, + subscription_id, + } => { + tracing::trace!( + relay = %url, + event_id = %event.id, + sub_id = %subscription_id, + "Received event" + ); + if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); + break; + } + } + RelayNotification::Message { message } => match message { + RelayMessage::EndOfStoredEvents(sub_id) => { + tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); + // Convert Cow to owned SubscriptionId + let owned_sub_id = sub_id.into_owned(); + if event_sender + .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) + .await + .is_err() + { + tracing::debug!( + relay = %url, + "Event sender closed, stopping event loop" + ); + break; } - RelayPoolNotification::Message { message, .. } => { - match message { - RelayMessage::EndOfStoredEvents(sub_id) => { - tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); - // Convert Cow to owned SubscriptionId - let owned_sub_id = sub_id.into_owned(); - if event_sender - .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) - .await - .is_err() - { - tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); - break; - } - } - RelayMessage::Closed { message: msg, .. } => { - tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); - let _ = event_sender - .send(RelayEvent::Closed(msg.to_string())) - .await; - break; - } - _ => {} - } + } + RelayMessage::Closed { message: msg, .. } => { + tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); + let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; + break; + } + _ => {} + }, + RelayNotification::RelayStatus { status } => { + // Event-driven disconnect detection - no polling needed! + match status { + RelayStatus::Disconnected => { + tracing::info!( + relay = %url, + "Relay disconnected (detected via RelayNotification)" + ); + break; } - RelayPoolNotification::Shutdown => { - tracing::info!(relay = %url, "Relay pool shutdown"); - let _ = event_sender.send(RelayEvent::Shutdown).await; + RelayStatus::Terminated => { + tracing::info!( + relay = %url, + "Relay terminated (detected via RelayNotification)" + ); break; } + _ => { + // Log other status changes for debugging + tracing::trace!( + relay = %url, + status = ?status, + "Relay status changed" + ); + } } } - Err(_) => { - // Notification channel closed - connection lost - tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); + RelayNotification::Shutdown => { + tracing::info!(relay = %url, "Relay shutdown notification"); + let _ = event_sender.send(RelayEvent::Shutdown).await; break; } - } - } - // Periodic connection health check - _ = check_interval.tick() => { - // Check if relay is still connected via nostr-sdk - if let Ok(relay) = self.client.relay(&self.url).await { - if !relay.is_connected() { - tracing::info!(relay = %url, "Relay disconnected (detected by health check)"); - break; + RelayNotification::Authenticated => { + tracing::debug!(relay = %url, "Authenticated to relay (NIP-42)"); + } + RelayNotification::AuthenticationFailed => { + tracing::warn!(relay = %url, "Authentication failed to relay (NIP-42)"); + // Don't break - relay may still work for public data } - } else { - // Relay not found in client - must be disconnected - tracing::info!(relay = %url, "Relay not found (detected by health check)"); - break; } } + Err(_) => { + // Notification channel closed - connection lost + tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); + break; + } } } -- cgit v1.2.3