upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-02 13:53:47 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-02 13:55:38 +0000
commit11b1655638b5a328662187a27f85a84df60fc759 (patch)
treec26adc3bb7b5fcb0b2fe51b1e7bf9823bd970c4e /src/sync/mod.rs
parent6e70994a30a0d0e537099094df2a8369b559586d (diff)
sync: use purgatory
don't save new events destined for purgatory events directly to db or serve on websockets don't download events already in purgatory via negentropy sync
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs52
1 files changed, 39 insertions, 13 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 65affc6..dcdbe3a 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -159,6 +159,8 @@ pub enum ProcessResult {
159 Saved, 159 Saved,
160 /// Event already existed in database 160 /// Event already existed in database
161 Duplicate, 161 Duplicate,
162 /// Event added to Purgatory
163 Purgatory,
162 /// Event rejected by write policy 164 /// Event rejected by write policy
163 Rejected, 165 Rejected,
164} 166}
@@ -377,6 +379,8 @@ pub struct SyncManager {
377 database: SharedDatabase, 379 database: SharedDatabase,
378 /// Write policy for validating incoming events 380 /// Write policy for validating incoming events
379 write_policy: Nip34WritePolicy, 381 write_policy: Nip34WritePolicy,
382 /// Purgatory for read-only access to events awaiting git data
383 purgatory: Arc<crate::purgatory::Purgatory>,
380 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) 384 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers)
381 local_relay: LocalRelay, 385 local_relay: LocalRelay,
382 /// Configuration reference for sync settings 386 /// Configuration reference for sync settings
@@ -425,11 +429,15 @@ impl SyncManager {
425 config: &Config, 429 config: &Config,
426 sync_metrics: Option<SyncMetrics>, 430 sync_metrics: Option<SyncMetrics>,
427 ) -> Self { 431 ) -> Self {
432 // Extract purgatory from write_policy for read-only access
433 let purgatory = write_policy.purgatory().clone();
434
428 Self { 435 Self {
429 bootstrap_relay_url, 436 bootstrap_relay_url,
430 service_domain, 437 service_domain,
431 database, 438 database,
432 write_policy, 439 write_policy,
440 purgatory,
433 local_relay, 441 local_relay,
434 config: config.clone(), 442 config: config.clone(),
435 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 443 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
@@ -1621,6 +1629,7 @@ impl SyncManager {
1621 match result { 1629 match result {
1622 WritePolicyResult::Accept => { 1630 WritePolicyResult::Accept => {
1623 // Save event to database 1631 // Save event to database
1632
1624 if let Err(e) = database.save_event(event).await { 1633 if let Err(e) = database.save_event(event).await {
1625 tracing::error!( 1634 tracing::error!(
1626 event_id = %event.id, 1635 event_id = %event.id,
@@ -1644,14 +1653,24 @@ impl SyncManager {
1644 ); 1653 );
1645 ProcessResult::Saved 1654 ProcessResult::Saved
1646 } 1655 }
1647 WritePolicyResult::Reject { message, .. } => { 1656 WritePolicyResult::Reject { message, status } => {
1648 tracing::debug!( 1657 if status {
1649 event_id = %event.id, 1658 tracing::debug!(
1650 relay = %relay_url, 1659 event_id = %event.id,
1651 reason = %message, 1660 kind = %event.kind.as_u16(),
1652 "Event rejected by write policy" 1661 reason = %message,
1653 ); 1662 "Event added to purgatory"
1654 ProcessResult::Rejected 1663 );
1664 ProcessResult::Purgatory
1665 } else {
1666 tracing::debug!(
1667 event_id = %event.id,
1668 relay = %relay_url,
1669 reason = %message,
1670 "Event rejected by write policy"
1671 );
1672 ProcessResult::Rejected
1673 }
1655 } 1674 }
1656 } 1675 }
1657 } 1676 }
@@ -2172,21 +2191,28 @@ impl SyncManager {
2172 let mut all_remote_ids = Vec::new(); 2191 let mut all_remote_ids = Vec::new();
2173 let mut failed_count = 0; 2192 let mut failed_count = 0;
2174 2193
2194 // Get event IDs currently in purgatory to avoid fetching them
2195 let purgatory_ids = self.purgatory.event_ids();
2196
2175 for (idx, result) in diff_results { 2197 for (idx, result) in diff_results {
2176 match result { 2198 match result {
2177 Ok(reconciliation) => { 2199 Ok(reconciliation) => {
2178 let remote_count = reconciliation.remote.len(); 2200 let remote_excluding_purgatory_ids: HashSet<EventId> = reconciliation
2179 let local_count = reconciliation.local.len(); 2201 .remote
2202 .difference(&purgatory_ids)
2203 .cloned()
2204 .collect();
2205 let remote_count = remote_excluding_purgatory_ids.len();
2180 tracing::debug!( 2206 tracing::debug!(
2181 relay = %relay_url, 2207 relay = %relay_url,
2182 filter_idx = idx, 2208 filter_idx = idx,
2183 remote_count = remote_count, 2209 remote_count = remote_count,
2184 local_count = local_count, 2210 local_count = reconciliation.local.len(),
2185 remote_ids = ?reconciliation.remote, 2211 remote_ids = ?remote_excluding_purgatory_ids,
2186 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx 2212 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx
2187 ); 2213 );
2188 if remote_count > 0 { 2214 if remote_count > 0 {
2189 all_remote_ids.extend(reconciliation.remote.into_iter()); 2215 all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter());
2190 } 2216 }
2191 } 2217 }
2192 Err(e) => { 2218 Err(e) => {