diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 16:20:23 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 16:20:23 +0000 |
| commit | 6d0447f31eb9f9282e60ac3c90c665a8b3781331 (patch) | |
| tree | 52a15001bda47c1096f82eb0598c8320df0b637c /src/sync/relay_connection.rs | |
| parent | 497df415749039236126140193af0ea612358cc7 (diff) | |
feat: implement NIP-77 negentropy sync for historical data
Replace EOSE-based sync completion with negentropy reconciliation for:
- Initial connect (fresh sync)
- Daily sync (Layer 1 announcements)
- Stale reconnect (>15 min)
Key changes:
- Add NegentropySyncResult struct with remote_only, local_only, received fields
- Add supports_negentropy() using try-and-fallback approach
- Add negentropy_sync_filter() using nostr-sdk client.sync() API
- Modify handle_connect_or_reconnect() to use negentropy for fresh/stale sync
- Modify daily_sync() to use negentropy for Layer 1
- Single-warning logging per relay when negentropy fails
Quick reconnects (<15 min) unchanged - still use REQ with since filter.
If negentropy unsupported, gracefully falls back to REQ+EOSE flow.
Diffstat (limited to 'src/sync/relay_connection.rs')
| -rw-r--r-- | src/sync/relay_connection.rs | 161 |
1 files changed, 159 insertions, 2 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 32071e5..fae179b 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -4,12 +4,22 @@ | |||
| 4 | //! Each RelayConnection manages a single connection to an external relay and handles | 4 | //! Each RelayConnection manages a single connection to an external relay and handles |
| 5 | //! subscriptions using the three-layer sync strategy. | 5 | //! subscriptions using the three-layer sync strategy. |
| 6 | //! | 6 | //! |
| 7 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | 7 | //! ## NIP-77 Negentropy Support |
| 8 | //! | ||
| 9 | //! RelayConnection supports NIP-77 negentropy for efficient set reconciliation: | ||
| 10 | //! - `supports_negentropy()` - Check if remote relay supports NIP-77 | ||
| 11 | //! - `negentropy_sync_filter()` - Perform negentropy sync for a filter | ||
| 12 | //! | ||
| 13 | //! When NIP-77 is supported, historical sync uses negentropy instead of REQ+EOSE, | ||
| 14 | //! significantly reducing bandwidth for relays with overlapping event sets. | ||
| 15 | //! | ||
| 16 | //! See `docs/explanation/grasp-02-proactive-sync.md` for full design details. | ||
| 8 | 17 | ||
| 9 | use nostr_sdk::prelude::*; | 18 | use nostr_sdk::prelude::*; |
| 10 | use tokio::sync::mpsc; | 19 | use tokio::sync::mpsc; |
| 11 | 20 | ||
| 12 | use super::filters::build_announcement_filter; | 21 | use super::filters::build_announcement_filter; |
| 22 | use crate::nostr::builder::SharedDatabase; | ||
| 13 | 23 | ||
| 14 | /// Events from a relay connection | 24 | /// Events from a relay connection |
| 15 | #[derive(Debug)] | 25 | #[derive(Debug)] |
| @@ -24,6 +34,17 @@ pub enum RelayEvent { | |||
| 24 | Shutdown, | 34 | Shutdown, |
| 25 | } | 35 | } |
| 26 | 36 | ||
| 37 | /// Result of a negentropy sync operation | ||
| 38 | #[derive(Debug)] | ||
| 39 | pub struct NegentropySyncResult { | ||
| 40 | /// Event IDs that exist on remote but not locally (discovered but not fetched) | ||
| 41 | pub remote_only: Vec<EventId>, | ||
| 42 | /// Event IDs that exist locally but not on remote (could push) | ||
| 43 | pub local_only: Vec<EventId>, | ||
| 44 | /// Event IDs that were fetched during sync | ||
| 45 | pub received: Vec<EventId>, | ||
| 46 | } | ||
| 47 | |||
| 27 | /// Manages connection to a single external relay | 48 | /// Manages connection to a single external relay |
| 28 | /// | 49 | /// |
| 29 | /// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection | 50 | /// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection |
| @@ -32,6 +53,7 @@ pub enum RelayEvent { | |||
| 32 | /// - Layer 1 subscription (announcements) | 53 | /// - Layer 1 subscription (announcements) |
| 33 | /// - Additional filter subscriptions (Layers 2 & 3) | 54 | /// - Additional filter subscriptions (Layers 2 & 3) |
| 34 | /// - Event notification loop | 55 | /// - Event notification loop |
| 56 | /// - NIP-77 negentropy synchronization | ||
| 35 | /// | 57 | /// |
| 36 | /// # Why Client instead of Relay directly? | 58 | /// # Why Client instead of Relay directly? |
| 37 | /// | 59 | /// |
| @@ -49,6 +71,10 @@ pub struct RelayConnection { | |||
| 49 | url: String, | 71 | url: String, |
| 50 | /// The underlying nostr-sdk client | 72 | /// The underlying nostr-sdk client |
| 51 | client: Client, | 73 | client: Client, |
| 74 | /// Local database for negentropy comparison (used for NIP-77 sync) | ||
| 75 | database: Option<SharedDatabase>, | ||
| 76 | /// Whether we've logged NIP-77 not supported for this relay (log once) | ||
| 77 | nip77_warning_logged: std::sync::Arc<std::sync::atomic::AtomicBool>, | ||
| 52 | } | 78 | } |
| 53 | 79 | ||
| 54 | impl RelayConnection { | 80 | impl RelayConnection { |
| @@ -58,7 +84,27 @@ impl RelayConnection { | |||
| 58 | /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com") | 84 | /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com") |
| 59 | pub fn new(url: String) -> Self { | 85 | pub fn new(url: String) -> Self { |
| 60 | let client = Client::default(); | 86 | let client = Client::default(); |
| 61 | Self { url, client } | 87 | Self { |
| 88 | url, | ||
| 89 | client, | ||
| 90 | database: None, | ||
| 91 | nip77_warning_logged: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), | ||
| 92 | } | ||
| 93 | } | ||
| 94 | |||
| 95 | /// Create a new relay connection with database for negentropy sync | ||
| 96 | /// | ||
| 97 | /// # Arguments | ||
| 98 | /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com") | ||
| 99 | /// * `database` - Shared database for local event comparison during negentropy sync | ||
| 100 | pub fn new_with_database(url: String, database: SharedDatabase) -> Self { | ||
| 101 | let client = Client::default(); | ||
| 102 | Self { | ||
| 103 | url, | ||
| 104 | client, | ||
| 105 | database: Some(database), | ||
| 106 | nip77_warning_logged: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), | ||
| 107 | } | ||
| 62 | } | 108 | } |
| 63 | 109 | ||
| 64 | /// Connect to the relay and subscribe to Layer 1 (announcements) | 110 | /// Connect to the relay and subscribe to Layer 1 (announcements) |
| @@ -333,4 +379,115 @@ impl RelayConnection { | |||
| 333 | self.client.unsubscribe_all().await; | 379 | self.client.unsubscribe_all().await; |
| 334 | tracing::debug!(relay = %self.url, "Unsubscribed from all subscriptions"); | 380 | tracing::debug!(relay = %self.url, "Unsubscribed from all subscriptions"); |
| 335 | } | 381 | } |
| 382 | |||
| 383 | // ========================================================================= | ||
| 384 | // NIP-77 Negentropy Support | ||
| 385 | // ========================================================================= | ||
| 386 | |||
| 387 | /// Check if negentropy sync should be attempted | ||
| 388 | /// | ||
| 389 | /// Rather than relying on NIP-11 document detection (which can be unreliable), | ||
| 390 | /// this returns true to indicate we should try negentropy sync. The actual | ||
| 391 | /// sync will handle failures gracefully with fallback to REQ+EOSE. | ||
| 392 | /// | ||
| 393 | /// # Note | ||
| 394 | /// This uses a "try and fallback" approach because: | ||
| 395 | /// - Some relays support NIP-77 but don't advertise it in NIP-11 | ||
| 396 | /// - Some relays claim NIP-77 support but have bugs | ||
| 397 | /// - The nostr-sdk 0.44 API for relay document access varies | ||
| 398 | pub async fn supports_negentropy(&self) -> bool { | ||
| 399 | // Always return true to attempt negentropy - we handle failure gracefully | ||
| 400 | // in negentropy_sync_filter() which logs a warning and returns an error | ||
| 401 | // that the caller can use to fall back to REQ+EOSE | ||
| 402 | true | ||
| 403 | } | ||
| 404 | |||
| 405 | /// Perform negentropy synchronization for a filter | ||
| 406 | /// | ||
| 407 | /// Uses NIP-77 negentropy protocol to efficiently reconcile events matching | ||
| 408 | /// the filter between local database and remote relay. This is much more | ||
| 409 | /// efficient than REQ+EOSE for relays with overlapping event sets. | ||
| 410 | /// | ||
| 411 | /// # Arguments | ||
| 412 | /// * `filter` - The filter defining which events to sync | ||
| 413 | /// | ||
| 414 | /// # Returns | ||
| 415 | /// * `Ok(NegentropySyncResult)` - Sync completed successfully with reconciliation info | ||
| 416 | /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error) | ||
| 417 | /// | ||
| 418 | /// # Fallback Behavior | ||
| 419 | /// If this method fails, the caller should fall back to traditional REQ+EOSE sync. | ||
| 420 | /// Failure reasons include: | ||
| 421 | /// - Relay doesn't actually support NIP-77 (despite claiming to) | ||
| 422 | /// - Network errors during reconciliation | ||
| 423 | /// - Timeout during sync | ||
| 424 | pub async fn negentropy_sync_filter(&self, filter: Filter) -> Result<NegentropySyncResult, String> { | ||
| 425 | // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange | ||
| 426 | let sync_opts = SyncOptions::default(); | ||
| 427 | |||
| 428 | match self.client.sync(filter.clone(), &sync_opts).await { | ||
| 429 | Ok(output) => { | ||
| 430 | let reconciliation = output.val; | ||
| 431 | |||
| 432 | tracing::debug!( | ||
| 433 | relay = %self.url, | ||
| 434 | local_count = reconciliation.local.len(), | ||
| 435 | remote_count = reconciliation.remote.len(), | ||
| 436 | sent_count = reconciliation.sent.len(), | ||
| 437 | received_count = reconciliation.received.len(), | ||
| 438 | "Negentropy sync completed" | ||
| 439 | ); | ||
| 440 | |||
| 441 | // Check for any failures | ||
| 442 | if !output.failed.is_empty() { | ||
| 443 | tracing::warn!( | ||
| 444 | relay = %self.url, | ||
| 445 | failures = ?output.failed, | ||
| 446 | "Some relays failed during negentropy sync" | ||
| 447 | ); | ||
| 448 | } | ||
| 449 | |||
| 450 | Ok(NegentropySyncResult { | ||
| 451 | remote_only: reconciliation.remote.into_iter().collect(), | ||
| 452 | local_only: reconciliation.local.into_iter().collect(), | ||
| 453 | received: reconciliation.received.into_iter().collect(), | ||
| 454 | }) | ||
| 455 | } | ||
| 456 | Err(e) => { | ||
| 457 | // Log warning only once per relay to avoid spam | ||
| 458 | if !self | ||
| 459 | .nip77_warning_logged | ||
| 460 | .swap(true, std::sync::atomic::Ordering::Relaxed) | ||
| 461 | { | ||
| 462 | tracing::warn!( | ||
| 463 | relay = %self.url, | ||
| 464 | error = %e, | ||
| 465 | "Negentropy sync failed, will fall back to REQ+EOSE" | ||
| 466 | ); | ||
| 467 | } | ||
| 468 | Err(format!("Negentropy sync failed: {}", e)) | ||
| 469 | } | ||
| 470 | } | ||
| 471 | } | ||
| 472 | |||
| 473 | /// Perform negentropy sync and return received event IDs | ||
| 474 | /// | ||
| 475 | /// Convenience method that performs negentropy sync and returns the event IDs | ||
| 476 | /// that were received (i.e., events that exist on remote but not locally). | ||
| 477 | /// | ||
| 478 | /// # Arguments | ||
| 479 | /// * `filter` - The filter defining which events to sync | ||
| 480 | /// | ||
| 481 | /// # Returns | ||
| 482 | /// * `Ok(Vec<EventId>)` - Event IDs received from remote relay | ||
| 483 | /// * `Err(String)` - Sync failed | ||
| 484 | pub async fn negentropy_sync_and_fetch(&self, filter: Filter) -> Result<Vec<EventId>, String> { | ||
| 485 | let result = self.negentropy_sync_filter(filter).await?; | ||
| 486 | Ok(result.received) | ||
| 487 | } | ||
| 488 | |||
| 489 | /// Check if this connection has a database configured for negentropy | ||
| 490 | pub fn has_database(&self) -> bool { | ||
| 491 | self.database.is_some() | ||
| 492 | } | ||
| 336 | } | 493 | } |