diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 12:50:05 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 12:50:05 +0000 |
| commit | 82c5783a4d40c4273cb12317ec9bf88a2e281a04 (patch) | |
| tree | 508623c92193573c98c0c476b18c36ca77207316 /src/sync/relay_connection.rs | |
| parent | 83844a528365e657cd5f4d2cda51d72ced9900da (diff) | |
refactor: use Relay::notifications() for event-driven disconnect detection
Replace the 1-second polling loop with nostr-sdk's relay-level notification
system that provides immediate disconnect detection via RelayNotification::RelayStatus.
Key changes:
- Use relay.notifications() instead of client.notifications()
- Handle RelayNotification::RelayStatus { Disconnected | Terminated } to detect
connection loss immediately without polling
- Remove tokio::select! with interval timer - now uses simple match loop
- Handle additional notification types (Authenticated, AuthenticationFailed)
Why this is better:
- Event-driven vs polling: no wasted CPU cycles checking every second
- Immediate detection: disconnect triggers notification instantly
- Uses nostr-sdk's built-in mechanism that was previously inaccessible at pool level
(RelayStatus notifications are filtered out in RelayPoolNotification)
Technical note: RelayNotification::RelayStatus is only available via
Relay::notifications(), not Client::notifications(), because the pool-level
broadcast filters out status change events.
Future refactoring opportunity: Consider restructuring RelayConnection to hold
a Relay directly instead of wrapping a Client, since we only manage one relay
per connection anyway.
Diffstat (limited to 'src/sync/relay_connection.rs')
| -rw-r--r-- | src/sync/relay_connection.rs | 178 |
1 files changed, 111 insertions, 67 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 63e4247..91825f2 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -111,97 +111,141 @@ impl RelayConnection { | |||
| 111 | 111 | ||
| 112 | /// Run the event loop, sending events through the provided channel | 112 | /// Run the event loop, sending events through the provided channel |
| 113 | /// | 113 | /// |
| 114 | /// This method blocks and processes notifications from the relay: | 114 | /// This method blocks and processes notifications from the relay using |
| 115 | /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event` | 115 | /// nostr-sdk's `Relay::notifications()` channel, which provides event-driven |
| 116 | /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` | 116 | /// disconnect detection via `RelayNotification::RelayStatus`. |
| 117 | /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown` | 117 | /// |
| 118 | /// Notification types handled: | ||
| 119 | /// - `RelayNotification::Event` -> sends `RelayEvent::Event` | ||
| 120 | /// - `RelayNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` | ||
| 121 | /// - `RelayNotification::RelayStatus { Disconnected }` -> terminates loop (disconnect detected) | ||
| 122 | /// - `RelayNotification::Shutdown` -> sends `RelayEvent::Shutdown` | ||
| 118 | /// | 123 | /// |
| 119 | /// The loop terminates when: | 124 | /// The loop terminates when: |
| 120 | /// - The sender channel is closed (receiver dropped) | 125 | /// - The sender channel is closed (receiver dropped) |
| 121 | /// - A shutdown notification is received | 126 | /// - A shutdown notification is received |
| 127 | /// - Relay status changes to Disconnected or Terminated | ||
| 122 | /// - An error occurs receiving notifications | 128 | /// - An error occurs receiving notifications |
| 123 | /// | 129 | /// |
| 124 | /// # Arguments | 130 | /// # Arguments |
| 125 | /// * `event_sender` - Channel to send relay events through | 131 | /// * `event_sender` - Channel to send relay events through |
| 132 | /// | ||
| 133 | /// # Note | ||
| 134 | /// This uses `Relay::notifications()` instead of `Client::notifications()` because | ||
| 135 | /// `RelayNotification::RelayStatus` events are not forwarded to the pool-level channel. | ||
| 136 | /// This enables immediate, event-driven disconnect detection without polling. | ||
| 137 | /// | ||
| 138 | /// # Future Refactoring | ||
| 139 | /// Consider refactoring `RelayConnection` to hold a `Relay` directly instead of | ||
| 140 | /// wrapping a `Client`. This would simplify the architecture since we only manage | ||
| 141 | /// one relay per connection. | ||
| 126 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { | 142 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { |
| 127 | use std::time::Duration; | ||
| 128 | use tokio::time::interval; | ||
| 129 | |||
| 130 | let mut notifications = self.client.notifications(); | ||
| 131 | let url = self.url.clone(); | 143 | let url = self.url.clone(); |
| 132 | |||
| 133 | // Check connection status every second to detect dead connections | ||
| 134 | let mut check_interval = interval(Duration::from_secs(1)); | ||
| 135 | 144 | ||
| 136 | tracing::debug!(relay = %url, "Starting event loop"); | 145 | // Get the Relay from the client to access relay-level notifications |
| 146 | // which include RelayStatus changes (not available at pool level) | ||
| 147 | let relay = match self.client.relay(&self.url).await { | ||
| 148 | Ok(r) => r, | ||
| 149 | Err(e) => { | ||
| 150 | tracing::error!(relay = %url, error = %e, "Failed to get relay from client"); | ||
| 151 | return; | ||
| 152 | } | ||
| 153 | }; | ||
| 154 | |||
| 155 | // Subscribe to relay-level notifications (includes RelayStatus) | ||
| 156 | let mut notifications = relay.notifications(); | ||
| 157 | |||
| 158 | tracing::debug!(relay = %url, "Starting event loop with relay-level notifications"); | ||
| 137 | 159 | ||
| 138 | loop { | 160 | loop { |
| 139 | tokio::select! { | 161 | match notifications.recv().await { |
| 140 | // Check for new notifications | 162 | Ok(notification) => { |
| 141 | notification_result = notifications.recv() => { | 163 | match notification { |
| 142 | match notification_result { | 164 | RelayNotification::Event { |
| 143 | Ok(notification) => { | 165 | event, |
| 144 | match notification { | 166 | subscription_id, |
| 145 | RelayPoolNotification::Event { event, .. } => { | 167 | } => { |
| 146 | tracing::trace!(relay = %url, event_id = %event.id, "Received event"); | 168 | tracing::trace!( |
| 147 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | 169 | relay = %url, |
| 148 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 170 | event_id = %event.id, |
| 149 | break; | 171 | sub_id = %subscription_id, |
| 150 | } | 172 | "Received event" |
| 173 | ); | ||
| 174 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | ||
| 175 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | ||
| 176 | break; | ||
| 177 | } | ||
| 178 | } | ||
| 179 | RelayNotification::Message { message } => match message { | ||
| 180 | RelayMessage::EndOfStoredEvents(sub_id) => { | ||
| 181 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); | ||
| 182 | // Convert Cow<SubscriptionId> to owned SubscriptionId | ||
| 183 | let owned_sub_id = sub_id.into_owned(); | ||
| 184 | if event_sender | ||
| 185 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) | ||
| 186 | .await | ||
| 187 | .is_err() | ||
| 188 | { | ||
| 189 | tracing::debug!( | ||
| 190 | relay = %url, | ||
| 191 | "Event sender closed, stopping event loop" | ||
| 192 | ); | ||
| 193 | break; | ||
| 151 | } | 194 | } |
| 152 | RelayPoolNotification::Message { message, .. } => { | 195 | } |
| 153 | match message { | 196 | RelayMessage::Closed { message: msg, .. } => { |
| 154 | RelayMessage::EndOfStoredEvents(sub_id) => { | 197 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); |
| 155 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); | 198 | let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; |
| 156 | // Convert Cow<SubscriptionId> to owned SubscriptionId | 199 | break; |
| 157 | let owned_sub_id = sub_id.into_owned(); | 200 | } |
| 158 | if event_sender | 201 | _ => {} |
| 159 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) | 202 | }, |
| 160 | .await | 203 | RelayNotification::RelayStatus { status } => { |
| 161 | .is_err() | 204 | // Event-driven disconnect detection - no polling needed! |
| 162 | { | 205 | match status { |
| 163 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 206 | RelayStatus::Disconnected => { |
| 164 | break; | 207 | tracing::info!( |
| 165 | } | 208 | relay = %url, |
| 166 | } | 209 | "Relay disconnected (detected via RelayNotification)" |
| 167 | RelayMessage::Closed { message: msg, .. } => { | 210 | ); |
| 168 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | 211 | break; |
| 169 | let _ = event_sender | ||
| 170 | .send(RelayEvent::Closed(msg.to_string())) | ||
| 171 | .await; | ||
| 172 | break; | ||
| 173 | } | ||
| 174 | _ => {} | ||
| 175 | } | ||
| 176 | } | 212 | } |
| 177 | RelayPoolNotification::Shutdown => { | 213 | RelayStatus::Terminated => { |
| 178 | tracing::info!(relay = %url, "Relay pool shutdown"); | 214 | tracing::info!( |
| 179 | let _ = event_sender.send(RelayEvent::Shutdown).await; | 215 | relay = %url, |
| 216 | "Relay terminated (detected via RelayNotification)" | ||
| 217 | ); | ||
| 180 | break; | 218 | break; |
| 181 | } | 219 | } |
| 220 | _ => { | ||
| 221 | // Log other status changes for debugging | ||
| 222 | tracing::trace!( | ||
| 223 | relay = %url, | ||
| 224 | status = ?status, | ||
| 225 | "Relay status changed" | ||
| 226 | ); | ||
| 227 | } | ||
| 182 | } | 228 | } |
| 183 | } | 229 | } |
| 184 | Err(_) => { | 230 | RelayNotification::Shutdown => { |
| 185 | // Notification channel closed - connection lost | 231 | tracing::info!(relay = %url, "Relay shutdown notification"); |
| 186 | tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); | 232 | let _ = event_sender.send(RelayEvent::Shutdown).await; |
| 187 | break; | 233 | break; |
| 188 | } | 234 | } |
| 189 | } | 235 | RelayNotification::Authenticated => { |
| 190 | } | 236 | tracing::debug!(relay = %url, "Authenticated to relay (NIP-42)"); |
| 191 | // Periodic connection health check | 237 | } |
| 192 | _ = check_interval.tick() => { | 238 | RelayNotification::AuthenticationFailed => { |
| 193 | // Check if relay is still connected via nostr-sdk | 239 | tracing::warn!(relay = %url, "Authentication failed to relay (NIP-42)"); |
| 194 | if let Ok(relay) = self.client.relay(&self.url).await { | 240 | // Don't break - relay may still work for public data |
| 195 | if !relay.is_connected() { | ||
| 196 | tracing::info!(relay = %url, "Relay disconnected (detected by health check)"); | ||
| 197 | break; | ||
| 198 | } | 241 | } |
| 199 | } else { | ||
| 200 | // Relay not found in client - must be disconnected | ||
| 201 | tracing::info!(relay = %url, "Relay not found (detected by health check)"); | ||
| 202 | break; | ||
| 203 | } | 242 | } |
| 204 | } | 243 | } |
| 244 | Err(_) => { | ||
| 245 | // Notification channel closed - connection lost | ||
| 246 | tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); | ||
| 247 | break; | ||
| 248 | } | ||
| 205 | } | 249 | } |
| 206 | } | 250 | } |
| 207 | 251 | ||