diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:19:18 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:19:18 +0000 |
| commit | b101afa00bc28e1b55286145cb81e32a5b3decc9 (patch) | |
| tree | d42869f89e4916bb8dc36fd26c9ac5f888e042ac /src/sync/mod.rs | |
| parent | b6c70f765dd02fb0297888d671e455df33d6fcb4 (diff) | |
feat(sync): add rejected events cache persistence and integrate with shutdown/startup
Implement save/restore functionality for rejected events cache and
integrate persistence with relay shutdown/startup lifecycle. Both
purgatory and rejected cache now survive relay restarts.
Key features:
- Serialize rejected events cache to JSON (rejected-events-cache.json)
- Save both hot cache (2min, full events) and cold index (7day, metadata)
- Restore with downtime adjustment (preserves remaining TTL)
- Graceful degradation (missing/corrupted files don't crash)
- File cleanup after successful restore
- Automatic restoration in SyncManager::new()
Integration:
- Shutdown hook saves both purgatory and rejected cache
- Startup hook restores both and re-queues repositories
- Non-fatal errors (logs warnings, continues on failure)
Files:
- src/sync/rejected_index.rs: save_to_disk/restore_from_disk methods
- src/sync/mod.rs: SyncManager integration and auto-restore
- src/main.rs: Shutdown/startup hooks for both caches
- tests/purgatory_persistence.rs: 17 integration tests
Tests: 13 unit tests + 17 integration tests covering full lifecycle
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 71 |
1 files changed, 59 insertions, 12 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e2d55bd..3213dfb 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -43,6 +43,7 @@ pub use health::RelayHealthTracker; | |||
| 43 | use tokio::time::sleep; | 43 | use tokio::time::sleep; |
| 44 | 44 | ||
| 45 | use std::collections::{HashMap, HashSet}; | 45 | use std::collections::{HashMap, HashSet}; |
| 46 | use std::path::{Path, PathBuf}; | ||
| 46 | use std::sync::Arc; | 47 | use std::sync::Arc; |
| 47 | use std::time::Duration; | 48 | use std::time::Duration; |
| 48 | 49 | ||
| @@ -581,6 +582,7 @@ impl SyncManager { | |||
| 581 | /// * `write_policy` - Policy for validating events before storage | 582 | /// * `write_policy` - Policy for validating events before storage |
| 582 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) | 583 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) |
| 583 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | ||
| 584 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 585 | pub fn new( | 587 | pub fn new( |
| 586 | bootstrap_relay_url: Option<String>, | 588 | bootstrap_relay_url: Option<String>, |
| @@ -589,11 +591,42 @@ impl SyncManager { | |||
| 589 | write_policy: Nip34WritePolicy, | 591 | write_policy: Nip34WritePolicy, |
| 590 | local_relay: LocalRelay, | 592 | local_relay: LocalRelay, |
| 591 | config: &Config, | 593 | config: &Config, |
| 594 | data_path: PathBuf, | ||
| 592 | sync_metrics: Option<SyncMetrics>, | 595 | sync_metrics: Option<SyncMetrics>, |
| 593 | ) -> Self { | 596 | ) -> Self { |
| 594 | // Extract purgatory from write_policy for read-only access | 597 | // Extract purgatory from write_policy for read-only access |
| 595 | let purgatory = write_policy.purgatory().clone(); | 598 | let purgatory = write_policy.purgatory().clone(); |
| 596 | 599 | ||
| 600 | // Create rejected events index | ||
| 601 | let rejected_events_index = Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 602 | RejectedEventsIndex::with_metrics( | ||
| 603 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 604 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 605 | metrics.clone(), | ||
| 606 | ) | ||
| 607 | } else { | ||
| 608 | RejectedEventsIndex::new( | ||
| 609 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 610 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 611 | ) | ||
| 612 | }); | ||
| 613 | |||
| 614 | // Attempt to restore rejected events index from disk | ||
| 615 | let rejected_index_path = data_path.join("rejected-events-cache.json"); | ||
| 616 | if rejected_index_path.exists() { | ||
| 617 | match rejected_events_index.restore_from_disk(&rejected_index_path) { | ||
| 618 | Ok(()) => { | ||
| 619 | tracing::info!("Restored rejected events index from disk"); | ||
| 620 | } | ||
| 621 | Err(e) => { | ||
| 622 | tracing::warn!( | ||
| 623 | "Failed to restore rejected events index: {}, starting empty", | ||
| 624 | e | ||
| 625 | ); | ||
| 626 | } | ||
| 627 | } | ||
| 628 | } | ||
| 629 | |||
| 597 | Self { | 630 | Self { |
| 598 | bootstrap_relay_url, | 631 | bootstrap_relay_url, |
| 599 | service_domain, | 632 | service_domain, |
| @@ -605,18 +638,7 @@ impl SyncManager { | |||
| 605 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 638 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 606 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 639 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 607 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 640 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 608 | rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics { | 641 | rejected_events_index, |
| 609 | RejectedEventsIndex::with_metrics( | ||
| 610 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 611 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 612 | metrics.clone(), | ||
| 613 | ) | ||
| 614 | } else { | ||
| 615 | RejectedEventsIndex::new( | ||
| 616 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 617 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 618 | ) | ||
| 619 | }), | ||
| 620 | connections: HashMap::new(), | 642 | connections: HashMap::new(), |
| 621 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 643 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 622 | next_batch_id: 0, | 644 | next_batch_id: 0, |
| @@ -637,6 +659,31 @@ impl SyncManager { | |||
| 637 | self.next_batch_id | 659 | self.next_batch_id |
| 638 | } | 660 | } |
| 639 | 661 | ||
| 662 | /// Get a clone of the rejected events index Arc. | ||
| 663 | /// | ||
| 664 | /// This allows access to the rejected events index for persistence | ||
| 665 | /// even after the SyncManager has been moved into a task. | ||
| 666 | /// | ||
| 667 | /// # Returns | ||
| 668 | /// Arc clone of the rejected events index | ||
| 669 | pub fn rejected_events_index(&self) -> Arc<RejectedEventsIndex> { | ||
| 670 | self.rejected_events_index.clone() | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Save rejected events index to disk. | ||
| 674 | /// | ||
| 675 | /// This is called during shutdown to persist the rejected events cache, | ||
| 676 | /// allowing us to avoid re-downloading rejected events after restart. | ||
| 677 | /// | ||
| 678 | /// # Arguments | ||
| 679 | /// * `path` - Path to save the rejected index file | ||
| 680 | /// | ||
| 681 | /// # Returns | ||
| 682 | /// Ok(()) on success, Err if save fails | ||
| 683 | pub fn save_rejected_index(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> { | ||
| 684 | self.rejected_events_index.save_to_disk(path) | ||
| 685 | } | ||
| 686 | |||
| 640 | /// Handle EOSE (End Of Stored Events) for a subscription | 687 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 641 | /// | 688 | /// |
| 642 | /// This method: | 689 | /// This method: |