diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-02 13:53:47 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-02 13:55:38 +0000 |
| commit | 11b1655638b5a328662187a27f85a84df60fc759 (patch) | |
| tree | c26adc3bb7b5fcb0b2fe51b1e7bf9823bd970c4e /src/sync | |
| parent | 6e70994a30a0d0e537099094df2a8369b559586d (diff) | |
sync: use purgatory
don't save new events destined for purgatory events directly to db
or serve on websockets
don't download events already in purgatory via negentropy sync
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 52 |
1 files changed, 39 insertions, 13 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 65affc6..dcdbe3a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -159,6 +159,8 @@ pub enum ProcessResult { | |||
| 159 | Saved, | 159 | Saved, |
| 160 | /// Event already existed in database | 160 | /// Event already existed in database |
| 161 | Duplicate, | 161 | Duplicate, |
| 162 | /// Event added to Purgatory | ||
| 163 | Purgatory, | ||
| 162 | /// Event rejected by write policy | 164 | /// Event rejected by write policy |
| 163 | Rejected, | 165 | Rejected, |
| 164 | } | 166 | } |
| @@ -377,6 +379,8 @@ pub struct SyncManager { | |||
| 377 | database: SharedDatabase, | 379 | database: SharedDatabase, |
| 378 | /// Write policy for validating incoming events | 380 | /// Write policy for validating incoming events |
| 379 | write_policy: Nip34WritePolicy, | 381 | write_policy: Nip34WritePolicy, |
| 382 | /// Purgatory for read-only access to events awaiting git data | ||
| 383 | purgatory: Arc<crate::purgatory::Purgatory>, | ||
| 380 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) | 384 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) |
| 381 | local_relay: LocalRelay, | 385 | local_relay: LocalRelay, |
| 382 | /// Configuration reference for sync settings | 386 | /// Configuration reference for sync settings |
| @@ -425,11 +429,15 @@ impl SyncManager { | |||
| 425 | config: &Config, | 429 | config: &Config, |
| 426 | sync_metrics: Option<SyncMetrics>, | 430 | sync_metrics: Option<SyncMetrics>, |
| 427 | ) -> Self { | 431 | ) -> Self { |
| 432 | // Extract purgatory from write_policy for read-only access | ||
| 433 | let purgatory = write_policy.purgatory().clone(); | ||
| 434 | |||
| 428 | Self { | 435 | Self { |
| 429 | bootstrap_relay_url, | 436 | bootstrap_relay_url, |
| 430 | service_domain, | 437 | service_domain, |
| 431 | database, | 438 | database, |
| 432 | write_policy, | 439 | write_policy, |
| 440 | purgatory, | ||
| 433 | local_relay, | 441 | local_relay, |
| 434 | config: config.clone(), | 442 | config: config.clone(), |
| 435 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 443 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| @@ -1621,6 +1629,7 @@ impl SyncManager { | |||
| 1621 | match result { | 1629 | match result { |
| 1622 | WritePolicyResult::Accept => { | 1630 | WritePolicyResult::Accept => { |
| 1623 | // Save event to database | 1631 | // Save event to database |
| 1632 | |||
| 1624 | if let Err(e) = database.save_event(event).await { | 1633 | if let Err(e) = database.save_event(event).await { |
| 1625 | tracing::error!( | 1634 | tracing::error!( |
| 1626 | event_id = %event.id, | 1635 | event_id = %event.id, |
| @@ -1644,14 +1653,24 @@ impl SyncManager { | |||
| 1644 | ); | 1653 | ); |
| 1645 | ProcessResult::Saved | 1654 | ProcessResult::Saved |
| 1646 | } | 1655 | } |
| 1647 | WritePolicyResult::Reject { message, .. } => { | 1656 | WritePolicyResult::Reject { message, status } => { |
| 1648 | tracing::debug!( | 1657 | if status { |
| 1649 | event_id = %event.id, | 1658 | tracing::debug!( |
| 1650 | relay = %relay_url, | 1659 | event_id = %event.id, |
| 1651 | reason = %message, | 1660 | kind = %event.kind.as_u16(), |
| 1652 | "Event rejected by write policy" | 1661 | reason = %message, |
| 1653 | ); | 1662 | "Event added to purgatory" |
| 1654 | ProcessResult::Rejected | 1663 | ); |
| 1664 | ProcessResult::Purgatory | ||
| 1665 | } else { | ||
| 1666 | tracing::debug!( | ||
| 1667 | event_id = %event.id, | ||
| 1668 | relay = %relay_url, | ||
| 1669 | reason = %message, | ||
| 1670 | "Event rejected by write policy" | ||
| 1671 | ); | ||
| 1672 | ProcessResult::Rejected | ||
| 1673 | } | ||
| 1655 | } | 1674 | } |
| 1656 | } | 1675 | } |
| 1657 | } | 1676 | } |
| @@ -2172,21 +2191,28 @@ impl SyncManager { | |||
| 2172 | let mut all_remote_ids = Vec::new(); | 2191 | let mut all_remote_ids = Vec::new(); |
| 2173 | let mut failed_count = 0; | 2192 | let mut failed_count = 0; |
| 2174 | 2193 | ||
| 2194 | // Get event IDs currently in purgatory to avoid fetching them | ||
| 2195 | let purgatory_ids = self.purgatory.event_ids(); | ||
| 2196 | |||
| 2175 | for (idx, result) in diff_results { | 2197 | for (idx, result) in diff_results { |
| 2176 | match result { | 2198 | match result { |
| 2177 | Ok(reconciliation) => { | 2199 | Ok(reconciliation) => { |
| 2178 | let remote_count = reconciliation.remote.len(); | 2200 | let remote_excluding_purgatory_ids: HashSet<EventId> = reconciliation |
| 2179 | let local_count = reconciliation.local.len(); | 2201 | .remote |
| 2202 | .difference(&purgatory_ids) | ||
| 2203 | .cloned() | ||
| 2204 | .collect(); | ||
| 2205 | let remote_count = remote_excluding_purgatory_ids.len(); | ||
| 2180 | tracing::debug!( | 2206 | tracing::debug!( |
| 2181 | relay = %relay_url, | 2207 | relay = %relay_url, |
| 2182 | filter_idx = idx, | 2208 | filter_idx = idx, |
| 2183 | remote_count = remote_count, | 2209 | remote_count = remote_count, |
| 2184 | local_count = local_count, | 2210 | local_count = reconciliation.local.len(), |
| 2185 | remote_ids = ?reconciliation.remote, | 2211 | remote_ids = ?remote_excluding_purgatory_ids, |
| 2186 | "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx | 2212 | "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx |
| 2187 | ); | 2213 | ); |
| 2188 | if remote_count > 0 { | 2214 | if remote_count > 0 { |
| 2189 | all_remote_ids.extend(reconciliation.remote.into_iter()); | 2215 | all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter()); |
| 2190 | } | 2216 | } |
| 2191 | } | 2217 | } |
| 2192 | Err(e) => { | 2218 | Err(e) => { |