diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 15:53:48 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 15:59:23 +0000 |
| commit | 02a90c109d4d08c6a54184f821c100f4eba92545 (patch) | |
| tree | a8c07978d5a7c58e2776cf057cc89e1233de1eee /src | |
| parent | 565715adf14cafd0f0155d553f583581334a8dac (diff) | |
Simplify sync metrics to track only newly saved events
Replace broken event counting that occurred before duplicate/policy checks
with accurate tracking of events that are new, accepted, and saved.
Changes:
- Added ProcessResult enum to track event processing outcomes
- Modified process_event_static() to return ProcessResult
- Replaced events_total (with source labels) with events_synced_total
- Removed gap_events_total and event_source module
- Removed eose_received flag (EOSE is per-subscription, not suitable)
- Updated all tests to use new simplified API
The new ngit_sync_events_synced_total metric only counts events that:
1. Are new (not duplicates)
2. Pass write policy validation
3. Are successfully saved to database
All 165 tests pass (124 lib + 41 integration)
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/metrics.rs | 128 | ||||
| -rw-r--r-- | src/sync/mod.rs | 61 |
2 files changed, 59 insertions, 130 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index c3bebfc..22c9192 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -3,12 +3,11 @@ | |||
| 3 | //! This module provides comprehensive sync monitoring metrics including: | 3 | //! This module provides comprehensive sync monitoring metrics including: |
| 4 | //! - Connection status and attempts per relay | 4 | //! - Connection status and attempts per relay |
| 5 | //! - Health state tracking (Healthy/Degraded/Dead) | 5 | //! - Health state tracking (Healthy/Degraded/Dead) |
| 6 | //! - Event sync tracking by source (live/startup/reconnect/daily catchup) | 6 | //! - Event sync tracking (only newly saved events) |
| 7 | //! - Gap events filled during catchup operations | ||
| 8 | //! | 7 | //! |
| 9 | //! All metrics follow the `ngit_sync_` prefix convention. | 8 | //! All metrics follow the `ngit_sync_` prefix convention. |
| 10 | 9 | ||
| 11 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; | 10 | use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; |
| 12 | 11 | ||
| 13 | use super::health::HealthState; | 12 | use super::health::HealthState; |
| 14 | 13 | ||
| @@ -31,10 +30,8 @@ pub struct SyncMetrics { | |||
| 31 | relay_failures: IntGaugeVec, | 30 | relay_failures: IntGaugeVec, |
| 32 | 31 | ||
| 33 | // === Event metrics === | 32 | // === Event metrics === |
| 34 | /// Events synced by source (live/startup/reconnect/daily) | 33 | /// Total events synced (newly saved events only) |
| 35 | events_total: IntCounterVec, | 34 | events_synced_total: IntCounter, |
| 36 | /// Gap events filled during catchup, by relay | ||
| 37 | gap_events_total: IntCounterVec, | ||
| 38 | 35 | ||
| 39 | // === Summary metrics === | 36 | // === Summary metrics === |
| 40 | /// Total relays discovered and tracked | 37 | /// Total relays discovered and tracked |
| @@ -91,23 +88,11 @@ impl SyncMetrics { | |||
| 91 | registry.register(Box::new(relay_failures.clone()))?; | 88 | registry.register(Box::new(relay_failures.clone()))?; |
| 92 | 89 | ||
| 93 | // Event metrics | 90 | // Event metrics |
| 94 | let events_total = IntCounterVec::new( | 91 | let events_synced_total = IntCounter::with_opts(Opts::new( |
| 95 | Opts::new( | 92 | "ngit_sync_events_synced_total", |
| 96 | "ngit_sync_events_total", | 93 | "Total events synced (newly saved events only)", |
| 97 | "Total events synced by source type", | 94 | ))?; |
| 98 | ), | 95 | registry.register(Box::new(events_synced_total.clone()))?; |
| 99 | &["source"], | ||
| 100 | )?; | ||
| 101 | registry.register(Box::new(events_total.clone()))?; | ||
| 102 | |||
| 103 | let gap_events_total = IntCounterVec::new( | ||
| 104 | Opts::new( | ||
| 105 | "ngit_sync_gap_events_total", | ||
| 106 | "Gap events filled during catchup by relay", | ||
| 107 | ), | ||
| 108 | &["relay"], | ||
| 109 | )?; | ||
| 110 | registry.register(Box::new(gap_events_total.clone()))?; | ||
| 111 | 96 | ||
| 112 | // Summary metrics | 97 | // Summary metrics |
| 113 | let relays_tracked_total = IntGauge::with_opts(Opts::new( | 98 | let relays_tracked_total = IntGauge::with_opts(Opts::new( |
| @@ -133,8 +118,7 @@ impl SyncMetrics { | |||
| 133 | connection_attempts_total, | 118 | connection_attempts_total, |
| 134 | relay_status, | 119 | relay_status, |
| 135 | relay_failures, | 120 | relay_failures, |
| 136 | events_total, | 121 | events_synced_total, |
| 137 | gap_events_total, | ||
| 138 | relays_tracked_total, | 122 | relays_tracked_total, |
| 139 | relays_connected_total, | 123 | relays_connected_total, |
| 140 | relays_dead_total, | 124 | relays_dead_total, |
| @@ -242,51 +226,12 @@ impl SyncMetrics { | |||
| 242 | 226 | ||
| 243 | // === Event Recording Methods === | 227 | // === Event Recording Methods === |
| 244 | 228 | ||
| 245 | /// Record a synced event by source type. | 229 | /// Record a successfully synced event (newly saved to database). |
| 246 | /// | ||
| 247 | /// # Arguments | ||
| 248 | /// | ||
| 249 | /// * `source` - The event source type. Use constants from [`event_source`]: | ||
| 250 | /// - [`event_source::LIVE`] - Real-time subscription events | ||
| 251 | /// - [`event_source::STARTUP`] - Events from startup catchup | ||
| 252 | /// - [`event_source::RECONNECT`] - Events from reconnection catchup | ||
| 253 | /// - [`event_source::DAILY`] - Events from daily catchup | ||
| 254 | pub fn record_event(&self, source: &str) { | ||
| 255 | self.events_total.with_label_values(&[source]).inc(); | ||
| 256 | } | ||
| 257 | |||
| 258 | /// Record multiple events synced by source type. | ||
| 259 | /// | ||
| 260 | /// # Arguments | ||
| 261 | /// | 230 | /// |
| 262 | /// * `source` - The event source type (see [`record_event`](Self::record_event)) | 231 | /// Only events that are new AND pass write policy should be counted. |
| 263 | /// * `count` - Number of events to record | 232 | /// Duplicates and rejected events are not counted. |
| 264 | pub fn record_events(&self, source: &str, count: u64) { | 233 | pub fn record_synced_event(&self) { |
| 265 | self.events_total.with_label_values(&[source]).inc_by(count); | 234 | self.events_synced_total.inc(); |
| 266 | } | ||
| 267 | |||
| 268 | /// Record a gap event filled during catchup. | ||
| 269 | /// | ||
| 270 | /// Gap events are historical events discovered during catchup that weren't | ||
| 271 | /// received during live sync. | ||
| 272 | /// | ||
| 273 | /// # Arguments | ||
| 274 | /// | ||
| 275 | /// * `relay` - The relay URL from which the gap event was received | ||
| 276 | pub fn record_gap_event(&self, relay: &str) { | ||
| 277 | self.gap_events_total.with_label_values(&[relay]).inc(); | ||
| 278 | } | ||
| 279 | |||
| 280 | /// Record multiple gap events filled during catchup. | ||
| 281 | /// | ||
| 282 | /// # Arguments | ||
| 283 | /// | ||
| 284 | /// * `relay` - The relay URL from which the gap events were received | ||
| 285 | /// * `count` - Number of gap events to record | ||
| 286 | pub fn record_gap_events(&self, relay: &str, count: u64) { | ||
| 287 | self.gap_events_total | ||
| 288 | .with_label_values(&[relay]) | ||
| 289 | .inc_by(count); | ||
| 290 | } | 235 | } |
| 291 | 236 | ||
| 292 | // === Summary Recording Methods === | 237 | // === Summary Recording Methods === |
| @@ -317,24 +262,6 @@ impl SyncMetrics { | |||
| 317 | } | 262 | } |
| 318 | } | 263 | } |
| 319 | 264 | ||
| 320 | /// Event source types for metrics tracking. | ||
| 321 | /// | ||
| 322 | /// These constants are used as labels for the `ngit_sync_events_total` metric | ||
| 323 | /// to categorize events by how they were discovered. | ||
| 324 | pub mod event_source { | ||
| 325 | /// Real-time subscription events received during live sync. | ||
| 326 | pub const LIVE: &str = "live"; | ||
| 327 | |||
| 328 | /// Events from startup catchup when the relay first starts. | ||
| 329 | pub const STARTUP: &str = "startup"; | ||
| 330 | |||
| 331 | /// Events from reconnection catchup after a relay reconnects. | ||
| 332 | pub const RECONNECT: &str = "reconnect"; | ||
| 333 | |||
| 334 | /// Events from daily catchup for drift detection. | ||
| 335 | pub const DAILY: &str = "daily"; | ||
| 336 | } | ||
| 337 | |||
| 338 | #[cfg(test)] | 265 | #[cfg(test)] |
| 339 | mod tests { | 266 | mod tests { |
| 340 | use super::*; | 267 | use super::*; |
| @@ -400,18 +327,10 @@ mod tests { | |||
| 400 | let registry = create_test_registry(); | 327 | let registry = create_test_registry(); |
| 401 | let metrics = SyncMetrics::register(®istry).unwrap(); | 328 | let metrics = SyncMetrics::register(®istry).unwrap(); |
| 402 | 329 | ||
| 403 | // Record single events | 330 | // Record synced events |
| 404 | metrics.record_event(event_source::LIVE); | 331 | metrics.record_synced_event(); |
| 405 | metrics.record_event(event_source::STARTUP); | 332 | metrics.record_synced_event(); |
| 406 | metrics.record_event(event_source::RECONNECT); | 333 | metrics.record_synced_event(); |
| 407 | metrics.record_event(event_source::DAILY); | ||
| 408 | |||
| 409 | // Record multiple events | ||
| 410 | metrics.record_events(event_source::STARTUP, 10); | ||
| 411 | |||
| 412 | // Record gap events | ||
| 413 | metrics.record_gap_event("wss://relay1.example.com"); | ||
| 414 | metrics.record_gap_events("wss://relay2.example.com", 5); | ||
| 415 | } | 334 | } |
| 416 | 335 | ||
| 417 | #[test] | 336 | #[test] |
| @@ -432,15 +351,6 @@ mod tests { | |||
| 432 | } | 351 | } |
| 433 | 352 | ||
| 434 | #[test] | 353 | #[test] |
| 435 | fn test_event_source_constants() { | ||
| 436 | // Verify constants have expected values | ||
| 437 | assert_eq!(event_source::LIVE, "live"); | ||
| 438 | assert_eq!(event_source::STARTUP, "startup"); | ||
| 439 | assert_eq!(event_source::RECONNECT, "reconnect"); | ||
| 440 | assert_eq!(event_source::DAILY, "daily"); | ||
| 441 | } | ||
| 442 | |||
| 443 | #[test] | ||
| 444 | fn test_duplicate_registration_fails() { | 354 | fn test_duplicate_registration_fails() { |
| 445 | let registry = create_test_registry(); | 355 | let registry = create_test_registry(); |
| 446 | 356 | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 88608b1..8581fb6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -23,7 +23,7 @@ pub mod self_subscriber; | |||
| 23 | pub use algorithms::{AddFilters, RelaySyncNeeds}; | 23 | pub use algorithms::{AddFilters, RelaySyncNeeds}; |
| 24 | 24 | ||
| 25 | // Re-export metrics types | 25 | // Re-export metrics types |
| 26 | pub use metrics::{event_source, SyncMetrics}; | 26 | pub use metrics::SyncMetrics; |
| 27 | 27 | ||
| 28 | // Re-export relay connection types | 28 | // Re-export relay connection types |
| 29 | pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; | 29 | pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; |
| @@ -151,6 +151,17 @@ pub enum SyncMethod { | |||
| 151 | Negentropy, | 151 | Negentropy, |
| 152 | } | 152 | } |
| 153 | 153 | ||
| 154 | /// Result of processing an event from sync | ||
| 155 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 156 | pub enum ProcessResult { | ||
| 157 | /// Event was new and saved to database | ||
| 158 | Saved, | ||
| 159 | /// Event already existed in database | ||
| 160 | Duplicate, | ||
| 161 | /// Event rejected by write policy | ||
| 162 | Rejected, | ||
| 163 | } | ||
| 164 | |||
| 154 | /// A batch of items pending confirmation | 165 | /// A batch of items pending confirmation |
| 155 | #[derive(Debug, Clone)] | 166 | #[derive(Debug, Clone)] |
| 156 | pub struct PendingBatch { | 167 | pub struct PendingBatch { |
| @@ -853,20 +864,11 @@ impl SyncManager { | |||
| 853 | 864 | ||
| 854 | tokio::spawn(async move { | 865 | tokio::spawn(async move { |
| 855 | let mut disconnect_sent = false; | 866 | let mut disconnect_sent = false; |
| 856 | let mut eose_received = false; | ||
| 857 | 867 | ||
| 858 | while let Some(relay_event) = event_rx.recv().await { | 868 | while let Some(relay_event) = event_rx.recv().await { |
| 859 | match relay_event { | 869 | match relay_event { |
| 860 | RelayEvent::Event(event) => { | 870 | RelayEvent::Event(event) => { |
| 861 | if let Some(ref metrics) = metrics_clone { | 871 | let result = Self::process_event_static( |
| 862 | let source = if eose_received { | ||
| 863 | event_source::LIVE | ||
| 864 | } else { | ||
| 865 | event_source::STARTUP | ||
| 866 | }; | ||
| 867 | metrics.record_event(source); | ||
| 868 | } | ||
| 869 | Self::process_event_static( | ||
| 870 | &event, | 872 | &event, |
| 871 | &relay_url_clone, | 873 | &relay_url_clone, |
| 872 | &database, | 874 | &database, |
| @@ -874,9 +876,14 @@ impl SyncManager { | |||
| 874 | &local_relay, | 876 | &local_relay, |
| 875 | ) | 877 | ) |
| 876 | .await; | 878 | .await; |
| 879 | // Only record metric when event is actually saved | ||
| 880 | if result == ProcessResult::Saved { | ||
| 881 | if let Some(ref metrics) = metrics_clone { | ||
| 882 | metrics.record_synced_event(); | ||
| 883 | } | ||
| 884 | } | ||
| 877 | } | 885 | } |
| 878 | RelayEvent::EndOfStoredEvents(sub_id) => { | 886 | RelayEvent::EndOfStoredEvents(sub_id) => { |
| 879 | eose_received = true; | ||
| 880 | tracing::debug!( | 887 | tracing::debug!( |
| 881 | relay = %relay_url_clone, | 888 | relay = %relay_url_clone, |
| 882 | sub_id = %sub_id, | 889 | sub_id = %sub_id, |
| @@ -1055,7 +1062,6 @@ impl SyncManager { | |||
| 1055 | // Record reconnect-specific metrics (not basic connection metrics) | 1062 | // Record reconnect-specific metrics (not basic connection metrics) |
| 1056 | if let Some(ref metrics) = self.metrics { | 1063 | if let Some(ref metrics) = self.metrics { |
| 1057 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1064 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); |
| 1058 | metrics.record_event(event_source::RECONNECT); | ||
| 1059 | } | 1065 | } |
| 1060 | 1066 | ||
| 1061 | // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) | 1067 | // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) |
| @@ -1376,24 +1382,26 @@ impl SyncManager { | |||
| 1376 | /// - Write policy validation | 1382 | /// - Write policy validation |
| 1377 | /// - Database save | 1383 | /// - Database save |
| 1378 | /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) | 1384 | /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) |
| 1385 | /// | ||
| 1386 | /// Returns `ProcessResult` to indicate whether the event was saved, duplicate, or rejected. | ||
| 1379 | async fn process_event_static( | 1387 | async fn process_event_static( |
| 1380 | event: &Event, | 1388 | event: &Event, |
| 1381 | relay_url: &str, | 1389 | relay_url: &str, |
| 1382 | database: &SharedDatabase, | 1390 | database: &SharedDatabase, |
| 1383 | write_policy: &Nip34WritePolicy, | 1391 | write_policy: &Nip34WritePolicy, |
| 1384 | local_relay: &LocalRelay, | 1392 | local_relay: &LocalRelay, |
| 1385 | ) { | 1393 | ) -> ProcessResult { |
| 1386 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; | 1394 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; |
| 1387 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 1395 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |
| 1388 | // Check if event already exists | 1396 | // Check if event already exists |
| 1389 | match database.event_by_id(&event.id).await { | 1397 | match database.event_by_id(&event.id).await { |
| 1390 | Ok(Some(_)) => { | 1398 | Ok(Some(_)) => { |
| 1391 | tracing::trace!(event_id = %event.id, "Event already exists, skipping"); | 1399 | tracing::trace!(event_id = %event.id, "Event already exists, skipping"); |
| 1392 | return; | 1400 | return ProcessResult::Duplicate; |
| 1393 | } | 1401 | } |
| 1394 | Err(e) => { | 1402 | Err(e) => { |
| 1395 | tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); | 1403 | tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); |
| 1396 | return; | 1404 | return ProcessResult::Rejected; |
| 1397 | } | 1405 | } |
| 1398 | Ok(None) => {} // Continue processing | 1406 | Ok(None) => {} // Continue processing |
| 1399 | } | 1407 | } |
| @@ -1412,7 +1420,7 @@ impl SyncManager { | |||
| 1412 | error = %e, | 1420 | error = %e, |
| 1413 | "Failed to save synced event" | 1421 | "Failed to save synced event" |
| 1414 | ); | 1422 | ); |
| 1415 | return; | 1423 | return ProcessResult::Rejected; |
| 1416 | } | 1424 | } |
| 1417 | 1425 | ||
| 1418 | // Broadcast to WebSocket subscribers (enables recursive relay discovery) | 1426 | // Broadcast to WebSocket subscribers (enables recursive relay discovery) |
| @@ -1426,6 +1434,7 @@ impl SyncManager { | |||
| 1426 | broadcast = broadcast_success, | 1434 | broadcast = broadcast_success, |
| 1427 | "Synced event saved and broadcast" | 1435 | "Synced event saved and broadcast" |
| 1428 | ); | 1436 | ); |
| 1437 | ProcessResult::Saved | ||
| 1429 | } | 1438 | } |
| 1430 | PolicyResult::Reject(reason) => { | 1439 | PolicyResult::Reject(reason) => { |
| 1431 | tracing::debug!( | 1440 | tracing::debug!( |
| @@ -1434,6 +1443,7 @@ impl SyncManager { | |||
| 1434 | reason = %reason, | 1443 | reason = %reason, |
| 1435 | "Event rejected by write policy" | 1444 | "Event rejected by write policy" |
| 1436 | ); | 1445 | ); |
| 1446 | ProcessResult::Rejected | ||
| 1437 | } | 1447 | } |
| 1438 | } | 1448 | } |
| 1439 | } | 1449 | } |
| @@ -1778,7 +1788,14 @@ impl SyncManager { | |||
| 1778 | 1788 | ||
| 1779 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex | 1789 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex |
| 1780 | /// | 1790 | /// |
| 1781 | /// This method subscribes to filters with `limit: 0` for receiving ongoing events. | 1791 | /// This method applies limit(0) to all filters to receive ONLY new events. |
| 1792 | /// Per NIP-01, limit 0 means "send no stored events, only future events", which | ||
| 1793 | /// ensures EOSE is received immediately and all subsequent events are tagged as "live" | ||
| 1794 | /// in metrics (not "startup"). | ||
| 1795 | /// | ||
| 1796 | /// **Important**: Callers pass the SAME filters to both sync_live() and historic_sync(). | ||
| 1797 | /// This method applies limit(0) to prevent fetching historic events. | ||
| 1798 | /// | ||
| 1782 | /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have | 1799 | /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have |
| 1783 | /// a definite "completion" - they stay open indefinitely. | 1800 | /// a definite "completion" - they stay open indefinitely. |
| 1784 | /// | 1801 | /// |
| @@ -1788,7 +1805,7 @@ impl SyncManager { | |||
| 1788 | /// | 1805 | /// |
| 1789 | /// # Arguments | 1806 | /// # Arguments |
| 1790 | /// * `relay_url` - The relay URL to subscribe on | 1807 | /// * `relay_url` - The relay URL to subscribe on |
| 1791 | /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) | 1808 | /// * `filters` - Filters to subscribe to (limit(0) will be applied) |
| 1792 | /// | 1809 | /// |
| 1793 | /// # Returns | 1810 | /// # Returns |
| 1794 | /// Vec of subscription IDs for the live subscriptions, or empty if connection not found | 1811 | /// Vec of subscription IDs for the live subscriptions, or empty if connection not found |
| @@ -1808,10 +1825,12 @@ impl SyncManager { | |||
| 1808 | let mut sub_ids = Vec::new(); | 1825 | let mut sub_ids = Vec::new(); |
| 1809 | 1826 | ||
| 1810 | for filter in filters.iter() { | 1827 | for filter in filters.iter() { |
| 1811 | // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) | 1828 | // Live subscriptions MUST use limit(0) to receive ONLY new events |
| 1829 | // This prevents fetching historic events that would be miscounted as "live" in metrics | ||
| 1830 | // The caller passes the same filters to both sync_live() and historic_sync() | ||
| 1812 | // Live subscriptions do NOT auto-close - we want them to stay open for new events | 1831 | // Live subscriptions do NOT auto-close - we want them to stay open for new events |
| 1813 | match connection | 1832 | match connection |
| 1814 | .subscribe_filter(filter.clone().limit(1), false) | 1833 | .subscribe_filter(filter.clone().limit(0), false) |
| 1815 | .await | 1834 | .await |
| 1816 | { | 1835 | { |
| 1817 | Ok(sub_id) => { | 1836 | Ok(sub_id) => { |