upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:46:30 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-14 10:46:30 +0000
commit4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch)
treed42869f89e4916bb8dc36fd26c9ac5f888e042ac /src/sync/mod.rs
parent7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff)
parentb101afa00bc28e1b55286145cb81e32a5b3decc9 (diff)
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and rejected events cache. Events are now saved to disk on graceful shutdown and restored on startup, preventing data loss during relay restarts. Key features: - Purgatory state persisted to JSON (state events, PR events, expired events) - Rejected events cache persisted (hot cache + cold index) - Downtime adjustment preserves remaining TTL - Graceful degradation on missing/corrupted files - Automatic re-queueing of restored repositories - Comprehensive test coverage (45 tests)
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs71
1 files changed, 59 insertions, 12 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index e2d55bd..3213dfb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -43,6 +43,7 @@ pub use health::RelayHealthTracker;
43use tokio::time::sleep; 43use tokio::time::sleep;
44 44
45use std::collections::{HashMap, HashSet}; 45use std::collections::{HashMap, HashSet};
46use std::path::{Path, PathBuf};
46use std::sync::Arc; 47use std::sync::Arc;
47use std::time::Duration; 48use std::time::Duration;
48 49
@@ -581,6 +582,7 @@ impl SyncManager {
581 /// * `write_policy` - Policy for validating events before storage 582 /// * `write_policy` - Policy for validating events before storage
582 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) 583 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
583 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence)
584 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
585 pub fn new( 587 pub fn new(
586 bootstrap_relay_url: Option<String>, 588 bootstrap_relay_url: Option<String>,
@@ -589,11 +591,42 @@ impl SyncManager {
589 write_policy: Nip34WritePolicy, 591 write_policy: Nip34WritePolicy,
590 local_relay: LocalRelay, 592 local_relay: LocalRelay,
591 config: &Config, 593 config: &Config,
594 data_path: PathBuf,
592 sync_metrics: Option<SyncMetrics>, 595 sync_metrics: Option<SyncMetrics>,
593 ) -> Self { 596 ) -> Self {
594 // Extract purgatory from write_policy for read-only access 597 // Extract purgatory from write_policy for read-only access
595 let purgatory = write_policy.purgatory().clone(); 598 let purgatory = write_policy.purgatory().clone();
596 599
600 // Create rejected events index
601 let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics {
602 RejectedEventsIndex::with_metrics(
603 Duration::from_secs(config.rejected_hot_cache_duration_secs),
604 Duration::from_secs(config.rejected_cold_index_expiry_secs),
605 metrics.clone(),
606 )
607 } else {
608 RejectedEventsIndex::new(
609 Duration::from_secs(config.rejected_hot_cache_duration_secs),
610 Duration::from_secs(config.rejected_cold_index_expiry_secs),
611 )
612 });
613
614 // Attempt to restore rejected events index from disk
615 let rejected_index_path = data_path.join("rejected-events-cache.json");
616 if rejected_index_path.exists() {
617 match rejected_events_index.restore_from_disk(&rejected_index_path) {
618 Ok(()) => {
619 tracing::info!("Restored rejected events index from disk");
620 }
621 Err(e) => {
622 tracing::warn!(
623 "Failed to restore rejected events index: {}, starting empty",
624 e
625 );
626 }
627 }
628 }
629
597 Self { 630 Self {
598 bootstrap_relay_url, 631 bootstrap_relay_url,
599 service_domain, 632 service_domain,
@@ -605,18 +638,7 @@ impl SyncManager {
605 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 638 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
606 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 639 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
607 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 640 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
608 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { 641 rejected_events_index,
609 RejectedEventsIndex::with_metrics(
610 Duration::from_secs(config.rejected_hot_cache_duration_secs),
611 Duration::from_secs(config.rejected_cold_index_expiry_secs),
612 metrics.clone(),
613 )
614 } else {
615 RejectedEventsIndex::new(
616 Duration::from_secs(config.rejected_hot_cache_duration_secs),
617 Duration::from_secs(config.rejected_cold_index_expiry_secs),
618 )
619 }),
620 connections: HashMap::new(), 642 connections: HashMap::new(),
621 health_tracker: Arc::new(RelayHealthTracker::new(config)), 643 health_tracker: Arc::new(RelayHealthTracker::new(config)),
622 next_batch_id: 0, 644 next_batch_id: 0,
@@ -637,6 +659,31 @@ impl SyncManager {
637 self.next_batch_id 659 self.next_batch_id
638 } 660 }
639 661
662 /// Get a clone of the rejected events index Arc.
663 ///
664 /// This allows access to the rejected events index for persistence
665 /// even after the SyncManager has been moved into a task.
666 ///
667 /// # Returns
668 /// Arc clone of the rejected events index
669 pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> {
670 self.rejected_events_index.clone()
671 }
672
673 /// Save rejected events index to disk.
674 ///
675 /// This is called during shutdown to persist the rejected events cache,
676 /// allowing us to avoid re-downloading rejected events after restart.
677 ///
678 /// # Arguments
679 /// * `path` - Path to save the rejected index file
680 ///
681 /// # Returns
682 /// Ok(()) on success, Err if save fails
683 pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
684 self.rejected_events_index.save_to_disk(path)
685 }
686
640 /// Handle EOSE (End Of Stored Events) for a subscription 687 /// Handle EOSE (End Of Stored Events) for a subscription
641 /// 688 ///
642 /// This method: 689 /// This method: