upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs71
1 files changed, 59 insertions, 12 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e2d55bd..3213dfb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -43,6 +43,7 @@ pub use health::RelayHealthTracker;
43use tokio::time::sleep; 43use tokio::time::sleep;
44 44
45use std::collections::{HashMap, HashSet}; 45use std::collections::{HashMap, HashSet};
46use std::path::{Path, PathBuf};
46use std::sync::Arc; 47use std::sync::Arc;
47use std::time::Duration; 48use std::time::Duration;
48 49
@@ -581,6 +582,7 @@ impl SyncManager {
581 /// * `write_policy` - Policy for validating events before storage 582 /// * `write_policy` - Policy for validating events before storage
582 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) 583 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
583 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence)
584 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
585 pub fn new( 587 pub fn new(
586 bootstrap_relay_url: Option<String>, 588 bootstrap_relay_url: Option<String>,
@@ -589,11 +591,42 @@ impl SyncManager {
589 write_policy: Nip34WritePolicy, 591 write_policy: Nip34WritePolicy,
590 local_relay: LocalRelay, 592 local_relay: LocalRelay,
591 config: &Config, 593 config: &Config,
594 data_path: PathBuf,
592 sync_metrics: Option<SyncMetrics>, 595 sync_metrics: Option<SyncMetrics>,
593 ) -> Self { 596 ) -> Self {
594 // Extract purgatory from write_policy for read-only access 597 // Extract purgatory from write_policy for read-only access
595 let purgatory = write_policy.purgatory().clone(); 598 let purgatory = write_policy.purgatory().clone();
596 599
600 // Create rejected events index
601 let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics {
602 RejectedEventsIndex::with_metrics(
603 Duration::from_secs(config.rejected_hot_cache_duration_secs),
604 Duration::from_secs(config.rejected_cold_index_expiry_secs),
605 metrics.clone(),
606 )
607 } else {
608 RejectedEventsIndex::new(
609 Duration::from_secs(config.rejected_hot_cache_duration_secs),
610 Duration::from_secs(config.rejected_cold_index_expiry_secs),
611 )
612 });
613
614 // Attempt to restore rejected events index from disk
615 let rejected_index_path = data_path.join("rejected-events-cache.json");
616 if rejected_index_path.exists() {
617 match rejected_events_index.restore_from_disk(&rejected_index_path) {
618 Ok(()) => {
619 tracing::info!("Restored rejected events index from disk");
620 }
621 Err(e) => {
622 tracing::warn!(
623 "Failed to restore rejected events index: {}, starting empty",
624 e
625 );
626 }
627 }
628 }
629
597 Self { 630 Self {
598 bootstrap_relay_url, 631 bootstrap_relay_url,
599 service_domain, 632 service_domain,
@@ -605,18 +638,7 @@ impl SyncManager {
605 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 638 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
606 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 639 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
607 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 640 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
608 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { 641 rejected_events_index,
609 RejectedEventsIndex::with_metrics(
610 Duration::from_secs(config.rejected_hot_cache_duration_secs),
611 Duration::from_secs(config.rejected_cold_index_expiry_secs),
612 metrics.clone(),
613 )
614 } else {
615 RejectedEventsIndex::new(
616 Duration::from_secs(config.rejected_hot_cache_duration_secs),
617 Duration::from_secs(config.rejected_cold_index_expiry_secs),
618 )
619 }),
620 connections: HashMap::new(), 642 connections: HashMap::new(),
621 health_tracker: Arc::new(RelayHealthTracker::new(config)), 643 health_tracker: Arc::new(RelayHealthTracker::new(config)),
622 next_batch_id: 0, 644 next_batch_id: 0,
@@ -637,6 +659,31 @@ impl SyncManager {
637 self.next_batch_id 659 self.next_batch_id
638 } 660 }
639 661
662 /// Get a clone of the rejected events index Arc.
663 ///
664 /// This allows access to the rejected events index for persistence
665 /// even after the SyncManager has been moved into a task.
666 ///
667 /// # Returns
668 /// Arc clone of the rejected events index
669 pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> {
670 self.rejected_events_index.clone()
671 }
672
673 /// Save rejected events index to disk.
674 ///
675 /// This is called during shutdown to persist the rejected events cache,
676 /// allowing us to avoid re-downloading rejected events after restart.
677 ///
678 /// # Arguments
679 /// * `path` - Path to save the rejected index file
680 ///
681 /// # Returns
682 /// Ok(()) on success, Err if save fails
683 pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
684 self.rejected_events_index.save_to_disk(path)
685 }
686
640 /// Handle EOSE (End Of Stored Events) for a subscription 687 /// Handle EOSE (End Of Stored Events) for a subscription
641 /// 688 ///
642 /// This method: 689 /// This method: