upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/relay_connection.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 16:20:23 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 16:20:23 +0000
commit6d0447f31eb9f9282e60ac3c90c665a8b3781331 (patch)
tree52a15001bda47c1096f82eb0598c8320df0b637c /src/sync/relay_connection.rs
parent497df415749039236126140193af0ea612358cc7 (diff)
feat: implement NIP-77 negentropy sync for historical data
Replace EOSE-based sync completion with negentropy reconciliation for: - Initial connect (fresh sync) - Daily sync (Layer 1 announcements) - Stale reconnect (>15 min) Key changes: - Add NegentropySyncResult struct with remote_only, local_only, received fields - Add supports_negentropy() using try-and-fallback approach - Add negentropy_sync_filter() using nostr-sdk client.sync() API - Modify handle_connect_or_reconnect() to use negentropy for fresh/stale sync - Modify daily_sync() to use negentropy for Layer 1 - Single-warning logging per relay when negentropy fails Quick reconnects (<15 min) unchanged - still use REQ with since filter. If negentropy unsupported, gracefully falls back to REQ+EOSE flow.
Diffstat (limited to 'src/sync/relay_connection.rs')
-rw-r--r--src/sync/relay_connection.rs161
1 files changed, 159 insertions, 2 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 32071e5..fae179b 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -4,12 +4,22 @@
4//! Each RelayConnection manages a single connection to an external relay and handles 4//! Each RelayConnection manages a single connection to an external relay and handles
5//! subscriptions using the three-layer sync strategy. 5//! subscriptions using the three-layer sync strategy.
6//! 6//!
7//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. 7//! ## NIP-77 Negentropy Support
8//!
9//! RelayConnection supports NIP-77 negentropy for efficient set reconciliation:
10//! - `supports_negentropy()` - Check if remote relay supports NIP-77
11//! - `negentropy_sync_filter()` - Perform negentropy sync for a filter
12//!
13//! When NIP-77 is supported, historical sync uses negentropy instead of REQ+EOSE,
14//! significantly reducing bandwidth for relays with overlapping event sets.
15//!
16//! See `docs/explanation/grasp-02-proactive-sync.md` for full design details.
8 17
9use nostr_sdk::prelude::*; 18use nostr_sdk::prelude::*;
10use tokio::sync::mpsc; 19use tokio::sync::mpsc;
11 20
12use super::filters::build_announcement_filter; 21use super::filters::build_announcement_filter;
22use crate::nostr::builder::SharedDatabase;
13 23
14/// Events from a relay connection 24/// Events from a relay connection
15#[derive(Debug)] 25#[derive(Debug)]
@@ -24,6 +34,17 @@ pub enum RelayEvent {
24 Shutdown, 34 Shutdown,
25} 35}
26 36
37/// Result of a negentropy sync operation
38#[derive(Debug)]
39pub struct NegentropySyncResult {
40 /// Event IDs that exist on remote but not locally (discovered but not fetched)
41 pub remote_only: Vec<EventId>,
42 /// Event IDs that exist locally but not on remote (could push)
43 pub local_only: Vec<EventId>,
44 /// Event IDs that were fetched during sync
45 pub received: Vec<EventId>,
46}
47
27/// Manages connection to a single external relay 48/// Manages connection to a single external relay
28/// 49///
29/// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection 50/// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection
@@ -32,6 +53,7 @@ pub enum RelayEvent {
32/// - Layer 1 subscription (announcements) 53/// - Layer 1 subscription (announcements)
33/// - Additional filter subscriptions (Layers 2 & 3) 54/// - Additional filter subscriptions (Layers 2 & 3)
34/// - Event notification loop 55/// - Event notification loop
56/// - NIP-77 negentropy synchronization
35/// 57///
36/// # Why Client instead of Relay directly? 58/// # Why Client instead of Relay directly?
37/// 59///
@@ -49,6 +71,10 @@ pub struct RelayConnection {
49 url: String, 71 url: String,
50 /// The underlying nostr-sdk client 72 /// The underlying nostr-sdk client
51 client: Client, 73 client: Client,
74 /// Local database for negentropy comparison (used for NIP-77 sync)
75 database: Option<SharedDatabase>,
76 /// Whether we've logged NIP-77 not supported for this relay (log once)
77 nip77_warning_logged: std::sync::Arc<std::sync::atomic::AtomicBool>,
52} 78}
53 79
54impl RelayConnection { 80impl RelayConnection {
@@ -58,7 +84,27 @@ impl RelayConnection {
58 /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com") 84 /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com")
59 pub fn new(url: String) -> Self { 85 pub fn new(url: String) -> Self {
60 let client = Client::default(); 86 let client = Client::default();
61 Self { url, client } 87 Self {
88 url,
89 client,
90 database: None,
91 nip77_warning_logged: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
92 }
93 }
94
95 /// Create a new relay connection with database for negentropy sync
96 ///
97 /// # Arguments
98 /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com")
99 /// * `database` - Shared database for local event comparison during negentropy sync
100 pub fn new_with_database(url: String, database: SharedDatabase) -> Self {
101 let client = Client::default();
102 Self {
103 url,
104 client,
105 database: Some(database),
106 nip77_warning_logged: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
107 }
62 } 108 }
63 109
64 /// Connect to the relay and subscribe to Layer 1 (announcements) 110 /// Connect to the relay and subscribe to Layer 1 (announcements)
@@ -333,4 +379,115 @@ impl RelayConnection {
333 self.client.unsubscribe_all().await; 379 self.client.unsubscribe_all().await;
334 tracing::debug!(relay = %self.url, "Unsubscribed from all subscriptions"); 380 tracing::debug!(relay = %self.url, "Unsubscribed from all subscriptions");
335 } 381 }
382
383 // =========================================================================
384 // NIP-77 Negentropy Support
385 // =========================================================================
386
387 /// Check if negentropy sync should be attempted
388 ///
389 /// Rather than relying on NIP-11 document detection (which can be unreliable),
390 /// this returns true to indicate we should try negentropy sync. The actual
391 /// sync will handle failures gracefully with fallback to REQ+EOSE.
392 ///
393 /// # Note
394 /// This uses a "try and fallback" approach because:
395 /// - Some relays support NIP-77 but don't advertise it in NIP-11
396 /// - Some relays claim NIP-77 support but have bugs
397 /// - The nostr-sdk 0.44 API for relay document access varies
398 pub async fn supports_negentropy(&self) -> bool {
399 // Always return true to attempt negentropy - we handle failure gracefully
400 // in negentropy_sync_filter() which logs a warning and returns an error
401 // that the caller can use to fall back to REQ+EOSE
402 true
403 }
404
405 /// Perform negentropy synchronization for a filter
406 ///
407 /// Uses NIP-77 negentropy protocol to efficiently reconcile events matching
408 /// the filter between local database and remote relay. This is much more
409 /// efficient than REQ+EOSE for relays with overlapping event sets.
410 ///
411 /// # Arguments
412 /// * `filter` - The filter defining which events to sync
413 ///
414 /// # Returns
415 /// * `Ok(NegentropySyncResult)` - Sync completed successfully with reconciliation info
416 /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error)
417 ///
418 /// # Fallback Behavior
419 /// If this method fails, the caller should fall back to traditional REQ+EOSE sync.
420 /// Failure reasons include:
421 /// - Relay doesn't actually support NIP-77 (despite claiming to)
422 /// - Network errors during reconciliation
423 /// - Timeout during sync
424 pub async fn negentropy_sync_filter(&self, filter: Filter) -> Result<NegentropySyncResult, String> {
425 // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange
426 let sync_opts = SyncOptions::default();
427
428 match self.client.sync(filter.clone(), &sync_opts).await {
429 Ok(output) => {
430 let reconciliation = output.val;
431
432 tracing::debug!(
433 relay = %self.url,
434 local_count = reconciliation.local.len(),
435 remote_count = reconciliation.remote.len(),
436 sent_count = reconciliation.sent.len(),
437 received_count = reconciliation.received.len(),
438 "Negentropy sync completed"
439 );
440
441 // Check for any failures
442 if !output.failed.is_empty() {
443 tracing::warn!(
444 relay = %self.url,
445 failures = ?output.failed,
446 "Some relays failed during negentropy sync"
447 );
448 }
449
450 Ok(NegentropySyncResult {
451 remote_only: reconciliation.remote.into_iter().collect(),
452 local_only: reconciliation.local.into_iter().collect(),
453 received: reconciliation.received.into_iter().collect(),
454 })
455 }
456 Err(e) => {
457 // Log warning only once per relay to avoid spam
458 if !self
459 .nip77_warning_logged
460 .swap(true, std::sync::atomic::Ordering::Relaxed)
461 {
462 tracing::warn!(
463 relay = %self.url,
464 error = %e,
465 "Negentropy sync failed, will fall back to REQ+EOSE"
466 );
467 }
468 Err(format!("Negentropy sync failed: {}", e))
469 }
470 }
471 }
472
473 /// Perform negentropy sync and return received event IDs
474 ///
475 /// Convenience method that performs negentropy sync and returns the event IDs
476 /// that were received (i.e., events that exist on remote but not locally).
477 ///
478 /// # Arguments
479 /// * `filter` - The filter defining which events to sync
480 ///
481 /// # Returns
482 /// * `Ok(Vec<EventId>)` - Event IDs received from remote relay
483 /// * `Err(String)` - Sync failed
484 pub async fn negentropy_sync_and_fetch(&self, filter: Filter) -> Result<Vec<EventId>, String> {
485 let result = self.negentropy_sync_filter(filter).await?;
486 Ok(result.received)
487 }
488
489 /// Check if this connection has a database configured for negentropy
490 pub fn has_database(&self) -> bool {
491 self.database.is_some()
492 }
336} 493}