diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 21:24:21 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 21:24:21 +0000 |
| commit | 466a009d8248aab274a9da419e4c0d83a4b9f466 (patch) | |
| tree | 19d69825c5013e8ba39721b5ccaf9a84665ce570 /src/sync | |
| parent | b6c908599c1e63852b8d3b4b20caae344e37a34a (diff) | |
feat(sync): broadcast synced events to WebSocket subscribers
Enable recursive relay discovery by broadcasting synced events to
WebSocket subscribers via LocalRelay.notify_event(). This allows the
SelfSubscriber to receive 30617 announcements synced from external
relays and discover additional relay URLs to connect to.
Changes:
- Pass LocalRelay to SyncManager::new() from main.rs
- Add local_relay field to SyncManager struct
- Call notify_event() after saving synced events to database
- Enable test_recursive_relay_discovery_syncs_announcement test
The test verifies that when relay_a syncs announcement_x from bootstrap
relay_b (which lists relay_c), relay_a discovers and connects to
relay_c to sync announcement_y.
Fixes recursive relay discovery from bootstrap sync.
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 37 |
1 files changed, 29 insertions, 8 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index b38c4a9..1e60e4a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -40,6 +40,7 @@ use tokio::sync::{broadcast, Mutex, RwLock}; | |||
| 40 | 40 | ||
| 41 | use crate::config::Config; | 41 | use crate::config::Config; |
| 42 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 42 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 43 | use nostr_relay_builder::prelude::LocalRelay; | ||
| 43 | 44 | ||
| 44 | // ============================================================================= | 45 | // ============================================================================= |
| 45 | // Type Aliases for Index Structures | 46 | // Type Aliases for Index Structures |
| @@ -467,6 +468,8 @@ pub struct SyncManager { | |||
| 467 | database: SharedDatabase, | 468 | database: SharedDatabase, |
| 468 | /// Write policy for validating incoming events | 469 | /// Write policy for validating incoming events |
| 469 | write_policy: Nip34WritePolicy, | 470 | write_policy: Nip34WritePolicy, |
| 471 | /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) | ||
| 472 | local_relay: LocalRelay, | ||
| 470 | /// Configuration reference for sync settings | 473 | /// Configuration reference for sync settings |
| 471 | config: Config, | 474 | config: Config, |
| 472 | /// What we want to sync (source of truth) | 475 | /// What we want to sync (source of truth) |
| @@ -499,12 +502,14 @@ impl SyncManager { | |||
| 499 | /// * `service_domain` - The domain this relay serves (for filtering repos) | 502 | /// * `service_domain` - The domain this relay serves (for filtering repos) |
| 500 | /// * `database` - Shared database for event storage | 503 | /// * `database` - Shared database for event storage |
| 501 | /// * `write_policy` - Policy for validating events before storage | 504 | /// * `write_policy` - Policy for validating events before storage |
| 505 | /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast) | ||
| 502 | /// * `config` - Configuration for sync settings | 506 | /// * `config` - Configuration for sync settings |
| 503 | pub fn new( | 507 | pub fn new( |
| 504 | bootstrap_relay_url: Option<String>, | 508 | bootstrap_relay_url: Option<String>, |
| 505 | service_domain: String, | 509 | service_domain: String, |
| 506 | database: SharedDatabase, | 510 | database: SharedDatabase, |
| 507 | write_policy: Nip34WritePolicy, | 511 | write_policy: Nip34WritePolicy, |
| 512 | local_relay: LocalRelay, | ||
| 508 | config: &Config, | 513 | config: &Config, |
| 509 | ) -> Self { | 514 | ) -> Self { |
| 510 | Self { | 515 | Self { |
| @@ -512,6 +517,7 @@ impl SyncManager { | |||
| 512 | service_domain, | 517 | service_domain, |
| 513 | database, | 518 | database, |
| 514 | write_policy, | 519 | write_policy, |
| 520 | local_relay, | ||
| 515 | config: config.clone(), | 521 | config: config.clone(), |
| 516 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 522 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 517 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 523 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| @@ -1269,6 +1275,7 @@ impl SyncManager { | |||
| 1269 | 1275 | ||
| 1270 | let database = Arc::clone(&self.database); | 1276 | let database = Arc::clone(&self.database); |
| 1271 | let write_policy = self.write_policy.clone(); | 1277 | let write_policy = self.write_policy.clone(); |
| 1278 | let local_relay = self.local_relay.clone(); | ||
| 1272 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | 1279 | let relay_sync_index = Arc::clone(&self.relay_sync_index); |
| 1273 | 1280 | ||
| 1274 | // Check if this is a bootstrap relay | 1281 | // Check if this is a bootstrap relay |
| @@ -1340,6 +1347,7 @@ impl SyncManager { | |||
| 1340 | &relay_url_clone, | 1347 | &relay_url_clone, |
| 1341 | &database, | 1348 | &database, |
| 1342 | &write_policy, | 1349 | &write_policy, |
| 1350 | &local_relay, | ||
| 1343 | ) | 1351 | ) |
| 1344 | .await; | 1352 | .await; |
| 1345 | } | 1353 | } |
| @@ -1393,11 +1401,18 @@ impl SyncManager { | |||
| 1393 | } | 1401 | } |
| 1394 | 1402 | ||
| 1395 | /// Process a single event from a relay (static version for spawned tasks) | 1403 | /// Process a single event from a relay (static version for spawned tasks) |
| 1404 | /// | ||
| 1405 | /// Processes events with dedup, policy check, database save, and broadcast: | ||
| 1406 | /// - Deduplication (skips if event already exists) | ||
| 1407 | /// - Write policy validation | ||
| 1408 | /// - Database save | ||
| 1409 | /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery) | ||
| 1396 | async fn process_event_static( | 1410 | async fn process_event_static( |
| 1397 | event: &Event, | 1411 | event: &Event, |
| 1398 | relay_url: &str, | 1412 | relay_url: &str, |
| 1399 | database: &SharedDatabase, | 1413 | database: &SharedDatabase, |
| 1400 | write_policy: &Nip34WritePolicy, | 1414 | write_policy: &Nip34WritePolicy, |
| 1415 | local_relay: &LocalRelay, | ||
| 1401 | ) { | 1416 | ) { |
| 1402 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; | 1417 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; |
| 1403 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 1418 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |
| @@ -1421,7 +1436,7 @@ impl SyncManager { | |||
| 1421 | 1436 | ||
| 1422 | match result { | 1437 | match result { |
| 1423 | PolicyResult::Accept => { | 1438 | PolicyResult::Accept => { |
| 1424 | // Save event | 1439 | // Save event to database |
| 1425 | if let Err(e) = database.save_event(event).await { | 1440 | if let Err(e) = database.save_event(event).await { |
| 1426 | tracing::error!( | 1441 | tracing::error!( |
| 1427 | event_id = %event.id, | 1442 | event_id = %event.id, |
| @@ -1429,14 +1444,20 @@ impl SyncManager { | |||
| 1429 | error = %e, | 1444 | error = %e, |
| 1430 | "Failed to save synced event" | 1445 | "Failed to save synced event" |
| 1431 | ); | 1446 | ); |
| 1432 | } else { | 1447 | return; |
| 1433 | tracing::debug!( | ||
| 1434 | event_id = %event.id, | ||
| 1435 | relay = %relay_url, | ||
| 1436 | kind = %event.kind.as_u16(), | ||
| 1437 | "Saved synced event" | ||
| 1438 | ); | ||
| 1439 | } | 1448 | } |
| 1449 | |||
| 1450 | // Broadcast to WebSocket subscribers (enables recursive relay discovery) | ||
| 1451 | // This allows SelfSubscriber to receive synced 30617 announcements | ||
| 1452 | let broadcast_success = local_relay.notify_event(event.clone()); | ||
| 1453 | |||
| 1454 | tracing::debug!( | ||
| 1455 | event_id = %event.id, | ||
| 1456 | relay = %relay_url, | ||
| 1457 | kind = %event.kind.as_u16(), | ||
| 1458 | broadcast = broadcast_success, | ||
| 1459 | "Synced event saved and broadcast" | ||
| 1460 | ); | ||
| 1440 | } | 1461 | } |
| 1441 | PolicyResult::Reject(reason) => { | 1462 | PolicyResult::Reject(reason) => { |
| 1442 | tracing::debug!( | 1463 | tracing::debug!( |