upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 12:50:05 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 12:50:05 +0000
commit82c5783a4d40c4273cb12317ec9bf88a2e281a04 (patch)
tree508623c92193573c98c0c476b18c36ca77207316 /src
parent83844a528365e657cd5f4d2cda51d72ced9900da (diff)
refactor: use Relay::notifications() for event-driven disconnect detection
Replace the 1-second polling loop with nostr-sdk's relay-level notification system that provides immediate disconnect detection via RelayNotification::RelayStatus. Key changes: - Use relay.notifications() instead of client.notifications() - Handle RelayNotification::RelayStatus { Disconnected | Terminated } to detect connection loss immediately without polling - Remove tokio::select! with interval timer - now uses simple match loop - Handle additional notification types (Authenticated, AuthenticationFailed) Why this is better: - Event-driven vs polling: no wasted CPU cycles checking every second - Immediate detection: disconnect triggers notification instantly - Uses nostr-sdk's built-in mechanism that was previously inaccessible at pool level (RelayStatus notifications are filtered out in RelayPoolNotification) Technical note: RelayNotification::RelayStatus is only available via Relay::notifications(), not Client::notifications(), because the pool-level broadcast filters out status change events. Future refactoring opportunity: Consider restructuring RelayConnection to hold a Relay directly instead of wrapping a Client, since we only manage one relay per connection anyway.
Diffstat (limited to 'src')
-rw-r--r--src/sync/relay_connection.rs178
1 files changed, 111 insertions, 67 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 63e4247..91825f2 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -111,97 +111,141 @@ impl RelayConnection {
111 111
112 /// Run the event loop, sending events through the provided channel 112 /// Run the event loop, sending events through the provided channel
113 /// 113 ///
114 /// This method blocks and processes notifications from the relay: 114 /// This method blocks and processes notifications from the relay using
115 /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event` 115 /// nostr-sdk's `Relay::notifications()` channel, which provides event-driven
116 /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` 116 /// disconnect detection via `RelayNotification::RelayStatus`.
117 /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown` 117 ///
118 /// Notification types handled:
119 /// - `RelayNotification::Event` -> sends `RelayEvent::Event`
120 /// - `RelayNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents`
121 /// - `RelayNotification::RelayStatus { Disconnected }` -> terminates loop (disconnect detected)
122 /// - `RelayNotification::Shutdown` -> sends `RelayEvent::Shutdown`
118 /// 123 ///
119 /// The loop terminates when: 124 /// The loop terminates when:
120 /// - The sender channel is closed (receiver dropped) 125 /// - The sender channel is closed (receiver dropped)
121 /// - A shutdown notification is received 126 /// - A shutdown notification is received
127 /// - Relay status changes to Disconnected or Terminated
122 /// - An error occurs receiving notifications 128 /// - An error occurs receiving notifications
123 /// 129 ///
124 /// # Arguments 130 /// # Arguments
125 /// * `event_sender` - Channel to send relay events through 131 /// * `event_sender` - Channel to send relay events through
132 ///
133 /// # Note
134 /// This uses `Relay::notifications()` instead of `Client::notifications()` because
135 /// `RelayNotification::RelayStatus` events are not forwarded to the pool-level channel.
136 /// This enables immediate, event-driven disconnect detection without polling.
137 ///
138 /// # Future Refactoring
139 /// Consider refactoring `RelayConnection` to hold a `Relay` directly instead of
140 /// wrapping a `Client`. This would simplify the architecture since we only manage
141 /// one relay per connection.
126 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { 142 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
127 use std::time::Duration;
128 use tokio::time::interval;
129
130 let mut notifications = self.client.notifications();
131 let url = self.url.clone(); 143 let url = self.url.clone();
132
133 // Check connection status every second to detect dead connections
134 let mut check_interval = interval(Duration::from_secs(1));
135 144
136 tracing::debug!(relay = %url, "Starting event loop"); 145 // Get the Relay from the client to access relay-level notifications
146 // which include RelayStatus changes (not available at pool level)
147 let relay = match self.client.relay(&self.url).await {
148 Ok(r) => r,
149 Err(e) => {
150 tracing::error!(relay = %url, error = %e, "Failed to get relay from client");
151 return;
152 }
153 };
154
155 // Subscribe to relay-level notifications (includes RelayStatus)
156 let mut notifications = relay.notifications();
157
158 tracing::debug!(relay = %url, "Starting event loop with relay-level notifications");
137 159
138 loop { 160 loop {
139 tokio::select! { 161 match notifications.recv().await {
140 // Check for new notifications 162 Ok(notification) => {
141 notification_result = notifications.recv() => { 163 match notification {
142 match notification_result { 164 RelayNotification::Event {
143 Ok(notification) => { 165 event,
144 match notification { 166 subscription_id,
145 RelayPoolNotification::Event { event, .. } => { 167 } => {
146 tracing::trace!(relay = %url, event_id = %event.id, "Received event"); 168 tracing::trace!(
147 if event_sender.send(RelayEvent::Event(*event)).await.is_err() { 169 relay = %url,
148 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 170 event_id = %event.id,
149 break; 171 sub_id = %subscription_id,
150 } 172 "Received event"
173 );
174 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
175 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
176 break;
177 }
178 }
179 RelayNotification::Message { message } => match message {
180 RelayMessage::EndOfStoredEvents(sub_id) => {
181 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE");
182 // Convert Cow<SubscriptionId> to owned SubscriptionId
183 let owned_sub_id = sub_id.into_owned();
184 if event_sender
185 .send(RelayEvent::EndOfStoredEvents(owned_sub_id))
186 .await
187 .is_err()
188 {
189 tracing::debug!(
190 relay = %url,
191 "Event sender closed, stopping event loop"
192 );
193 break;
151 } 194 }
152 RelayPoolNotification::Message { message, .. } => { 195 }
153 match message { 196 RelayMessage::Closed { message: msg, .. } => {
154 RelayMessage::EndOfStoredEvents(sub_id) => { 197 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
155 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); 198 let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await;
156 // Convert Cow<SubscriptionId> to owned SubscriptionId 199 break;
157 let owned_sub_id = sub_id.into_owned(); 200 }
158 if event_sender 201 _ => {}
159 .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) 202 },
160 .await 203 RelayNotification::RelayStatus { status } => {
161 .is_err() 204 // Event-driven disconnect detection - no polling needed!
162 { 205 match status {
163 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 206 RelayStatus::Disconnected => {
164 break; 207 tracing::info!(
165 } 208 relay = %url,
166 } 209 "Relay disconnected (detected via RelayNotification)"
167 RelayMessage::Closed { message: msg, .. } => { 210 );
168 tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); 211 break;
169 let _ = event_sender
170 .send(RelayEvent::Closed(msg.to_string()))
171 .await;
172 break;
173 }
174 _ => {}
175 }
176 } 212 }
177 RelayPoolNotification::Shutdown => { 213 RelayStatus::Terminated => {
178 tracing::info!(relay = %url, "Relay pool shutdown"); 214 tracing::info!(
179 let _ = event_sender.send(RelayEvent::Shutdown).await; 215 relay = %url,
216 "Relay terminated (detected via RelayNotification)"
217 );
180 break; 218 break;
181 } 219 }
220 _ => {
221 // Log other status changes for debugging
222 tracing::trace!(
223 relay = %url,
224 status = ?status,
225 "Relay status changed"
226 );
227 }
182 } 228 }
183 } 229 }
184 Err(_) => { 230 RelayNotification::Shutdown => {
185 // Notification channel closed - connection lost 231 tracing::info!(relay = %url, "Relay shutdown notification");
186 tracing::debug!(relay = %url, "Notification channel error, stopping event loop"); 232 let _ = event_sender.send(RelayEvent::Shutdown).await;
187 break; 233 break;
188 } 234 }
189 } 235 RelayNotification::Authenticated => {
190 } 236 tracing::debug!(relay = %url, "Authenticated to relay (NIP-42)");
191 // Periodic connection health check 237 }
192 _ = check_interval.tick() => { 238 RelayNotification::AuthenticationFailed => {
193 // Check if relay is still connected via nostr-sdk 239 tracing::warn!(relay = %url, "Authentication failed to relay (NIP-42)");
194 if let Ok(relay) = self.client.relay(&self.url).await { 240 // Don't break - relay may still work for public data
195 if !relay.is_connected() {
196 tracing::info!(relay = %url, "Relay disconnected (detected by health check)");
197 break;
198 } 241 }
199 } else {
200 // Relay not found in client - must be disconnected
201 tracing::info!(relay = %url, "Relay not found (detected by health check)");
202 break;
203 } 242 }
204 } 243 }
244 Err(_) => {
245 // Notification channel closed - connection lost
246 tracing::debug!(relay = %url, "Notification channel error, stopping event loop");
247 break;
248 }
205 } 249 }
206 } 250 }
207 251