upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 21:24:21 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 21:24:21 +0000
commit466a009d8248aab274a9da419e4c0d83a4b9f466 (patch)
tree19d69825c5013e8ba39721b5ccaf9a84665ce570
parentb6c908599c1e63852b8d3b4b20caae344e37a34a (diff)
feat(sync): broadcast synced events to WebSocket subscribers
Enable recursive relay discovery by broadcasting synced events to WebSocket subscribers via LocalRelay.notify_event(). This allows the SelfSubscriber to receive 30617 announcements synced from external relays and discover additional relay URLs to connect to. Changes: - Pass LocalRelay to SyncManager::new() from main.rs - Add local_relay field to SyncManager struct - Call notify_event() after saving synced events to database - Enable test_recursive_relay_discovery_syncs_announcement test The test verifies that when relay_a syncs announcement_x from bootstrap relay_b (which lists relay_c), relay_a discovers and connects to relay_c to sync announcement_y. Fixes recursive relay discovery from bootstrap sync.
-rw-r--r--src/main.rs1
-rw-r--r--src/sync/mod.rs37
-rw-r--r--tests/sync/discovery.rs5
3 files changed, 30 insertions, 13 deletions
diff --git a/src/main.rs b/src/main.rs
index f887e42..31c7b63 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -58,6 +58,7 @@ async fn main() -> Result<()> {
58 config.domain.clone(), 58 config.domain.clone(),
59 relay_with_db.database.clone(), 59 relay_with_db.database.clone(),
60 relay_with_db.write_policy.clone(), 60 relay_with_db.write_policy.clone(),
61 relay_with_db.relay.clone(),
61 &config, 62 &config,
62 ); 63 );
63 64
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index b38c4a9..1e60e4a 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -40,6 +40,7 @@ use tokio::sync::{broadcast, Mutex, RwLock};
40 40
41use crate::config::Config; 41use crate::config::Config;
42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 42use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
43use nostr_relay_builder::prelude::LocalRelay;
43 44
44// ============================================================================= 45// =============================================================================
45// Type Aliases for Index Structures 46// Type Aliases for Index Structures
@@ -467,6 +468,8 @@ pub struct SyncManager {
467 database: SharedDatabase, 468 database: SharedDatabase,
468 /// Write policy for validating incoming events 469 /// Write policy for validating incoming events
469 write_policy: Nip34WritePolicy, 470 write_policy: Nip34WritePolicy,
471 /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers)
472 local_relay: LocalRelay,
470 /// Configuration reference for sync settings 473 /// Configuration reference for sync settings
471 config: Config, 474 config: Config,
472 /// What we want to sync (source of truth) 475 /// What we want to sync (source of truth)
@@ -499,12 +502,14 @@ impl SyncManager {
499 /// * `service_domain` - The domain this relay serves (for filtering repos) 502 /// * `service_domain` - The domain this relay serves (for filtering repos)
500 /// * `database` - Shared database for event storage 503 /// * `database` - Shared database for event storage
501 /// * `write_policy` - Policy for validating events before storage 504 /// * `write_policy` - Policy for validating events before storage
505 /// * `local_relay` - Local relay for submitting synced events (enables WebSocket broadcast)
502 /// * `config` - Configuration for sync settings 506 /// * `config` - Configuration for sync settings
503 pub fn new( 507 pub fn new(
504 bootstrap_relay_url: Option<String>, 508 bootstrap_relay_url: Option<String>,
505 service_domain: String, 509 service_domain: String,
506 database: SharedDatabase, 510 database: SharedDatabase,
507 write_policy: Nip34WritePolicy, 511 write_policy: Nip34WritePolicy,
512 local_relay: LocalRelay,
508 config: &Config, 513 config: &Config,
509 ) -> Self { 514 ) -> Self {
510 Self { 515 Self {
@@ -512,6 +517,7 @@ impl SyncManager {
512 service_domain, 517 service_domain,
513 database, 518 database,
514 write_policy, 519 write_policy,
520 local_relay,
515 config: config.clone(), 521 config: config.clone(),
516 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 522 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
517 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 523 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
@@ -1269,6 +1275,7 @@ impl SyncManager {
1269 1275
1270 let database = Arc::clone(&self.database); 1276 let database = Arc::clone(&self.database);
1271 let write_policy = self.write_policy.clone(); 1277 let write_policy = self.write_policy.clone();
1278 let local_relay = self.local_relay.clone();
1272 let relay_sync_index = Arc::clone(&self.relay_sync_index); 1279 let relay_sync_index = Arc::clone(&self.relay_sync_index);
1273 1280
1274 // Check if this is a bootstrap relay 1281 // Check if this is a bootstrap relay
@@ -1340,6 +1347,7 @@ impl SyncManager {
1340 &relay_url_clone, 1347 &relay_url_clone,
1341 &database, 1348 &database,
1342 &write_policy, 1349 &write_policy,
1350 &local_relay,
1343 ) 1351 )
1344 .await; 1352 .await;
1345 } 1353 }
@@ -1393,11 +1401,18 @@ impl SyncManager {
1393 } 1401 }
1394 1402
1395 /// Process a single event from a relay (static version for spawned tasks) 1403 /// Process a single event from a relay (static version for spawned tasks)
1404 ///
1405 /// Processes events with dedup, policy check, database save, and broadcast:
1406 /// - Deduplication (skips if event already exists)
1407 /// - Write policy validation
1408 /// - Database save
1409 /// - Broadcast to WebSocket subscribers via notify_event (enables recursive relay discovery)
1396 async fn process_event_static( 1410 async fn process_event_static(
1397 event: &Event, 1411 event: &Event,
1398 relay_url: &str, 1412 relay_url: &str,
1399 database: &SharedDatabase, 1413 database: &SharedDatabase,
1400 write_policy: &Nip34WritePolicy, 1414 write_policy: &Nip34WritePolicy,
1415 local_relay: &LocalRelay,
1401 ) { 1416 ) {
1402 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; 1417 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy};
1403 use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 1418 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -1421,7 +1436,7 @@ impl SyncManager {
1421 1436
1422 match result { 1437 match result {
1423 PolicyResult::Accept => { 1438 PolicyResult::Accept => {
1424 // Save event 1439 // Save event to database
1425 if let Err(e) = database.save_event(event).await { 1440 if let Err(e) = database.save_event(event).await {
1426 tracing::error!( 1441 tracing::error!(
1427 event_id = %event.id, 1442 event_id = %event.id,
@@ -1429,14 +1444,20 @@ impl SyncManager {
1429 error = %e, 1444 error = %e,
1430 "Failed to save synced event" 1445 "Failed to save synced event"
1431 ); 1446 );
1432 } else { 1447 return;
1433 tracing::debug!(
1434 event_id = %event.id,
1435 relay = %relay_url,
1436 kind = %event.kind.as_u16(),
1437 "Saved synced event"
1438 );
1439 } 1448 }
1449
1450 // Broadcast to WebSocket subscribers (enables recursive relay discovery)
1451 // This allows SelfSubscriber to receive synced 30617 announcements
1452 let broadcast_success = local_relay.notify_event(event.clone());
1453
1454 tracing::debug!(
1455 event_id = %event.id,
1456 relay = %relay_url,
1457 kind = %event.kind.as_u16(),
1458 broadcast = broadcast_success,
1459 "Synced event saved and broadcast"
1460 );
1440 } 1461 }
1441 PolicyResult::Reject(reason) => { 1462 PolicyResult::Reject(reason) => {
1442 tracing::debug!( 1463 tracing::debug!(
diff --git a/tests/sync/discovery.rs b/tests/sync/discovery.rs
index 3cdf00d..9e27f9e 100644
--- a/tests/sync/discovery.rs
+++ b/tests/sync/discovery.rs
@@ -293,12 +293,7 @@ async fn test_layer2_discovery_with_chain() {
293/// 3. Discovers and connects to relay_c 293/// 3. Discovers and connects to relay_c
294/// 4. Syncs announcement_y from relay_c 294/// 4. Syncs announcement_y from relay_c
295/// 295///
296/// NOTE: This test is ignored because recursive relay discovery from synced
297/// announcements is not yet implemented. Currently, discovery only triggers
298/// when an announcement is directly submitted to a relay, not when it's
299/// synced from a bootstrap relay.
300#[tokio::test] 296#[tokio::test]
301#[ignore = "Recursive relay discovery from bootstrap sync not yet implemented"]
302async fn test_recursive_relay_discovery_syncs_announcement() { 297async fn test_recursive_relay_discovery_syncs_announcement() {
303 // 1. Start all three relays 298 // 1. Start all three relays
304 299