diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:58:31 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:58:31 +0000 |
| commit | f639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (patch) | |
| tree | cfcbf16a937a59048930ccaf8557f78ed5576bde /src/sync/connection.rs | |
| parent | bf558b0dc17e14f96eea624ea5591315a2909154 (diff) | |
feat(sync): Phase 3 - resilience and health tracking
- Add RelayHealthTracker with DashMap
- Implement exponential backoff (5s -> 1h max)
- Handle dead relays (24h failures -> daily retry)
- Add startup jitter to prevent thundering herd
- Add NGIT_SYNC_MAX_BACKOFF_SECS config
Diffstat (limited to 'src/sync/connection.rs')
| -rw-r--r-- | src/sync/connection.rs | 80 |
1 files changed, 68 insertions, 12 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 76cc8e8..319cbbd 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs | |||
| @@ -9,6 +9,12 @@ | |||
| 9 | //! 1. Layer 1: kinds 30617 + 30618 (announcements) | 9 | //! 1. Layer 1: kinds 30617 + 30618 (announcements) |
| 10 | //! 2. Layer 2: A/a tags for repository events | 10 | //! 2. Layer 2: A/a tags for repository events |
| 11 | //! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) | 11 | //! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.) |
| 12 | //! | ||
| 13 | //! ## Phase 3 Features | ||
| 14 | //! | ||
| 15 | //! - Health tracking with success/failure reporting | ||
| 16 | //! - Exponential backoff with health-aware delays | ||
| 17 | //! - Dead relay detection and minimal retry | ||
| 12 | 18 | ||
| 13 | use std::sync::Arc; | 19 | use std::sync::Arc; |
| 14 | use std::time::Duration; | 20 | use std::time::Duration; |
| @@ -17,6 +23,7 @@ use nostr_sdk::prelude::*; | |||
| 17 | use tokio::sync::mpsc; | 23 | use tokio::sync::mpsc; |
| 18 | 24 | ||
| 19 | use super::filter::FilterService; | 25 | use super::filter::FilterService; |
| 26 | use super::health::RelayHealthTracker; | ||
| 20 | 27 | ||
| 21 | /// Event received from the sync connection | 28 | /// Event received from the sync connection |
| 22 | #[derive(Debug, Clone)] | 29 | #[derive(Debug, Clone)] |
| @@ -169,47 +176,96 @@ impl SyncConnection { | |||
| 169 | } | 176 | } |
| 170 | } | 177 | } |
| 171 | 178 | ||
| 172 | /// Reconnect loop with exponential backoff | 179 | /// Reconnect loop with health-aware exponential backoff |
| 180 | /// | ||
| 181 | /// This function manages the connection lifecycle with health tracking: | ||
| 182 | /// - Checks health state before attempting connections | ||
| 183 | /// - Reports success/failure to the health tracker | ||
| 184 | /// - Respects backoff delays from the health tracker | ||
| 185 | /// - Handles dead relay detection (24h+ failures) | ||
| 173 | /// | 186 | /// |
| 174 | /// # Arguments | 187 | /// # Arguments |
| 175 | /// * `url` - The relay URL to connect to | 188 | /// * `url` - The relay URL to connect to |
| 176 | /// * `tx` - Channel sender for synced events | 189 | /// * `tx` - Channel sender for synced events |
| 177 | /// * `filter_service` - FilterService for building subscriptions | 190 | /// * `filter_service` - FilterService for building subscriptions |
| 178 | /// * `our_domain` - Our relay's domain (used to extract remote domain) | 191 | /// * `our_domain` - Our relay's domain (used to extract remote domain) |
| 192 | /// * `health_tracker` - Health tracker for managing connection state | ||
| 179 | pub async fn connect_with_retry( | 193 | pub async fn connect_with_retry( |
| 180 | url: &str, | 194 | url: &str, |
| 181 | tx: mpsc::Sender<SyncedEvent>, | 195 | tx: mpsc::Sender<SyncedEvent>, |
| 182 | filter_service: Arc<FilterService>, | 196 | filter_service: Arc<FilterService>, |
| 183 | _our_domain: &str, | 197 | _our_domain: &str, |
| 198 | health_tracker: Arc<RelayHealthTracker>, | ||
| 184 | ) { | 199 | ) { |
| 185 | let mut backoff = Duration::from_secs(1); | ||
| 186 | let max_backoff = Duration::from_secs(60); | ||
| 187 | |||
| 188 | // Extract remote domain from URL | 200 | // Extract remote domain from URL |
| 189 | let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); | 201 | let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); |
| 190 | 202 | ||
| 191 | loop { | 203 | loop { |
| 204 | // Check if we should attempt connection based on health state | ||
| 205 | if !health_tracker.should_attempt_connection(url) { | ||
| 206 | // Wait for remaining backoff | ||
| 207 | if let Some(remaining) = health_tracker.get_remaining_backoff(url) { | ||
| 208 | tracing::debug!( | ||
| 209 | "Relay {} in backoff, waiting {:?} before retry", | ||
| 210 | url, | ||
| 211 | remaining | ||
| 212 | ); | ||
| 213 | tokio::time::sleep(remaining).await; | ||
| 214 | continue; | ||
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 218 | // Log current health state for dead relays | ||
| 219 | if health_tracker.is_dead(url) { | ||
| 220 | tracing::info!( | ||
| 221 | "Attempting reconnection to dead relay {} (daily retry)", | ||
| 222 | url | ||
| 223 | ); | ||
| 224 | } | ||
| 225 | |||
| 192 | match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { | 226 | match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { |
| 193 | Ok(conn) => { | 227 | Ok(conn) => { |
| 194 | backoff = Duration::from_secs(1); // Reset backoff on successful connection | 228 | // Record successful connection |
| 229 | health_tracker.record_success(url); | ||
| 230 | tracing::info!("Sync connection established to {}", url); | ||
| 231 | |||
| 232 | // Run the connection (this blocks until disconnection) | ||
| 195 | conn.run(tx.clone()).await; | 233 | conn.run(tx.clone()).await; |
| 234 | |||
| 235 | // Connection ended - record as failure for reconnection backoff | ||
| 236 | // (The connection ending is considered a failure even if it worked for a while) | ||
| 237 | health_tracker.record_failure(url); | ||
| 196 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | 238 | tracing::warn!("Sync connection to {} ended, will reconnect", url); |
| 197 | } | 239 | } |
| 198 | Err(e) => { | 240 | Err(e) => { |
| 241 | // Record connection failure | ||
| 242 | health_tracker.record_failure(url); | ||
| 243 | |||
| 244 | let failure_count = health_tracker.get_failure_count(url); | ||
| 245 | let state = health_tracker.get_state(url); | ||
| 246 | |||
| 199 | tracing::error!( | 247 | tracing::error!( |
| 200 | "Failed to connect to sync relay {}: {} (retrying in {:?})", | 248 | "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", |
| 201 | url, | 249 | url, |
| 202 | e, | 250 | failure_count, |
| 203 | backoff | 251 | state, |
| 252 | e | ||
| 204 | ); | 253 | ); |
| 205 | } | 254 | } |
| 206 | } | 255 | } |
| 207 | 256 | ||
| 208 | // Wait before reconnecting | 257 | // Get the backoff duration from health tracker |
| 209 | tokio::time::sleep(backoff).await; | 258 | // If the health tracker has no backoff set (shouldn't happen), use a small default |
| 259 | let wait_duration = health_tracker | ||
| 260 | .get_remaining_backoff(url) | ||
| 261 | .unwrap_or(Duration::from_secs(5)); | ||
| 210 | 262 | ||
| 211 | // Exponential backoff | 263 | tracing::debug!( |
| 212 | backoff = std::cmp::min(backoff * 2, max_backoff); | 264 | "Waiting {:?} before reconnecting to {}", |
| 265 | wait_duration, | ||
| 266 | url | ||
| 267 | ); | ||
| 268 | tokio::time::sleep(wait_duration).await; | ||
| 213 | } | 269 | } |
| 214 | } | 270 | } |
| 215 | 271 | ||