diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 08:36:48 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 08:36:48 +0000 |
| commit | ffcf8a7bc679f0aff9135063d343be3161b3b439 (patch) | |
| tree | 0256c08ac29c881eb44df89ba7e6602196a19b9f /src/sync/mod.rs | |
| parent | da6187fece22b7f1284dff8d804c22ad5f2eb40e (diff) | |
feat: add event metrics tracking throughout sync (Phase 5)
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index a58406b..5039c04 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -549,6 +549,10 @@ impl SyncManager { | |||
| 549 | // Recompute actions - will discover all repos/events again | 549 | // Recompute actions - will discover all repos/events again |
| 550 | self.recompute_actions_for_relay(relay_url).await; | 550 | self.recompute_actions_for_relay(relay_url).await; |
| 551 | 551 | ||
| 552 | if let Some(ref metrics) = self.metrics { | ||
| 553 | metrics.record_event(event_source::DAILY); | ||
| 554 | } | ||
| 555 | |||
| 552 | tracing::info!(relay = %relay_url, "Daily sync complete"); | 556 | tracing::info!(relay = %relay_url, "Daily sync complete"); |
| 553 | } | 557 | } |
| 554 | 558 | ||
| @@ -716,6 +720,11 @@ impl SyncManager { | |||
| 716 | index.insert(action.relay_url.clone(), new_state); | 720 | index.insert(action.relay_url.clone(), new_state); |
| 717 | } | 721 | } |
| 718 | 722 | ||
| 723 | // Track new relay in metrics | ||
| 724 | if let Some(ref metrics) = self.metrics { | ||
| 725 | metrics.inc_tracked_count(); | ||
| 726 | } | ||
| 727 | |||
| 719 | tracing::info!( | 728 | tracing::info!( |
| 720 | relay = %action.relay_url, | 729 | relay = %action.relay_url, |
| 721 | repos = action.repos.len(), | 730 | repos = action.repos.len(), |
| @@ -872,6 +881,13 @@ impl SyncManager { | |||
| 872 | // Record success in health tracker | 881 | // Record success in health tracker |
| 873 | self.health_tracker.record_success(relay_url); | 882 | self.health_tracker.record_success(relay_url); |
| 874 | 883 | ||
| 884 | // Update metrics | ||
| 885 | if let Some(ref metrics) = self.metrics { | ||
| 886 | metrics.set_relay_connected(relay_url, true); | ||
| 887 | metrics.inc_connected_count(); | ||
| 888 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 889 | } | ||
| 890 | |||
| 875 | // Subscribe based on reconnect type | 891 | // Subscribe based on reconnect type |
| 876 | if is_fresh_sync { | 892 | if is_fresh_sync { |
| 877 | tracing::info!( | 893 | tracing::info!( |
| @@ -916,6 +932,10 @@ impl SyncManager { | |||
| 916 | 932 | ||
| 917 | // Recompute actions for any new items discovered while disconnected | 933 | // Recompute actions for any new items discovered while disconnected |
| 918 | self.recompute_actions_for_relay(relay_url).await; | 934 | self.recompute_actions_for_relay(relay_url).await; |
| 935 | |||
| 936 | if let Some(ref metrics) = self.metrics { | ||
| 937 | metrics.record_event(event_source::RECONNECT); | ||
| 938 | } | ||
| 919 | } | 939 | } |
| 920 | } | 940 | } |
| 921 | 941 | ||
| @@ -1089,6 +1109,14 @@ impl SyncManager { | |||
| 1089 | 1109 | ||
| 1090 | // 4. Record failure in health tracker | 1110 | // 4. Record failure in health tracker |
| 1091 | self.health_tracker.record_failure(relay_url); | 1111 | self.health_tracker.record_failure(relay_url); |
| 1112 | |||
| 1113 | // Update metrics | ||
| 1114 | if let Some(ref metrics) = self.metrics { | ||
| 1115 | metrics.set_relay_connected(relay_url, false); | ||
| 1116 | metrics.dec_connected_count(); | ||
| 1117 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1118 | } | ||
| 1119 | |||
| 1092 | tracing::info!( | 1120 | tracing::info!( |
| 1093 | relay = %relay_url, | 1121 | relay = %relay_url, |
| 1094 | health_state = %self.health_tracker.get_state(relay_url), | 1122 | health_state = %self.health_tracker.get_state(relay_url), |
| @@ -1146,6 +1174,11 @@ impl SyncManager { | |||
| 1146 | // Create relay connection | 1174 | // Create relay connection |
| 1147 | let connection = RelayConnection::new(relay_url.clone()); | 1175 | let connection = RelayConnection::new(relay_url.clone()); |
| 1148 | 1176 | ||
| 1177 | // Record connection attempt | ||
| 1178 | if let Some(ref metrics) = self.metrics { | ||
| 1179 | metrics.record_connection_attempt(&relay_url, false); | ||
| 1180 | } | ||
| 1181 | |||
| 1149 | // Connect and subscribe to Layer 1 | 1182 | // Connect and subscribe to Layer 1 |
| 1150 | if let Err(e) = connection.connect_and_subscribe(None).await { | 1183 | if let Err(e) = connection.connect_and_subscribe(None).await { |
| 1151 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | 1184 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); |
| @@ -1159,6 +1192,11 @@ impl SyncManager { | |||
| 1159 | return; | 1192 | return; |
| 1160 | } | 1193 | } |
| 1161 | 1194 | ||
| 1195 | // If successful, update connection attempt metric to success | ||
| 1196 | if let Some(ref metrics) = self.metrics { | ||
| 1197 | metrics.record_connection_attempt(&relay_url, true); | ||
| 1198 | } | ||
| 1199 | |||
| 1162 | // Mark as connected in relay sync index | 1200 | // Mark as connected in relay sync index |
| 1163 | { | 1201 | { |
| 1164 | let mut index = relay_sync_index.write().await; | 1202 | let mut index = relay_sync_index.write().await; |
| @@ -1200,10 +1238,20 @@ impl SyncManager { | |||
| 1200 | 1238 | ||
| 1201 | // Spawn event processor | 1239 | // Spawn event processor |
| 1202 | let relay_url_clone = relay_url.clone(); | 1240 | let relay_url_clone = relay_url.clone(); |
| 1241 | let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task | ||
| 1242 | let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task | ||
| 1203 | tokio::spawn(async move { | 1243 | tokio::spawn(async move { |
| 1204 | while let Some(relay_event) = event_rx.recv().await { | 1244 | while let Some(relay_event) = event_rx.recv().await { |
| 1205 | match relay_event { | 1245 | match relay_event { |
| 1206 | RelayEvent::Event(event) => { | 1246 | RelayEvent::Event(event) => { |
| 1247 | if let Some(ref metrics) = metrics_clone { | ||
| 1248 | let source = if is_bootstrap_clone { | ||
| 1249 | event_source::STARTUP | ||
| 1250 | } else { | ||
| 1251 | event_source::LIVE | ||
| 1252 | }; | ||
| 1253 | metrics.record_event(source); | ||
| 1254 | } | ||
| 1207 | Self::process_event_static( | 1255 | Self::process_event_static( |
| 1208 | &event, | 1256 | &event, |
| 1209 | &relay_url_clone, | 1257 | &relay_url_clone, |