upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:39:44 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:39:44 +0000
commit433d78640e887a5503afa3cda1840c8047fcb6d0 (patch)
tree950e20eeda8ab3775547055725c7987732ac9866 /src
parentb32772848f7325d4b3e1e15b05c5163df0b9671b (diff)
sync: implement daily timer for periodic fresh sync
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs153
1 files changed, 142 insertions, 11 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index b5133e6..a3189a3 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -32,10 +32,11 @@ pub use health::RelayHealthTracker;
32 32
33use std::collections::{HashMap, HashSet}; 33use std::collections::{HashMap, HashSet};
34use std::sync::Arc; 34use std::sync::Arc;
35use std::time::Duration;
35 36
36use nostr_sdk::prelude::*; 37use nostr_sdk::prelude::*;
37use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; 38use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
38use tokio::sync::RwLock; 39use tokio::sync::{Mutex, RwLock};
39 40
40use crate::config::Config; 41use crate::config::Config;
41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
@@ -352,6 +353,60 @@ const CONSOLIDATION_THRESHOLD: usize = 70;
352/// Maximum time to wait for pending batches (30 seconds) 353/// Maximum time to wait for pending batches (30 seconds)
353const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; 354const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30;
354 355
356// =============================================================================
357// Daily Timer (Phase 7)
358// =============================================================================
359
360/// Run the daily timer for periodic fresh syncs
361///
362/// This function runs in a loop, sleeping for a random interval between
363/// 23-25 hours, then triggering a daily sync for all relays. The random
364/// interval prevents thundering herd effects across multiple ngit-grasp instances.
365///
366/// The daily sync:
367/// - Unsubscribes from all current subscriptions
368/// - Clears pending batches and sync state
369/// - Re-discovers all repos and events from scratch
370///
371/// This detects state drift over time that might occur from missed events.
372async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) {
373 use rand::Rng;
374
375 loop {
376 // Random interval between 23-25 hours
377 let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0;
378 let seconds = (hours * 3600.0) as u64;
379
380 tracing::info!(
381 hours = format!("{:.1}", hours),
382 "Daily timer scheduled to fire in {:.1} hours",
383 hours
384 );
385
386 tokio::time::sleep(Duration::from_secs(seconds)).await;
387
388 // Get list of relays
389 let relay_urls: Vec<String> = {
390 let manager = sync_manager.lock().await;
391 let index = manager.relay_sync_index.read().await;
392 let urls: Vec<String> = index.keys().cloned().collect();
393 drop(index);
394 urls
395 };
396
397 tracing::info!(
398 relay_count = relay_urls.len(),
399 "Daily timer fired, starting daily sync for all relays"
400 );
401
402 // Trigger daily sync for each relay
403 for relay_url in relay_urls {
404 let mut manager = sync_manager.lock().await;
405 manager.daily_sync(&relay_url).await;
406 }
407 }
408}
409
355/// Manages proactive synchronization with external relays 410/// Manages proactive synchronization with external relays
356/// 411///
357/// The SyncManager runs as a background task, subscribing to repository 412/// The SyncManager runs as a background task, subscribing to repository
@@ -526,13 +581,69 @@ impl SyncManager {
526 } 581 }
527 } 582 }
528 583
584 /// Perform a daily sync for a specific relay
585 ///
586 /// This method:
587 /// - Unsubscribes from all current subscriptions on the relay
588 /// - Clears pending batches for this relay
589 /// - Clears sync state (repos and root_events) in RelayState
590 /// - Recomputes actions to re-discover all repos/events
591 ///
592 /// This is triggered by the daily timer to detect state drift over time.
593 async fn daily_sync(&mut self, relay_url: &str) {
594 tracing::info!(relay = %relay_url, "Starting daily sync");
595
596 // Get connection
597 let connection = match self.connections.get(relay_url) {
598 Some(conn) => conn,
599 None => {
600 tracing::warn!(
601 relay = %relay_url,
602 "No connection for relay, skipping daily sync"
603 );
604 return;
605 }
606 };
607
608 // Unsubscribe all current subscriptions
609 connection.unsubscribe_all().await;
610
611 // Clear pending batches for this relay
612 {
613 let mut pending = self.pending_sync_index.write().await;
614 pending.remove(relay_url);
615 }
616
617 // Get relay state and clear sync state (repos and root_events)
618 {
619 let mut index = self.relay_sync_index.write().await;
620 if let Some(state) = index.get_mut(relay_url) {
621 let repos_cleared = state.repos.len();
622 let events_cleared = state.root_events.len();
623 state.clear_sync_state();
624 tracing::debug!(
625 relay = %relay_url,
626 repos_cleared = repos_cleared,
627 events_cleared = events_cleared,
628 "Cleared sync state for daily sync"
629 );
630 }
631 }
632
633 // Recompute actions - will discover all repos/events again
634 self.recompute_actions_for_relay(relay_url).await;
635
636 tracing::info!(relay = %relay_url, "Daily sync complete");
637 }
638
529 /// Run the sync manager 639 /// Run the sync manager
530 /// 640 ///
531 /// Coordinates all sync components: 641 /// Coordinates all sync components:
532 /// 1. Spawns self-subscriber to monitor own relay for announcements 642 /// 1. Spawns self-subscriber to monitor own relay for announcements
533 /// 2. Connects to bootstrap relay if configured 643 /// 2. Spawns daily timer for periodic fresh syncs
534 /// 3. Handles relay actions from self-subscriber 644 /// 3. Connects to bootstrap relay if configured
535 /// 4. Handles disconnect notifications from spawned relay tasks 645 /// 4. Handles relay actions from self-subscriber
646 /// 5. Handles disconnect, EOSE, and connect notifications from spawned relay tasks
536 pub async fn run(mut self) { 647 pub async fn run(mut self) {
537 use tokio::sync::mpsc; 648 use tokio::sync::mpsc;
538 649
@@ -569,12 +680,22 @@ impl SyncManager {
569 self.connect_tx = Some(connect_tx.clone()); 680 self.connect_tx = Some(connect_tx.clone());
570 681
571 // 6. Connect to bootstrap relay if configured 682 // 6. Connect to bootstrap relay if configured
572 if let Some(ref bootstrap_url) = self.bootstrap_relay_url { 683 if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() {
573 self.spawn_relay_connection(bootstrap_url.clone()).await; 684 self.spawn_relay_connection(bootstrap_url.clone()).await;
574 } 685 }
575 686
576 // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 687 // 7. Wrap self in Arc<Mutex> for sharing with timer task
688 let sync_manager = Arc::new(Mutex::new(self));
689
690 // 8. Spawn daily timer task
691 let timer_manager = Arc::clone(&sync_manager);
692 tokio::spawn(async move {
693 run_daily_timer(timer_manager).await;
694 });
695
696 // 9. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
577 loop { 697 loop {
698 // Wait for an event without holding the lock
578 tokio::select! { 699 tokio::select! {
579 action = action_rx.recv() => { 700 action = action_rx.recv() => {
580 match action { 701 match action {
@@ -597,7 +718,9 @@ impl SyncManager {
597 filters, 718 filters,
598 }; 719 };
599 720
600 self.handle_add_filters(action).await; 721 // Acquire lock to process action
722 let mut manager = sync_manager.lock().await;
723 manager.handle_add_filters(action).await;
601 } 724 }
602 Some(RelayAction::AddFilters { relay_url, repos }) => { 725 Some(RelayAction::AddFilters { relay_url, repos }) => {
603 // Convert to AddFilters format and use unified handler 726 // Convert to AddFilters format and use unified handler
@@ -618,7 +741,9 @@ impl SyncManager {
618 filters, 741 filters,
619 }; 742 };
620 743
621 self.handle_add_filters(action).await; 744 // Acquire lock to process action
745 let mut manager = sync_manager.lock().await;
746 manager.handle_add_filters(action).await;
622 } 747 }
623 None => break, 748 None => break,
624 } 749 }
@@ -626,7 +751,9 @@ impl SyncManager {
626 disconnect = disconnect_rx.recv() => { 751 disconnect = disconnect_rx.recv() => {
627 match disconnect { 752 match disconnect {
628 Some(notification) => { 753 Some(notification) => {
629 self.handle_disconnect(&notification.relay_url).await; 754 // Acquire lock to process disconnect
755 let mut manager = sync_manager.lock().await;
756 manager.handle_disconnect(&notification.relay_url).await;
630 } 757 }
631 None => { 758 None => {
632 // All disconnect senders dropped - unlikely but handle gracefully 759 // All disconnect senders dropped - unlikely but handle gracefully
@@ -637,7 +764,9 @@ impl SyncManager {
637 eose = eose_rx.recv() => { 764 eose = eose_rx.recv() => {
638 match eose { 765 match eose {
639 Some(notification) => { 766 Some(notification) => {
640 self.handle_eose(&notification.relay_url, notification.sub_id).await; 767 // Acquire lock to process EOSE
768 let mut manager = sync_manager.lock().await;
769 manager.handle_eose(&notification.relay_url, notification.sub_id).await;
641 } 770 }
642 None => { 771 None => {
643 // All EOSE senders dropped - unlikely but handle gracefully 772 // All EOSE senders dropped - unlikely but handle gracefully
@@ -648,7 +777,9 @@ impl SyncManager {
648 connect = connect_rx.recv() => { 777 connect = connect_rx.recv() => {
649 match connect { 778 match connect {
650 Some(notification) => { 779 Some(notification) => {
651 self.handle_connect_or_reconnect(&notification.relay_url).await; 780 // Acquire lock to process connect
781 let mut manager = sync_manager.lock().await;
782 manager.handle_connect_or_reconnect(&notification.relay_url).await;
652 } 783 }
653 None => { 784 None => {
654 // All connect senders dropped - unlikely but handle gracefully 785 // All connect senders dropped - unlikely but handle gracefully