diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
| commit | 4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch) | |
| tree | d42869f89e4916bb8dc36fd26c9ac5f888e042ac /src/sync/mod.rs | |
| parent | 7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff) | |
| parent | b101afa00bc28e1b55286145cb81e32a5b3decc9 (diff) | |
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and
rejected events cache. Events are now saved to disk on graceful
shutdown and restored on startup, preventing data loss during
relay restarts.
Key features:
- Purgatory state persisted to JSON (state events, PR events, expired events)
- Rejected events cache persisted (hot cache + cold index)
- Downtime adjustment preserves remaining TTL
- Graceful degradation on missing/corrupted files
- Automatic re-queueing of restored repositories
- Comprehensive test coverage (45 tests)
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 71 |
1 files changed, 59 insertions, 12 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; | |||
| 43 | use tokio::time::sleep; | 43 | use tokio::time::sleep; |
| 44 | 44 | ||
| 45 | use std::collections::{HashMap, HashSet}; | 45 | use std::collections::{HashMap, HashSet}; |
| 46 | use std::path::{Path, PathBuf}; | ||
| 46 | use std::sync::Arc; | 47 | use std::sync::Arc; |
| 47 | use std::time::Duration; | 48 | use std::time::Duration; |
| 48 | 49 | ||
| @@ -581,6 +582,7 @@ impl SyncManager { | |||
| 581 | /// * `write_policy` - Policy for validating events before storage | 582 | /// * `write_policy` - Policy for validating events before storage |
| 582 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) | 583 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) |
| 583 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | ||
| 584 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 585 | pub fn new( | 587 | pub fn new( |
| 586 | bootstrap_relay_url: Option<String>, | 588 | bootstrap_relay_url: Option<String>, |
| @@ -589,11 +591,42 @@ impl SyncManager { | |||
| 589 | write_policy: Nip34WritePolicy, | 591 | write_policy: Nip34WritePolicy, |
| 590 | local_relay: LocalRelay, | 592 | local_relay: LocalRelay, |
| 591 | config: &Config, | 593 | config: &Config, |
| 594 | data_path: PathBuf, | ||
| 592 | sync_metrics: Option<SyncMetrics>, | 595 | sync_metrics: Option<SyncMetrics>, |
| 593 | ) -> Self { | 596 | ) -> Self { |
| 594 | // Extract purgatory from write_policy for read-only access | 597 | // Extract purgatory from write_policy for read-only access |
| 595 | let purgatory = write_policy.purgatory().clone(); | 598 | let purgatory = write_policy.purgatory().clone(); |
| 596 | 599 | ||
| 600 | // Create rejected events index | ||
| 601 | let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 602 | RejectedEventsIndex::with_metrics( | ||
| 603 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 604 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 605 | metrics.clone(), | ||
| 606 | ) | ||
| 607 | } else { | ||
| 608 | RejectedEventsIndex::new( | ||
| 609 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 610 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 611 | ) | ||
| 612 | }); | ||
| 613 | |||
| 614 | // Attempt to restore rejected events index from disk | ||
| 615 | let rejected_index_path = data_path.join("rejected-events-cache.json"); | ||
| 616 | if rejected_index_path.exists() { | ||
| 617 | match rejected_events_index.restore_from_disk(&rejected_index_path) { | ||
| 618 | Ok(()) => { | ||
| 619 | tracing::info!("Restored rejected events index from disk"); | ||
| 620 | } | ||
| 621 | Err(e) => { | ||
| 622 | tracing::warn!( | ||
| 623 | "Failed to restore rejected events index: {}, starting empty", | ||
| 624 | e | ||
| 625 | ); | ||
| 626 | } | ||
| 627 | } | ||
| 628 | } | ||
| 629 | |||
| 597 | Self { | 630 | Self { |
| 598 | bootstrap_relay_url, | 631 | bootstrap_relay_url, |
| 599 | service_domain, | 632 | service_domain, |
| @@ -605,18 +638,7 @@ impl SyncManager { | |||
| 605 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 638 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 606 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 639 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 607 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 640 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 608 | rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { | 641 | rejected_events_index, |
| 609 | RejectedEventsIndex::with_metrics( | ||
| 610 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 611 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 612 | metrics.clone(), | ||
| 613 | ) | ||
| 614 | } else { | ||
| 615 | RejectedEventsIndex::new( | ||
| 616 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 617 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 618 | ) | ||
| 619 | }), | ||
| 620 | connections: HashMap::new(), | 642 | connections: HashMap::new(), |
| 621 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 643 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 622 | next_batch_id: 0, | 644 | next_batch_id: 0, |
| @@ -637,6 +659,31 @@ impl SyncManager { | |||
| 637 | self.next_batch_id | 659 | self.next_batch_id |
| 638 | } | 660 | } |
| 639 | 661 | ||
| 662 | /// Get a clone of the rejected events index Arc. | ||
| 663 | /// | ||
| 664 | /// This allows access to the rejected events index for persistence | ||
| 665 | /// even after the SyncManager has been moved into a task. | ||
| 666 | /// | ||
| 667 | /// # Returns | ||
| 668 | /// Arc clone of the rejected events index | ||
| 669 | pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> { | ||
| 670 | self.rejected_events_index.clone() | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Save rejected events index to disk. | ||
| 674 | /// | ||
| 675 | /// This is called during shutdown to persist the rejected events cache, | ||
| 676 | /// allowing us to avoid re-downloading rejected events after restart. | ||
| 677 | /// | ||
| 678 | /// # Arguments | ||
| 679 | /// * `path` - Path to save the rejected index file | ||
| 680 | /// | ||
| 681 | /// # Returns | ||
| 682 | /// Ok(()) on success, Err if save fails | ||
| 683 | pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 684 | self.rejected_events_index.save_to_disk(path) | ||
| 685 | } | ||
| 686 | |||
| 640 | /// Handle EOSE (End Of Stored Events) for a subscription | 687 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 641 | /// | 688 | /// |
| 642 | /// This method: | 689 | /// This method: |