upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs80
1 files changed, 68 insertions, 12 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 76cc8e8..319cbbd 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -9,6 +9,12 @@
9//! 1. Layer 1: kinds 30617 + 30618 (announcements) 9//! 1. Layer 1: kinds 30617 + 30618 (announcements)
10//! 2. Layer 2: A/a tags for repository events 10//! 2. Layer 2: A/a tags for repository events
11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) 11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.)
12//!
13//! ## Phase 3 Features
14//!
15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry
12 18
13use std::sync::Arc; 19use std::sync::Arc;
14use std::time::Duration; 20use std::time::Duration;
@@ -17,6 +23,7 @@ use nostr_sdk::prelude::*;
17use tokio::sync::mpsc; 23use tokio::sync::mpsc;
18 24
19use super::filter::FilterService; 25use super::filter::FilterService;
26use super::health::RelayHealthTracker;
20 27
21/// Event received from the sync connection 28/// Event received from the sync connection
22#[derive(Debug, Clone)] 29#[derive(Debug, Clone)]
@@ -169,47 +176,96 @@ impl SyncConnection {
169 } 176 }
170} 177}
171 178
172/// Reconnect loop with exponential backoff 179/// Reconnect loop with health-aware exponential backoff
180///
181/// This function manages the connection lifecycle with health tracking:
182/// - Checks health state before attempting connections
183/// - Reports success/failure to the health tracker
184/// - Respects backoff delays from the health tracker
185/// - Handles dead relay detection (24h+ failures)
173/// 186///
174/// # Arguments 187/// # Arguments
175/// * `url` - The relay URL to connect to 188/// * `url` - The relay URL to connect to
176/// * `tx` - Channel sender for synced events 189/// * `tx` - Channel sender for synced events
177/// * `filter_service` - FilterService for building subscriptions 190/// * `filter_service` - FilterService for building subscriptions
178/// * `our_domain` - Our relay's domain (used to extract remote domain) 191/// * `our_domain` - Our relay's domain (used to extract remote domain)
192/// * `health_tracker` - Health tracker for managing connection state
179pub async fn connect_with_retry( 193pub async fn connect_with_retry(
180 url: &str, 194 url: &str,
181 tx: mpsc::Sender<SyncedEvent>, 195 tx: mpsc::Sender<SyncedEvent>,
182 filter_service: Arc<FilterService>, 196 filter_service: Arc<FilterService>,
183 _our_domain: &str, 197 _our_domain: &str,
198 health_tracker: Arc<RelayHealthTracker>,
184) { 199) {
185 let mut backoff = Duration::from_secs(1);
186 let max_backoff = Duration::from_secs(60);
187
188 // Extract remote domain from URL 200 // Extract remote domain from URL
189 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); 201 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
190 202
191 loop { 203 loop {
204 // Check if we should attempt connection based on health state
205 if !health_tracker.should_attempt_connection(url) {
206 // Wait for remaining backoff
207 if let Some(remaining) = health_tracker.get_remaining_backoff(url) {
208 tracing::debug!(
209 "Relay {} in backoff, waiting {:?} before retry",
210 url,
211 remaining
212 );
213 tokio::time::sleep(remaining).await;
214 continue;
215 }
216 }
217
218 // Log current health state for dead relays
219 if health_tracker.is_dead(url) {
220 tracing::info!(
221 "Attempting reconnection to dead relay {} (daily retry)",
222 url
223 );
224 }
225
192 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { 226 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await {
193 Ok(conn) => { 227 Ok(conn) => {
194 backoff = Duration::from_secs(1); // Reset backoff on successful connection 228 // Record successful connection
229 health_tracker.record_success(url);
230 tracing::info!("Sync connection established to {}", url);
231
232 // Run the connection (this blocks until disconnection)
195 conn.run(tx.clone()).await; 233 conn.run(tx.clone()).await;
234
235 // Connection ended - record as failure for reconnection backoff
236 // (The connection ending is considered a failure even if it worked for a while)
237 health_tracker.record_failure(url);
196 tracing::warn!("Sync connection to {} ended, will reconnect", url); 238 tracing::warn!("Sync connection to {} ended, will reconnect", url);
197 } 239 }
198 Err(e) => { 240 Err(e) => {
241 // Record connection failure
242 health_tracker.record_failure(url);
243
244 let failure_count = health_tracker.get_failure_count(url);
245 let state = health_tracker.get_state(url);
246
199 tracing::error!( 247 tracing::error!(
200 "Failed to connect to sync relay {}: {} (retrying in {:?})", 248 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}",
201 url, 249 url,
202 e, 250 failure_count,
203 backoff 251 state,
252 e
204 ); 253 );
205 } 254 }
206 } 255 }
207 256
208 // Wait before reconnecting 257 // Get the backoff duration from health tracker
209 tokio::time::sleep(backoff).await; 258 // If the health tracker has no backoff set (shouldn't happen), use a small default
259 let wait_duration = health_tracker
260 .get_remaining_backoff(url)
261 .unwrap_or(Duration::from_secs(5));
210 262
211 // Exponential backoff 263 tracing::debug!(
212 backoff = std::cmp::min(backoff * 2, max_backoff); 264 "Waiting {:?} before reconnecting to {}",
265 wait_duration,
266 url
267 );
268 tokio::time::sleep(wait_duration).await;
213 } 269 }
214} 270}
215 271