diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 18:43:49 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 18:43:49 +0000 |
| commit | dd403b17e7c74db9443d0891a9de1f0f0f9f89eb (patch) | |
| tree | 177dd9f664dde3565492c1d11016dabfeda28bbc /src/sync/connection.rs | |
| parent | 950c2e4e68448d2abcad90a31bfffaca6d7bc47e (diff) | |
feat(sync): Phase 6 - observability and production readiness
- Add SyncMetrics with full Prometheus integration
- Track sync gaps via catchup events
- Update Grafana dashboard with sync panels
- Document all sync configuration options
- Update design doc with implementation notes
Diffstat (limited to 'src/sync/connection.rs')
| -rw-r--r-- | src/sync/connection.rs | 47 |
1 files changed, 46 insertions, 1 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs index cd7a603..e921185 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs | |||
| @@ -31,6 +31,7 @@ use tokio::sync::mpsc; | |||
| 31 | 31 | ||
| 32 | use super::filter::FilterService; | 32 | use super::filter::FilterService; |
| 33 | use super::health::RelayHealthTracker; | 33 | use super::health::RelayHealthTracker; |
| 34 | use super::metrics::{event_source, SyncMetrics}; | ||
| 34 | use super::subscription::SubscriptionManager; | 35 | use super::subscription::SubscriptionManager; |
| 35 | 36 | ||
| 36 | /// Event received from the sync connection | 37 | /// Event received from the sync connection |
| @@ -47,6 +48,7 @@ pub struct SyncConnection { | |||
| 47 | filter_service: Arc<FilterService>, | 48 | filter_service: Arc<FilterService>, |
| 48 | remote_domain: String, | 49 | remote_domain: String, |
| 49 | subscription_manager: SubscriptionManager, | 50 | subscription_manager: SubscriptionManager, |
| 51 | metrics: Option<SyncMetrics>, | ||
| 50 | } | 52 | } |
| 51 | 53 | ||
| 52 | impl SyncConnection { | 54 | impl SyncConnection { |
| @@ -55,6 +57,7 @@ impl SyncConnection { | |||
| 55 | url: &str, | 57 | url: &str, |
| 56 | filter_service: Arc<FilterService>, | 58 | filter_service: Arc<FilterService>, |
| 57 | remote_domain: &str, | 59 | remote_domain: &str, |
| 60 | metrics: Option<SyncMetrics>, | ||
| 58 | ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { | 61 | ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { |
| 59 | let client = Client::default(); | 62 | let client = Client::default(); |
| 60 | 63 | ||
| @@ -78,6 +81,7 @@ impl SyncConnection { | |||
| 78 | filter_service, | 81 | filter_service, |
| 79 | remote_domain: remote_domain.to_string(), | 82 | remote_domain: remote_domain.to_string(), |
| 80 | subscription_manager, | 83 | subscription_manager, |
| 84 | metrics, | ||
| 81 | }) | 85 | }) |
| 82 | } | 86 | } |
| 83 | 87 | ||
| @@ -152,10 +156,12 @@ impl SyncConnection { | |||
| 152 | 156 | ||
| 153 | // Handle incoming notifications | 157 | // Handle incoming notifications |
| 154 | let url = self.url.clone(); | 158 | let url = self.url.clone(); |
| 159 | let metrics = self.metrics.clone(); | ||
| 155 | self.client | 160 | self.client |
| 156 | .handle_notifications(|notification| { | 161 | .handle_notifications(|notification| { |
| 157 | let tx = tx.clone(); | 162 | let tx = tx.clone(); |
| 158 | let url = url.clone(); | 163 | let url = url.clone(); |
| 164 | let metrics = metrics.clone(); | ||
| 159 | async move { | 165 | async move { |
| 160 | match notification { | 166 | match notification { |
| 161 | RelayPoolNotification::Event { event, .. } => { | 167 | RelayPoolNotification::Event { event, .. } => { |
| @@ -166,6 +172,11 @@ impl SyncConnection { | |||
| 166 | event.kind.as_u16() | 172 | event.kind.as_u16() |
| 167 | ); | 173 | ); |
| 168 | 174 | ||
| 175 | // Record live event metric | ||
| 176 | if let Some(ref m) = metrics { | ||
| 177 | m.record_event(event_source::LIVE); | ||
| 178 | } | ||
| 179 | |||
| 169 | // Send the event to the manager for processing | 180 | // Send the event to the manager for processing |
| 170 | let synced = SyncedEvent { | 181 | let synced = SyncedEvent { |
| 171 | event: (*event).clone(), | 182 | event: (*event).clone(), |
| @@ -320,12 +331,14 @@ impl SyncConnection { | |||
| 320 | /// * `filter_service` - FilterService for building subscriptions | 331 | /// * `filter_service` - FilterService for building subscriptions |
| 321 | /// * `our_domain` - Our relay's domain (used to extract remote domain) | 332 | /// * `our_domain` - Our relay's domain (used to extract remote domain) |
| 322 | /// * `health_tracker` - Health tracker for managing connection state | 333 | /// * `health_tracker` - Health tracker for managing connection state |
| 334 | /// * `metrics` - Optional sync metrics for Prometheus | ||
| 323 | pub async fn connect_with_retry( | 335 | pub async fn connect_with_retry( |
| 324 | url: &str, | 336 | url: &str, |
| 325 | tx: mpsc::Sender<SyncedEvent>, | 337 | tx: mpsc::Sender<SyncedEvent>, |
| 326 | filter_service: Arc<FilterService>, | 338 | filter_service: Arc<FilterService>, |
| 327 | _our_domain: &str, | 339 | _our_domain: &str, |
| 328 | health_tracker: Arc<RelayHealthTracker>, | 340 | health_tracker: Arc<RelayHealthTracker>, |
| 341 | metrics: Option<SyncMetrics>, | ||
| 329 | ) { | 342 | ) { |
| 330 | // Extract remote domain from URL | 343 | // Extract remote domain from URL |
| 331 | let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); | 344 | let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); |
| @@ -353,10 +366,20 @@ pub async fn connect_with_retry( | |||
| 353 | ); | 366 | ); |
| 354 | } | 367 | } |
| 355 | 368 | ||
| 356 | match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { | 369 | match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { |
| 357 | Ok(conn) => { | 370 | Ok(conn) => { |
| 358 | // Record successful connection | 371 | // Record successful connection |
| 359 | health_tracker.record_success(url); | 372 | health_tracker.record_success(url); |
| 373 | |||
| 374 | // Record metrics | ||
| 375 | if let Some(ref m) = metrics { | ||
| 376 | m.record_connection_attempt(url, true); | ||
| 377 | m.set_relay_connected(url, true); | ||
| 378 | m.inc_connected_count(); | ||
| 379 | m.record_health_state(url, health_tracker.get_state(url)); | ||
| 380 | m.record_failure_count(url, 0); | ||
| 381 | } | ||
| 382 | |||
| 360 | tracing::info!("Sync connection established to {}", url); | 383 | tracing::info!("Sync connection established to {}", url); |
| 361 | 384 | ||
| 362 | // Run the connection (this blocks until disconnection) | 385 | // Run the connection (this blocks until disconnection) |
| @@ -365,6 +388,15 @@ pub async fn connect_with_retry( | |||
| 365 | // Connection ended - record as failure for reconnection backoff | 388 | // Connection ended - record as failure for reconnection backoff |
| 366 | // (The connection ending is considered a failure even if it worked for a while) | 389 | // (The connection ending is considered a failure even if it worked for a while) |
| 367 | health_tracker.record_failure(url); | 390 | health_tracker.record_failure(url); |
| 391 | |||
| 392 | // Update metrics for disconnection | ||
| 393 | if let Some(ref m) = metrics { | ||
| 394 | m.set_relay_connected(url, false); | ||
| 395 | m.dec_connected_count(); | ||
| 396 | m.record_health_state(url, health_tracker.get_state(url)); | ||
| 397 | m.record_failure_count(url, health_tracker.get_failure_count(url)); | ||
| 398 | } | ||
| 399 | |||
| 368 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | 400 | tracing::warn!("Sync connection to {} ended, will reconnect", url); |
| 369 | } | 401 | } |
| 370 | Err(e) => { | 402 | Err(e) => { |
| @@ -373,6 +405,19 @@ pub async fn connect_with_retry( | |||
| 373 | 405 | ||
| 374 | let failure_count = health_tracker.get_failure_count(url); | 406 | let failure_count = health_tracker.get_failure_count(url); |
| 375 | let state = health_tracker.get_state(url); | 407 | let state = health_tracker.get_state(url); |
| 408 | |||
| 409 | // Record metrics | ||
| 410 | if let Some(ref m) = metrics { | ||
| 411 | m.record_connection_attempt(url, false); | ||
| 412 | m.set_relay_connected(url, false); | ||
| 413 | m.record_health_state(url, state); | ||
| 414 | m.record_failure_count(url, failure_count); | ||
| 415 | |||
| 416 | // Track dead relays | ||
| 417 | if state == super::health::HealthState::Dead { | ||
| 418 | m.inc_dead_count(); | ||
| 419 | } | ||
| 420 | } | ||
| 376 | 421 | ||
| 377 | tracing::error!( | 422 | tracing::error!( |
| 378 | "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", | 423 | "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", |