upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-02 13:53:47 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-02 13:55:38 +0000
commit11b1655638b5a328662187a27f85a84df60fc759 (patch)
treec26adc3bb7b5fcb0b2fe51b1e7bf9823bd970c4e /src
parent6e70994a30a0d0e537099094df2a8369b559586d (diff)
sync: use purgatory
don't save new events destined for purgatory events directly to db or serve on websockets don't download events already in purgatory via negentropy sync
Diffstat (limited to 'src')
-rw-r--r--src/nostr/builder.rs5
-rw-r--r--src/purgatory/mod.rs29
-rw-r--r--src/sync/mod.rs52
3 files changed, 73 insertions, 13 deletions
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 3d7a0d8..da81e64 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -69,6 +69,11 @@ impl Nip34WritePolicy {
69 } 69 }
70 } 70 }
71 71
72 /// Get a reference to the purgatory for read-only access
73 pub fn purgatory(&self) -> &std::sync::Arc<crate::purgatory::Purgatory> {
74 &self.ctx.purgatory
75 }
76
72 /// Handle repository announcement event 77 /// Handle repository announcement event
73 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { 78 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult {
74 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 79 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 18a55d5..2987f15 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -19,6 +19,7 @@ pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
19 19
20use dashmap::DashMap; 20use dashmap::DashMap;
21use nostr_sdk::prelude::*; 21use nostr_sdk::prelude::*;
22use std::collections::HashSet;
22use std::sync::Arc; 23use std::sync::Arc;
23use std::time::{Duration, Instant}; 24use std::time::{Duration, Instant};
24 25
@@ -253,6 +254,34 @@ impl Purgatory {
253 self.pr_events.remove(event_id); 254 self.pr_events.remove(event_id);
254 } 255 }
255 256
257 /// Get all event IDs currently stored in purgatory.
258 ///
259 /// Returns a HashSet of all event IDs for both state events and PR events
260 /// held in purgatory. Useful for negentropy sync to avoid fetching events
261 /// that are already in purgatory awaiting git data.
262 ///
263 /// # Returns
264 /// HashSet of event IDs (as EventId) for all events in purgatory
265 pub fn event_ids(&self) -> HashSet<EventId> {
266 let mut ids = HashSet::new();
267
268 // Collect state event IDs
269 for entry in self.state_events.iter() {
270 for state_entry in entry.value().iter() {
271 ids.insert(state_entry.event.id);
272 }
273 }
274
275 // Collect PR event IDs (only actual events, not placeholders)
276 for entry in self.pr_events.iter() {
277 if let Some(ref event) = entry.value().event {
278 ids.insert(event.id);
279 }
280 }
281
282 ids
283 }
284
256 /// Get all PR placeholder event IDs (git-data-first entries without events). 285 /// Get all PR placeholder event IDs (git-data-first entries without events).
257 /// 286 ///
258 /// Returns event IDs for entries where git data arrived before the PR event. 287 /// Returns event IDs for entries where git data arrived before the PR event.
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 65affc6..dcdbe3a 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -159,6 +159,8 @@ pub enum ProcessResult {
159 Saved, 159 Saved,
160 /// Event already existed in database 160 /// Event already existed in database
161 Duplicate, 161 Duplicate,
162 /// Event added to Purgatory
163 Purgatory,
162 /// Event rejected by write policy 164 /// Event rejected by write policy
163 Rejected, 165 Rejected,
164} 166}
@@ -377,6 +379,8 @@ pub struct SyncManager {
377 database: SharedDatabase, 379 database: SharedDatabase,
378 /// Write policy for validating incoming events 380 /// Write policy for validating incoming events
379 write_policy: Nip34WritePolicy, 381 write_policy: Nip34WritePolicy,
382 /// Purgatory for read-only access to events awaiting git data
383 purgatory: Arc<crate::purgatory::Purgatory>,
380 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) 384 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers)
381 local_relay: LocalRelay, 385 local_relay: LocalRelay,
382 /// Configuration reference for sync settings 386 /// Configuration reference for sync settings
@@ -425,11 +429,15 @@ impl SyncManager {
425 config: &Config, 429 config: &Config,
426 sync_metrics: Option<SyncMetrics>, 430 sync_metrics: Option<SyncMetrics>,
427 ) -> Self { 431 ) -> Self {
432 // Extract purgatory from write_policy for read-only access
433 let purgatory = write_policy.purgatory().clone();
434
428 Self { 435 Self {
429 bootstrap_relay_url, 436 bootstrap_relay_url,
430 service_domain, 437 service_domain,
431 database, 438 database,
432 write_policy, 439 write_policy,
440 purgatory,
433 local_relay, 441 local_relay,
434 config: config.clone(), 442 config: config.clone(),
435 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 443 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
@@ -1621,6 +1629,7 @@ impl SyncManager {
1621 match result { 1629 match result {
1622 WritePolicyResult::Accept => { 1630 WritePolicyResult::Accept => {
1623 // Save event to database 1631 // Save event to database
1632
1624 if let Err(e) = database.save_event(event).await { 1633 if let Err(e) = database.save_event(event).await {
1625 tracing::error!( 1634 tracing::error!(
1626 event_id = %event.id, 1635 event_id = %event.id,
@@ -1644,14 +1653,24 @@ impl SyncManager {
1644 ); 1653 );
1645 ProcessResult::Saved 1654 ProcessResult::Saved
1646 } 1655 }
1647 WritePolicyResult::Reject { message, .. } => { 1656 WritePolicyResult::Reject { message, status } => {
1648 tracing::debug!( 1657 if status {
1649 event_id = %event.id, 1658 tracing::debug!(
1650 relay = %relay_url, 1659 event_id = %event.id,
1651 reason = %message, 1660 kind = %event.kind.as_u16(),
1652 "Event rejected by write policy" 1661 reason = %message,
1653 ); 1662 "Event added to purgatory"
1654 ProcessResult::Rejected 1663 );
1664 ProcessResult::Purgatory
1665 } else {
1666 tracing::debug!(
1667 event_id = %event.id,
1668 relay = %relay_url,
1669 reason = %message,
1670 "Event rejected by write policy"
1671 );
1672 ProcessResult::Rejected
1673 }
1655 } 1674 }
1656 } 1675 }
1657 } 1676 }
@@ -2172,21 +2191,28 @@ impl SyncManager {
2172 let mut all_remote_ids = Vec::new(); 2191 let mut all_remote_ids = Vec::new();
2173 let mut failed_count = 0; 2192 let mut failed_count = 0;
2174 2193
2194 // Get event IDs currently in purgatory to avoid fetching them
2195 let purgatory_ids = self.purgatory.event_ids();
2196
2175 for (idx, result) in diff_results { 2197 for (idx, result) in diff_results {
2176 match result { 2198 match result {
2177 Ok(reconciliation) => { 2199 Ok(reconciliation) => {
2178 let remote_count = reconciliation.remote.len(); 2200 let remote_excluding_purgatory_ids: HashSet<EventId> = reconciliation
2179 let local_count = reconciliation.local.len(); 2201 .remote
2202 .difference(&purgatory_ids)
2203 .cloned()
2204 .collect();
2205 let remote_count = remote_excluding_purgatory_ids.len();
2180 tracing::debug!( 2206 tracing::debug!(
2181 relay = %relay_url, 2207 relay = %relay_url,
2182 filter_idx = idx, 2208 filter_idx = idx,
2183 remote_count = remote_count, 2209 remote_count = remote_count,
2184 local_count = local_count, 2210 local_count = reconciliation.local.len(),
2185 remote_ids = ?reconciliation.remote, 2211 remote_ids = ?remote_excluding_purgatory_ids,
2186 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx 2212 "[DIAG TRACE] ✓ Negentropy diff results for filter {}", idx
2187 ); 2213 );
2188 if remote_count > 0 { 2214 if remote_count > 0 {
2189 all_remote_ids.extend(reconciliation.remote.into_iter()); 2215 all_remote_ids.extend(remote_excluding_purgatory_ids.into_iter());
2190 } 2216 }
2191 } 2217 }
2192 Err(e) => { 2218 Err(e) => {