From 433d78640e887a5503afa3cda1840c8047fcb6d0 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 12:39:44 +0000 Subject: sync: implement daily timer for periodic fresh sync --- src/sync/mod.rs | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 142 insertions(+), 11 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index b5133e6..a3189a3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -32,10 +32,11 @@ pub use health::RelayHealthTracker; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use nostr_sdk::prelude::*; use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use crate::config::Config; use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; @@ -352,6 +353,60 @@ const CONSOLIDATION_THRESHOLD: usize = 70; /// Maximum time to wait for pending batches (30 seconds) const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; +// ============================================================================= +// Daily Timer (Phase 7) +// ============================================================================= + +/// Run the daily timer for periodic fresh syncs +/// +/// This function runs in a loop, sleeping for a random interval between +/// 23-25 hours, then triggering a daily sync for all relays. The random +/// interval prevents thundering herd effects across multiple ngit-grasp instances. +/// +/// The daily sync: +/// - Unsubscribes from all current subscriptions +/// - Clears pending batches and sync state +/// - Re-discovers all repos and events from scratch +/// +/// This detects state drift over time that might occur from missed events. +async fn run_daily_timer(sync_manager: Arc>) { + use rand::Rng; + + loop { + // Random interval between 23-25 hours + let hours = 23.0 + rand::thread_rng().gen::() * 2.0; + let seconds = (hours * 3600.0) as u64; + + tracing::info!( + hours = format!("{:.1}", hours), + "Daily timer scheduled to fire in {:.1} hours", + hours + ); + + tokio::time::sleep(Duration::from_secs(seconds)).await; + + // Get list of relays + let relay_urls: Vec = { + let manager = sync_manager.lock().await; + let index = manager.relay_sync_index.read().await; + let urls: Vec = index.keys().cloned().collect(); + drop(index); + urls + }; + + tracing::info!( + relay_count = relay_urls.len(), + "Daily timer fired, starting daily sync for all relays" + ); + + // Trigger daily sync for each relay + for relay_url in relay_urls { + let mut manager = sync_manager.lock().await; + manager.daily_sync(&relay_url).await; + } + } +} + /// Manages proactive synchronization with external relays /// /// The SyncManager runs as a background task, subscribing to repository @@ -526,13 +581,69 @@ impl SyncManager { } } + /// Perform a daily sync for a specific relay + /// + /// This method: + /// - Unsubscribes from all current subscriptions on the relay + /// - Clears pending batches for this relay + /// - Clears sync state (repos and root_events) in RelayState + /// - Recomputes actions to re-discover all repos/events + /// + /// This is triggered by the daily timer to detect state drift over time. + async fn daily_sync(&mut self, relay_url: &str) { + tracing::info!(relay = %relay_url, "Starting daily sync"); + + // Get connection + let connection = match self.connections.get(relay_url) { + Some(conn) => conn, + None => { + tracing::warn!( + relay = %relay_url, + "No connection for relay, skipping daily sync" + ); + return; + } + }; + + // Unsubscribe all current subscriptions + connection.unsubscribe_all().await; + + // Clear pending batches for this relay + { + let mut pending = self.pending_sync_index.write().await; + pending.remove(relay_url); + } + + // Get relay state and clear sync state (repos and root_events) + { + let mut index = self.relay_sync_index.write().await; + if let Some(state) = index.get_mut(relay_url) { + let repos_cleared = state.repos.len(); + let events_cleared = state.root_events.len(); + state.clear_sync_state(); + tracing::debug!( + relay = %relay_url, + repos_cleared = repos_cleared, + events_cleared = events_cleared, + "Cleared sync state for daily sync" + ); + } + } + + // Recompute actions - will discover all repos/events again + self.recompute_actions_for_relay(relay_url).await; + + tracing::info!(relay = %relay_url, "Daily sync complete"); + } + /// Run the sync manager /// /// Coordinates all sync components: /// 1. Spawns self-subscriber to monitor own relay for announcements - /// 2. Connects to bootstrap relay if configured - /// 3. Handles relay actions from self-subscriber - /// 4. Handles disconnect notifications from spawned relay tasks + /// 2. Spawns daily timer for periodic fresh syncs + /// 3. Connects to bootstrap relay if configured + /// 4. Handles relay actions from self-subscriber + /// 5. Handles disconnect, EOSE, and connect notifications from spawned relay tasks pub async fn run(mut self) { use tokio::sync::mpsc; @@ -569,12 +680,22 @@ impl SyncManager { self.connect_tx = Some(connect_tx.clone()); // 6. Connect to bootstrap relay if configured - if let Some(ref bootstrap_url) = self.bootstrap_relay_url { + if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { self.spawn_relay_connection(bootstrap_url.clone()).await; } - // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 7. Wrap self in Arc for sharing with timer task + let sync_manager = Arc::new(Mutex::new(self)); + + // 8. Spawn daily timer task + let timer_manager = Arc::clone(&sync_manager); + tokio::spawn(async move { + run_daily_timer(timer_manager).await; + }); + + // 9. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { + // Wait for an event without holding the lock tokio::select! { action = action_rx.recv() => { match action { @@ -597,7 +718,9 @@ impl SyncManager { filters, }; - self.handle_add_filters(action).await; + // Acquire lock to process action + let mut manager = sync_manager.lock().await; + manager.handle_add_filters(action).await; } Some(RelayAction::AddFilters { relay_url, repos }) => { // Convert to AddFilters format and use unified handler @@ -618,7 +741,9 @@ impl SyncManager { filters, }; - self.handle_add_filters(action).await; + // Acquire lock to process action + let mut manager = sync_manager.lock().await; + manager.handle_add_filters(action).await; } None => break, } @@ -626,7 +751,9 @@ impl SyncManager { disconnect = disconnect_rx.recv() => { match disconnect { Some(notification) => { - self.handle_disconnect(¬ification.relay_url).await; + // Acquire lock to process disconnect + let mut manager = sync_manager.lock().await; + manager.handle_disconnect(¬ification.relay_url).await; } None => { // All disconnect senders dropped - unlikely but handle gracefully @@ -637,7 +764,9 @@ impl SyncManager { eose = eose_rx.recv() => { match eose { Some(notification) => { - self.handle_eose(¬ification.relay_url, notification.sub_id).await; + // Acquire lock to process EOSE + let mut manager = sync_manager.lock().await; + manager.handle_eose(¬ification.relay_url, notification.sub_id).await; } None => { // All EOSE senders dropped - unlikely but handle gracefully @@ -648,7 +777,9 @@ impl SyncManager { connect = connect_rx.recv() => { match connect { Some(notification) => { - self.handle_connect_or_reconnect(¬ification.relay_url).await; + // Acquire lock to process connect + let mut manager = sync_manager.lock().await; + manager.handle_connect_or_reconnect(¬ification.relay_url).await; } None => { // All connect senders dropped - unlikely but handle gracefully -- cgit v1.2.3