upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 08:36:48 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 08:36:48 +0000
commitffcf8a7bc679f0aff9135063d343be3161b3b439 (patch)
tree0256c08ac29c881eb44df89ba7e6602196a19b9f /src/sync
parentda6187fece22b7f1284dff8d804c22ad5f2eb40e (diff)
feat: add event metrics tracking throughout sync (Phase 5)
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs48
1 files changed, 48 insertions, 0 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index a58406b..5039c04 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -549,6 +549,10 @@ impl SyncManager {
549 // Recompute actions - will discover all repos/events again 549 // Recompute actions - will discover all repos/events again
550 self.recompute_actions_for_relay(relay_url).await; 550 self.recompute_actions_for_relay(relay_url).await;
551 551
552 if let Some(ref metrics) = self.metrics {
553 metrics.record_event(event_source::DAILY);
554 }
555
552 tracing::info!(relay = %relay_url, "Daily sync complete"); 556 tracing::info!(relay = %relay_url, "Daily sync complete");
553 } 557 }
554 558
@@ -716,6 +720,11 @@ impl SyncManager {
716 index.insert(action.relay_url.clone(), new_state); 720 index.insert(action.relay_url.clone(), new_state);
717 } 721 }
718 722
723 // Track new relay in metrics
724 if let Some(ref metrics) = self.metrics {
725 metrics.inc_tracked_count();
726 }
727
719 tracing::info!( 728 tracing::info!(
720 relay = %action.relay_url, 729 relay = %action.relay_url,
721 repos = action.repos.len(), 730 repos = action.repos.len(),
@@ -872,6 +881,13 @@ impl SyncManager {
872 // Record success in health tracker 881 // Record success in health tracker
873 self.health_tracker.record_success(relay_url); 882 self.health_tracker.record_success(relay_url);
874 883
884 // Update metrics
885 if let Some(ref metrics) = self.metrics {
886 metrics.set_relay_connected(relay_url, true);
887 metrics.inc_connected_count();
888 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
889 }
890
875 // Subscribe based on reconnect type 891 // Subscribe based on reconnect type
876 if is_fresh_sync { 892 if is_fresh_sync {
877 tracing::info!( 893 tracing::info!(
@@ -916,6 +932,10 @@ impl SyncManager {
916 932
917 // Recompute actions for any new items discovered while disconnected 933 // Recompute actions for any new items discovered while disconnected
918 self.recompute_actions_for_relay(relay_url).await; 934 self.recompute_actions_for_relay(relay_url).await;
935
936 if let Some(ref metrics) = self.metrics {
937 metrics.record_event(event_source::RECONNECT);
938 }
919 } 939 }
920 } 940 }
921 941
@@ -1089,6 +1109,14 @@ impl SyncManager {
1089 1109
1090 // 4. Record failure in health tracker 1110 // 4. Record failure in health tracker
1091 self.health_tracker.record_failure(relay_url); 1111 self.health_tracker.record_failure(relay_url);
1112
1113 // Update metrics
1114 if let Some(ref metrics) = self.metrics {
1115 metrics.set_relay_connected(relay_url, false);
1116 metrics.dec_connected_count();
1117 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1118 }
1119
1092 tracing::info!( 1120 tracing::info!(
1093 relay = %relay_url, 1121 relay = %relay_url,
1094 health_state = %self.health_tracker.get_state(relay_url), 1122 health_state = %self.health_tracker.get_state(relay_url),
@@ -1146,6 +1174,11 @@ impl SyncManager {
1146 // Create relay connection 1174 // Create relay connection
1147 let connection = RelayConnection::new(relay_url.clone()); 1175 let connection = RelayConnection::new(relay_url.clone());
1148 1176
1177 // Record connection attempt
1178 if let Some(ref metrics) = self.metrics {
1179 metrics.record_connection_attempt(&relay_url, false);
1180 }
1181
1149 // Connect and subscribe to Layer 1 1182 // Connect and subscribe to Layer 1
1150 if let Err(e) = connection.connect_and_subscribe(None).await { 1183 if let Err(e) = connection.connect_and_subscribe(None).await {
1151 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); 1184 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
@@ -1159,6 +1192,11 @@ impl SyncManager {
1159 return; 1192 return;
1160 } 1193 }
1161 1194
1195 // If successful, update connection attempt metric to success
1196 if let Some(ref metrics) = self.metrics {
1197 metrics.record_connection_attempt(&relay_url, true);
1198 }
1199
1162 // Mark as connected in relay sync index 1200 // Mark as connected in relay sync index
1163 { 1201 {
1164 let mut index = relay_sync_index.write().await; 1202 let mut index = relay_sync_index.write().await;
@@ -1200,10 +1238,20 @@ impl SyncManager {
1200 1238
1201 // Spawn event processor 1239 // Spawn event processor
1202 let relay_url_clone = relay_url.clone(); 1240 let relay_url_clone = relay_url.clone();
1241 let metrics_clone = self.metrics.clone(); // Clone metrics for the spawned task
1242 let is_bootstrap_clone = is_bootstrap; // Clone is_bootstrap for the spawned task
1203 tokio::spawn(async move { 1243 tokio::spawn(async move {
1204 while let Some(relay_event) = event_rx.recv().await { 1244 while let Some(relay_event) = event_rx.recv().await {
1205 match relay_event { 1245 match relay_event {
1206 RelayEvent::Event(event) => { 1246 RelayEvent::Event(event) => {
1247 if let Some(ref metrics) = metrics_clone {
1248 let source = if is_bootstrap_clone {
1249 event_source::STARTUP
1250 } else {
1251 event_source::LIVE
1252 };
1253 metrics.record_event(source);
1254 }
1207 Self::process_event_static( 1255 Self::process_event_static(
1208 &event, 1256 &event,
1209 &relay_url_clone, 1257 &relay_url_clone,