upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 22:26:42 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 22:26:42 +0000
commit82cc74ade1524edc096608795b4e13c3cb19c5eb (patch)
treef3ca7be2104e934b23e05acd9827992b022cb257 /src/sync
parent46b306dcfa4850a688367c04e9e06e8d9c2883dc (diff)
feat: create sync metrics module (Phase 1)
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/metrics.rs454
-rw-r--r--src/sync/mod.rs167
2 files changed, 458 insertions, 163 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
new file mode 100644
index 0000000..411ff63
--- /dev/null
+++ b/src/sync/metrics.rs
@@ -0,0 +1,454 @@
1//! Prometheus Metrics for Proactive Sync (GRASP-02)
2//!
3//! This module provides comprehensive sync monitoring metrics including:
4//! - Connection status and attempts per relay
5//! - Health state tracking (Healthy/Degraded/Dead)
6//! - Event sync tracking by source (live/startup/reconnect/daily catchup)
7//! - Gap events filled during catchup operations
8//!
9//! All metrics follow the `ngit_sync_` prefix convention.
10
11use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
12
13use super::health::HealthState;
14
15/// Prometheus metrics for the proactive sync system.
16///
17/// Tracks relay connections, sync progress, health states, and operational statistics.
18/// Designed for comprehensive monitoring of GRASP-02 proactive sync operations.
19#[derive(Clone)]
20pub struct SyncMetrics {
21 // === Connection metrics ===
22 /// Per-relay connection status (1=connected, 0=disconnected)
23 relay_connected: IntGaugeVec,
24 /// Connection attempts by relay and result (success/failure)
25 connection_attempts_total: IntCounterVec,
26
27 // === Health metrics ===
28 /// Per-relay health status (healthy=1, degraded=2, dead=3)
29 relay_status: IntGaugeVec,
30 /// Per-relay consecutive failure count
31 relay_failures: IntGaugeVec,
32
33 // === Event metrics ===
34 /// Events synced by source (live/startup/reconnect/daily)
35 events_total: IntCounterVec,
36 /// Gap events filled during catchup, by relay
37 gap_events_total: IntCounterVec,
38
39 // === Summary metrics ===
40 /// Total relays discovered and tracked
41 relays_tracked_total: IntGauge,
42 /// Currently connected relay count
43 relays_connected_total: IntGauge,
44 /// Relays marked as dead
45 relays_dead_total: IntGauge,
46}
47
48impl SyncMetrics {
49 /// Register all sync metrics with the provided Prometheus registry.
50 ///
51 /// # Errors
52 ///
53 /// Returns an error if metrics are already registered (e.g., in tests).
54 pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> {
55 // Connection metrics
56 let relay_connected = IntGaugeVec::new(
57 Opts::new(
58 "ngit_sync_relay_connected",
59 "Relay connection status (1=connected, 0=disconnected)",
60 ),
61 &["relay"],
62 )?;
63 registry.register(Box::new(relay_connected.clone()))?;
64
65 let connection_attempts_total = IntCounterVec::new(
66 Opts::new(
67 "ngit_sync_connection_attempts_total",
68 "Total connection attempts by relay and result",
69 ),
70 &["relay", "result"],
71 )?;
72 registry.register(Box::new(connection_attempts_total.clone()))?;
73
74 // Health metrics
75 let relay_status = IntGaugeVec::new(
76 Opts::new(
77 "ngit_sync_relay_status",
78 "Relay health status (1=healthy, 2=degraded, 3=dead)",
79 ),
80 &["relay"],
81 )?;
82 registry.register(Box::new(relay_status.clone()))?;
83
84 let relay_failures = IntGaugeVec::new(
85 Opts::new(
86 "ngit_sync_relay_failures",
87 "Consecutive failure count per relay",
88 ),
89 &["relay"],
90 )?;
91 registry.register(Box::new(relay_failures.clone()))?;
92
93 // Event metrics
94 let events_total = IntCounterVec::new(
95 Opts::new(
96 "ngit_sync_events_total",
97 "Total events synced by source type",
98 ),
99 &["source"],
100 )?;
101 registry.register(Box::new(events_total.clone()))?;
102
103 let gap_events_total = IntCounterVec::new(
104 Opts::new(
105 "ngit_sync_gap_events_total",
106 "Gap events filled during catchup by relay",
107 ),
108 &["relay"],
109 )?;
110 registry.register(Box::new(gap_events_total.clone()))?;
111
112 // Summary metrics
113 let relays_tracked_total = IntGauge::with_opts(Opts::new(
114 "ngit_sync_relays_tracked_total",
115 "Total number of relays discovered and tracked",
116 ))?;
117 registry.register(Box::new(relays_tracked_total.clone()))?;
118
119 let relays_connected_total = IntGauge::with_opts(Opts::new(
120 "ngit_sync_relays_connected_total",
121 "Number of currently connected relays",
122 ))?;
123 registry.register(Box::new(relays_connected_total.clone()))?;
124
125 let relays_dead_total = IntGauge::with_opts(Opts::new(
126 "ngit_sync_relays_dead_total",
127 "Number of relays marked as dead",
128 ))?;
129 registry.register(Box::new(relays_dead_total.clone()))?;
130
131 Ok(Self {
132 relay_connected,
133 connection_attempts_total,
134 relay_status,
135 relay_failures,
136 events_total,
137 gap_events_total,
138 relays_tracked_total,
139 relays_connected_total,
140 relays_dead_total,
141 })
142 }
143
144 // === Connection Recording Methods ===
145
146 /// Record a connection attempt (success or failure).
147 ///
148 /// # Arguments
149 ///
150 /// * `relay` - The relay URL
151 /// * `success` - Whether the connection attempt succeeded
152 pub fn record_connection_attempt(&self, relay: &str, success: bool) {
153 let result = if success { "success" } else { "failure" };
154 self.connection_attempts_total
155 .with_label_values(&[relay, result])
156 .inc();
157 }
158
159 /// Set relay connection status.
160 ///
161 /// # Arguments
162 ///
163 /// * `relay` - The relay URL
164 /// * `connected` - Whether the relay is currently connected
165 pub fn set_relay_connected(&self, relay: &str, connected: bool) {
166 self.relay_connected
167 .with_label_values(&[relay])
168 .set(if connected { 1 } else { 0 });
169
170 // Note: Connected count should be updated via update_connected_count() for accuracy
171 }
172
173 /// Update the total connected relay count.
174 ///
175 /// This directly sets the count rather than deriving it from individual relay states,
176 /// which is more accurate when relay connection states are managed elsewhere.
177 pub fn update_connected_count(&self, count: i64) {
178 self.relays_connected_total.set(count);
179 }
180
181 /// Increment connected count by one.
182 pub fn inc_connected_count(&self) {
183 self.relays_connected_total.inc();
184 }
185
186 /// Decrement connected count by one.
187 pub fn dec_connected_count(&self) {
188 self.relays_connected_total.dec();
189 }
190
191 // === Health Recording Methods ===
192
193 /// Record relay health state change.
194 ///
195 /// Maps health states to numeric values for Prometheus:
196 /// - Healthy = 1
197 /// - Degraded = 2
198 /// - Dead = 3
199 ///
200 /// # Arguments
201 ///
202 /// * `relay` - The relay URL
203 /// * `state` - The current health state
204 pub fn record_health_state(&self, relay: &str, state: HealthState) {
205 let state_value = match state {
206 HealthState::Healthy => 1,
207 HealthState::Degraded => 2,
208 HealthState::Dead => 3,
209 };
210 self.relay_status.with_label_values(&[relay]).set(state_value);
211 }
212
213 /// Record relay failure count.
214 ///
215 /// # Arguments
216 ///
217 /// * `relay` - The relay URL
218 /// * `count` - The number of consecutive failures
219 pub fn record_failure_count(&self, relay: &str, count: u32) {
220 self.relay_failures
221 .with_label_values(&[relay])
222 .set(count as i64);
223 }
224
225 /// Update dead relay count.
226 pub fn update_dead_count(&self, count: i64) {
227 self.relays_dead_total.set(count);
228 }
229
230 /// Increment dead relay count by one.
231 pub fn inc_dead_count(&self) {
232 self.relays_dead_total.inc();
233 }
234
235 /// Decrement dead relay count by one.
236 pub fn dec_dead_count(&self) {
237 self.relays_dead_total.dec();
238 }
239
240 // === Event Recording Methods ===
241
242 /// Record a synced event by source type.
243 ///
244 /// # Arguments
245 ///
246 /// * `source` - The event source type. Use constants from [`event_source`]:
247 /// - [`event_source::LIVE`] - Real-time subscription events
248 /// - [`event_source::STARTUP`] - Events from startup catchup
249 /// - [`event_source::RECONNECT`] - Events from reconnection catchup
250 /// - [`event_source::DAILY`] - Events from daily catchup
251 pub fn record_event(&self, source: &str) {
252 self.events_total.with_label_values(&[source]).inc();
253 }
254
255 /// Record multiple events synced by source type.
256 ///
257 /// # Arguments
258 ///
259 /// * `source` - The event source type (see [`record_event`](Self::record_event))
260 /// * `count` - Number of events to record
261 pub fn record_events(&self, source: &str, count: u64) {
262 self.events_total
263 .with_label_values(&[source])
264 .inc_by(count);
265 }
266
267 /// Record a gap event filled during catchup.
268 ///
269 /// Gap events are historical events discovered during catchup that weren't
270 /// received during live sync.
271 ///
272 /// # Arguments
273 ///
274 /// * `relay` - The relay URL from which the gap event was received
275 pub fn record_gap_event(&self, relay: &str) {
276 self.gap_events_total.with_label_values(&[relay]).inc();
277 }
278
279 /// Record multiple gap events filled during catchup.
280 ///
281 /// # Arguments
282 ///
283 /// * `relay` - The relay URL from which the gap events were received
284 /// * `count` - Number of gap events to record
285 pub fn record_gap_events(&self, relay: &str, count: u64) {
286 self.gap_events_total
287 .with_label_values(&[relay])
288 .inc_by(count);
289 }
290
291 // === Summary Recording Methods ===
292
293 /// Set the total tracked relay count.
294 pub fn set_tracked_count(&self, count: i64) {
295 self.relays_tracked_total.set(count);
296 }
297
298 /// Increment tracked relay count by one.
299 pub fn inc_tracked_count(&self) {
300 self.relays_tracked_total.inc();
301 }
302
303 /// Get current tracked relay count.
304 pub fn get_tracked_count(&self) -> i64 {
305 self.relays_tracked_total.get()
306 }
307
308 /// Get current connected relay count.
309 pub fn get_connected_count(&self) -> i64 {
310 self.relays_connected_total.get()
311 }
312
313 /// Get current dead relay count.
314 pub fn get_dead_count(&self) -> i64 {
315 self.relays_dead_total.get()
316 }
317}
318
319/// Event source types for metrics tracking.
320///
321/// These constants are used as labels for the `ngit_sync_events_total` metric
322/// to categorize events by how they were discovered.
323pub mod event_source {
324 /// Real-time subscription events received during live sync.
325 pub const LIVE: &str = "live";
326
327 /// Events from startup catchup when the relay first starts.
328 pub const STARTUP: &str = "startup";
329
330 /// Events from reconnection catchup after a relay reconnects.
331 pub const RECONNECT: &str = "reconnect";
332
333 /// Events from daily catchup for drift detection.
334 pub const DAILY: &str = "daily";
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 fn create_test_registry() -> Registry {
342 Registry::new()
343 }
344
345 #[test]
346 fn test_metrics_registration() {
347 let registry = create_test_registry();
348 let metrics = SyncMetrics::register(&registry);
349 assert!(metrics.is_ok());
350 }
351
352 #[test]
353 fn test_connection_metrics() {
354 let registry = create_test_registry();
355 let metrics = SyncMetrics::register(&registry).unwrap();
356
357 // Record connection attempts
358 metrics.record_connection_attempt("wss://relay1.example.com", true);
359 metrics.record_connection_attempt("wss://relay1.example.com", false);
360 metrics.record_connection_attempt("wss://relay2.example.com", true);
361
362 // Set relay connection status
363 metrics.set_relay_connected("wss://relay1.example.com", true);
364 metrics.inc_connected_count();
365
366 assert_eq!(metrics.get_connected_count(), 1);
367
368 // Test decrement
369 metrics.dec_connected_count();
370 assert_eq!(metrics.get_connected_count(), 0);
371 }
372
373 #[test]
374 fn test_health_metrics() {
375 let registry = create_test_registry();
376 let metrics = SyncMetrics::register(&registry).unwrap();
377
378 // Record health states
379 metrics.record_health_state("wss://relay1.example.com", HealthState::Healthy);
380 metrics.record_health_state("wss://relay2.example.com", HealthState::Degraded);
381 metrics.record_health_state("wss://relay3.example.com", HealthState::Dead);
382
383 // Record failure count
384 metrics.record_failure_count("wss://relay2.example.com", 5);
385
386 // Test dead count tracking
387 metrics.update_dead_count(1);
388 assert_eq!(metrics.get_dead_count(), 1);
389
390 metrics.inc_dead_count();
391 assert_eq!(metrics.get_dead_count(), 2);
392
393 metrics.dec_dead_count();
394 assert_eq!(metrics.get_dead_count(), 1);
395 }
396
397 #[test]
398 fn test_event_metrics() {
399 let registry = create_test_registry();
400 let metrics = SyncMetrics::register(&registry).unwrap();
401
402 // Record single events
403 metrics.record_event(event_source::LIVE);
404 metrics.record_event(event_source::STARTUP);
405 metrics.record_event(event_source::RECONNECT);
406 metrics.record_event(event_source::DAILY);
407
408 // Record multiple events
409 metrics.record_events(event_source::STARTUP, 10);
410
411 // Record gap events
412 metrics.record_gap_event("wss://relay1.example.com");
413 metrics.record_gap_events("wss://relay2.example.com", 5);
414 }
415
416 #[test]
417 fn test_summary_metrics() {
418 let registry = create_test_registry();
419 let metrics = SyncMetrics::register(&registry).unwrap();
420
421 // Test tracked count
422 metrics.set_tracked_count(5);
423 assert_eq!(metrics.get_tracked_count(), 5);
424
425 metrics.inc_tracked_count();
426 assert_eq!(metrics.get_tracked_count(), 6);
427
428 // Test connected count
429 metrics.update_connected_count(3);
430 assert_eq!(metrics.get_connected_count(), 3);
431 }
432
433 #[test]
434 fn test_event_source_constants() {
435 // Verify constants have expected values
436 assert_eq!(event_source::LIVE, "live");
437 assert_eq!(event_source::STARTUP, "startup");
438 assert_eq!(event_source::RECONNECT, "reconnect");
439 assert_eq!(event_source::DAILY, "daily");
440 }
441
442 #[test]
443 fn test_duplicate_registration_fails() {
444 let registry = create_test_registry();
445
446 // First registration should succeed
447 let metrics1 = SyncMetrics::register(&registry);
448 assert!(metrics1.is_ok());
449
450 // Second registration should fail (metrics already registered)
451 let metrics2 = SyncMetrics::register(&registry);
452 assert!(metrics2.is_err());
453 }
454} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 1e60e4a..dd0479c 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -15,12 +15,16 @@
15pub mod algorithms; 15pub mod algorithms;
16pub mod filters; 16pub mod filters;
17pub mod health; 17pub mod health;
18pub mod metrics;
18pub mod relay_connection; 19pub mod relay_connection;
19pub mod self_subscriber; 20pub mod self_subscriber;
20 21
21// Re-export core algorithm types 22// Re-export core algorithm types
22pub use algorithms::{AddFilters, RelaySyncNeeds}; 23pub use algorithms::{AddFilters, RelaySyncNeeds};
23 24
25// Re-export metrics types
26pub use metrics::{event_source, SyncMetrics};
27
24// Re-export relay connection types 28// Re-export relay connection types
25pub use relay_connection::{RelayConnection, RelayEvent}; 29pub use relay_connection::{RelayConnection, RelayEvent};
26 30
@@ -35,7 +39,6 @@ use std::sync::Arc;
35use std::time::Duration; 39use std::time::Duration;
36 40
37use nostr_sdk::prelude::*; 41use nostr_sdk::prelude::*;
38use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
39use tokio::sync::{broadcast, Mutex, RwLock}; 42use tokio::sync::{broadcast, Mutex, RwLock};
40 43
41use crate::config::Config; 44use crate::config::Config;
@@ -157,168 +160,6 @@ pub struct PendingItems {
157} 160}
158 161
159// ============================================================================= 162// =============================================================================
160// SyncMetrics - Prometheus Metrics for Sync System
161// =============================================================================
162
163/// Prometheus metrics for the proactive sync system.
164///
165/// Tracks relay connections, sync progress, and operational statistics.
166/// Following the comprehensive v3 metrics design.
167#[derive(Clone)]
168pub struct SyncMetrics {
169 // === Connection metrics ===
170 /// Per-relay connection status (1=connected, 0=disconnected)
171 relay_connected: IntGaugeVec,
172 /// Connection attempts by relay and result (success/failure)
173 connection_attempts_total: IntCounterVec,
174
175 // === Event metrics ===
176 /// Events synced by source (live/startup/reconnect/daily)
177 events_total: IntCounterVec,
178
179 // === Summary metrics ===
180 /// Total relays discovered and tracked
181 relays_tracked_total: IntGauge,
182 /// Currently connected relay count
183 relays_connected_total: IntGauge,
184}
185
186impl SyncMetrics {
187 /// Register sync metrics with a Prometheus registry.
188 ///
189 /// Returns an error if metrics are already registered (e.g., in tests).
190 pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> {
191 // Connection metrics
192 let relay_connected = IntGaugeVec::new(
193 Opts::new(
194 "ngit_sync_relay_connected",
195 "Relay connection status (1=connected, 0=disconnected)",
196 ),
197 &["relay"],
198 )?;
199 registry.register(Box::new(relay_connected.clone()))?;
200
201 let connection_attempts_total = IntCounterVec::new(
202 Opts::new(
203 "ngit_sync_connection_attempts_total",
204 "Total connection attempts by relay and result",
205 ),
206 &["relay", "result"],
207 )?;
208 registry.register(Box::new(connection_attempts_total.clone()))?;
209
210 // Event metrics
211 let events_total = IntCounterVec::new(
212 Opts::new(
213 "ngit_sync_events_total",
214 "Total events synced by source type",
215 ),
216 &["source"],
217 )?;
218 registry.register(Box::new(events_total.clone()))?;
219
220 // Summary metrics
221 let relays_tracked_total = IntGauge::with_opts(Opts::new(
222 "ngit_sync_relays_tracked_total",
223 "Total number of relays discovered and tracked",
224 ))?;
225 registry.register(Box::new(relays_tracked_total.clone()))?;
226
227 let relays_connected_total = IntGauge::with_opts(Opts::new(
228 "ngit_sync_relays_connected_total",
229 "Number of currently connected relays",
230 ))?;
231 registry.register(Box::new(relays_connected_total.clone()))?;
232
233 Ok(Self {
234 relay_connected,
235 connection_attempts_total,
236 events_total,
237 relays_tracked_total,
238 relays_connected_total,
239 })
240 }
241
242 // === Connection Recording Methods ===
243
244 /// Record a connection attempt (success or failure)
245 pub fn record_connection_attempt(&self, relay: &str, success: bool) {
246 let result = if success { "success" } else { "failure" };
247 self.connection_attempts_total
248 .with_label_values(&[relay, result])
249 .inc();
250 }
251
252 /// Set relay connection status
253 pub fn set_relay_connected(&self, relay: &str, connected: bool) {
254 self.relay_connected
255 .with_label_values(&[relay])
256 .set(if connected { 1 } else { 0 });
257 }
258
259 /// Increment connected count
260 pub fn inc_connected_count(&self) {
261 self.relays_connected_total.inc();
262 }
263
264 /// Decrement connected count
265 pub fn dec_connected_count(&self) {
266 self.relays_connected_total.dec();
267 }
268
269 // === Event Recording Methods ===
270
271 /// Record a synced event by source type
272 ///
273 /// Source types:
274 /// - "live" - Real-time subscription events
275 /// - "startup" - Events from startup catchup
276 /// - "reconnect" - Events from reconnection catchup
277 pub fn record_event(&self, source: &str) {
278 self.events_total.with_label_values(&[source]).inc();
279 }
280
281 /// Record multiple events synced by source type
282 pub fn record_events(&self, source: &str, count: u64) {
283 self.events_total
284 .with_label_values(&[source])
285 .inc_by(count);
286 }
287
288 // === Summary Recording Methods ===
289
290 /// Set the total tracked relay count
291 pub fn set_tracked_count(&self, count: i64) {
292 self.relays_tracked_total.set(count);
293 }
294
295 /// Increment tracked relay count
296 pub fn inc_tracked_count(&self) {
297 self.relays_tracked_total.inc();
298 }
299
300 /// Get current tracked relay count
301 pub fn get_tracked_count(&self) -> i64 {
302 self.relays_tracked_total.get()
303 }
304
305 /// Get current connected relay count
306 pub fn get_connected_count(&self) -> i64 {
307 self.relays_connected_total.get()
308 }
309}
310
311/// Event source types for metrics tracking
312pub mod event_source {
313 /// Real-time subscription events
314 pub const LIVE: &str = "live";
315 /// Events from startup catchup
316 pub const STARTUP: &str = "startup";
317 /// Events from reconnection catchup
318 pub const RECONNECT: &str = "reconnect";
319}
320
321// =============================================================================
322// SyncManager - Main Entry Point 163// SyncManager - Main Entry Point
323// ============================================================================= 164// =============================================================================
324 165