diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-14 10:46:30 +0000 |
| commit | 4c8f1813fada9ce2bfd371095b0721bff68173e3 (patch) | |
| tree | d42869f89e4916bb8dc36fd26c9ac5f888e042ac /src/main.rs | |
| parent | 7dba18eb9ae64d429fef1a1f5437981efefb86b6 (diff) | |
| parent | b101afa00bc28e1b55286145cb81e32a5b3decc9 (diff) | |
Add purgatory persistence to survive relay restarts
Implement save/restore functionality for both purgatory state and
rejected events cache. Events are now saved to disk on graceful
shutdown and restored on startup, preventing data loss during
relay restarts.
Key features:
- Purgatory state persisted to JSON (state events, PR events, expired events)
- Rejected events cache persisted (hot cache + cold index)
- Downtime adjustment preserves remaining TTL
- Graceful degradation on missing/corrupted files
- Automatic re-queueing of restored repositories
- Comprehensive test coverage (45 tests)
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 50 |
1 files changed, 49 insertions, 1 deletions
diff --git a/src/main.rs b/src/main.rs index a6f1d9d..5e5b83a 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc}; | |||
| 3 | 3 | ||
| 4 | use anyhow::Result; | 4 | use anyhow::Result; |
| 5 | use tokio::signal; | 5 | use tokio::signal; |
| 6 | use tracing::{info, Level}; | 6 | use tracing::{error, info, warn, Level}; |
| 7 | use tracing_subscriber::FmtSubscriber; | 7 | use tracing_subscriber::FmtSubscriber; |
| 8 | 8 | ||
| 9 | use ngit_grasp::{ | 9 | use ngit_grasp::{ |
| @@ -64,6 +64,22 @@ async fn main() -> Result<()> { | |||
| 64 | ))); | 64 | ))); |
| 65 | info!("Purgatory initialized for event coordination"); | 65 | info!("Purgatory initialized for event coordination"); |
| 66 | 66 | ||
| 67 | // Restore purgatory state from disk if available | ||
| 68 | let purgatory_path = | ||
| 69 | PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json"); | ||
| 70 | |||
| 71 | if purgatory_path.exists() { | ||
| 72 | match purgatory.restore_from_disk(&purgatory_path) { | ||
| 73 | Ok(()) => { | ||
| 74 | info!("Restored purgatory state from disk"); | ||
| 75 | // Re-queueing will happen later after sync system is created | ||
| 76 | } | ||
| 77 | Err(e) => { | ||
| 78 | warn!("Failed to restore purgatory state: {}, starting empty", e); | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 67 | // Create Nostr relay with NIP-34 validation | 83 | // Create Nostr relay with NIP-34 validation |
| 68 | // Returns both the relay and database for direct queries in handlers | 84 | // Returns both the relay and database for direct queries in handlers |
| 69 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { | 85 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { |
| @@ -88,6 +104,7 @@ async fn main() -> Result<()> { | |||
| 88 | relay_with_db.write_policy.clone(), | 104 | relay_with_db.write_policy.clone(), |
| 89 | relay_with_db.relay.clone(), | 105 | relay_with_db.relay.clone(), |
| 90 | &config, | 106 | &config, |
| 107 | PathBuf::from(config.effective_git_data_path()), | ||
| 91 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), | 108 | metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), |
| 92 | ); | 109 | ); |
| 93 | 110 | ||
| @@ -100,6 +117,21 @@ async fn main() -> Result<()> { | |||
| 100 | info!("Proactive sync enabled (will discover relays from stored announcements)"); | 117 | info!("Proactive sync enabled (will discover relays from stored announcements)"); |
| 101 | } | 118 | } |
| 102 | 119 | ||
| 120 | // Re-queue all restored purgatory repos for sync | ||
| 121 | let restored_identifiers = purgatory.get_all_identifiers(); | ||
| 122 | if !restored_identifiers.is_empty() { | ||
| 123 | info!( | ||
| 124 | "Re-queueing {} restored repositories for sync", | ||
| 125 | restored_identifiers.len() | ||
| 126 | ); | ||
| 127 | for identifier in restored_identifiers { | ||
| 128 | purgatory.enqueue_sync_immediate(&identifier); | ||
| 129 | } | ||
| 130 | } | ||
| 131 | |||
| 132 | // Get a reference to the rejected events index for shutdown persistence | ||
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | ||
| 134 | |||
| 103 | tokio::spawn(async move { | 135 | tokio::spawn(async move { |
| 104 | sync_manager.run().await; | 136 | sync_manager.run().await; |
| 105 | }); | 137 | }); |
| @@ -190,6 +222,22 @@ async fn main() -> Result<()> { | |||
| 190 | } | 222 | } |
| 191 | } | 223 | } |
| 192 | 224 | ||
| 225 | // Save purgatory state to disk | ||
| 226 | let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json"); | ||
| 227 | if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) { | ||
| 228 | error!("Failed to save purgatory state: {}", e); | ||
| 229 | } else { | ||
| 230 | info!("Purgatory state saved to disk"); | ||
| 231 | } | ||
| 232 | |||
| 233 | // Save rejected events cache to disk | ||
| 234 | let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json"); | ||
| 235 | if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) { | ||
| 236 | error!("Failed to save rejected events cache: {}", e); | ||
| 237 | } else { | ||
| 238 | info!("Rejected events cache saved to disk"); | ||
| 239 | } | ||
| 240 | |||
| 193 | // Cleanup placeholder refs on shutdown | 241 | // Cleanup placeholder refs on shutdown |
| 194 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); | 242 | let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); |
| 195 | if !placeholder_ids.is_empty() { | 243 | if !placeholder_ids.is_empty() { |