From 11b1655638b5a328662187a27f85a84df60fc759 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 2 Jan 2026 13:53:47 +0000 Subject: sync: use purgatory don't save new events destined for purgatory events directly to db or serve on websockets don't download events already in purgatory via negentropy sync --- src/sync/mod.rs | 52 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 13 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 65affc6..dcdbe3a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -159,6 +159,8 @@ pub enum ProcessResult { Saved, /// Event already existed in database Duplicate, + /// Event added to Purgatory + Purgatory, /// Event rejected by write policy Rejected, } @@ -377,6 +379,8 @@ pub struct SyncManager { database: SharedDatabase, /// Write policy for validating incoming events write_policy: Nip34WritePolicy, + /// Purgatory for read-only access to events awaiting git data + purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) local_relay: LocalRelay, /// Configuration reference for sync settings @@ -425,11 +429,15 @@ impl SyncManager { config: &Config, sync_metrics: Option, ) -> Self { + // Extract purgatory from write_policy for read-only access + let purgatory = write_policy.purgatory().clone(); + Self { bootstrap_relay_url, service_domain, database, write_policy, + purgatory, local_relay, config: config.clone(), repo_sync_index: Arc::new(RwLock::new(HashMap::new())), @@ -1621,6 +1629,7 @@ impl SyncManager { match result { WritePolicyResult::Accept => { // Save event to database + if let Err(e) = database.save_event(event).await { tracing::error!( event_id = %event.id, @@ -1644,14 +1653,24 @@ impl SyncManager { ); ProcessResult::Saved } - WritePolicyResult::Reject { message, .. } => { - tracing::debug!( - event_id = %event.id, - relay = %relay_url, - reason = %message, - "Event rejected by write policy" - ); - ProcessResult::Rejected + WritePolicyResult::Reject { message, status } => { + if status { + tracing::debug!( + event_id = %event.id, + kind = %event.kind.as_u16(), + reason = %message, + "Event added to purgatory" + ); + ProcessResult::Purgatory + } else { + tracing::debug!( + event_id = %event.id, + relay = %relay_url, + reason = %message, + "Event rejected by write policy" + ); + ProcessResult::Rejected + } } } } @@ -2172,21 +2191,28 @@ impl SyncManager { let mut all_remote_ids = Vec::new(); let mut failed_count = 0; + // Get event IDs currently in purgatory to avoid fetching them + let purgatory_ids = self.purgatory.event_ids(); + for (idx, result) in diff_results { match result { Ok(reconciliation) => { - let remote_count = reconciliation.remote.len(); - let local_count = reconciliation.local.len(); + let remote_excluding_purgatory_ids: HashSet = reconciliation + .remote + .difference(&purgatory_ids) + .cloned() + .collect(); + let remote_count = remote_excluding_purgatory_ids.len(); tracing::debug!( relay = %relay_url, filter_idx = idx, remote_count = remote_count, - local_count = local_count, - remote_ids = ?reconciliation.remote, + local_count = reconciliation.local.len(), + remote_ids = ?remote_excluding_purgatory_ids, "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx ); if remote_count > 0 { - all_remote_ids.extend(reconciliation.remote.into_iter()); + all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter()); } } Err(e) => { -- cgit v1.2.3