upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/metrics/mod.rs18
-rw-r--r--src/sync/connection.rs47
-rw-r--r--src/sync/manager.rs59
-rw-r--r--src/sync/metrics.rs348
-rw-r--r--src/sync/mod.rs2
5 files changed, 471 insertions, 3 deletions
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs
index 4a4fe57..736414f 100644
--- a/src/metrics/mod.rs
+++ b/src/metrics/mod.rs
@@ -5,6 +5,7 @@
5//! - Git operation metrics (clone, fetch, push) 5//! - Git operation metrics (clone, fetch, push)
6//! - Repository bandwidth tracking (top-N only for cardinality control) 6//! - Repository bandwidth tracking (top-N only for cardinality control)
7//! - Nostr event metrics 7//! - Nostr event metrics
8//! - Sync metrics (GRASP-02 proactive sync)
8//! 9//!
9//! # Privacy 10//! # Privacy
10//! IP addresses are NEVER exposed in metrics. The `ConnectionTracker` maintains 11//! IP addresses are NEVER exposed in metrics. The `ConnectionTracker` maintains
@@ -14,6 +15,8 @@
14pub mod bandwidth; 15pub mod bandwidth;
15pub mod connection; 16pub mod connection;
16 17
18pub use crate::sync::SyncMetrics;
19
17use std::sync::Arc; 20use std::sync::Arc;
18use std::time::Instant; 21use std::time::Instant;
19 22
@@ -46,6 +49,9 @@ struct MetricsInner {
46 /// Repository bandwidth tracking (top-N only) 49 /// Repository bandwidth tracking (top-N only)
47 pub bandwidth_tracker: BandwidthTracker, 50 pub bandwidth_tracker: BandwidthTracker,
48 51
52 /// Sync metrics (GRASP-02 proactive sync)
53 pub sync_metrics: Option<crate::sync::SyncMetrics>,
54
49 // === WebSocket Metrics === 55 // === WebSocket Metrics ===
50 /// Total WebSocket connections since startup 56 /// Total WebSocket connections since startup
51 pub websocket_connections_total: Counter, 57 pub websocket_connections_total: Counter,
@@ -97,6 +103,11 @@ impl Metrics {
97 } 103 }
98 } 104 }
99 105
106 /// Returns the sync metrics if registered.
107 pub fn sync_metrics(&self) -> Option<&crate::sync::SyncMetrics> {
108 self.inner.sync_metrics.as_ref()
109 }
110
100 /// Returns the connection tracker for WebSocket connection management. 111 /// Returns the connection tracker for WebSocket connection management.
101 pub fn connection_tracker(&self) -> &ConnectionTracker { 112 pub fn connection_tracker(&self) -> &ConnectionTracker {
102 &self.inner.connection_tracker 113 &self.inner.connection_tracker
@@ -248,6 +259,12 @@ impl MetricsInner {
248 // Create bandwidth tracker 259 // Create bandwidth tracker
249 let bandwidth_tracker = BandwidthTracker::new(&REGISTRY); 260 let bandwidth_tracker = BandwidthTracker::new(&REGISTRY);
250 261
262 // Create sync metrics (may fail if already registered in tests)
263 let sync_metrics = crate::sync::SyncMetrics::register(&REGISTRY).ok();
264 if sync_metrics.is_some() {
265 tracing::info!("Sync metrics registered with Prometheus");
266 }
267
251 // WebSocket metrics 268 // WebSocket metrics
252 let websocket_connections_total = Counter::with_opts( 269 let websocket_connections_total = Counter::with_opts(
253 Opts::new( 270 Opts::new(
@@ -377,6 +394,7 @@ impl MetricsInner {
377 Self { 394 Self {
378 connection_tracker, 395 connection_tracker,
379 bandwidth_tracker, 396 bandwidth_tracker,
397 sync_metrics,
380 websocket_connections_total, 398 websocket_connections_total,
381 websocket_connection_duration, 399 websocket_connection_duration,
382 websocket_messages_received, 400 websocket_messages_received,
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index cd7a603..e921185 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -31,6 +31,7 @@ use tokio::sync::mpsc;
31 31
32use super::filter::FilterService; 32use super::filter::FilterService;
33use super::health::RelayHealthTracker; 33use super::health::RelayHealthTracker;
34use super::metrics::{event_source, SyncMetrics};
34use super::subscription::SubscriptionManager; 35use super::subscription::SubscriptionManager;
35 36
36/// Event received from the sync connection 37/// Event received from the sync connection
@@ -47,6 +48,7 @@ pub struct SyncConnection {
47 filter_service: Arc<FilterService>, 48 filter_service: Arc<FilterService>,
48 remote_domain: String, 49 remote_domain: String,
49 subscription_manager: SubscriptionManager, 50 subscription_manager: SubscriptionManager,
51 metrics: Option<SyncMetrics>,
50} 52}
51 53
52impl SyncConnection { 54impl SyncConnection {
@@ -55,6 +57,7 @@ impl SyncConnection {
55 url: &str, 57 url: &str,
56 filter_service: Arc<FilterService>, 58 filter_service: Arc<FilterService>,
57 remote_domain: &str, 59 remote_domain: &str,
60 metrics: Option<SyncMetrics>,
58 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { 61 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
59 let client = Client::default(); 62 let client = Client::default();
60 63
@@ -78,6 +81,7 @@ impl SyncConnection {
78 filter_service, 81 filter_service,
79 remote_domain: remote_domain.to_string(), 82 remote_domain: remote_domain.to_string(),
80 subscription_manager, 83 subscription_manager,
84 metrics,
81 }) 85 })
82 } 86 }
83 87
@@ -152,10 +156,12 @@ impl SyncConnection {
152 156
153 // Handle incoming notifications 157 // Handle incoming notifications
154 let url = self.url.clone(); 158 let url = self.url.clone();
159 let metrics = self.metrics.clone();
155 self.client 160 self.client
156 .handle_notifications(|notification| { 161 .handle_notifications(|notification| {
157 let tx = tx.clone(); 162 let tx = tx.clone();
158 let url = url.clone(); 163 let url = url.clone();
164 let metrics = metrics.clone();
159 async move { 165 async move {
160 match notification { 166 match notification {
161 RelayPoolNotification::Event { event, .. } => { 167 RelayPoolNotification::Event { event, .. } => {
@@ -166,6 +172,11 @@ impl SyncConnection {
166 event.kind.as_u16() 172 event.kind.as_u16()
167 ); 173 );
168 174
175 // Record live event metric
176 if let Some(ref m) = metrics {
177 m.record_event(event_source::LIVE);
178 }
179
169 // Send the event to the manager for processing 180 // Send the event to the manager for processing
170 let synced = SyncedEvent { 181 let synced = SyncedEvent {
171 event: (*event).clone(), 182 event: (*event).clone(),
@@ -320,12 +331,14 @@ impl SyncConnection {
320/// * `filter_service` - FilterService for building subscriptions 331/// * `filter_service` - FilterService for building subscriptions
321/// * `our_domain` - Our relay's domain (used to extract remote domain) 332/// * `our_domain` - Our relay's domain (used to extract remote domain)
322/// * `health_tracker` - Health tracker for managing connection state 333/// * `health_tracker` - Health tracker for managing connection state
334/// * `metrics` - Optional sync metrics for Prometheus
323pub async fn connect_with_retry( 335pub async fn connect_with_retry(
324 url: &str, 336 url: &str,
325 tx: mpsc::Sender<SyncedEvent>, 337 tx: mpsc::Sender<SyncedEvent>,
326 filter_service: Arc<FilterService>, 338 filter_service: Arc<FilterService>,
327 _our_domain: &str, 339 _our_domain: &str,
328 health_tracker: Arc<RelayHealthTracker>, 340 health_tracker: Arc<RelayHealthTracker>,
341 metrics: Option<SyncMetrics>,
329) { 342) {
330 // Extract remote domain from URL 343 // Extract remote domain from URL
331 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string()); 344 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
@@ -353,10 +366,20 @@ pub async fn connect_with_retry(
353 ); 366 );
354 } 367 }
355 368
356 match SyncConnection::new(url, filter_service.clone(), &remote_domain).await { 369 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await {
357 Ok(conn) => { 370 Ok(conn) => {
358 // Record successful connection 371 // Record successful connection
359 health_tracker.record_success(url); 372 health_tracker.record_success(url);
373
374 // Record metrics
375 if let Some(ref m) = metrics {
376 m.record_connection_attempt(url, true);
377 m.set_relay_connected(url, true);
378 m.inc_connected_count();
379 m.record_health_state(url, health_tracker.get_state(url));
380 m.record_failure_count(url, 0);
381 }
382
360 tracing::info!("Sync connection established to {}", url); 383 tracing::info!("Sync connection established to {}", url);
361 384
362 // Run the connection (this blocks until disconnection) 385 // Run the connection (this blocks until disconnection)
@@ -365,6 +388,15 @@ pub async fn connect_with_retry(
365 // Connection ended - record as failure for reconnection backoff 388 // Connection ended - record as failure for reconnection backoff
366 // (The connection ending is considered a failure even if it worked for a while) 389 // (The connection ending is considered a failure even if it worked for a while)
367 health_tracker.record_failure(url); 390 health_tracker.record_failure(url);
391
392 // Update metrics for disconnection
393 if let Some(ref m) = metrics {
394 m.set_relay_connected(url, false);
395 m.dec_connected_count();
396 m.record_health_state(url, health_tracker.get_state(url));
397 m.record_failure_count(url, health_tracker.get_failure_count(url));
398 }
399
368 tracing::warn!("Sync connection to {} ended, will reconnect", url); 400 tracing::warn!("Sync connection to {} ended, will reconnect", url);
369 } 401 }
370 Err(e) => { 402 Err(e) => {
@@ -373,6 +405,19 @@ pub async fn connect_with_retry(
373 405
374 let failure_count = health_tracker.get_failure_count(url); 406 let failure_count = health_tracker.get_failure_count(url);
375 let state = health_tracker.get_state(url); 407 let state = health_tracker.get_state(url);
408
409 // Record metrics
410 if let Some(ref m) = metrics {
411 m.record_connection_attempt(url, false);
412 m.set_relay_connected(url, false);
413 m.record_health_state(url, state);
414 m.record_failure_count(url, failure_count);
415
416 // Track dead relays
417 if state == super::health::HealthState::Dead {
418 m.inc_dead_count();
419 }
420 }
376 421
377 tracing::error!( 422 tracing::error!(
378 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}", 423 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}",
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index f594454..3bc190d 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -35,6 +35,7 @@ use tokio::sync::mpsc;
35use super::connection::{connect_with_retry, SyncedEvent}; 35use super::connection::{connect_with_retry, SyncedEvent};
36use super::filter::FilterService; 36use super::filter::FilterService;
37use super::health::RelayHealthTracker; 37use super::health::RelayHealthTracker;
38use super::metrics::SyncMetrics;
38use super::SYNC_SOURCE_ADDR; 39use super::SYNC_SOURCE_ADDR;
39use crate::config::Config; 40use crate::config::Config;
40use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
@@ -54,6 +55,8 @@ pub struct SyncManager {
54 write_policy: Nip34WritePolicy, 55 write_policy: Nip34WritePolicy,
55 /// Health tracker for relay connections 56 /// Health tracker for relay connections
56 health_tracker: Arc<RelayHealthTracker>, 57 health_tracker: Arc<RelayHealthTracker>,
58 /// Sync metrics for Prometheus
59 metrics: Option<SyncMetrics>,
57} 60}
58 61
59impl SyncManager { 62impl SyncManager {
@@ -78,6 +81,34 @@ impl SyncManager {
78 database, 81 database,
79 write_policy, 82 write_policy,
80 health_tracker: Arc::new(RelayHealthTracker::new(config)), 83 health_tracker: Arc::new(RelayHealthTracker::new(config)),
84 metrics: None,
85 }
86 }
87
88 /// Create a new SyncManager with metrics
89 ///
90 /// # Arguments
91 /// * `initial_relay_url` - Optional initial relay URL from config
92 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
93 /// * `database` - Shared database for storing events and querying announcements
94 /// * `write_policy` - Write policy for validating synced events
95 /// * `config` - Configuration for health tracking settings
96 /// * `metrics` - Sync metrics for Prometheus
97 pub fn with_metrics(
98 initial_relay_url: Option<String>,
99 relay_domain: String,
100 database: SharedDatabase,
101 write_policy: Nip34WritePolicy,
102 config: &Config,
103 metrics: SyncMetrics,
104 ) -> Self {
105 Self {
106 initial_relay_url,
107 relay_domain,
108 database,
109 write_policy,
110 health_tracker: Arc::new(RelayHealthTracker::new(config)),
111 metrics: Some(metrics),
81 } 112 }
82 } 113 }
83 114
@@ -95,9 +126,20 @@ impl SyncManager {
95 database, 126 database,
96 write_policy, 127 write_policy,
97 health_tracker: Arc::new(RelayHealthTracker::with_defaults()), 128 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
129 metrics: None,
98 } 130 }
99 } 131 }
100 132
133 /// Set metrics for the sync manager
134 pub fn set_metrics(&mut self, metrics: SyncMetrics) {
135 self.metrics = Some(metrics);
136 }
137
138 /// Get a reference to the metrics
139 pub fn metrics(&self) -> Option<&SyncMetrics> {
140 self.metrics.as_ref()
141 }
142
101 /// Get a reference to the health tracker 143 /// Get a reference to the health tracker
102 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> { 144 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
103 self.health_tracker.clone() 145 self.health_tracker.clone()
@@ -148,6 +190,11 @@ impl SyncManager {
148 } 190 }
149 } 191 }
150 192
193 // Record initial tracked relay count
194 if let Some(ref metrics) = self.metrics {
195 metrics.set_tracked_count(active_relays.len() as i64);
196 }
197
151 // Spawn connections with startup jitter to prevent thundering herd 198 // Spawn connections with startup jitter to prevent thundering herd
152 for url in relays_to_connect { 199 for url in relays_to_connect {
153 tracing::info!("Scheduling connection to sync relay: {}", url); 200 tracing::info!("Scheduling connection to sync relay: {}", url);
@@ -172,6 +219,12 @@ impl SyncManager {
172 if !active_relays.contains(&url) && !self.is_own_relay(&url) { 219 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
173 tracing::info!("Discovered new relay from event, connecting: {}", url); 220 tracing::info!("Discovered new relay from event, connecting: {}", url);
174 active_relays.insert(url.clone()); 221 active_relays.insert(url.clone());
222
223 // Update tracked relay count
224 if let Some(ref metrics) = self.metrics {
225 metrics.inc_tracked_count();
226 }
227
175 // New relays discovered during runtime don't need jitter 228 // New relays discovered during runtime don't need jitter
176 self.spawn_connection(url, tx.clone(), filter_service.clone()); 229 self.spawn_connection(url, tx.clone(), filter_service.clone());
177 } 230 }
@@ -200,6 +253,7 @@ impl SyncManager {
200 ) { 253 ) {
201 let domain = self.relay_domain.clone(); 254 let domain = self.relay_domain.clone();
202 let health_tracker = self.health_tracker.clone(); 255 let health_tracker = self.health_tracker.clone();
256 let metrics = self.metrics.clone();
203 257
204 tokio::spawn(async move { 258 tokio::spawn(async move {
205 // Apply startup jitter 259 // Apply startup jitter
@@ -211,7 +265,7 @@ impl SyncManager {
211 ); 265 );
212 tokio::time::sleep(Duration::from_millis(jitter_ms)).await; 266 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
213 267
214 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; 268 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
215 }); 269 });
216 } 270 }
217 271
@@ -226,9 +280,10 @@ impl SyncManager {
226 ) { 280 ) {
227 let domain = self.relay_domain.clone(); 281 let domain = self.relay_domain.clone();
228 let health_tracker = self.health_tracker.clone(); 282 let health_tracker = self.health_tracker.clone();
283 let metrics = self.metrics.clone();
229 284
230 tokio::spawn(async move { 285 tokio::spawn(async move {
231 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; 286 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
232 }); 287 });
233 } 288 }
234 289
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
new file mode 100644
index 0000000..c93e583
--- /dev/null
+++ b/src/sync/metrics.rs
@@ -0,0 +1,348 @@
1//! Prometheus Metrics for Proactive Sync (GRASP-02 Phase 6)
2//!
3//! This module provides comprehensive sync monitoring metrics including:
4//! - Connection status and attempts per relay
5//! - Health state tracking (Healthy/Degraded/Dead)
6//! - Event sync tracking by source (live/startup/reconnect/daily catchup)
7//! - Gap events filled during catchup operations
8//!
9//! All metrics follow the `ngit_sync_` prefix convention.
10
11use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
12
13use super::health::HealthState;
14
15/// Prometheus metrics for the proactive sync system
16#[derive(Clone)]
17pub struct SyncMetrics {
18 // === Connection metrics ===
19 /// Per-relay connection status (1=connected, 0=disconnected)
20 relay_connected: IntGaugeVec,
21 /// Connection attempts by relay and result (success/failure)
22 connection_attempts_total: IntCounterVec,
23
24 // === Health metrics ===
25 /// Per-relay health status (healthy=1, degraded=2, dead=3)
26 relay_status: IntGaugeVec,
27 /// Per-relay consecutive failure count
28 relay_failures: IntGaugeVec,
29
30 // === Event metrics ===
31 /// Events synced by source (live/startup/reconnect/daily)
32 events_total: IntCounterVec,
33 /// Gap events filled during catchup, by relay
34 gap_events_total: IntCounterVec,
35
36 // === Summary metrics ===
37 /// Total relays discovered and tracked
38 relays_tracked_total: IntGauge,
39 /// Currently connected relay count
40 relays_connected_total: IntGauge,
41 /// Relays marked as dead
42 relays_dead_total: IntGauge,
43}
44
45impl SyncMetrics {
46 /// Register all sync metrics with the provided Prometheus registry
47 pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> {
48 // Connection metrics
49 let relay_connected = IntGaugeVec::new(
50 Opts::new(
51 "ngit_sync_relay_connected",
52 "Relay connection status (1=connected, 0=disconnected)",
53 ),
54 &["relay"],
55 )?;
56 registry.register(Box::new(relay_connected.clone()))?;
57
58 let connection_attempts_total = IntCounterVec::new(
59 Opts::new(
60 "ngit_sync_connection_attempts_total",
61 "Total connection attempts by relay and result",
62 ),
63 &["relay", "result"],
64 )?;
65 registry.register(Box::new(connection_attempts_total.clone()))?;
66
67 // Health metrics
68 let relay_status = IntGaugeVec::new(
69 Opts::new(
70 "ngit_sync_relay_status",
71 "Relay health status (1=healthy, 2=degraded, 3=dead)",
72 ),
73 &["relay"],
74 )?;
75 registry.register(Box::new(relay_status.clone()))?;
76
77 let relay_failures = IntGaugeVec::new(
78 Opts::new(
79 "ngit_sync_relay_failures",
80 "Consecutive failure count per relay",
81 ),
82 &["relay"],
83 )?;
84 registry.register(Box::new(relay_failures.clone()))?;
85
86 // Event metrics
87 let events_total = IntCounterVec::new(
88 Opts::new(
89 "ngit_sync_events_total",
90 "Total events synced by source type",
91 ),
92 &["source"],
93 )?;
94 registry.register(Box::new(events_total.clone()))?;
95
96 let gap_events_total = IntCounterVec::new(
97 Opts::new(
98 "ngit_sync_gap_events_total",
99 "Gap events filled during catchup by relay",
100 ),
101 &["relay"],
102 )?;
103 registry.register(Box::new(gap_events_total.clone()))?;
104
105 // Summary metrics
106 let relays_tracked_total = IntGauge::with_opts(Opts::new(
107 "ngit_sync_relays_tracked_total",
108 "Total number of relays discovered and tracked",
109 ))?;
110 registry.register(Box::new(relays_tracked_total.clone()))?;
111
112 let relays_connected_total = IntGauge::with_opts(Opts::new(
113 "ngit_sync_relays_connected_total",
114 "Number of currently connected relays",
115 ))?;
116 registry.register(Box::new(relays_connected_total.clone()))?;
117
118 let relays_dead_total = IntGauge::with_opts(Opts::new(
119 "ngit_sync_relays_dead_total",
120 "Number of relays marked as dead",
121 ))?;
122 registry.register(Box::new(relays_dead_total.clone()))?;
123
124 Ok(Self {
125 relay_connected,
126 connection_attempts_total,
127 relay_status,
128 relay_failures,
129 events_total,
130 gap_events_total,
131 relays_tracked_total,
132 relays_connected_total,
133 relays_dead_total,
134 })
135 }
136
137 // === Connection Recording Methods ===
138
139 /// Record a connection attempt (success or failure)
140 pub fn record_connection_attempt(&self, relay: &str, success: bool) {
141 let result = if success { "success" } else { "failure" };
142 self.connection_attempts_total
143 .with_label_values(&[relay, result])
144 .inc();
145 }
146
147 /// Set relay connection status
148 pub fn set_relay_connected(&self, relay: &str, connected: bool) {
149 self.relay_connected
150 .with_label_values(&[relay])
151 .set(if connected { 1 } else { 0 });
152
153 // Update connected count based on all relay values
154 // This is handled by update_connected_count() for accuracy
155 }
156
157 /// Update the total connected relay count
158 pub fn update_connected_count(&self, count: i64) {
159 self.relays_connected_total.set(count);
160 }
161
162 /// Increment connected count
163 pub fn inc_connected_count(&self) {
164 self.relays_connected_total.inc();
165 }
166
167 /// Decrement connected count
168 pub fn dec_connected_count(&self) {
169 self.relays_connected_total.dec();
170 }
171
172 // === Health Recording Methods ===
173
174 /// Record relay health state change
175 pub fn record_health_state(&self, relay: &str, state: HealthState) {
176 let state_value = match state {
177 HealthState::Healthy => 1,
178 HealthState::Degraded => 2,
179 HealthState::Dead => 3,
180 };
181 self.relay_status.with_label_values(&[relay]).set(state_value);
182 }
183
184 /// Record relay failure count
185 pub fn record_failure_count(&self, relay: &str, count: u32) {
186 self.relay_failures
187 .with_label_values(&[relay])
188 .set(count as i64);
189 }
190
191 /// Update dead relay count
192 pub fn update_dead_count(&self, count: i64) {
193 self.relays_dead_total.set(count);
194 }
195
196 /// Increment dead relay count
197 pub fn inc_dead_count(&self) {
198 self.relays_dead_total.inc();
199 }
200
201 /// Decrement dead relay count
202 pub fn dec_dead_count(&self) {
203 self.relays_dead_total.dec();
204 }
205
206 // === Event Recording Methods ===
207
208 /// Record a synced event by source type
209 ///
210 /// Source types:
211 /// - "live" - Real-time subscription events
212 /// - "startup" - Events from startup catchup
213 /// - "reconnect" - Events from reconnection catchup
214 /// - "daily" - Events from daily catchup
215 pub fn record_event(&self, source: &str) {
216 self.events_total.with_label_values(&[source]).inc();
217 }
218
219 /// Record multiple events synced by source type
220 pub fn record_events(&self, source: &str, count: u64) {
221 self.events_total
222 .with_label_values(&[source])
223 .inc_by(count);
224 }
225
226 /// Record a gap event filled during catchup
227 pub fn record_gap_event(&self, relay: &str) {
228 self.gap_events_total.with_label_values(&[relay]).inc();
229 }
230
231 /// Record multiple gap events filled during catchup
232 pub fn record_gap_events(&self, relay: &str, count: u64) {
233 self.gap_events_total
234 .with_label_values(&[relay])
235 .inc_by(count);
236 }
237
238 // === Summary Recording Methods ===
239
240 /// Set the total tracked relay count
241 pub fn set_tracked_count(&self, count: i64) {
242 self.relays_tracked_total.set(count);
243 }
244
245 /// Increment tracked relay count
246 pub fn inc_tracked_count(&self) {
247 self.relays_tracked_total.inc();
248 }
249
250 /// Get current tracked relay count
251 pub fn get_tracked_count(&self) -> i64 {
252 self.relays_tracked_total.get()
253 }
254
255 /// Get current connected relay count
256 pub fn get_connected_count(&self) -> i64 {
257 self.relays_connected_total.get()
258 }
259
260 /// Get current dead relay count
261 pub fn get_dead_count(&self) -> i64 {
262 self.relays_dead_total.get()
263 }
264}
265
266/// Event source types for metrics tracking
267pub mod event_source {
268 /// Real-time subscription events
269 pub const LIVE: &str = "live";
270 /// Events from startup catchup
271 pub const STARTUP: &str = "startup";
272 /// Events from reconnection catchup
273 pub const RECONNECT: &str = "reconnect";
274 /// Events from daily catchup
275 pub const DAILY: &str = "daily";
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 fn create_test_registry() -> Registry {
283 Registry::new()
284 }
285
286 #[test]
287 fn test_metrics_registration() {
288 let registry = create_test_registry();
289 let metrics = SyncMetrics::register(&registry);
290 assert!(metrics.is_ok());
291 }
292
293 #[test]
294 fn test_connection_metrics() {
295 let registry = create_test_registry();
296 let metrics = SyncMetrics::register(&registry).unwrap();
297
298 metrics.record_connection_attempt("wss://relay1.example.com", true);
299 metrics.record_connection_attempt("wss://relay1.example.com", false);
300 metrics.record_connection_attempt("wss://relay2.example.com", true);
301
302 metrics.set_relay_connected("wss://relay1.example.com", true);
303 metrics.inc_connected_count();
304
305 assert_eq!(metrics.get_connected_count(), 1);
306 }
307
308 #[test]
309 fn test_health_metrics() {
310 let registry = create_test_registry();
311 let metrics = SyncMetrics::register(&registry).unwrap();
312
313 metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy);
314 metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded);
315 metrics.record_health_state("wss://relay3.example.com", HealthState::Dead);
316
317 metrics.record_failure_count("wss://relay2.example.com", 5);
318 metrics.update_dead_count(1);
319
320 assert_eq!(metrics.get_dead_count(), 1);
321 }
322
323 #[test]
324 fn test_event_metrics() {
325 let registry = create_test_registry();
326 let metrics = SyncMetrics::register(&registry).unwrap();
327
328 metrics.record_event(event_source::LIVE);
329 metrics.record_events(event_source::STARTUP, 10);
330 metrics.record_gap_event("wss://relay1.example.com");
331 metrics.record_gap_events("wss://relay2.example.com", 5);
332 }
333
334 #[test]
335 fn test_summary_metrics() {
336 let registry = create_test_registry();
337 let metrics = SyncMetrics::register(&registry).unwrap();
338
339 metrics.set_tracked_count(5);
340 assert_eq!(metrics.get_tracked_count(), 5);
341
342 metrics.inc_tracked_count();
343 assert_eq!(metrics.get_tracked_count(), 6);
344
345 metrics.update_connected_count(3);
346 assert_eq!(metrics.get_connected_count(), 3);
347 }
348} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index dc11812..67d389e 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -21,12 +21,14 @@ mod connection;
21mod filter; 21mod filter;
22pub mod health; 22pub mod health;
23mod manager; 23mod manager;
24pub mod metrics;
24pub mod negentropy; 25pub mod negentropy;
25mod subscription; 26mod subscription;
26 27
27pub use filter::FilterService; 28pub use filter::FilterService;
28pub use health::{HealthState, RelayHealth, RelayHealthTracker}; 29pub use health::{HealthState, RelayHealth, RelayHealthTracker};
29pub use manager::SyncManager; 30pub use manager::SyncManager;
31pub use metrics::SyncMetrics;
30pub use negentropy::NegentropyService; 32pub use negentropy::NegentropyService;
31pub use subscription::SubscriptionManager; 33pub use subscription::SubscriptionManager;
32 34