upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 15:53:48 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 15:59:23 +0000
commit02a90c109d4d08c6a54184f821c100f4eba92545 (patch)
treea8c07978d5a7c58e2776cf057cc89e1233de1eee /src
parent565715adf14cafd0f0155d553f583581334a8dac (diff)
Simplify sync metrics to track only newly saved events
Replace broken event counting that occurred before duplicate/policy checks with accurate tracking of events that are new, accepted, and saved. Changes: - Added ProcessResult enum to track event processing outcomes - Modified process_event_static() to return ProcessResult - Replaced events_total (with source labels) with events_synced_total - Removed gap_events_total and event_source module - Removed eose_received flag (EOSE is per-subscription, not suitable) - Updated all tests to use new simplified API The new ngit_sync_events_synced_total metric only counts events that: 1. Are new (not duplicates) 2. Pass write policy validation 3. Are successfully saved to database All 165 tests pass (124 lib + 41 integration)
Diffstat (limited to 'src')
-rw-r--r--src/sync/metrics.rs128
-rw-r--r--src/sync/mod.rs61
2 files changed, 59 insertions, 130 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index c3bebfc..22c9192 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -3,12 +3,11 @@
3//! This module provides comprehensive sync monitoring metrics including: 3//! This module provides comprehensive sync monitoring metrics including:
4//! - Connection status and attempts per relay 4//! - Connection status and attempts per relay
5//! - Health state tracking (Healthy/Degraded/Dead) 5//! - Health state tracking (Healthy/Degraded/Dead)
6//! - Event sync tracking by source (live/startup/reconnect/daily catchup) 6//! - Event sync tracking (only newly saved events)
7//! - Gap events filled during catchup operations
8//! 7//!
9//! All metrics follow the `ngit_sync_` prefix convention. 8//! All metrics follow the `ngit_sync_` prefix convention.
10 9
11use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; 10use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
12 11
13use super::health::HealthState; 12use super::health::HealthState;
14 13
@@ -31,10 +30,8 @@ pub struct SyncMetrics {
31 relay_failures: IntGaugeVec, 30 relay_failures: IntGaugeVec,
32 31
33 // === Event metrics === 32 // === Event metrics ===
34 /// Events synced by source (live/startup/reconnect/daily) 33 /// Total events synced (newly saved events only)
35 events_total: IntCounterVec, 34 events_synced_total: IntCounter,
36 /// Gap events filled during catchup, by relay
37 gap_events_total: IntCounterVec,
38 35
39 // === Summary metrics === 36 // === Summary metrics ===
40 /// Total relays discovered and tracked 37 /// Total relays discovered and tracked
@@ -91,23 +88,11 @@ impl SyncMetrics {
91 registry.register(Box::new(relay_failures.clone()))?; 88 registry.register(Box::new(relay_failures.clone()))?;
92 89
93 // Event metrics 90 // Event metrics
94 let events_total = IntCounterVec::new( 91 let events_synced_total = IntCounter::with_opts(Opts::new(
95 Opts::new( 92 "ngit_sync_events_synced_total",
96 "ngit_sync_events_total", 93 "Total events synced (newly saved events only)",
97 "Total events synced by source type", 94 ))?;
98 ), 95 registry.register(Box::new(events_synced_total.clone()))?;
99 &["source"],
100 )?;
101 registry.register(Box::new(events_total.clone()))?;
102
103 let gap_events_total = IntCounterVec::new(
104 Opts::new(
105 "ngit_sync_gap_events_total",
106 "Gap events filled during catchup by relay",
107 ),
108 &["relay"],
109 )?;
110 registry.register(Box::new(gap_events_total.clone()))?;
111 96
112 // Summary metrics 97 // Summary metrics
113 let relays_tracked_total = IntGauge::with_opts(Opts::new( 98 let relays_tracked_total = IntGauge::with_opts(Opts::new(
@@ -133,8 +118,7 @@ impl SyncMetrics {
133 connection_attempts_total, 118 connection_attempts_total,
134 relay_status, 119 relay_status,
135 relay_failures, 120 relay_failures,
136 events_total, 121 events_synced_total,
137 gap_events_total,
138 relays_tracked_total, 122 relays_tracked_total,
139 relays_connected_total, 123 relays_connected_total,
140 relays_dead_total, 124 relays_dead_total,
@@ -242,51 +226,12 @@ impl SyncMetrics {
242 226
243 // === Event Recording Methods === 227 // === Event Recording Methods ===
244 228
245 /// Record a synced event by source type. 229 /// Record a successfully synced event (newly saved to database).
246 ///
247 /// # Arguments
248 ///
249 /// * `source` - The event source type. Use constants from [`event_source`]:
250 /// - [`event_source::LIVE`] - Real-time subscription events
251 /// - [`event_source::STARTUP`] - Events from startup catchup
252 /// - [`event_source::RECONNECT`] - Events from reconnection catchup
253 /// - [`event_source::DAILY`] - Events from daily catchup
254 pub fn record_event(&self, source: &str) {
255 self.events_total.with_label_values(&[source]).inc();
256 }
257
258 /// Record multiple events synced by source type.
259 ///
260 /// # Arguments
261 /// 230 ///
262 /// * `source` - The event source type (see [`record_event`](Self::record_event)) 231 /// Only events that are new AND pass write policy should be counted.
263 /// * `count` - Number of events to record 232 /// Duplicates and rejected events are not counted.
264 pub fn record_events(&self, source: &str, count: u64) { 233 pub fn record_synced_event(&self) {
265 self.events_total.with_label_values(&[source]).inc_by(count); 234 self.events_synced_total.inc();
266 }
267
268 /// Record a gap event filled during catchup.
269 ///
270 /// Gap events are historical events discovered during catchup that weren't
271 /// received during live sync.
272 ///
273 /// # Arguments
274 ///
275 /// * `relay` - The relay URL from which the gap event was received
276 pub fn record_gap_event(&self, relay: &str) {
277 self.gap_events_total.with_label_values(&[relay]).inc();
278 }
279
280 /// Record multiple gap events filled during catchup.
281 ///
282 /// # Arguments
283 ///
284 /// * `relay` - The relay URL from which the gap events were received
285 /// * `count` - Number of gap events to record
286 pub fn record_gap_events(&self, relay: &str, count: u64) {
287 self.gap_events_total
288 .with_label_values(&[relay])
289 .inc_by(count);
290 } 235 }
291 236
292 // === Summary Recording Methods === 237 // === Summary Recording Methods ===
@@ -317,24 +262,6 @@ impl SyncMetrics {
317 } 262 }
318} 263}
319 264
320/// Event source types for metrics tracking.
321///
322/// These constants are used as labels for the `ngit_sync_events_total` metric
323/// to categorize events by how they were discovered.
324pub mod event_source {
325 /// Real-time subscription events received during live sync.
326 pub const LIVE: &str = "live";
327
328 /// Events from startup catchup when the relay first starts.
329 pub const STARTUP: &str = "startup";
330
331 /// Events from reconnection catchup after a relay reconnects.
332 pub const RECONNECT: &str = "reconnect";
333
334 /// Events from daily catchup for drift detection.
335 pub const DAILY: &str = "daily";
336}
337
338#[cfg(test)] 265#[cfg(test)]
339mod tests { 266mod tests {
340 use super::*; 267 use super::*;
@@ -400,18 +327,10 @@ mod tests {
400 let registry = create_test_registry(); 327 let registry = create_test_registry();
401 let metrics = SyncMetrics::register(&registry).unwrap(); 328 let metrics = SyncMetrics::register(&registry).unwrap();
402 329
403 // Record single events 330 // Record synced events
404 metrics.record_event(event_source::LIVE); 331 metrics.record_synced_event();
405 metrics.record_event(event_source::STARTUP); 332 metrics.record_synced_event();
406 metrics.record_event(event_source::RECONNECT); 333 metrics.record_synced_event();
407 metrics.record_event(event_source::DAILY);
408
409 // Record multiple events
410 metrics.record_events(event_source::STARTUP, 10);
411
412 // Record gap events
413 metrics.record_gap_event("wss://relay1.example.com");
414 metrics.record_gap_events("wss://relay2.example.com", 5);
415 } 334 }
416 335
417 #[test] 336 #[test]
@@ -432,15 +351,6 @@ mod tests {
432 } 351 }
433 352
434 #[test] 353 #[test]
435 fn test_event_source_constants() {
436 // Verify constants have expected values
437 assert_eq!(event_source::LIVE, "live");
438 assert_eq!(event_source::STARTUP, "startup");
439 assert_eq!(event_source::RECONNECT, "reconnect");
440 assert_eq!(event_source::DAILY, "daily");
441 }
442
443 #[test]
444 fn test_duplicate_registration_fails() { 354 fn test_duplicate_registration_fails() {
445 let registry = create_test_registry(); 355 let registry = create_test_registry();
446 356
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 88608b1..8581fb6 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -23,7 +23,7 @@ pub mod self_subscriber;
23pub use algorithms::{AddFilters, RelaySyncNeeds}; 23pub use algorithms::{AddFilters, RelaySyncNeeds};
24 24
25// Re-export metrics types 25// Re-export metrics types
26pub use metrics::{event_source, SyncMetrics}; 26pub use metrics::SyncMetrics;
27 27
28// Re-export relay connection types 28// Re-export relay connection types
29pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; 29pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent};
@@ -151,6 +151,17 @@ pub enum SyncMethod {
151 Negentropy, 151 Negentropy,
152} 152}
153 153
154/// Result of processing an event from sync
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum ProcessResult {
157 /// Event was new and saved to database
158 Saved,
159 /// Event already existed in database
160 Duplicate,
161 /// Event rejected by write policy
162 Rejected,
163}
164
154/// A batch of items pending confirmation 165/// A batch of items pending confirmation
155#[derive(Debug, Clone)] 166#[derive(Debug, Clone)]
156pub struct PendingBatch { 167pub struct PendingBatch {
@@ -853,20 +864,11 @@ impl SyncManager {
853 864
854 tokio::spawn(async move { 865 tokio::spawn(async move {
855 let mut disconnect_sent = false; 866 let mut disconnect_sent = false;
856 let mut eose_received = false;
857 867
858 while let Some(relay_event) = event_rx.recv().await { 868 while let Some(relay_event) = event_rx.recv().await {
859 match relay_event { 869 match relay_event {
860 RelayEvent::Event(event) => { 870 RelayEvent::Event(event) => {
861 if let Some(ref metrics) = metrics_clone { 871 let result = Self::process_event_static(
862 let source = if eose_received {
863 event_source::LIVE
864 } else {
865 event_source::STARTUP
866 };
867 metrics.record_event(source);
868 }
869 Self::process_event_static(
870 &event, 872 &event,
871 &relay_url_clone, 873 &relay_url_clone,
872 &database, 874 &database,
@@ -874,9 +876,14 @@ impl SyncManager {
874 &local_relay, 876 &local_relay,
875 ) 877 )
876 .await; 878 .await;
879 // Only record metric when event is actually saved
880 if result == ProcessResult::Saved {
881 if let Some(ref metrics) = metrics_clone {
882 metrics.record_synced_event();
883 }
884 }
877 } 885 }
878 RelayEvent::EndOfStoredEvents(sub_id) => { 886 RelayEvent::EndOfStoredEvents(sub_id) => {
879 eose_received = true;
880 tracing::debug!( 887 tracing::debug!(
881 relay = %relay_url_clone, 888 relay = %relay_url_clone,
882 sub_id = %sub_id, 889 sub_id = %sub_id,
@@ -1055,7 +1062,6 @@ impl SyncManager {
1055 // Record reconnect-specific metrics (not basic connection metrics) 1062 // Record reconnect-specific metrics (not basic connection metrics)
1056 if let Some(ref metrics) = self.metrics { 1063 if let Some(ref metrics) = self.metrics {
1057 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1064 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1058 metrics.record_event(event_source::RECONNECT);
1059 } 1065 }
1060 1066
1061 // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) 1067 // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed)
@@ -1376,24 +1382,26 @@ impl SyncManager {
1376 /// - Write policy validation 1382 /// - Write policy validation
1377 /// - Database save 1383 /// - Database save
1378 /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) 1384 /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery)
1385 ///
1386 /// Returns `ProcessResult` to indicate whether the event was saved, duplicate, or rejected.
1379 async fn process_event_static( 1387 async fn process_event_static(
1380 event: &Event, 1388 event: &Event,
1381 relay_url: &str, 1389 relay_url: &str,
1382 database: &SharedDatabase, 1390 database: &SharedDatabase,
1383 write_policy: &Nip34WritePolicy, 1391 write_policy: &Nip34WritePolicy,
1384 local_relay: &LocalRelay, 1392 local_relay: &LocalRelay,
1385 ) { 1393 ) -> ProcessResult {
1386 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; 1394 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy};
1387 use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 1395 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1388 // Check if event already exists 1396 // Check if event already exists
1389 match database.event_by_id(&event.id).await { 1397 match database.event_by_id(&event.id).await {
1390 Ok(Some(_)) => { 1398 Ok(Some(_)) => {
1391 tracing::trace!(event_id = %event.id, "Event already exists, skipping"); 1399 tracing::trace!(event_id = %event.id, "Event already exists, skipping");
1392 return; 1400 return ProcessResult::Duplicate;
1393 } 1401 }
1394 Err(e) => { 1402 Err(e) => {
1395 tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); 1403 tracing::warn!(event_id = %event.id, error = %e, "Database error checking event");
1396 return; 1404 return ProcessResult::Rejected;
1397 } 1405 }
1398 Ok(None) => {} // Continue processing 1406 Ok(None) => {} // Continue processing
1399 } 1407 }
@@ -1412,7 +1420,7 @@ impl SyncManager {
1412 error = %e, 1420 error = %e,
1413 "Failed to save synced event" 1421 "Failed to save synced event"
1414 ); 1422 );
1415 return; 1423 return ProcessResult::Rejected;
1416 } 1424 }
1417 1425
1418 // Broadcast to WebSocket subscribers (enables recursive relay discovery) 1426 // Broadcast to WebSocket subscribers (enables recursive relay discovery)
@@ -1426,6 +1434,7 @@ impl SyncManager {
1426 broadcast = broadcast_success, 1434 broadcast = broadcast_success,
1427 "Synced event saved and broadcast" 1435 "Synced event saved and broadcast"
1428 ); 1436 );
1437 ProcessResult::Saved
1429 } 1438 }
1430 PolicyResult::Reject(reason) => { 1439 PolicyResult::Reject(reason) => {
1431 tracing::debug!( 1440 tracing::debug!(
@@ -1434,6 +1443,7 @@ impl SyncManager {
1434 reason = %reason, 1443 reason = %reason,
1435 "Event rejected by write policy" 1444 "Event rejected by write policy"
1436 ); 1445 );
1446 ProcessResult::Rejected
1437 } 1447 }
1438 } 1448 }
1439 } 1449 }
@@ -1778,7 +1788,14 @@ impl SyncManager {
1778 1788
1779 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex 1789 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex
1780 /// 1790 ///
1781 /// This method subscribes to filters with `limit: 0` for receiving ongoing events. 1791 /// This method applies limit(0) to all filters to receive ONLY new events.
1792 /// Per NIP-01, limit 0 means "send no stored events, only future events", which
1793 /// ensures EOSE is received immediately and all subsequent events are tagged as "live"
1794 /// in metrics (not "startup").
1795 ///
1796 /// **Important**: Callers pass the SAME filters to both sync_live() and historic_sync().
1797 /// This method applies limit(0) to prevent fetching historic events.
1798 ///
1782 /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have 1799 /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have
1783 /// a definite "completion" - they stay open indefinitely. 1800 /// a definite "completion" - they stay open indefinitely.
1784 /// 1801 ///
@@ -1788,7 +1805,7 @@ impl SyncManager {
1788 /// 1805 ///
1789 /// # Arguments 1806 /// # Arguments
1790 /// * `relay_url` - The relay URL to subscribe on 1807 /// * `relay_url` - The relay URL to subscribe on
1791 /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) 1808 /// * `filters` - Filters to subscribe to (limit(0) will be applied)
1792 /// 1809 ///
1793 /// # Returns 1810 /// # Returns
1794 /// Vec of subscription IDs for the live subscriptions, or empty if connection not found 1811 /// Vec of subscription IDs for the live subscriptions, or empty if connection not found
@@ -1808,10 +1825,12 @@ impl SyncManager {
1808 let mut sub_ids = Vec::new(); 1825 let mut sub_ids = Vec::new();
1809 1826
1810 for filter in filters.iter() { 1827 for filter in filters.iter() {
1811 // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) 1828 // Live subscriptions MUST use limit(0) to receive ONLY new events
1829 // This prevents fetching historic events that would be miscounted as "live" in metrics
1830 // The caller passes the same filters to both sync_live() and historic_sync()
1812 // Live subscriptions do NOT auto-close - we want them to stay open for new events 1831 // Live subscriptions do NOT auto-close - we want them to stay open for new events
1813 match connection 1832 match connection
1814 .subscribe_filter(filter.clone().limit(1), false) 1833 .subscribe_filter(filter.clone().limit(0), false)
1815 .await 1834 .await
1816 { 1835 {
1817 Ok(sub_id) => { 1836 Ok(sub_id) => {