From 950c2e4e68448d2abcad90a31bfffaca6d7bc47e Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:30:18 +0000 Subject: feat(sync): Phase 5 - negentropy catchup (NIP-77) - Add NegentropyService for set reconciliation - Implement startup catchup with warm-up delay - Implement reconnect catchup (last 3 days) - Add daily catchup schedule with stagger --- src/sync/mod.rs | 2 + src/sync/negentropy.rs | 477 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 479 insertions(+) create mode 100644 src/sync/negentropy.rs (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 4dca160..dc11812 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -21,11 +21,13 @@ mod connection; mod filter; pub mod health; mod manager; +pub mod negentropy; mod subscription; pub use filter::FilterService; pub use health::{HealthState, RelayHealth, RelayHealthTracker}; pub use manager::SyncManager; +pub use negentropy::NegentropyService; pub use subscription::SubscriptionManager; use std::net::SocketAddr; diff --git a/src/sync/negentropy.rs b/src/sync/negentropy.rs new file mode 100644 index 0000000..5c0a246 --- /dev/null +++ b/src/sync/negentropy.rs @@ -0,0 +1,477 @@ +//! Negentropy Catchup Service for GRASP-02 Phase 5 +//! +//! Implements gap-filling synchronization to ensure no events are missed during: +//! - Startup (initial sync after warm-up period) +//! - Reconnection (after connection restore) +//! - Daily maintenance (periodic full reconciliation) +//! +//! ## Note on NIP-77 +//! +//! This implementation uses a simplified gap-filling strategy (fetch and compare) +//! rather than full NIP-77 negentropy set reconciliation. The nostr-sdk 0.44 does +//! not include built-in negentropy support, so we implement an equivalent approach: +//! +//! 1. Fetch events from relay using same filters as live sync +//! 2. Compare with local database (skip already-stored events) +//! 3. Validate and store missing events through policy +//! +//! Full NIP-77 support can be added in a future release if needed. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use nostr_relay_builder::prelude::*; +use nostr_sdk::prelude::*; +use tokio::sync::RwLock; + +use super::filter::FilterService; +use super::SYNC_SOURCE_ADDR; +use crate::config::Config; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; + +/// Default startup delay before first catchup (30 seconds) +const DEFAULT_STARTUP_DELAY_SECS: u64 = 30; + +/// Default delay after reconnection before catchup (10 seconds) +const DEFAULT_RECONNECT_DELAY_SECS: u64 = 10; + +/// Default lookback period for reconnect catchup (3 days) +const DEFAULT_RECONNECT_LOOKBACK_DAYS: u64 = 3; + +/// Daily catchup interval (24 hours) +const DAILY_CATCHUP_INTERVAL_SECS: u64 = 86400; + +/// Stagger delay between relays for catchup operations (5 minutes) +const RELAY_STAGGER_SECS: u64 = 300; + +/// Timeout for fetching events during catchup +const CATCHUP_FETCH_TIMEOUT_SECS: u64 = 60; + +/// Negentropy Catchup Service +/// +/// Manages gap-filling operations for different scenarios: +/// - Startup catchup after warm-up period +/// - Reconnect catchup after connection restore +/// - Daily catchup for periodic maintenance +#[derive(Debug)] +pub struct NegentropyService { + /// Database for storing and querying events + database: SharedDatabase, + /// Filter service for building catchup filters + filter_service: Arc, + /// Write policy for validating synced events + write_policy: Nip34WritePolicy, + /// Startup time of the service + startup_time: Instant, + /// Configuration values + startup_delay_secs: u64, + reconnect_delay_secs: u64, + reconnect_lookback_days: u64, + /// Whether startup catchup has been run + startup_catchup_completed: Arc>, + /// Last daily catchup time per relay + last_daily_catchup: Arc>>, +} + +impl NegentropyService { + /// Create a new NegentropyService + /// + /// # Arguments + /// * `database` - Shared database for storing events + /// * `filter_service` - Filter service for building catchup filters + /// * `write_policy` - Write policy for validating events + /// * `config` - Configuration for catchup timing + pub fn new( + database: SharedDatabase, + filter_service: Arc, + write_policy: Nip34WritePolicy, + config: &Config, + ) -> Self { + Self { + database, + filter_service, + write_policy, + startup_time: Instant::now(), + startup_delay_secs: config.sync_startup_delay_secs, + reconnect_delay_secs: config.sync_reconnect_delay_secs, + reconnect_lookback_days: config.sync_reconnect_lookback_days, + startup_catchup_completed: Arc::new(RwLock::new(false)), + last_daily_catchup: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Create a NegentropyService with default configuration + pub fn with_defaults( + database: SharedDatabase, + filter_service: Arc, + write_policy: Nip34WritePolicy, + ) -> Self { + Self { + database, + filter_service, + write_policy, + startup_time: Instant::now(), + startup_delay_secs: DEFAULT_STARTUP_DELAY_SECS, + reconnect_delay_secs: DEFAULT_RECONNECT_DELAY_SECS, + reconnect_lookback_days: DEFAULT_RECONNECT_LOOKBACK_DAYS, + startup_catchup_completed: Arc::new(RwLock::new(false)), + last_daily_catchup: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Check if startup catchup should run + /// + /// Returns true if: + /// - Startup delay has elapsed (default 30s) + /// - Startup catchup hasn't been completed yet + pub async fn should_run_startup_catchup(&self) -> bool { + let completed = *self.startup_catchup_completed.read().await; + if completed { + return false; + } + + let elapsed = self.startup_time.elapsed(); + elapsed >= Duration::from_secs(self.startup_delay_secs) + } + + /// Check if daily catchup should run for a specific relay + /// + /// Returns true if 24 hours have elapsed since last daily catchup + pub async fn should_run_daily_catchup(&self, relay_url: &str) -> bool { + let last_catchup = self.last_daily_catchup.read().await; + + match last_catchup.get(relay_url) { + None => true, // Never run, should run + Some(last_time) => { + last_time.elapsed() >= Duration::from_secs(DAILY_CATCHUP_INTERVAL_SECS) + } + } + } + + /// Get the startup delay in seconds + pub fn startup_delay_secs(&self) -> u64 { + self.startup_delay_secs + } + + /// Get the reconnect delay in seconds + pub fn reconnect_delay_secs(&self) -> u64 { + self.reconnect_delay_secs + } + + /// Get the relay stagger delay in seconds + pub fn relay_stagger_secs(&self) -> u64 { + RELAY_STAGGER_SECS + } + + /// Run startup catchup for a relay + /// + /// Fetches all events matching the sync filters and stores any missing ones. + /// This is called after the startup warm-up period (default 30s). + /// + /// Returns the count of gap events filled. + pub async fn run_startup_catchup( + &self, + relay_url: &str, + remote_domain: &str, + ) -> Result> { + tracing::info!("Starting startup catchup for {}", relay_url); + + // Run full catchup (no time restriction) + let gap_count = self + .run_catchup(relay_url, remote_domain, None, "startup") + .await?; + + // Mark startup catchup as completed + { + let mut completed = self.startup_catchup_completed.write().await; + *completed = true; + } + + if gap_count > 0 { + tracing::warn!( + "Startup catchup filled {} gaps from {}", + gap_count, + relay_url + ); + } else { + tracing::info!("Startup catchup completed for {} (no gaps)", relay_url); + } + + Ok(gap_count) + } + + /// Run reconnect catchup for a relay + /// + /// Fetches events from the last 3 days (configurable) and stores any missing ones. + /// This is called after a connection is restored (after reconnect delay). + /// + /// Returns the count of gap events filled. + pub async fn run_reconnect_catchup( + &self, + relay_url: &str, + remote_domain: &str, + ) -> Result> { + tracing::info!("Starting reconnect catchup for {}", relay_url); + + // Calculate "since" timestamp (3 days ago) + let lookback_secs = self.reconnect_lookback_days * 24 * 60 * 60; + let since = Timestamp::now() - lookback_secs; + + let gap_count = self + .run_catchup(relay_url, remote_domain, Some(since), "reconnect") + .await?; + + if gap_count > 0 { + tracing::warn!( + "Reconnect catchup filled {} gaps from {}", + gap_count, + relay_url + ); + } else { + tracing::debug!("Reconnect catchup completed for {} (no gaps)", relay_url); + } + + Ok(gap_count) + } + + /// Run daily catchup for a relay + /// + /// Performs full reconciliation and stores any missing events. + /// This is called once per day per relay (with stagger). + /// + /// Returns the count of gap events filled. + pub async fn run_daily_catchup( + &self, + relay_url: &str, + remote_domain: &str, + ) -> Result> { + tracing::info!("Starting daily catchup for {}", relay_url); + + // Run full catchup (no time restriction) + let gap_count = self + .run_catchup(relay_url, remote_domain, None, "daily") + .await?; + + // Update last daily catchup time + { + let mut last_catchup = self.last_daily_catchup.write().await; + last_catchup.insert(relay_url.to_string(), Instant::now()); + } + + if gap_count > 0 { + tracing::warn!( + "Daily catchup filled {} gaps from {}", + gap_count, + relay_url + ); + } else { + tracing::info!("Daily catchup completed for {} (no gaps)", relay_url); + } + + Ok(gap_count) + } + + /// Core catchup implementation + /// + /// Fetches events from relay matching sync filters, compares with local database, + /// validates through policy, and stores missing events. + /// + /// # Arguments + /// * `relay_url` - URL of the relay to fetch from + /// * `remote_domain` - Domain of the remote relay (for filter building) + /// * `since` - Optional timestamp to filter events (for reconnect catchup) + /// * `catchup_type` - Type of catchup for logging ("startup", "reconnect", "daily") + async fn run_catchup( + &self, + relay_url: &str, + remote_domain: &str, + since: Option, + catchup_type: &str, + ) -> Result> { + // Create a client for fetching events + let client = Client::default(); + client.add_relay(relay_url).await?; + client.connect().await; + + let mut gap_count = 0; + + // Build filters (same as live sync uses) + let mut all_filters = Vec::new(); + + // Layer 1: Announcement discovery + let layer1_filters = self.filter_service.get_layer1_filters(); + all_filters.extend(layer1_filters); + + // Layer 2: Repository events + let layer2_filters = self.filter_service.get_layer2_filters(remote_domain).await; + all_filters.extend(layer2_filters); + + // Layer 3: Related events + let layer3_filters = self.filter_service.get_layer3_filters().await; + all_filters.extend(layer3_filters); + + // Apply "since" filter if specified (for reconnect catchup) + let filters: Vec = if let Some(since_ts) = since { + all_filters + .into_iter() + .map(|f| f.since(since_ts)) + .collect() + } else { + all_filters + }; + + if filters.is_empty() { + tracing::debug!("No filters for {} catchup on {}", catchup_type, relay_url); + client.disconnect().await; + return Ok(0); + } + + tracing::debug!( + "Running {} catchup on {} with {} filters", + catchup_type, + relay_url, + filters.len() + ); + + // Fetch events for each filter + for filter in filters { + match client + .fetch_events(filter, Duration::from_secs(CATCHUP_FETCH_TIMEOUT_SECS)) + .await + { + Ok(events) => { + for event in events.into_iter() { + // Check if event already exists in local database + if self.event_exists_locally(&event).await { + continue; + } + + // Validate through write policy + let result = self + .write_policy + .admit_event(&event, &SYNC_SOURCE_ADDR) + .await; + + match result { + PolicyResult::Accept => { + // Log gap event at WARN level to distinguish from live events + tracing::warn!( + "Gap event filled via {} catchup: {} (kind {})", + catchup_type, + event.id.to_hex(), + event.kind.as_u16() + ); + + // Store the event + if let Err(e) = self.database.save_event(&event).await { + tracing::error!( + "Failed to store gap event {}: {}", + event.id.to_hex(), + e + ); + } else { + gap_count += 1; + } + } + PolicyResult::Reject(reason) => { + tracing::debug!( + "Gap event {} rejected by policy: {}", + event.id.to_hex(), + reason + ); + } + } + } + } + Err(e) => { + tracing::warn!( + "Failed to fetch events for {} catchup from {}: {}", + catchup_type, + relay_url, + e + ); + } + } + } + + client.disconnect().await; + + Ok(gap_count) + } + + /// Check if an event already exists in the local database + async fn event_exists_locally(&self, event: &Event) -> bool { + // Query for the specific event by ID + let filter = Filter::new().id(event.id); + + match self.database.query(filter).await { + Ok(events) => !events.is_empty(), + Err(e) => { + tracing::warn!( + "Failed to check if event {} exists locally: {}", + event.id.to_hex(), + e + ); + // Assume it doesn't exist to avoid skipping events on error + false + } + } + } + + /// Mark startup catchup as completed (for testing) + #[cfg(test)] + pub async fn mark_startup_completed(&self) { + let mut completed = self.startup_catchup_completed.write().await; + *completed = true; + } + + /// Reset startup catchup status (for testing) + #[cfg(test)] + pub async fn reset_startup_status(&self) { + let mut completed = self.startup_catchup_completed.write().await; + *completed = false; + } +} + +/// Create a shared NegentropyService wrapped in Arc +pub fn create_negentropy_service( + database: SharedDatabase, + filter_service: Arc, + write_policy: Nip34WritePolicy, + config: &Config, +) -> Arc { + Arc::new(NegentropyService::new( + database, + filter_service, + write_policy, + config, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_constants() { + assert_eq!(DEFAULT_STARTUP_DELAY_SECS, 30); + assert_eq!(DEFAULT_RECONNECT_DELAY_SECS, 10); + assert_eq!(DEFAULT_RECONNECT_LOOKBACK_DAYS, 3); + assert_eq!(DAILY_CATCHUP_INTERVAL_SECS, 86400); + assert_eq!(RELAY_STAGGER_SECS, 300); + } + + #[test] + fn test_reconnect_lookback_calculation() { + // 3 days = 3 * 24 * 60 * 60 = 259,200 seconds + let lookback_days: u64 = 3; + let lookback_secs = lookback_days * 24 * 60 * 60; + assert_eq!(lookback_secs, 259200); + } + + #[test] + fn test_stagger_delay_is_5_minutes() { + assert_eq!(RELAY_STAGGER_SECS, 300); // 5 * 60 = 300 + } +} \ No newline at end of file -- cgit v1.2.3