upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/relay_connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/relay_connection.rs')
-rw-r--r--src/sync/relay_connection.rs99
1 files changed, 67 insertions, 32 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 9a580d2..63e4247 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -124,48 +124,83 @@ impl RelayConnection {
124 /// # Arguments 124 /// # Arguments
125 /// * `event_sender` - Channel to send relay events through 125 /// * `event_sender` - Channel to send relay events through
126 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { 126 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
127 use std::time::Duration;
128 use tokio::time::interval;
129
127 let mut notifications = self.client.notifications(); 130 let mut notifications = self.client.notifications();
128 let url = self.url.clone(); 131 let url = self.url.clone();
132
133 // Check connection status every second to detect dead connections
134 let mut check_interval = interval(Duration::from_secs(1));
129 135
130 tracing::debug!(relay = %url, "Starting event loop"); 136 tracing::debug!(relay = %url, "Starting event loop");
131 137
132 while let Ok(notification) = notifications.recv().await { 138 loop {
133 match notification { 139 tokio::select! {
134 RelayPoolNotification::Event { event, .. } => { 140 // Check for new notifications
135 tracing::trace!(relay = %url, event_id = %event.id, "Received event"); 141 notification_result = notifications.recv() => {
136 if event_sender.send(RelayEvent::Event(*event)).await.is_err() { 142 match notification_result {
137 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 143 Ok(notification) => {
138 break; 144 match notification {
139 } 145 RelayPoolNotification::Event { event, .. } => {
140 } 146 tracing::trace!(relay = %url, event_id = %event.id, "Received event");
141 RelayPoolNotification::Message { message, .. } => { 147 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
142 match message { 148 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
143 RelayMessage::EndOfStoredEvents(sub_id) => { 149 break;
144 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); 150 }
145 // Convert Cow<SubscriptionId> to owned SubscriptionId 151 }
146 let owned_sub_id = sub_id.into_owned(); 152 RelayPoolNotification::Message { message, .. } => {
147 if event_sender 153 match message {
148 .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) 154 RelayMessage::EndOfStoredEvents(sub_id) => {
149 .await 155 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE");
150 .is_err() 156 // Convert Cow<SubscriptionId> to owned SubscriptionId
151 { 157 let owned_sub_id = sub_id.into_owned();
152 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 158 if event_sender
153 break; 159 .send(RelayEvent::EndOfStoredEvents(owned_sub_id))
160 .await
161 .is_err()
162 {
163 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
164 break;
165 }
166 }
167 RelayMessage::Closed { message: msg, .. } => {
168 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
169 let _ = event_sender
170 .send(RelayEvent::Closed(msg.to_string()))
171 .await;
172 break;
173 }
174 _ => {}
175 }
176 }
177 RelayPoolNotification::Shutdown => {
178 tracing::info!(relay = %url, "Relay pool shutdown");
179 let _ = event_sender.send(RelayEvent::Shutdown).await;
180 break;
181 }
154 } 182 }
155 } 183 }
156 RelayMessage::Closed { message: msg, .. } => { 184 Err(_) => {
157 tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); 185 // Notification channel closed - connection lost
158 let _ = event_sender 186 tracing::debug!(relay = %url, "Notification channel error, stopping event loop");
159 .send(RelayEvent::Closed(msg.to_string())) 187 break;
160 .await;
161 } 188 }
162 _ => {}
163 } 189 }
164 } 190 }
165 RelayPoolNotification::Shutdown => { 191 // Periodic connection health check
166 tracing::info!(relay = %url, "Relay pool shutdown"); 192 _ = check_interval.tick() => {
167 let _ = event_sender.send(RelayEvent::Shutdown).await; 193 // Check if relay is still connected via nostr-sdk
168 break; 194 if let Ok(relay) = self.client.relay(&self.url).await {
195 if !relay.is_connected() {
196 tracing::info!(relay = %url, "Relay disconnected (detected by health check)");
197 break;
198 }
199 } else {
200 // Relay not found in client - must be disconnected
201 tracing::info!(relay = %url, "Relay not found (detected by health check)");
202 break;
203 }
169 } 204 }
170 } 205 }
171 } 206 }