From ffcf8a7bc679f0aff9135063d343be3161b3b439 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 08:36:48 +0000 Subject: feat: add event metrics tracking throughout sync (Phase 5) --- src/sync/mod.rs | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index a58406b..5039c04 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -549,6 +549,10 @@ impl SyncManager { // Recompute actions - will discover all repos/events again self.recompute_actions_for_relay(relay_url).await; + if let Some(ref metrics) = self.metrics { + metrics.record_event(event_source::DAILY); + } + tracing::info!(relay = %relay_url, "Daily sync complete"); } @@ -716,6 +720,11 @@ impl SyncManager { index.insert(action.relay_url.clone(), new_state); } + // Track new relay in metrics + if let Some(ref metrics) = self.metrics { + metrics.inc_tracked_count(); + } + tracing::info!( relay = %action.relay_url, repos = action.repos.len(), @@ -872,6 +881,13 @@ impl SyncManager { // Record success in health tracker self.health_tracker.record_success(relay_url); + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.set_relay_connected(relay_url, true); + metrics.inc_connected_count(); + metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + } + // Subscribe based on reconnect type if is_fresh_sync { tracing::info!( @@ -916,6 +932,10 @@ impl SyncManager { // Recompute actions for any new items discovered while disconnected self.recompute_actions_for_relay(relay_url).await; + + if let Some(ref metrics) = self.metrics { + metrics.record_event(event_source::RECONNECT); + } } } @@ -1089,6 +1109,14 @@ impl SyncManager { // 4. Record failure in health tracker self.health_tracker.record_failure(relay_url); + + // Update metrics + if let Some(ref metrics) = self.metrics { + metrics.set_relay_connected(relay_url, false); + metrics.dec_connected_count(); + metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); + } + tracing::info!( relay = %relay_url, health_state = %self.health_tracker.get_state(relay_url), @@ -1146,6 +1174,11 @@ impl SyncManager { // Create relay connection let connection = RelayConnection::new(relay_url.clone()); + // Record connection attempt + if let Some(ref metrics) = self.metrics { + metrics.record_connection_attempt(&relay_url, false); + } + // Connect and subscribe to Layer 1 if let Err(e) = connection.connect_and_subscribe(None).await { tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); @@ -1159,6 +1192,11 @@ impl SyncManager { return; } + // If successful, update connection attempt metric to success + if let Some(ref metrics) = self.metrics { + metrics.record_connection_attempt(&relay_url, true); + } + // Mark as connected in relay sync index { let mut index = relay_sync_index.write().await; @@ -1200,10 +1238,20 @@ impl SyncManager { // Spawn event processor let relay_url_clone = relay_url.clone(); + let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task + let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task tokio::spawn(async move { while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { + if let Some(ref metrics) = metrics_clone { + let source = if is_bootstrap_clone { + event_source::STARTUP + } else { + event_source::LIVE + }; + metrics.record_event(source); + } Self::process_event_static( &event, &relay_url_clone, -- cgit v1.2.3