From 02a90c109d4d08c6a54184f821c100f4eba92545 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 15:53:48 +0000 Subject: Simplify sync metrics to track only newly saved events Replace broken event counting that occurred before duplicate/policy checks with accurate tracking of events that are new, accepted, and saved. Changes: - Added ProcessResult enum to track event processing outcomes - Modified process_event_static() to return ProcessResult - Replaced events_total (with source labels) with events_synced_total - Removed gap_events_total and event_source module - Removed eose_received flag (EOSE is per-subscription, not suitable) - Updated all tests to use new simplified API The new ngit_sync_events_synced_total metric only counts events that: 1. Are new (not duplicates) 2. Pass write policy validation 3. Are successfully saved to database All 165 tests pass (124 lib + 41 integration) --- src/sync/metrics.rs | 128 ++++++++-------------------------------------------- src/sync/mod.rs | 61 ++++++++++++++++--------- 2 files changed, 59 insertions(+), 130 deletions(-) (limited to 'src/sync') diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index c3bebfc..22c9192 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs @@ -3,12 +3,11 @@ //! This module provides comprehensive sync monitoring metrics including: //! - Connection status and attempts per relay //! - Health state tracking (Healthy/Degraded/Dead) -//! - Event sync tracking by source (live/startup/reconnect/daily catchup) -//! - Gap events filled during catchup operations +//! - Event sync tracking (only newly saved events) //! //! All metrics follow the `ngit_sync_` prefix convention. -use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; +use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; use super::health::HealthState; @@ -31,10 +30,8 @@ pub struct SyncMetrics { relay_failures: IntGaugeVec, // === Event metrics === - /// Events synced by source (live/startup/reconnect/daily) - events_total: IntCounterVec, - /// Gap events filled during catchup, by relay - gap_events_total: IntCounterVec, + /// Total events synced (newly saved events only) + events_synced_total: IntCounter, // === Summary metrics === /// Total relays discovered and tracked @@ -91,23 +88,11 @@ impl SyncMetrics { registry.register(Box::new(relay_failures.clone()))?; // Event metrics - let events_total = IntCounterVec::new( - Opts::new( - "ngit_sync_events_total", - "Total events synced by source type", - ), - &["source"], - )?; - registry.register(Box::new(events_total.clone()))?; - - let gap_events_total = IntCounterVec::new( - Opts::new( - "ngit_sync_gap_events_total", - "Gap events filled during catchup by relay", - ), - &["relay"], - )?; - registry.register(Box::new(gap_events_total.clone()))?; + let events_synced_total = IntCounter::with_opts(Opts::new( + "ngit_sync_events_synced_total", + "Total events synced (newly saved events only)", + ))?; + registry.register(Box::new(events_synced_total.clone()))?; // Summary metrics let relays_tracked_total = IntGauge::with_opts(Opts::new( @@ -133,8 +118,7 @@ impl SyncMetrics { connection_attempts_total, relay_status, relay_failures, - events_total, - gap_events_total, + events_synced_total, relays_tracked_total, relays_connected_total, relays_dead_total, @@ -242,51 +226,12 @@ impl SyncMetrics { // === Event Recording Methods === - /// Record a synced event by source type. - /// - /// # Arguments - /// - /// * `source` - The event source type. Use constants from [`event_source`]: - /// - [`event_source::LIVE`] - Real-time subscription events - /// - [`event_source::STARTUP`] - Events from startup catchup - /// - [`event_source::RECONNECT`] - Events from reconnection catchup - /// - [`event_source::DAILY`] - Events from daily catchup - pub fn record_event(&self, source: &str) { - self.events_total.with_label_values(&[source]).inc(); - } - - /// Record multiple events synced by source type. - /// - /// # Arguments + /// Record a successfully synced event (newly saved to database). /// - /// * `source` - The event source type (see [`record_event`](Self::record_event)) - /// * `count` - Number of events to record - pub fn record_events(&self, source: &str, count: u64) { - self.events_total.with_label_values(&[source]).inc_by(count); - } - - /// Record a gap event filled during catchup. - /// - /// Gap events are historical events discovered during catchup that weren't - /// received during live sync. - /// - /// # Arguments - /// - /// * `relay` - The relay URL from which the gap event was received - pub fn record_gap_event(&self, relay: &str) { - self.gap_events_total.with_label_values(&[relay]).inc(); - } - - /// Record multiple gap events filled during catchup. - /// - /// # Arguments - /// - /// * `relay` - The relay URL from which the gap events were received - /// * `count` - Number of gap events to record - pub fn record_gap_events(&self, relay: &str, count: u64) { - self.gap_events_total - .with_label_values(&[relay]) - .inc_by(count); + /// Only events that are new AND pass write policy should be counted. + /// Duplicates and rejected events are not counted. + pub fn record_synced_event(&self) { + self.events_synced_total.inc(); } // === Summary Recording Methods === @@ -317,24 +262,6 @@ impl SyncMetrics { } } -/// Event source types for metrics tracking. -/// -/// These constants are used as labels for the `ngit_sync_events_total` metric -/// to categorize events by how they were discovered. -pub mod event_source { - /// Real-time subscription events received during live sync. - pub const LIVE: &str = "live"; - - /// Events from startup catchup when the relay first starts. - pub const STARTUP: &str = "startup"; - - /// Events from reconnection catchup after a relay reconnects. - pub const RECONNECT: &str = "reconnect"; - - /// Events from daily catchup for drift detection. - pub const DAILY: &str = "daily"; -} - #[cfg(test)] mod tests { use super::*; @@ -400,18 +327,10 @@ mod tests { let registry = create_test_registry(); let metrics = SyncMetrics::register(®istry).unwrap(); - // Record single events - metrics.record_event(event_source::LIVE); - metrics.record_event(event_source::STARTUP); - metrics.record_event(event_source::RECONNECT); - metrics.record_event(event_source::DAILY); - - // Record multiple events - metrics.record_events(event_source::STARTUP, 10); - - // Record gap events - metrics.record_gap_event("wss://relay1.example.com"); - metrics.record_gap_events("wss://relay2.example.com", 5); + // Record synced events + metrics.record_synced_event(); + metrics.record_synced_event(); + metrics.record_synced_event(); } #[test] @@ -431,15 +350,6 @@ mod tests { assert_eq!(metrics.get_connected_count(), 3); } - #[test] - fn test_event_source_constants() { - // Verify constants have expected values - assert_eq!(event_source::LIVE, "live"); - assert_eq!(event_source::STARTUP, "startup"); - assert_eq!(event_source::RECONNECT, "reconnect"); - assert_eq!(event_source::DAILY, "daily"); - } - #[test] fn test_duplicate_registration_fails() { let registry = create_test_registry(); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 88608b1..8581fb6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -23,7 +23,7 @@ pub mod self_subscriber; pub use algorithms::{AddFilters, RelaySyncNeeds}; // Re-export metrics types -pub use metrics::{event_source, SyncMetrics}; +pub use metrics::SyncMetrics; // Re-export relay connection types pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; @@ -151,6 +151,17 @@ pub enum SyncMethod { Negentropy, } +/// Result of processing an event from sync +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProcessResult { + /// Event was new and saved to database + Saved, + /// Event already existed in database + Duplicate, + /// Event rejected by write policy + Rejected, +} + /// A batch of items pending confirmation #[derive(Debug, Clone)] pub struct PendingBatch { @@ -853,20 +864,11 @@ impl SyncManager { tokio::spawn(async move { let mut disconnect_sent = false; - let mut eose_received = false; while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { - if let Some(ref metrics) = metrics_clone { - let source = if eose_received { - event_source::LIVE - } else { - event_source::STARTUP - }; - metrics.record_event(source); - } - Self::process_event_static( + let result = Self::process_event_static( &event, &relay_url_clone, &database, @@ -874,9 +876,14 @@ impl SyncManager { &local_relay, ) .await; + // Only record metric when event is actually saved + if result == ProcessResult::Saved { + if let Some(ref metrics) = metrics_clone { + metrics.record_synced_event(); + } + } } RelayEvent::EndOfStoredEvents(sub_id) => { - eose_received = true; tracing::debug!( relay = %relay_url_clone, sub_id = %sub_id, @@ -1055,7 +1062,6 @@ impl SyncManager { // Record reconnect-specific metrics (not basic connection metrics) if let Some(ref metrics) = self.metrics { metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); - metrics.record_event(event_source::RECONNECT); } // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) @@ -1376,24 +1382,26 @@ impl SyncManager { /// - Write policy validation /// - Database save /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) + /// + /// Returns `ProcessResult` to indicate whether the event was saved, duplicate, or rejected. async fn process_event_static( event: &Event, relay_url: &str, database: &SharedDatabase, write_policy: &Nip34WritePolicy, local_relay: &LocalRelay, - ) { + ) -> ProcessResult { use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; // Check if event already exists match database.event_by_id(&event.id).await { Ok(Some(_)) => { tracing::trace!(event_id = %event.id, "Event already exists, skipping"); - return; + return ProcessResult::Duplicate; } Err(e) => { tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); - return; + return ProcessResult::Rejected; } Ok(None) => {} // Continue processing } @@ -1412,7 +1420,7 @@ impl SyncManager { error = %e, "Failed to save synced event" ); - return; + return ProcessResult::Rejected; } // Broadcast to WebSocket subscribers (enables recursive relay discovery) @@ -1426,6 +1434,7 @@ impl SyncManager { broadcast = broadcast_success, "Synced event saved and broadcast" ); + ProcessResult::Saved } PolicyResult::Reject(reason) => { tracing::debug!( @@ -1434,6 +1443,7 @@ impl SyncManager { reason = %reason, "Event rejected by write policy" ); + ProcessResult::Rejected } } } @@ -1778,7 +1788,14 @@ impl SyncManager { /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex /// - /// This method subscribes to filters with `limit: 0` for receiving ongoing events. + /// This method applies limit(0) to all filters to receive ONLY new events. + /// Per NIP-01, limit 0 means "send no stored events, only future events", which + /// ensures EOSE is received immediately and all subsequent events are tagged as "live" + /// in metrics (not "startup"). + /// + /// **Important**: Callers pass the SAME filters to both sync_live() and historic_sync(). + /// This method applies limit(0) to prevent fetching historic events. + /// /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have /// a definite "completion" - they stay open indefinitely. /// @@ -1788,7 +1805,7 @@ impl SyncManager { /// /// # Arguments /// * `relay_url` - The relay URL to subscribe on - /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) + /// * `filters` - Filters to subscribe to (limit(0) will be applied) /// /// # Returns /// Vec of subscription IDs for the live subscriptions, or empty if connection not found @@ -1808,10 +1825,12 @@ impl SyncManager { let mut sub_ids = Vec::new(); for filter in filters.iter() { - // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) + // Live subscriptions MUST use limit(0) to receive ONLY new events + // This prevents fetching historic events that would be miscounted as "live" in metrics + // The caller passes the same filters to both sync_live() and historic_sync() // Live subscriptions do NOT auto-close - we want them to stay open for new events match connection - .subscribe_filter(filter.clone().limit(1), false) + .subscribe_filter(filter.clone().limit(0), false) .await { Ok(sub_id) => { -- cgit v1.2.3