From 02a90c109d4d08c6a54184f821c100f4eba92545 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 15:53:48 +0000 Subject: Simplify sync metrics to track only newly saved events Replace broken event counting that occurred before duplicate/policy checks with accurate tracking of events that are new, accepted, and saved. Changes: - Added ProcessResult enum to track event processing outcomes - Modified process_event_static() to return ProcessResult - Replaced events_total (with source labels) with events_synced_total - Removed gap_events_total and event_source module - Removed eose_received flag (EOSE is per-subscription, not suitable) - Updated all tests to use new simplified API The new ngit_sync_events_synced_total metric only counts events that: 1. Are new (not duplicates) 2. Pass write policy validation 3. Are successfully saved to database All 165 tests pass (124 lib + 41 integration) --- src/sync/mod.rs | 61 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 21 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 88608b1..8581fb6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -23,7 +23,7 @@ pub mod self_subscriber; pub use algorithms::{AddFilters, RelaySyncNeeds}; // Re-export metrics types -pub use metrics::{event_source, SyncMetrics}; +pub use metrics::SyncMetrics; // Re-export relay connection types pub use relay_connection::{NegentropySyncResult, RelayConnection, RelayEvent}; @@ -151,6 +151,17 @@ pub enum SyncMethod { Negentropy, } +/// Result of processing an event from sync +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProcessResult { + /// Event was new and saved to database + Saved, + /// Event already existed in database + Duplicate, + /// Event rejected by write policy + Rejected, +} + /// A batch of items pending confirmation #[derive(Debug, Clone)] pub struct PendingBatch { @@ -853,20 +864,11 @@ impl SyncManager { tokio::spawn(async move { let mut disconnect_sent = false; - let mut eose_received = false; while let Some(relay_event) = event_rx.recv().await { match relay_event { RelayEvent::Event(event) => { - if let Some(ref metrics) = metrics_clone { - let source = if eose_received { - event_source::LIVE - } else { - event_source::STARTUP - }; - metrics.record_event(source); - } - Self::process_event_static( + let result = Self::process_event_static( &event, &relay_url_clone, &database, @@ -874,9 +876,14 @@ impl SyncManager { &local_relay, ) .await; + // Only record metric when event is actually saved + if result == ProcessResult::Saved { + if let Some(ref metrics) = metrics_clone { + metrics.record_synced_event(); + } + } } RelayEvent::EndOfStoredEvents(sub_id) => { - eose_received = true; tracing::debug!( relay = %relay_url_clone, sub_id = %sub_id, @@ -1055,7 +1062,6 @@ impl SyncManager { // Record reconnect-specific metrics (not basic connection metrics) if let Some(ref metrics) = self.metrics { metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); - metrics.record_event(event_source::RECONNECT); } // Step 2: L1 live + L1 historic with since filter (or full sync if announcements never completed) @@ -1376,24 +1382,26 @@ impl SyncManager { /// - Write policy validation /// - Database save /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) + /// + /// Returns `ProcessResult` to indicate whether the event was saved, duplicate, or rejected. async fn process_event_static( event: &Event, relay_url: &str, database: &SharedDatabase, write_policy: &Nip34WritePolicy, local_relay: &LocalRelay, - ) { + ) -> ProcessResult { use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; // Check if event already exists match database.event_by_id(&event.id).await { Ok(Some(_)) => { tracing::trace!(event_id = %event.id, "Event already exists, skipping"); - return; + return ProcessResult::Duplicate; } Err(e) => { tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); - return; + return ProcessResult::Rejected; } Ok(None) => {} // Continue processing } @@ -1412,7 +1420,7 @@ impl SyncManager { error = %e, "Failed to save synced event" ); - return; + return ProcessResult::Rejected; } // Broadcast to WebSocket subscribers (enables recursive relay discovery) @@ -1426,6 +1434,7 @@ impl SyncManager { broadcast = broadcast_success, "Synced event saved and broadcast" ); + ProcessResult::Saved } PolicyResult::Reject(reason) => { tracing::debug!( @@ -1434,6 +1443,7 @@ impl SyncManager { reason = %reason, "Event rejected by write policy" ); + ProcessResult::Rejected } } } @@ -1778,7 +1788,14 @@ impl SyncManager { /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex /// - /// This method subscribes to filters with `limit: 0` for receiving ongoing events. + /// This method applies limit(0) to all filters to receive ONLY new events. + /// Per NIP-01, limit 0 means "send no stored events, only future events", which + /// ensures EOSE is received immediately and all subsequent events are tagged as "live" + /// in metrics (not "startup"). + /// + /// **Important**: Callers pass the SAME filters to both sync_live() and historic_sync(). + /// This method applies limit(0) to prevent fetching historic events. + /// /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have /// a definite "completion" - they stay open indefinitely. /// @@ -1788,7 +1805,7 @@ impl SyncManager { /// /// # Arguments /// * `relay_url` - The relay URL to subscribe on - /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) + /// * `filters` - Filters to subscribe to (limit(0) will be applied) /// /// # Returns /// Vec of subscription IDs for the live subscriptions, or empty if connection not found @@ -1808,10 +1825,12 @@ impl SyncManager { let mut sub_ids = Vec::new(); for filter in filters.iter() { - // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) + // Live subscriptions MUST use limit(0) to receive ONLY new events + // This prevents fetching historic events that would be miscounted as "live" in metrics + // The caller passes the same filters to both sync_live() and historic_sync() // Live subscriptions do NOT auto-close - we want them to stay open for new events match connection - .subscribe_filter(filter.clone().limit(1), false) + .subscribe_filter(filter.clone().limit(0), false) .await { Ok(sub_id) => { -- cgit v1.2.3