upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-08 16:47:46 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-08 16:47:46 +0000
commit9f594fadf2a1d5bfda0ab027f2b3cf7a247900ec (patch)
treedecb942794374f04fabe94fde2afcf276ddf0537
parentefe3e43cf792abd8bb256121ebf84ae04836313a (diff)
proposed sync change to use self subscribe to trigger everything
-rw-r--r--docs/explanation/grasp-02-proactive-sync.md91
-rw-r--r--src/sync/manager.rs499
2 files changed, 485 insertions, 105 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md
index 666b048..c07f07c 100644
--- a/docs/explanation/grasp-02-proactive-sync.md
+++ b/docs/explanation/grasp-02-proactive-sync.md
@@ -18,7 +18,11 @@ This document covers **event syncing only**. Git data syncing is out of scope fo
18```mermaid 18```mermaid
19flowchart TB 19flowchart TB
20 subgraph ngit-grasp 20 subgraph ngit-grasp
21 SM[SyncManager] 21 subgraph SyncManager
22 SS[Self-Subscriber]
23 RC[Remote Connections]
24 end
25 WS[WebSocket Server]
22 FS[FilterService] 26 FS[FilterService]
23 RH[RelayHealthTracker] 27 RH[RelayHealthTracker]
24 DB[(Database)] 28 DB[(Database)]
@@ -32,41 +36,88 @@ flowchart TB
32 R3[nostr.land] 36 R3[nostr.land]
33 end 37 end
34 38
35 SM -->|builds filters| FS 39 WS -->|broadcasts events| SS
36 SM -->|tracks health| RH 40 SS -->|discovers relays| RC
37 SM -->|stores events| DB 41 RC -->|builds filters| FS
38 SM -->|validates| AP 42 RC -->|tracks health| RH
43 RC -->|stores events| DB
44 RC -->|validates| AP
39 45
40 SM <-->|WebSocket + NEG| R1 46 RC <-->|WebSocket + NEG| R1
41 SM <-->|WebSocket + NEG| R2 47 RC <-->|WebSocket + NEG| R2
42 SM <-->|WebSocket + NEG| R3 48 RC <-->|WebSocket + NEG| R3
43 49
44 RH -->|exposes state| MET 50 RH -->|exposes state| MET
45``` 51```
46 52
53**Key Insight: Self-Subscribe Architecture**
54
55The SyncManager uses a "self-subscribe" pattern for relay discovery. Rather than polling the database periodically, it connects to its own WebSocket server as a client and subscribes to kind 30617 events. When new announcements are saved (from any source), the self-subscriber receives them instantly and can spawn connections to newly discovered relays.
56
47## Connection Management 57## Connection Management
48 58
49### Relay Discovery 59### Relay Discovery
50 60
51Relays to connect to are discovered from **all stored repository announcements**: 61Relays to connect to are discovered using a **self-subscribe architecture** rather than periodic polling. The SyncManager connects to its own relay as a client and subscribes to kind 30617 (repository announcement) events. When a new announcement is saved to the database (from direct submission or sync), the self-subscriber receives it immediately and discovers new relays to connect to.
62
63```mermaid
64flowchart LR
65 subgraph Relay
66 WS[WebSocket Server]
67 DB[(Database)]
68 end
69
70 subgraph SyncManager
71 SS[Self-Subscribe Client]
72 RC[Remote Connections]
73 end
74
75 WS -->|broadcast| SS
76 SS -->|extract relay URLs| RC
77 RC -->|sync events| WS
78```
79
80**Why Self-Subscribe vs Polling?**
81
82| Approach | Latency | Complexity | Resource Use |
83|----------|---------|------------|--------------|
84| Self-Subscribe | Instant | Low | Minimal (1 WS connection) |
85| Periodic Polling | 30s+ delay | Higher | DB queries every N seconds |
86
87The self-subscribe approach provides:
88- **Immediate discovery**: New relays discovered instantly when announcement saved
89- **No polling overhead**: No periodic database queries
90- **Simple architecture**: Reuses existing WebSocket infrastructure
91
92**Implementation Pattern:**
52 93
53```rust 94```rust
54// Pseudocode for relay discovery 95// In SyncManager::run()
55fn discover_relays(database: &Database) -> HashSet<RelayUrl> { 96let self_client = Client::default();
56 let announcements = database.query(Filter::new().kind(30617)); 97self_client.add_relay(&own_relay_url).await?;
57 let mut relays = HashSet::new(); 98self_client.connect().await;
58 99
59 for announcement in announcements { 100let filter = Filter::new().kind(Kind::Custom(30617));
60 for relay_url in announcement.relays_tags() { 101self_client.subscribe(filter, None).await?;
61 if relay_url != our_domain { // Exclude ourselves 102
62 relays.insert(relay_url); 103// Handle notifications - when announcement arrives, extract relay URLs
104client.handle_notifications(|notification| async {
105 if let RelayPoolNotification::Event { event, .. } = notification {
106 let new_urls = filter_service.extract_relay_urls_from_event(&event);
107 for url in new_urls {
108 if !active_relays.contains(&url) && !is_own_relay(&url) {
109 spawn_connection(url, tx.clone(), filter_service.clone());
63 } 110 }
64 } 111 }
65 } 112 }
66 relays 113 Ok(false) // Continue processing
67} 114});
68``` 115```
69 116
117**Startup Discovery:** At startup, existing announcements in the database are queried once to discover initial relays. After startup, all discovery is event-driven via self-subscribe.
118
119**Reconnection:** The self-subscriber has built-in exponential backoff reconnection (1s → 60s max) to handle temporary disconnections from our own relay.
120
70### Connection Lifecycle 121### Connection Lifecycle
71 122
72```mermaid 123```mermaid
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 96bf0f4..6ae82ef 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -1,12 +1,42 @@
1//! SyncManager - Coordinates proactive sync operations 1//! SyncManager - Coordinates proactive sync operations
2//! 2//!
3//! The SyncManager discovers relays from stored announcements, spawns connections 3//! The SyncManager connects to remote relays, receives events, validates them
4//! to each relay, receives events, validates them through the write policy, 4//! through the write policy, and stores accepted events.
5//! and stores accepted events. 5//!
6//! ## Simplified Relay Discovery Architecture
7//!
8//! All relay discovery is centralized in the self-subscriber:
9//! - Bootstrap relay: connected immediately (no jitter, single relay)
10//! - All other relays: discovered via self-subscriber announcements (with jitter)
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────┐
14//! │ ngit-grasp │
15//! │ │
16//! │ ┌─────────────┐ broadcasts ┌───────────────┐ │
17//! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │
18//! │ │ Database │ │ Client │ │
19//! │ └─────────────┘ └───────┬───────┘ │
20//! │ ▲ │ │
21//! │ │ stores │ extracts │
22//! │ │ │ relay │
23//! │ ┌─────┴─────┐ │ URLs │
24//! │ │ Remote │◀────────────────────────────────┘ │
25//! │ │Connections│ spawns new │
26//! │ └───────────┘ connections (with jitter) │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Key Design Decisions
31//!
32//! - **Single relay discovery path**: Only the self-subscriber discovers new relays
33//! - **Jitter at point of discovery**: Applied when spawning connections from announcements
34//! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect
35//! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd
6//! 36//!
7//! ## Phase 2 Features 37//! ## Phase 2 Features
8//! 38//!
9//! - Relay discovery from stored kind 30617 announcements 39//! - Relay discovery from kind 30617 announcements (via self-subscriber)
10//! - Multiple simultaneous relay connections 40//! - Multiple simultaneous relay connections
11//! - Three-layer filter strategy via FilterService 41//! - Three-layer filter strategy via FilterService
12//! 42//!
@@ -30,6 +60,7 @@ use std::sync::Arc;
30use std::time::Duration; 60use std::time::Duration;
31 61
32use nostr_relay_builder::prelude::*; 62use nostr_relay_builder::prelude::*;
63use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp};
33use rand::Rng; 64use rand::Rng;
34use tokio::sync::mpsc; 65use tokio::sync::mpsc;
35 66
@@ -39,7 +70,7 @@ use super::health::RelayHealthTracker;
39use super::metrics::SyncMetrics; 70use super::metrics::SyncMetrics;
40use crate::config::Config; 71use crate::config::Config;
41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 72use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
42 73use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT;
43 74
44/// Default fallback address for sync source when bind_address cannot be parsed 75/// Default fallback address for sync source when bind_address cannot be parsed
45/// 76///
@@ -58,6 +89,11 @@ fn get_sync_source_addr(bind_address: &str) -> SocketAddr {
58 .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) 89 .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR)
59} 90}
60 91
92/// Derive the WebSocket URL for our own relay from bind_address
93fn derive_own_relay_url(bind_address: &str) -> String {
94 format!("ws://{}", bind_address)
95}
96
61/// Coordinates proactive sync from configured and discovered relays 97/// Coordinates proactive sync from configured and discovered relays
62pub struct SyncManager { 98pub struct SyncManager {
63 /// Bootstrap relay URL for initial sync (from config) 99 /// Bootstrap relay URL for initial sync (from config)
@@ -65,6 +101,8 @@ pub struct SyncManager {
65 bootstrap_relay_url: Option<String>, 101 bootstrap_relay_url: Option<String>,
66 /// Our relay's domain (for filtering) 102 /// Our relay's domain (for filtering)
67 relay_domain: String, 103 relay_domain: String,
104 /// Our relay's WebSocket URL (for self-subscribe)
105 own_relay_url: String,
68 /// Database for storing accepted events 106 /// Database for storing accepted events
69 database: SharedDatabase, 107 database: SharedDatabase,
70 /// Write policy for validating events 108 /// Write policy for validating events
@@ -95,9 +133,11 @@ impl SyncManager {
95 write_policy: Nip34WritePolicy, 133 write_policy: Nip34WritePolicy,
96 config: &Config, 134 config: &Config,
97 ) -> Self { 135 ) -> Self {
136 let own_relay_url = derive_own_relay_url(&config.bind_address);
98 Self { 137 Self {
99 bootstrap_relay_url, 138 bootstrap_relay_url,
100 relay_domain, 139 relay_domain,
140 own_relay_url,
101 database, 141 database,
102 write_policy, 142 write_policy,
103 health_tracker: Arc::new(RelayHealthTracker::new(config)), 143 health_tracker: Arc::new(RelayHealthTracker::new(config)),
@@ -124,9 +164,11 @@ impl SyncManager {
124 config: &Config, 164 config: &Config,
125 metrics: SyncMetrics, 165 metrics: SyncMetrics,
126 ) -> Self { 166 ) -> Self {
167 let own_relay_url = derive_own_relay_url(&config.bind_address);
127 Self { 168 Self {
128 bootstrap_relay_url, 169 bootstrap_relay_url,
129 relay_domain, 170 relay_domain,
171 own_relay_url,
130 database, 172 database,
131 write_policy, 173 write_policy,
132 health_tracker: Arc::new(RelayHealthTracker::new(config)), 174 health_tracker: Arc::new(RelayHealthTracker::new(config)),
@@ -144,9 +186,11 @@ impl SyncManager {
144 ) -> Self { 186 ) -> Self {
145 // Extract domain from URL for filtering 187 // Extract domain from URL for filtering
146 let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default(); 188 let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default();
189 let own_relay_url = format!("ws://{}", relay_domain);
147 Self { 190 Self {
148 bootstrap_relay_url: Some(bootstrap_url), 191 bootstrap_relay_url: Some(bootstrap_url),
149 relay_domain, 192 relay_domain,
193 own_relay_url,
150 database, 194 database,
151 write_policy, 195 write_policy,
152 health_tracker: Arc::new(RelayHealthTracker::with_defaults()), 196 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
@@ -173,12 +217,27 @@ impl SyncManager {
173 217
174 /// Run the sync manager 218 /// Run the sync manager
175 /// 219 ///
176 /// This discovers relays from stored announcements, spawns connection tasks, 220 /// This spawns the bootstrap relay connection (if configured), sets up a
177 /// and processes incoming events. Runs indefinitely until cancelled. 221 /// self-subscriber for event-driven relay discovery, and processes incoming
222 /// events. The self-subscriber handles ALL relay discovery from announcements.
223 /// Runs indefinitely until cancelled.
224 ///
225 /// ## Simplified Relay Discovery Architecture
226 ///
227 /// All relay discovery is centralized in the self-subscriber:
228 /// - Bootstrap relay: connected immediately (no jitter, single relay)
229 /// - All other relays: discovered via self-subscriber announcements (with jitter)
230 /// - Jitter applied at point of discovery (not startup)
231 ///
232 /// This eliminates three redundant discovery paths:
233 /// 1. DB query at startup (removed)
234 /// 2. Remote event extraction (removed)
235 /// 3. Self-subscriber (sole discovery path)
178 pub async fn run(self) { 236 pub async fn run(self) {
179 tracing::info!( 237 tracing::info!(
180 "Starting SyncManager (domain: {}, bootstrap relay: {:?})", 238 "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})",
181 self.relay_domain, 239 self.relay_domain,
240 self.own_relay_url,
182 self.bootstrap_relay_url 241 self.bootstrap_relay_url
183 ); 242 );
184 243
@@ -191,74 +250,56 @@ impl SyncManager {
191 // Create channel for receiving events from all connections 250 // Create channel for receiving events from all connections
192 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); 251 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
193 252
194 // Track active relay URLs to avoid duplicates 253 // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing)
195 let mut active_relays: HashSet<String> = HashSet::new(); 254 let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::<String>::new()));
196
197 // Collect all relays to connect to
198 let mut relays_to_connect: Vec<String> = Vec::new();
199 255
200 // Start with bootstrap relay if configured 256 // Bootstrap relay - connect immediately (no jitter, just one relay)
201 if let Some(ref url) = self.bootstrap_relay_url { 257 if let Some(ref url) = self.bootstrap_relay_url {
202 if !self.is_own_relay(url) { 258 if !self.is_own_relay(url) {
203 relays_to_connect.push(url.clone()); 259 tracing::info!("Connecting to bootstrap relay: {}", url);
204 active_relays.insert(url.clone()); 260 active_relays.lock().await.insert(url.clone());
261 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false);
205 } else { 262 } else {
206 tracing::info!("Skipping bootstrap relay (is our own relay): {}", url); 263 tracing::info!("Skipping bootstrap relay (is our own relay): {}", url);
207 } 264 }
208 } 265 }
209 266
210 // Discover additional relays from stored announcements
211 let discovered_urls = filter_service.discover_relay_urls().await;
212 for url in discovered_urls {
213 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
214 relays_to_connect.push(url.clone());
215 active_relays.insert(url.clone());
216 }
217 }
218
219 // Record initial tracked relay count 267 // Record initial tracked relay count
220 if let Some(ref metrics) = self.metrics { 268 if let Some(ref metrics) = self.metrics {
221 metrics.set_tracked_count(active_relays.len() as i64); 269 let count = active_relays.lock().await.len();
270 metrics.set_tracked_count(count as i64);
222 } 271 }
223 272
224 // Spawn connections with startup jitter to prevent thundering herd 273 {
225 for url in relays_to_connect { 274 let active = active_relays.lock().await;
226 tracing::info!("Scheduling connection to sync relay: {}", url); 275 if active.is_empty() {
227 self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); 276 tracing::info!(
277 "No bootstrap relay configured, waiting for announcements via self-subscriber..."
278 );
279 } else {
280 tracing::info!(
281 "SyncManager connected to {} relay(s): {:?}",
282 active.len(),
283 *active
284 );
285 }
228 } 286 }
229 287
230 if active_relays.is_empty() { 288 // Spawn self-subscriber task for ALL relay discovery
231 tracing::warn!("No sync relays configured or discovered, SyncManager idle"); 289 let self_subscriber_handle = self.spawn_self_subscriber(
232 } else { 290 tx.clone(),
233 tracing::info!( 291 filter_service.clone(),
234 "SyncManager connected to {} relays: {:?}", 292 active_relays.clone(),
235 active_relays.len(), 293 );
236 active_relays
237 );
238 }
239 294
240 // Process incoming events from all connections 295 // Process incoming events - just validate and store, NO relay discovery
296 // (relay discovery is handled solely by the self-subscriber)
241 while let Some(synced_event) = rx.recv().await { 297 while let Some(synced_event) = rx.recv().await {
242 // Check if this event reveals new relays to sync from
243 let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event);
244 for url in new_urls {
245 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
246 tracing::info!("Discovered new relay from event, connecting: {}", url);
247 active_relays.insert(url.clone());
248
249 // Update tracked relay count
250 if let Some(ref metrics) = self.metrics {
251 metrics.inc_tracked_count();
252 }
253
254 // New relays discovered during runtime don't need jitter
255 self.spawn_connection(url, tx.clone(), filter_service.clone());
256 }
257 }
258
259 self.process_event(synced_event).await; 298 self.process_event(synced_event).await;
260 } 299 }
261 300
301 // Clean up self-subscriber
302 self_subscriber_handle.abort();
262 tracing::warn!("SyncManager event channel closed, shutting down"); 303 tracing::warn!("SyncManager event channel closed, shutting down");
263 } 304 }
264 305
@@ -267,52 +308,325 @@ impl SyncManager {
267 url.contains(&self.relay_domain) 308 url.contains(&self.relay_domain)
268 } 309 }
269 310
270 /// Spawn a connection task for a relay with startup jitter 311 /// Spawn a self-subscriber task that connects to our own relay
312 /// and watches for kind 30617 announcements to discover new relays.
271 /// 313 ///
272 /// Adds a random delay (0 to startup_jitter_ms) before connecting to prevent 314 /// This is the SOLE relay discovery path - all relay discovery happens here.
273 /// thundering herd on startup when multiple relays are configured. 315 /// When a new announcement is saved to our database (from direct submission
274 /// Set startup_jitter_ms to 0 to disable jitter (useful for testing). 316 /// or synced from another relay), the self-subscriber receives it immediately
275 fn spawn_connection_with_jitter( 317 /// and spawns connections to newly discovered relays (with jitter).
318 fn spawn_self_subscriber(
276 &self, 319 &self,
277 url: String,
278 tx: mpsc::Sender<SyncedEvent>, 320 tx: mpsc::Sender<SyncedEvent>,
279 filter_service: Arc<FilterService>, 321 filter_service: Arc<FilterService>,
280 ) { 322 active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>,
281 let domain = self.relay_domain.clone(); 323 ) -> tokio::task::JoinHandle<()> {
282 let health_tracker = self.health_tracker.clone(); 324 let own_relay_url = self.own_relay_url.clone();
325 let relay_domain = self.relay_domain.clone();
283 let metrics = self.metrics.clone(); 326 let metrics = self.metrics.clone();
284 let max_jitter = self.startup_jitter_ms; 327 let health_tracker = self.health_tracker.clone();
328 let startup_jitter_ms = self.startup_jitter_ms;
285 329
286 tokio::spawn(async move { 330 tokio::spawn(async move {
287 // Apply startup jitter (if configured) 331 Self::run_self_subscriber_loop(
288 if max_jitter > 0 { 332 own_relay_url,
289 let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); 333 relay_domain,
290 tracing::debug!( 334 tx,
291 "Applying {}ms startup jitter before connecting to {}", 335 filter_service,
292 jitter_ms, 336 active_relays,
293 url 337 metrics,
294 ); 338 health_tracker,
295 tokio::time::sleep(Duration::from_millis(jitter_ms)).await; 339 startup_jitter_ms,
340 )
341 .await;
342 })
343 }
344
345 /// Main loop for the self-subscriber
346 ///
347 /// Connects to our own relay, subscribes to kind 30617 announcements,
348 /// and processes events to discover new relays. Handles reconnection
349 /// on disconnect.
350 ///
351 /// ## Reconnection Behavior
352 ///
353 /// - First connection: no `since` filter (get all historical announcements)
354 /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing
355 #[allow(clippy::too_many_arguments)]
356 async fn run_self_subscriber_loop(
357 own_relay_url: String,
358 relay_domain: String,
359 tx: mpsc::Sender<SyncedEvent>,
360 filter_service: Arc<FilterService>,
361 active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>,
362 metrics: Option<SyncMetrics>,
363 health_tracker: Arc<RelayHealthTracker>,
364 startup_jitter_ms: u64,
365 ) {
366 let mut reconnect_delay = Duration::from_secs(1);
367 let max_reconnect_delay = Duration::from_secs(60);
368 let mut is_first_connection = true;
369
370 loop {
371 tracing::info!(
372 "Self-subscriber connecting to own relay: {}",
373 own_relay_url
374 );
375
376 match Self::connect_self_subscriber(&own_relay_url).await {
377 Ok(client) => {
378 // Reset reconnect delay on successful connection
379 reconnect_delay = Duration::from_secs(1);
380
381 tracing::info!(
382 "Self-subscriber connected to own relay, subscribing to kind {} announcements{}",
383 KIND_REPOSITORY_ANNOUNCEMENT,
384 if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" }
385 );
386
387 // Subscribe to repository announcements
388 // First connection: get all historical; reconnections: only last 15 minutes
389 let filter = if is_first_connection {
390 Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT))
391 } else {
392 let since = Timestamp::now() - 15 * 60; // 15 minutes ago
393 Filter::new()
394 .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT))
395 .since(since)
396 };
397
398 is_first_connection = false;
399
400 if let Err(e) = client.subscribe(filter, None).await {
401 tracing::error!(
402 "Self-subscriber failed to subscribe on {}: {}",
403 own_relay_url,
404 e
405 );
406 // Will reconnect after delay
407 } else {
408 // Handle notifications until disconnect
409 Self::handle_self_subscriber_notifications(
410 &client,
411 &own_relay_url,
412 &relay_domain,
413 &tx,
414 &filter_service,
415 &active_relays,
416 &metrics,
417 &health_tracker,
418 startup_jitter_ms,
419 )
420 .await;
421 }
422
423 // Disconnect and cleanup
424 client.disconnect().await;
425 }
426 Err(e) => {
427 tracing::warn!(
428 "Self-subscriber failed to connect to {}: {}",
429 own_relay_url,
430 e
431 );
432 }
296 } 433 }
297 434
298 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; 435 // Wait before reconnecting with exponential backoff
299 }); 436 tracing::debug!(
437 "Self-subscriber will reconnect to {} in {:?}",
438 own_relay_url,
439 reconnect_delay
440 );
441 tokio::time::sleep(reconnect_delay).await;
442 reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay);
443 }
444 }
445
446 /// Connect to our own relay for self-subscribing
447 async fn connect_self_subscriber(
448 url: &str,
449 ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> {
450 let client = Client::default();
451 client.add_relay(url).await?;
452 client.connect().await;
453
454 // Wait for connection to establish (with timeout)
455 let timeout = Duration::from_secs(10);
456 let start = std::time::Instant::now();
457
458 while start.elapsed() < timeout {
459 let relays = client.relays().await;
460 if relays.values().any(|r| r.is_connected()) {
461 return Ok(client);
462 }
463 tokio::time::sleep(Duration::from_millis(100)).await;
464 }
465
466 Err("Timeout waiting for self-subscriber connection".into())
300 } 467 }
301 468
302 /// Spawn a connection task for a relay without jitter 469 /// Handle notifications from the self-subscriber client
303 /// 470 ///
304 /// Used for relays discovered during runtime (not at startup). 471 /// Processes announcement events to discover new relay URLs.
472 /// Applies jitter before spawning connections to prevent thundering herd.
473 #[allow(clippy::too_many_arguments)]
474 async fn handle_self_subscriber_notifications(
475 client: &Client,
476 own_relay_url: &str,
477 relay_domain: &str,
478 tx: &mpsc::Sender<SyncedEvent>,
479 filter_service: &Arc<FilterService>,
480 active_relays: &Arc<tokio::sync::Mutex<HashSet<String>>>,
481 metrics: &Option<SyncMetrics>,
482 health_tracker: &Arc<RelayHealthTracker>,
483 startup_jitter_ms: u64,
484 ) {
485 let own_relay_url = own_relay_url.to_string();
486 let relay_domain = relay_domain.to_string();
487 let filter_service = filter_service.clone();
488 let active_relays = active_relays.clone();
489 let metrics = metrics.clone();
490 let health_tracker = health_tracker.clone();
491 let tx = tx.clone();
492
493 client
494 .handle_notifications(|notification| {
495 let own_relay_url = own_relay_url.clone();
496 let relay_domain = relay_domain.clone();
497 let filter_service = filter_service.clone();
498 let active_relays = active_relays.clone();
499 let metrics = metrics.clone();
500 let health_tracker = health_tracker.clone();
501 let tx = tx.clone();
502
503 async move {
504 match notification {
505 RelayPoolNotification::Event { event, .. } => {
506 // Only process repository announcement events
507 if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT {
508 return Ok(false);
509 }
510
511 tracing::debug!(
512 "Self-subscriber received announcement {} from {}",
513 event.id,
514 own_relay_url
515 );
516
517 // Extract relay URLs from the announcement
518 let new_urls = filter_service.extract_relay_urls_from_event(&event);
519
520 for url in new_urls {
521 // Check if we should connect to this relay
522 let should_connect = {
523 let mut active = active_relays.lock().await;
524 let is_new = !active.contains(&url);
525 let is_not_self = !url.contains(&relay_domain);
526
527 if is_new && is_not_self {
528 active.insert(url.clone());
529 true
530 } else {
531 false
532 }
533 };
534
535 if should_connect {
536 tracing::info!(
537 "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}",
538 event.id,
539 url
540 );
541
542 // Update tracked relay count
543 if let Some(ref m) = metrics {
544 m.inc_tracked_count();
545 }
546
547 // Spawn connection to the new relay WITH jitter at point of discovery
548 let url_clone = url.clone();
549 let tx_clone = tx.clone();
550 let filter_service_clone = filter_service.clone();
551 let domain_clone = relay_domain.clone();
552 let health_tracker_clone = health_tracker.clone();
553 let metrics_clone = metrics.clone();
554
555 tokio::spawn(async move {
556 // Apply jitter at point of discovery
557 if startup_jitter_ms > 0 {
558 let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms);
559 tracing::debug!(
560 "Applying {}ms jitter before connecting to discovered relay {}",
561 jitter_ms,
562 url_clone
563 );
564 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
565 }
566
567 connect_with_retry(
568 &url_clone,
569 tx_clone,
570 filter_service_clone,
571 &domain_clone,
572 health_tracker_clone,
573 metrics_clone,
574 )
575 .await;
576 });
577 }
578 }
579
580 Ok(false) // Continue processing
581 }
582 RelayPoolNotification::Shutdown => {
583 tracing::warn!(
584 "Self-subscriber connection shutdown for {}",
585 own_relay_url
586 );
587 Ok(true) // Stop on shutdown
588 }
589 RelayPoolNotification::Message { .. } => {
590 Ok(false) // Continue processing
591 }
592 }
593 }
594 })
595 .await
596 .ok();
597 }
598
599 /// Spawn a connection task for a relay
600 ///
601 /// # Arguments
602 /// * `url` - Relay URL to connect to
603 /// * `tx` - Channel sender for synced events
604 /// * `filter_service` - Filter service for subscriptions
605 /// * `apply_jitter` - Whether to apply startup jitter before connecting
305 fn spawn_connection( 606 fn spawn_connection(
306 &self, 607 &self,
307 url: String, 608 url: String,
308 tx: mpsc::Sender<SyncedEvent>, 609 tx: mpsc::Sender<SyncedEvent>,
309 filter_service: Arc<FilterService>, 610 filter_service: Arc<FilterService>,
611 apply_jitter: bool,
310 ) { 612 ) {
311 let domain = self.relay_domain.clone(); 613 let domain = self.relay_domain.clone();
312 let health_tracker = self.health_tracker.clone(); 614 let health_tracker = self.health_tracker.clone();
313 let metrics = self.metrics.clone(); 615 let metrics = self.metrics.clone();
616 let max_jitter = self.startup_jitter_ms;
314 617
315 tokio::spawn(async move { 618 tokio::spawn(async move {
619 // Apply startup jitter if requested
620 if apply_jitter && max_jitter > 0 {
621 let jitter_ms = rand::thread_rng().gen_range(0..max_jitter);
622 tracing::debug!(
623 "Applying {}ms jitter before connecting to {}",
624 jitter_ms,
625 url
626 );
627 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
628 }
629
316 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; 630 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
317 }); 631 });
318 } 632 }
@@ -351,7 +665,10 @@ impl SyncManager {
351 } 665 }
352 666
353 // Validate through write policy using sync_source_addr derived from config 667 // Validate through write policy using sync_source_addr derived from config
354 let result = self.write_policy.admit_event(event, &self.sync_source_addr).await; 668 let result = self
669 .write_policy
670 .admit_event(event, &self.sync_source_addr)
671 .await;
355 672
356 match result { 673 match result {
357 PolicyResult::Accept => { 674 PolicyResult::Accept => {
@@ -386,12 +703,16 @@ impl SyncManager {
386/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" 703/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
387/// - "wss://relay.example.com" -> "relay.example.com" 704/// - "wss://relay.example.com" -> "relay.example.com"
388fn extract_domain_from_url(url: &str) -> Option<String> { 705fn extract_domain_from_url(url: &str) -> Option<String> {
389 let url = url.trim_start_matches("ws://").trim_start_matches("wss://"); 706 let url = url
390 let url = url.trim_start_matches("http://").trim_start_matches("https://"); 707 .trim_start_matches("ws://")
391 708 .trim_start_matches("wss://");
709 let url = url
710 .trim_start_matches("http://")
711 .trim_start_matches("https://");
712
392 // Remove path 713 // Remove path
393 let domain = url.split('/').next()?; 714 let domain = url.split('/').next()?;
394 715
395 Some(domain.to_string()) 716 Some(domain.to_string())
396} 717}
397 718
@@ -430,4 +751,12 @@ mod tests {
430 Some("example.com:3000".to_string()) 751 Some("example.com:3000".to_string())
431 ); 752 );
432 } 753 }
754
755 #[test]
756 fn test_derive_own_relay_url() {
757 assert_eq!(
758 derive_own_relay_url("127.0.0.1:8080"),
759 "ws://127.0.0.1:8080".to_string()
760 );
761 }
433} \ No newline at end of file 762} \ No newline at end of file