diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:39:44 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:39:44 +0000 |
| commit | 433d78640e887a5503afa3cda1840c8047fcb6d0 (patch) | |
| tree | 950e20eeda8ab3775547055725c7987732ac9866 /src/sync/mod.rs | |
| parent | b32772848f7325d4b3e1e15b05c5163df0b9671b (diff) | |
sync: implement daily timer for periodic fresh sync
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 153 |
1 files changed, 142 insertions, 11 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index b5133e6..a3189a3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -32,10 +32,11 @@ pub use health::RelayHealthTracker; | |||
| 32 | 32 | ||
| 33 | use std::collections::{HashMap, HashSet}; | 33 | use std::collections::{HashMap, HashSet}; |
| 34 | use std::sync::Arc; | 34 | use std::sync::Arc; |
| 35 | use std::time::Duration; | ||
| 35 | 36 | ||
| 36 | use nostr_sdk::prelude::*; | 37 | use nostr_sdk::prelude::*; |
| 37 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; | 38 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; |
| 38 | use tokio::sync::RwLock; | 39 | use tokio::sync::{Mutex, RwLock}; |
| 39 | 40 | ||
| 40 | use crate::config::Config; | 41 | use crate::config::Config; |
| 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 42 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| @@ -352,6 +353,60 @@ const CONSOLIDATION_THRESHOLD: usize = 70; | |||
| 352 | /// Maximum time to wait for pending batches (30 seconds) | 353 | /// Maximum time to wait for pending batches (30 seconds) |
| 353 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; | 354 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; |
| 354 | 355 | ||
| 356 | // ============================================================================= | ||
| 357 | // Daily Timer (Phase 7) | ||
| 358 | // ============================================================================= | ||
| 359 | |||
| 360 | /// Run the daily timer for periodic fresh syncs | ||
| 361 | /// | ||
| 362 | /// This function runs in a loop, sleeping for a random interval between | ||
| 363 | /// 23-25 hours, then triggering a daily sync for all relays. The random | ||
| 364 | /// interval prevents thundering herd effects across multiple ngit-grasp instances. | ||
| 365 | /// | ||
| 366 | /// The daily sync: | ||
| 367 | /// - Unsubscribes from all current subscriptions | ||
| 368 | /// - Clears pending batches and sync state | ||
| 369 | /// - Re-discovers all repos and events from scratch | ||
| 370 | /// | ||
| 371 | /// This detects state drift over time that might occur from missed events. | ||
| 372 | async fn run_daily_timer(sync_manager: Arc<Mutex<SyncManager>>) { | ||
| 373 | use rand::Rng; | ||
| 374 | |||
| 375 | loop { | ||
| 376 | // Random interval between 23-25 hours | ||
| 377 | let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0; | ||
| 378 | let seconds = (hours * 3600.0) as u64; | ||
| 379 | |||
| 380 | tracing::info!( | ||
| 381 | hours = format!("{:.1}", hours), | ||
| 382 | "Daily timer scheduled to fire in {:.1} hours", | ||
| 383 | hours | ||
| 384 | ); | ||
| 385 | |||
| 386 | tokio::time::sleep(Duration::from_secs(seconds)).await; | ||
| 387 | |||
| 388 | // Get list of relays | ||
| 389 | let relay_urls: Vec<String> = { | ||
| 390 | let manager = sync_manager.lock().await; | ||
| 391 | let index = manager.relay_sync_index.read().await; | ||
| 392 | let urls: Vec<String> = index.keys().cloned().collect(); | ||
| 393 | drop(index); | ||
| 394 | urls | ||
| 395 | }; | ||
| 396 | |||
| 397 | tracing::info!( | ||
| 398 | relay_count = relay_urls.len(), | ||
| 399 | "Daily timer fired, starting daily sync for all relays" | ||
| 400 | ); | ||
| 401 | |||
| 402 | // Trigger daily sync for each relay | ||
| 403 | for relay_url in relay_urls { | ||
| 404 | let mut manager = sync_manager.lock().await; | ||
| 405 | manager.daily_sync(&relay_url).await; | ||
| 406 | } | ||
| 407 | } | ||
| 408 | } | ||
| 409 | |||
| 355 | /// Manages proactive synchronization with external relays | 410 | /// Manages proactive synchronization with external relays |
| 356 | /// | 411 | /// |
| 357 | /// The SyncManager runs as a background task, subscribing to repository | 412 | /// The SyncManager runs as a background task, subscribing to repository |
| @@ -526,13 +581,69 @@ impl SyncManager { | |||
| 526 | } | 581 | } |
| 527 | } | 582 | } |
| 528 | 583 | ||
| 584 | /// Perform a daily sync for a specific relay | ||
| 585 | /// | ||
| 586 | /// This method: | ||
| 587 | /// - Unsubscribes from all current subscriptions on the relay | ||
| 588 | /// - Clears pending batches for this relay | ||
| 589 | /// - Clears sync state (repos and root_events) in RelayState | ||
| 590 | /// - Recomputes actions to re-discover all repos/events | ||
| 591 | /// | ||
| 592 | /// This is triggered by the daily timer to detect state drift over time. | ||
| 593 | async fn daily_sync(&mut self, relay_url: &str) { | ||
| 594 | tracing::info!(relay = %relay_url, "Starting daily sync"); | ||
| 595 | |||
| 596 | // Get connection | ||
| 597 | let connection = match self.connections.get(relay_url) { | ||
| 598 | Some(conn) => conn, | ||
| 599 | None => { | ||
| 600 | tracing::warn!( | ||
| 601 | relay = %relay_url, | ||
| 602 | "No connection for relay, skipping daily sync" | ||
| 603 | ); | ||
| 604 | return; | ||
| 605 | } | ||
| 606 | }; | ||
| 607 | |||
| 608 | // Unsubscribe all current subscriptions | ||
| 609 | connection.unsubscribe_all().await; | ||
| 610 | |||
| 611 | // Clear pending batches for this relay | ||
| 612 | { | ||
| 613 | let mut pending = self.pending_sync_index.write().await; | ||
| 614 | pending.remove(relay_url); | ||
| 615 | } | ||
| 616 | |||
| 617 | // Get relay state and clear sync state (repos and root_events) | ||
| 618 | { | ||
| 619 | let mut index = self.relay_sync_index.write().await; | ||
| 620 | if let Some(state) = index.get_mut(relay_url) { | ||
| 621 | let repos_cleared = state.repos.len(); | ||
| 622 | let events_cleared = state.root_events.len(); | ||
| 623 | state.clear_sync_state(); | ||
| 624 | tracing::debug!( | ||
| 625 | relay = %relay_url, | ||
| 626 | repos_cleared = repos_cleared, | ||
| 627 | events_cleared = events_cleared, | ||
| 628 | "Cleared sync state for daily sync" | ||
| 629 | ); | ||
| 630 | } | ||
| 631 | } | ||
| 632 | |||
| 633 | // Recompute actions - will discover all repos/events again | ||
| 634 | self.recompute_actions_for_relay(relay_url).await; | ||
| 635 | |||
| 636 | tracing::info!(relay = %relay_url, "Daily sync complete"); | ||
| 637 | } | ||
| 638 | |||
| 529 | /// Run the sync manager | 639 | /// Run the sync manager |
| 530 | /// | 640 | /// |
| 531 | /// Coordinates all sync components: | 641 | /// Coordinates all sync components: |
| 532 | /// 1. Spawns self-subscriber to monitor own relay for announcements | 642 | /// 1. Spawns self-subscriber to monitor own relay for announcements |
| 533 | /// 2. Connects to bootstrap relay if configured | 643 | /// 2. Spawns daily timer for periodic fresh syncs |
| 534 | /// 3. Handles relay actions from self-subscriber | 644 | /// 3. Connects to bootstrap relay if configured |
| 535 | /// 4. Handles disconnect notifications from spawned relay tasks | 645 | /// 4. Handles relay actions from self-subscriber |
| 646 | /// 5. Handles disconnect, EOSE, and connect notifications from spawned relay tasks | ||
| 536 | pub async fn run(mut self) { | 647 | pub async fn run(mut self) { |
| 537 | use tokio::sync::mpsc; | 648 | use tokio::sync::mpsc; |
| 538 | 649 | ||
| @@ -569,12 +680,22 @@ impl SyncManager { | |||
| 569 | self.connect_tx = Some(connect_tx.clone()); | 680 | self.connect_tx = Some(connect_tx.clone()); |
| 570 | 681 | ||
| 571 | // 6. Connect to bootstrap relay if configured | 682 | // 6. Connect to bootstrap relay if configured |
| 572 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | 683 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url.clone() { |
| 573 | self.spawn_relay_connection(bootstrap_url.clone()).await; | 684 | self.spawn_relay_connection(bootstrap_url.clone()).await; |
| 574 | } | 685 | } |
| 575 | 686 | ||
| 576 | // 7. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 687 | // 7. Wrap self in Arc<Mutex> for sharing with timer task |
| 688 | let sync_manager = Arc::new(Mutex::new(self)); | ||
| 689 | |||
| 690 | // 8. Spawn daily timer task | ||
| 691 | let timer_manager = Arc::clone(&sync_manager); | ||
| 692 | tokio::spawn(async move { | ||
| 693 | run_daily_timer(timer_manager).await; | ||
| 694 | }); | ||
| 695 | |||
| 696 | // 9. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | ||
| 577 | loop { | 697 | loop { |
| 698 | // Wait for an event without holding the lock | ||
| 578 | tokio::select! { | 699 | tokio::select! { |
| 579 | action = action_rx.recv() => { | 700 | action = action_rx.recv() => { |
| 580 | match action { | 701 | match action { |
| @@ -597,7 +718,9 @@ impl SyncManager { | |||
| 597 | filters, | 718 | filters, |
| 598 | }; | 719 | }; |
| 599 | 720 | ||
| 600 | self.handle_add_filters(action).await; | 721 | // Acquire lock to process action |
| 722 | let mut manager = sync_manager.lock().await; | ||
| 723 | manager.handle_add_filters(action).await; | ||
| 601 | } | 724 | } |
| 602 | Some(RelayAction::AddFilters { relay_url, repos }) => { | 725 | Some(RelayAction::AddFilters { relay_url, repos }) => { |
| 603 | // Convert to AddFilters format and use unified handler | 726 | // Convert to AddFilters format and use unified handler |
| @@ -618,7 +741,9 @@ impl SyncManager { | |||
| 618 | filters, | 741 | filters, |
| 619 | }; | 742 | }; |
| 620 | 743 | ||
| 621 | self.handle_add_filters(action).await; | 744 | // Acquire lock to process action |
| 745 | let mut manager = sync_manager.lock().await; | ||
| 746 | manager.handle_add_filters(action).await; | ||
| 622 | } | 747 | } |
| 623 | None => break, | 748 | None => break, |
| 624 | } | 749 | } |
| @@ -626,7 +751,9 @@ impl SyncManager { | |||
| 626 | disconnect = disconnect_rx.recv() => { | 751 | disconnect = disconnect_rx.recv() => { |
| 627 | match disconnect { | 752 | match disconnect { |
| 628 | Some(notification) => { | 753 | Some(notification) => { |
| 629 | self.handle_disconnect(¬ification.relay_url).await; | 754 | // Acquire lock to process disconnect |
| 755 | let mut manager = sync_manager.lock().await; | ||
| 756 | manager.handle_disconnect(¬ification.relay_url).await; | ||
| 630 | } | 757 | } |
| 631 | None => { | 758 | None => { |
| 632 | // All disconnect senders dropped - unlikely but handle gracefully | 759 | // All disconnect senders dropped - unlikely but handle gracefully |
| @@ -637,7 +764,9 @@ impl SyncManager { | |||
| 637 | eose = eose_rx.recv() => { | 764 | eose = eose_rx.recv() => { |
| 638 | match eose { | 765 | match eose { |
| 639 | Some(notification) => { | 766 | Some(notification) => { |
| 640 | self.handle_eose(¬ification.relay_url, notification.sub_id).await; | 767 | // Acquire lock to process EOSE |
| 768 | let mut manager = sync_manager.lock().await; | ||
| 769 | manager.handle_eose(¬ification.relay_url, notification.sub_id).await; | ||
| 641 | } | 770 | } |
| 642 | None => { | 771 | None => { |
| 643 | // All EOSE senders dropped - unlikely but handle gracefully | 772 | // All EOSE senders dropped - unlikely but handle gracefully |
| @@ -648,7 +777,9 @@ impl SyncManager { | |||
| 648 | connect = connect_rx.recv() => { | 777 | connect = connect_rx.recv() => { |
| 649 | match connect { | 778 | match connect { |
| 650 | Some(notification) => { | 779 | Some(notification) => { |
| 651 | self.handle_connect_or_reconnect(¬ification.relay_url).await; | 780 | // Acquire lock to process connect |
| 781 | let mut manager = sync_manager.lock().await; | ||
| 782 | manager.handle_connect_or_reconnect(¬ification.relay_url).await; | ||
| 652 | } | 783 | } |
| 653 | None => { | 784 | None => { |
| 654 | // All connect senders dropped - unlikely but handle gracefully | 785 | // All connect senders dropped - unlikely but handle gracefully |