diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:19:18 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:19:18 +0000 |
| commit | b101afa00bc28e1b55286145cb81e32a5b3decc9 (patch) | |
| tree | d42869f89e4916bb8dc36fd26c9ac5f888e042ac /src/main.rs | |
| parent | b6c70f765dd02fb0297888d671e455df33d6fcb4 (diff) | |
feat(sync): add rejected events cache persistence and integrate with shutdown/startup
Implement save/restore functionality for rejected events cache and
integrate persistence with relay shutdown/startup lifecycle. Both
purgatory and rejected cache now survive relay restarts.
Key features:
- Serialize rejected events cache to JSON (rejected-events-cache.json)
- Save both hot cache (2min, full events) and cold index (7day, metadata)
- Restore with downtime adjustment (preserves remaining TTL)
- Graceful degradation (missing/corrupted files don't crash)
- File cleanup after successful restore
- Automatic restoration in SyncManager::new()
Integration:
- Shutdown hook saves both purgatory and rejected cache
- Startup hook restores both and re-queues repositories
- Non-fatal errors (logs warnings, continues on failure)
Files:
- src/sync/rejected_index.rs: save_to_disk/restore_from_disk methods
- src/sync/mod.rs: SyncManager integration and auto-restore
- src/main.rs: Shutdown/startup hooks for both caches
- tests/purgatory_persistence.rs: 17 integration tests
Tests: 13 unit tests + 17 integration tests covering full lifecycle
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 50 |
1 files changed, 49 insertions, 1 deletions
diff --git a/src/main.rs b/src/main.rs index a6f1d9d..5e5b83a 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc}; | |||
| 3 | 3 | ||
| 4 | use anyhow::Result; | 4 | use anyhow::Result; |
| 5 | use tokio::signal; | 5 | use tokio::signal; |
| 6 | use tracing::{info, Level}; | 6 | use tracing::{error, info, warn, Level}; |
| 7 | use tracing_subscriber::FmtSubscriber; | 7 | use tracing_subscriber::FmtSubscriber; |
| 8 | 8 | ||
| 9 | use ngit_grasp::{ | 9 | use ngit_grasp::{ |
| @@ -64,6 +64,22 @@ async fn main() -> Result<()> { | |||
| 64 | ))); | 64 | ))); |
| 65 | info!("Purgatory initialized for event coordination"); | 65 | info!("Purgatory initialized for event coordination"); |
| 66 | 66 | ||
| 67 | // Restore purgatory state from disk if available | ||
| 68 | let purgatory_path = | ||
| 69 | PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json"); | ||
| 70 | |||
| 71 | if purgatory_path.exists() { | ||
| 72 | match purgatory.restore_from_disk(&purgatory_path) { | ||
| 73 | Ok(()) => { | ||
| 74 | info!("Restored purgatory state from disk"); | ||
| 75 | // Re-queueing will happen later after sync system is created | ||
| 76 | } | ||
| 77 | Err(e) => { | ||
| 78 | warn!("Failed to restore purgatory state: {}, starting empty", e); | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 67 | // Create Nostr relay with NIP-34 validation | 83 | // Create Nostr relay with NIP-34 validation |
| 68 | // Returns both the relay and database for direct queries in handlers | 84 | // Returns both the relay and database for direct queries in handlers |
| 69 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { | 85 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { |
| @@ -88,6 +104,7 @@ async fn main() -> Result<()> { | |||
| 88 | relay_with_db.write_policy.clone(), | 104 | relay_with_db.write_policy.clone(), |
| 89 | relay_with_db.relay.clone(), | 105 | relay_with_db.relay.clone(), |
| 90 | &config, | 106 | &config, |
| 107 | PathBuf::from(config.effective_git_data_path()), | ||
| 91 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), | 108 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), |
| 92 | ); | 109 | ); |
| 93 | 110 | ||
| @@ -100,6 +117,21 @@ async fn main() -> Result<()> { | |||
| 100 | info!("Proactive sync enabled (will discover relays from stored announcements)"); | 117 | info!("Proactive sync enabled (will discover relays from stored announcements)"); |
| 101 | } | 118 | } |
| 102 | 119 | ||
| 120 | // Re-queue all restored purgatory repos for sync | ||
| 121 | let restored_identifiers = purgatory.get_all_identifiers(); | ||
| 122 | if !restored_identifiers.is_empty() { | ||
| 123 | info!( | ||
| 124 | "Re-queueing {} restored repositories for sync", | ||
| 125 | restored_identifiers.len() | ||
| 126 | ); | ||
| 127 | for identifier in restored_identifiers { | ||
| 128 | purgatory.enqueue_sync_immediate(&identifier); | ||
| 129 | } | ||
| 130 | } | ||
| 131 | |||
| 132 | // Get a reference to the rejected events index for shutdown persistence | ||
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | ||
| 134 | |||
| 103 | tokio::spawn(async move { | 135 | tokio::spawn(async move { |
| 104 | sync_manager.run().await; | 136 | sync_manager.run().await; |
| 105 | }); | 137 | }); |
| @@ -190,6 +222,22 @@ async fn main() -> Result<()> { | |||
| 190 | } | 222 | } |
| 191 | } | 223 | } |
| 192 | 224 | ||
| 225 | // Save purgatory state to disk | ||
| 226 | let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json"); | ||
| 227 | if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) { | ||
| 228 | error!("Failed to save purgatory state: {}", e); | ||
| 229 | } else { | ||
| 230 | info!("Purgatory state saved to disk"); | ||
| 231 | } | ||
| 232 | |||
| 233 | // Save rejected events cache to disk | ||
| 234 | let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json"); | ||
| 235 | if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) { | ||
| 236 | error!("Failed to save rejected events cache: {}", e); | ||
| 237 | } else { | ||
| 238 | info!("Rejected events cache saved to disk"); | ||
| 239 | } | ||
| 240 | |||
| 193 | // Cleanup placeholder refs on shutdown | 241 | // Cleanup placeholder refs on shutdown |
| 194 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); | 242 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); |
| 195 | if !placeholder_ids.is_empty() { | 243 | if !placeholder_ids.is_empty() { |