diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-08 16:47:46 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-08 16:47:46 +0000 |
| commit | 9f594fadf2a1d5bfda0ab027f2b3cf7a247900ec (patch) | |
| tree | decb942794374f04fabe94fde2afcf276ddf0537 /src/sync | |
| parent | efe3e43cf792abd8bb256121ebf84ae04836313a (diff) | |
proposed sync change to use self subscribe to trigger everything
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/manager.rs | 499 |
1 files changed, 414 insertions, 85 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 96bf0f4..6ae82ef 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -1,12 +1,42 @@ | |||
| 1 | //! SyncManager - Coordinates proactive sync operations | 1 | //! SyncManager - Coordinates proactive sync operations |
| 2 | //! | 2 | //! |
| 3 | //! The SyncManager discovers relays from stored announcements, spawns connections | 3 | //! The SyncManager connects to remote relays, receives events, validates them |
| 4 | //! to each relay, receives events, validates them through the write policy, | 4 | //! through the write policy, and stores accepted events. |
| 5 | //! and stores accepted events. | 5 | //! |
| 6 | //! ## Simplified Relay Discovery Architecture | ||
| 7 | //! | ||
| 8 | //! All relay discovery is centralized in the self-subscriber: | ||
| 9 | //! - Bootstrap relay: connected immediately (no jitter, single relay) | ||
| 10 | //! - All other relays: discovered via self-subscriber announcements (with jitter) | ||
| 11 | //! | ||
| 12 | //! ```text | ||
| 13 | //! ┌─────────────────────────────────────────────────────────────┐ | ||
| 14 | //! │ ngit-grasp │ | ||
| 15 | //! │ │ | ||
| 16 | //! │ ┌─────────────┐ broadcasts ┌───────────────┐ │ | ||
| 17 | //! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │ | ||
| 18 | //! │ │ Database │ │ Client │ │ | ||
| 19 | //! │ └─────────────┘ └───────┬───────┘ │ | ||
| 20 | //! │ ▲ │ │ | ||
| 21 | //! │ │ stores │ extracts │ | ||
| 22 | //! │ │ │ relay │ | ||
| 23 | //! │ ┌─────┴─────┐ │ URLs │ | ||
| 24 | //! │ │ Remote │◀────────────────────────────────┘ │ | ||
| 25 | //! │ │Connections│ spawns new │ | ||
| 26 | //! │ └───────────┘ connections (with jitter) │ | ||
| 27 | //! └─────────────────────────────────────────────────────────────┘ | ||
| 28 | //! ``` | ||
| 29 | //! | ||
| 30 | //! ## Key Design Decisions | ||
| 31 | //! | ||
| 32 | //! - **Single relay discovery path**: Only the self-subscriber discovers new relays | ||
| 33 | //! - **Jitter at point of discovery**: Applied when spawning connections from announcements | ||
| 34 | //! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect | ||
| 35 | //! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd | ||
| 6 | //! | 36 | //! |
| 7 | //! ## Phase 2 Features | 37 | //! ## Phase 2 Features |
| 8 | //! | 38 | //! |
| 9 | //! - Relay discovery from stored kind 30617 announcements | 39 | //! - Relay discovery from kind 30617 announcements (via self-subscriber) |
| 10 | //! - Multiple simultaneous relay connections | 40 | //! - Multiple simultaneous relay connections |
| 11 | //! - Three-layer filter strategy via FilterService | 41 | //! - Three-layer filter strategy via FilterService |
| 12 | //! | 42 | //! |
| @@ -30,6 +60,7 @@ use std::sync::Arc; | |||
| 30 | use std::time::Duration; | 60 | use std::time::Duration; |
| 31 | 61 | ||
| 32 | use nostr_relay_builder::prelude::*; | 62 | use nostr_relay_builder::prelude::*; |
| 63 | use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp}; | ||
| 33 | use rand::Rng; | 64 | use rand::Rng; |
| 34 | use tokio::sync::mpsc; | 65 | use tokio::sync::mpsc; |
| 35 | 66 | ||
| @@ -39,7 +70,7 @@ use super::health::RelayHealthTracker; | |||
| 39 | use super::metrics::SyncMetrics; | 70 | use super::metrics::SyncMetrics; |
| 40 | use crate::config::Config; | 71 | use crate::config::Config; |
| 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 72 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 42 | 73 | use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT; | |
| 43 | 74 | ||
| 44 | /// Default fallback address for sync source when bind_address cannot be parsed | 75 | /// Default fallback address for sync source when bind_address cannot be parsed |
| 45 | /// | 76 | /// |
| @@ -58,6 +89,11 @@ fn get_sync_source_addr(bind_address: &str) -> SocketAddr { | |||
| 58 | .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) | 89 | .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) |
| 59 | } | 90 | } |
| 60 | 91 | ||
| 92 | /// Derive the WebSocket URL for our own relay from bind_address | ||
| 93 | fn derive_own_relay_url(bind_address: &str) -> String { | ||
| 94 | format!("ws://{}", bind_address) | ||
| 95 | } | ||
| 96 | |||
| 61 | /// Coordinates proactive sync from configured and discovered relays | 97 | /// Coordinates proactive sync from configured and discovered relays |
| 62 | pub struct SyncManager { | 98 | pub struct SyncManager { |
| 63 | /// Bootstrap relay URL for initial sync (from config) | 99 | /// Bootstrap relay URL for initial sync (from config) |
| @@ -65,6 +101,8 @@ pub struct SyncManager { | |||
| 65 | bootstrap_relay_url: Option<String>, | 101 | bootstrap_relay_url: Option<String>, |
| 66 | /// Our relay's domain (for filtering) | 102 | /// Our relay's domain (for filtering) |
| 67 | relay_domain: String, | 103 | relay_domain: String, |
| 104 | /// Our relay's WebSocket URL (for self-subscribe) | ||
| 105 | own_relay_url: String, | ||
| 68 | /// Database for storing accepted events | 106 | /// Database for storing accepted events |
| 69 | database: SharedDatabase, | 107 | database: SharedDatabase, |
| 70 | /// Write policy for validating events | 108 | /// Write policy for validating events |
| @@ -95,9 +133,11 @@ impl SyncManager { | |||
| 95 | write_policy: Nip34WritePolicy, | 133 | write_policy: Nip34WritePolicy, |
| 96 | config: &Config, | 134 | config: &Config, |
| 97 | ) -> Self { | 135 | ) -> Self { |
| 136 | let own_relay_url = derive_own_relay_url(&config.bind_address); | ||
| 98 | Self { | 137 | Self { |
| 99 | bootstrap_relay_url, | 138 | bootstrap_relay_url, |
| 100 | relay_domain, | 139 | relay_domain, |
| 140 | own_relay_url, | ||
| 101 | database, | 141 | database, |
| 102 | write_policy, | 142 | write_policy, |
| 103 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 143 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| @@ -124,9 +164,11 @@ impl SyncManager { | |||
| 124 | config: &Config, | 164 | config: &Config, |
| 125 | metrics: SyncMetrics, | 165 | metrics: SyncMetrics, |
| 126 | ) -> Self { | 166 | ) -> Self { |
| 167 | let own_relay_url = derive_own_relay_url(&config.bind_address); | ||
| 127 | Self { | 168 | Self { |
| 128 | bootstrap_relay_url, | 169 | bootstrap_relay_url, |
| 129 | relay_domain, | 170 | relay_domain, |
| 171 | own_relay_url, | ||
| 130 | database, | 172 | database, |
| 131 | write_policy, | 173 | write_policy, |
| 132 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 174 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| @@ -144,9 +186,11 @@ impl SyncManager { | |||
| 144 | ) -> Self { | 186 | ) -> Self { |
| 145 | // Extract domain from URL for filtering | 187 | // Extract domain from URL for filtering |
| 146 | let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default(); | 188 | let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default(); |
| 189 | let own_relay_url = format!("ws://{}", relay_domain); | ||
| 147 | Self { | 190 | Self { |
| 148 | bootstrap_relay_url: Some(bootstrap_url), | 191 | bootstrap_relay_url: Some(bootstrap_url), |
| 149 | relay_domain, | 192 | relay_domain, |
| 193 | own_relay_url, | ||
| 150 | database, | 194 | database, |
| 151 | write_policy, | 195 | write_policy, |
| 152 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), | 196 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), |
| @@ -173,12 +217,27 @@ impl SyncManager { | |||
| 173 | 217 | ||
| 174 | /// Run the sync manager | 218 | /// Run the sync manager |
| 175 | /// | 219 | /// |
| 176 | /// This discovers relays from stored announcements, spawns connection tasks, | 220 | /// This spawns the bootstrap relay connection (if configured), sets up a |
| 177 | /// and processes incoming events. Runs indefinitely until cancelled. | 221 | /// self-subscriber for event-driven relay discovery, and processes incoming |
| 222 | /// events. The self-subscriber handles ALL relay discovery from announcements. | ||
| 223 | /// Runs indefinitely until cancelled. | ||
| 224 | /// | ||
| 225 | /// ## Simplified Relay Discovery Architecture | ||
| 226 | /// | ||
| 227 | /// All relay discovery is centralized in the self-subscriber: | ||
| 228 | /// - Bootstrap relay: connected immediately (no jitter, single relay) | ||
| 229 | /// - All other relays: discovered via self-subscriber announcements (with jitter) | ||
| 230 | /// - Jitter applied at point of discovery (not startup) | ||
| 231 | /// | ||
| 232 | /// This eliminates three redundant discovery paths: | ||
| 233 | /// 1. DB query at startup (removed) | ||
| 234 | /// 2. Remote event extraction (removed) | ||
| 235 | /// 3. Self-subscriber (sole discovery path) | ||
| 178 | pub async fn run(self) { | 236 | pub async fn run(self) { |
| 179 | tracing::info!( | 237 | tracing::info!( |
| 180 | "Starting SyncManager (domain: {}, bootstrap relay: {:?})", | 238 | "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})", |
| 181 | self.relay_domain, | 239 | self.relay_domain, |
| 240 | self.own_relay_url, | ||
| 182 | self.bootstrap_relay_url | 241 | self.bootstrap_relay_url |
| 183 | ); | 242 | ); |
| 184 | 243 | ||
| @@ -191,74 +250,56 @@ impl SyncManager { | |||
| 191 | // Create channel for receiving events from all connections | 250 | // Create channel for receiving events from all connections |
| 192 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); | 251 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); |
| 193 | 252 | ||
| 194 | // Track active relay URLs to avoid duplicates | 253 | // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing) |
| 195 | let mut active_relays: HashSet<String> = HashSet::new(); | 254 | let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::<String>::new())); |
| 196 | |||
| 197 | // Collect all relays to connect to | ||
| 198 | let mut relays_to_connect: Vec<String> = Vec::new(); | ||
| 199 | 255 | ||
| 200 | // Start with bootstrap relay if configured | 256 | // Bootstrap relay - connect immediately (no jitter, just one relay) |
| 201 | if let Some(ref url) = self.bootstrap_relay_url { | 257 | if let Some(ref url) = self.bootstrap_relay_url { |
| 202 | if !self.is_own_relay(url) { | 258 | if !self.is_own_relay(url) { |
| 203 | relays_to_connect.push(url.clone()); | 259 | tracing::info!("Connecting to bootstrap relay: {}", url); |
| 204 | active_relays.insert(url.clone()); | 260 | active_relays.lock().await.insert(url.clone()); |
| 261 | self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false); | ||
| 205 | } else { | 262 | } else { |
| 206 | tracing::info!("Skipping bootstrap relay (is our own relay): {}", url); | 263 | tracing::info!("Skipping bootstrap relay (is our own relay): {}", url); |
| 207 | } | 264 | } |
| 208 | } | 265 | } |
| 209 | 266 | ||
| 210 | // Discover additional relays from stored announcements | ||
| 211 | let discovered_urls = filter_service.discover_relay_urls().await; | ||
| 212 | for url in discovered_urls { | ||
| 213 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | ||
| 214 | relays_to_connect.push(url.clone()); | ||
| 215 | active_relays.insert(url.clone()); | ||
| 216 | } | ||
| 217 | } | ||
| 218 | |||
| 219 | // Record initial tracked relay count | 267 | // Record initial tracked relay count |
| 220 | if let Some(ref metrics) = self.metrics { | 268 | if let Some(ref metrics) = self.metrics { |
| 221 | metrics.set_tracked_count(active_relays.len() as i64); | 269 | let count = active_relays.lock().await.len(); |
| 270 | metrics.set_tracked_count(count as i64); | ||
| 222 | } | 271 | } |
| 223 | 272 | ||
| 224 | // Spawn connections with startup jitter to prevent thundering herd | 273 | { |
| 225 | for url in relays_to_connect { | 274 | let active = active_relays.lock().await; |
| 226 | tracing::info!("Scheduling connection to sync relay: {}", url); | 275 | if active.is_empty() { |
| 227 | self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone()); | 276 | tracing::info!( |
| 277 | "No bootstrap relay configured, waiting for announcements via self-subscriber..." | ||
| 278 | ); | ||
| 279 | } else { | ||
| 280 | tracing::info!( | ||
| 281 | "SyncManager connected to {} relay(s): {:?}", | ||
| 282 | active.len(), | ||
| 283 | *active | ||
| 284 | ); | ||
| 285 | } | ||
| 228 | } | 286 | } |
| 229 | 287 | ||
| 230 | if active_relays.is_empty() { | 288 | // Spawn self-subscriber task for ALL relay discovery |
| 231 | tracing::warn!("No sync relays configured or discovered, SyncManager idle"); | 289 | let self_subscriber_handle = self.spawn_self_subscriber( |
| 232 | } else { | 290 | tx.clone(), |
| 233 | tracing::info!( | 291 | filter_service.clone(), |
| 234 | "SyncManager connected to {} relays: {:?}", | 292 | active_relays.clone(), |
| 235 | active_relays.len(), | 293 | ); |
| 236 | active_relays | ||
| 237 | ); | ||
| 238 | } | ||
| 239 | 294 | ||
| 240 | // Process incoming events from all connections | 295 | // Process incoming events - just validate and store, NO relay discovery |
| 296 | // (relay discovery is handled solely by the self-subscriber) | ||
| 241 | while let Some(synced_event) = rx.recv().await { | 297 | while let Some(synced_event) = rx.recv().await { |
| 242 | // Check if this event reveals new relays to sync from | ||
| 243 | let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event); | ||
| 244 | for url in new_urls { | ||
| 245 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | ||
| 246 | tracing::info!("Discovered new relay from event, connecting: {}", url); | ||
| 247 | active_relays.insert(url.clone()); | ||
| 248 | |||
| 249 | // Update tracked relay count | ||
| 250 | if let Some(ref metrics) = self.metrics { | ||
| 251 | metrics.inc_tracked_count(); | ||
| 252 | } | ||
| 253 | |||
| 254 | // New relays discovered during runtime don't need jitter | ||
| 255 | self.spawn_connection(url, tx.clone(), filter_service.clone()); | ||
| 256 | } | ||
| 257 | } | ||
| 258 | |||
| 259 | self.process_event(synced_event).await; | 298 | self.process_event(synced_event).await; |
| 260 | } | 299 | } |
| 261 | 300 | ||
| 301 | // Clean up self-subscriber | ||
| 302 | self_subscriber_handle.abort(); | ||
| 262 | tracing::warn!("SyncManager event channel closed, shutting down"); | 303 | tracing::warn!("SyncManager event channel closed, shutting down"); |
| 263 | } | 304 | } |
| 264 | 305 | ||
| @@ -267,52 +308,325 @@ impl SyncManager { | |||
| 267 | url.contains(&self.relay_domain) | 308 | url.contains(&self.relay_domain) |
| 268 | } | 309 | } |
| 269 | 310 | ||
| 270 | /// Spawn a connection task for a relay with startup jitter | 311 | /// Spawn a self-subscriber task that connects to our own relay |
| 312 | /// and watches for kind 30617 announcements to discover new relays. | ||
| 271 | /// | 313 | /// |
| 272 | /// Adds a random delay (0 to startup_jitter_ms) before connecting to prevent | 314 | /// This is the SOLE relay discovery path - all relay discovery happens here. |
| 273 | /// thundering herd on startup when multiple relays are configured. | 315 | /// When a new announcement is saved to our database (from direct submission |
| 274 | /// Set startup_jitter_ms to 0 to disable jitter (useful for testing). | 316 | /// or synced from another relay), the self-subscriber receives it immediately |
| 275 | fn spawn_connection_with_jitter( | 317 | /// and spawns connections to newly discovered relays (with jitter). |
| 318 | fn spawn_self_subscriber( | ||
| 276 | &self, | 319 | &self, |
| 277 | url: String, | ||
| 278 | tx: mpsc::Sender<SyncedEvent>, | 320 | tx: mpsc::Sender<SyncedEvent>, |
| 279 | filter_service: Arc<FilterService>, | 321 | filter_service: Arc<FilterService>, |
| 280 | ) { | 322 | active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>, |
| 281 | let domain = self.relay_domain.clone(); | 323 | ) -> tokio::task::JoinHandle<()> { |
| 282 | let health_tracker = self.health_tracker.clone(); | 324 | let own_relay_url = self.own_relay_url.clone(); |
| 325 | let relay_domain = self.relay_domain.clone(); | ||
| 283 | let metrics = self.metrics.clone(); | 326 | let metrics = self.metrics.clone(); |
| 284 | let max_jitter = self.startup_jitter_ms; | 327 | let health_tracker = self.health_tracker.clone(); |
| 328 | let startup_jitter_ms = self.startup_jitter_ms; | ||
| 285 | 329 | ||
| 286 | tokio::spawn(async move { | 330 | tokio::spawn(async move { |
| 287 | // Apply startup jitter (if configured) | 331 | Self::run_self_subscriber_loop( |
| 288 | if max_jitter > 0 { | 332 | own_relay_url, |
| 289 | let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); | 333 | relay_domain, |
| 290 | tracing::debug!( | 334 | tx, |
| 291 | "Applying {}ms startup jitter before connecting to {}", | 335 | filter_service, |
| 292 | jitter_ms, | 336 | active_relays, |
| 293 | url | 337 | metrics, |
| 294 | ); | 338 | health_tracker, |
| 295 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | 339 | startup_jitter_ms, |
| 340 | ) | ||
| 341 | .await; | ||
| 342 | }) | ||
| 343 | } | ||
| 344 | |||
| 345 | /// Main loop for the self-subscriber | ||
| 346 | /// | ||
| 347 | /// Connects to our own relay, subscribes to kind 30617 announcements, | ||
| 348 | /// and processes events to discover new relays. Handles reconnection | ||
| 349 | /// on disconnect. | ||
| 350 | /// | ||
| 351 | /// ## Reconnection Behavior | ||
| 352 | /// | ||
| 353 | /// - First connection: no `since` filter (get all historical announcements) | ||
| 354 | /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing | ||
| 355 | #[allow(clippy::too_many_arguments)] | ||
| 356 | async fn run_self_subscriber_loop( | ||
| 357 | own_relay_url: String, | ||
| 358 | relay_domain: String, | ||
| 359 | tx: mpsc::Sender<SyncedEvent>, | ||
| 360 | filter_service: Arc<FilterService>, | ||
| 361 | active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>, | ||
| 362 | metrics: Option<SyncMetrics>, | ||
| 363 | health_tracker: Arc<RelayHealthTracker>, | ||
| 364 | startup_jitter_ms: u64, | ||
| 365 | ) { | ||
| 366 | let mut reconnect_delay = Duration::from_secs(1); | ||
| 367 | let max_reconnect_delay = Duration::from_secs(60); | ||
| 368 | let mut is_first_connection = true; | ||
| 369 | |||
| 370 | loop { | ||
| 371 | tracing::info!( | ||
| 372 | "Self-subscriber connecting to own relay: {}", | ||
| 373 | own_relay_url | ||
| 374 | ); | ||
| 375 | |||
| 376 | match Self::connect_self_subscriber(&own_relay_url).await { | ||
| 377 | Ok(client) => { | ||
| 378 | // Reset reconnect delay on successful connection | ||
| 379 | reconnect_delay = Duration::from_secs(1); | ||
| 380 | |||
| 381 | tracing::info!( | ||
| 382 | "Self-subscriber connected to own relay, subscribing to kind {} announcements{}", | ||
| 383 | KIND_REPOSITORY_ANNOUNCEMENT, | ||
| 384 | if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" } | ||
| 385 | ); | ||
| 386 | |||
| 387 | // Subscribe to repository announcements | ||
| 388 | // First connection: get all historical; reconnections: only last 15 minutes | ||
| 389 | let filter = if is_first_connection { | ||
| 390 | Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) | ||
| 391 | } else { | ||
| 392 | let since = Timestamp::now() - 15 * 60; // 15 minutes ago | ||
| 393 | Filter::new() | ||
| 394 | .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) | ||
| 395 | .since(since) | ||
| 396 | }; | ||
| 397 | |||
| 398 | is_first_connection = false; | ||
| 399 | |||
| 400 | if let Err(e) = client.subscribe(filter, None).await { | ||
| 401 | tracing::error!( | ||
| 402 | "Self-subscriber failed to subscribe on {}: {}", | ||
| 403 | own_relay_url, | ||
| 404 | e | ||
| 405 | ); | ||
| 406 | // Will reconnect after delay | ||
| 407 | } else { | ||
| 408 | // Handle notifications until disconnect | ||
| 409 | Self::handle_self_subscriber_notifications( | ||
| 410 | &client, | ||
| 411 | &own_relay_url, | ||
| 412 | &relay_domain, | ||
| 413 | &tx, | ||
| 414 | &filter_service, | ||
| 415 | &active_relays, | ||
| 416 | &metrics, | ||
| 417 | &health_tracker, | ||
| 418 | startup_jitter_ms, | ||
| 419 | ) | ||
| 420 | .await; | ||
| 421 | } | ||
| 422 | |||
| 423 | // Disconnect and cleanup | ||
| 424 | client.disconnect().await; | ||
| 425 | } | ||
| 426 | Err(e) => { | ||
| 427 | tracing::warn!( | ||
| 428 | "Self-subscriber failed to connect to {}: {}", | ||
| 429 | own_relay_url, | ||
| 430 | e | ||
| 431 | ); | ||
| 432 | } | ||
| 296 | } | 433 | } |
| 297 | 434 | ||
| 298 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; | 435 | // Wait before reconnecting with exponential backoff |
| 299 | }); | 436 | tracing::debug!( |
| 437 | "Self-subscriber will reconnect to {} in {:?}", | ||
| 438 | own_relay_url, | ||
| 439 | reconnect_delay | ||
| 440 | ); | ||
| 441 | tokio::time::sleep(reconnect_delay).await; | ||
| 442 | reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); | ||
| 443 | } | ||
| 444 | } | ||
| 445 | |||
| 446 | /// Connect to our own relay for self-subscribing | ||
| 447 | async fn connect_self_subscriber( | ||
| 448 | url: &str, | ||
| 449 | ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { | ||
| 450 | let client = Client::default(); | ||
| 451 | client.add_relay(url).await?; | ||
| 452 | client.connect().await; | ||
| 453 | |||
| 454 | // Wait for connection to establish (with timeout) | ||
| 455 | let timeout = Duration::from_secs(10); | ||
| 456 | let start = std::time::Instant::now(); | ||
| 457 | |||
| 458 | while start.elapsed() < timeout { | ||
| 459 | let relays = client.relays().await; | ||
| 460 | if relays.values().any(|r| r.is_connected()) { | ||
| 461 | return Ok(client); | ||
| 462 | } | ||
| 463 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 464 | } | ||
| 465 | |||
| 466 | Err("Timeout waiting for self-subscriber connection".into()) | ||
| 300 | } | 467 | } |
| 301 | 468 | ||
| 302 | /// Spawn a connection task for a relay without jitter | 469 | /// Handle notifications from the self-subscriber client |
| 303 | /// | 470 | /// |
| 304 | /// Used for relays discovered during runtime (not at startup). | 471 | /// Processes announcement events to discover new relay URLs. |
| 472 | /// Applies jitter before spawning connections to prevent thundering herd. | ||
| 473 | #[allow(clippy::too_many_arguments)] | ||
| 474 | async fn handle_self_subscriber_notifications( | ||
| 475 | client: &Client, | ||
| 476 | own_relay_url: &str, | ||
| 477 | relay_domain: &str, | ||
| 478 | tx: &mpsc::Sender<SyncedEvent>, | ||
| 479 | filter_service: &Arc<FilterService>, | ||
| 480 | active_relays: &Arc<tokio::sync::Mutex<HashSet<String>>>, | ||
| 481 | metrics: &Option<SyncMetrics>, | ||
| 482 | health_tracker: &Arc<RelayHealthTracker>, | ||
| 483 | startup_jitter_ms: u64, | ||
| 484 | ) { | ||
| 485 | let own_relay_url = own_relay_url.to_string(); | ||
| 486 | let relay_domain = relay_domain.to_string(); | ||
| 487 | let filter_service = filter_service.clone(); | ||
| 488 | let active_relays = active_relays.clone(); | ||
| 489 | let metrics = metrics.clone(); | ||
| 490 | let health_tracker = health_tracker.clone(); | ||
| 491 | let tx = tx.clone(); | ||
| 492 | |||
| 493 | client | ||
| 494 | .handle_notifications(|notification| { | ||
| 495 | let own_relay_url = own_relay_url.clone(); | ||
| 496 | let relay_domain = relay_domain.clone(); | ||
| 497 | let filter_service = filter_service.clone(); | ||
| 498 | let active_relays = active_relays.clone(); | ||
| 499 | let metrics = metrics.clone(); | ||
| 500 | let health_tracker = health_tracker.clone(); | ||
| 501 | let tx = tx.clone(); | ||
| 502 | |||
| 503 | async move { | ||
| 504 | match notification { | ||
| 505 | RelayPoolNotification::Event { event, .. } => { | ||
| 506 | // Only process repository announcement events | ||
| 507 | if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT { | ||
| 508 | return Ok(false); | ||
| 509 | } | ||
| 510 | |||
| 511 | tracing::debug!( | ||
| 512 | "Self-subscriber received announcement {} from {}", | ||
| 513 | event.id, | ||
| 514 | own_relay_url | ||
| 515 | ); | ||
| 516 | |||
| 517 | // Extract relay URLs from the announcement | ||
| 518 | let new_urls = filter_service.extract_relay_urls_from_event(&event); | ||
| 519 | |||
| 520 | for url in new_urls { | ||
| 521 | // Check if we should connect to this relay | ||
| 522 | let should_connect = { | ||
| 523 | let mut active = active_relays.lock().await; | ||
| 524 | let is_new = !active.contains(&url); | ||
| 525 | let is_not_self = !url.contains(&relay_domain); | ||
| 526 | |||
| 527 | if is_new && is_not_self { | ||
| 528 | active.insert(url.clone()); | ||
| 529 | true | ||
| 530 | } else { | ||
| 531 | false | ||
| 532 | } | ||
| 533 | }; | ||
| 534 | |||
| 535 | if should_connect { | ||
| 536 | tracing::info!( | ||
| 537 | "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}", | ||
| 538 | event.id, | ||
| 539 | url | ||
| 540 | ); | ||
| 541 | |||
| 542 | // Update tracked relay count | ||
| 543 | if let Some(ref m) = metrics { | ||
| 544 | m.inc_tracked_count(); | ||
| 545 | } | ||
| 546 | |||
| 547 | // Spawn connection to the new relay WITH jitter at point of discovery | ||
| 548 | let url_clone = url.clone(); | ||
| 549 | let tx_clone = tx.clone(); | ||
| 550 | let filter_service_clone = filter_service.clone(); | ||
| 551 | let domain_clone = relay_domain.clone(); | ||
| 552 | let health_tracker_clone = health_tracker.clone(); | ||
| 553 | let metrics_clone = metrics.clone(); | ||
| 554 | |||
| 555 | tokio::spawn(async move { | ||
| 556 | // Apply jitter at point of discovery | ||
| 557 | if startup_jitter_ms > 0 { | ||
| 558 | let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms); | ||
| 559 | tracing::debug!( | ||
| 560 | "Applying {}ms jitter before connecting to discovered relay {}", | ||
| 561 | jitter_ms, | ||
| 562 | url_clone | ||
| 563 | ); | ||
| 564 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | ||
| 565 | } | ||
| 566 | |||
| 567 | connect_with_retry( | ||
| 568 | &url_clone, | ||
| 569 | tx_clone, | ||
| 570 | filter_service_clone, | ||
| 571 | &domain_clone, | ||
| 572 | health_tracker_clone, | ||
| 573 | metrics_clone, | ||
| 574 | ) | ||
| 575 | .await; | ||
| 576 | }); | ||
| 577 | } | ||
| 578 | } | ||
| 579 | |||
| 580 | Ok(false) // Continue processing | ||
| 581 | } | ||
| 582 | RelayPoolNotification::Shutdown => { | ||
| 583 | tracing::warn!( | ||
| 584 | "Self-subscriber connection shutdown for {}", | ||
| 585 | own_relay_url | ||
| 586 | ); | ||
| 587 | Ok(true) // Stop on shutdown | ||
| 588 | } | ||
| 589 | RelayPoolNotification::Message { .. } => { | ||
| 590 | Ok(false) // Continue processing | ||
| 591 | } | ||
| 592 | } | ||
| 593 | } | ||
| 594 | }) | ||
| 595 | .await | ||
| 596 | .ok(); | ||
| 597 | } | ||
| 598 | |||
| 599 | /// Spawn a connection task for a relay | ||
| 600 | /// | ||
| 601 | /// # Arguments | ||
| 602 | /// * `url` - Relay URL to connect to | ||
| 603 | /// * `tx` - Channel sender for synced events | ||
| 604 | /// * `filter_service` - Filter service for subscriptions | ||
| 605 | /// * `apply_jitter` - Whether to apply startup jitter before connecting | ||
| 305 | fn spawn_connection( | 606 | fn spawn_connection( |
| 306 | &self, | 607 | &self, |
| 307 | url: String, | 608 | url: String, |
| 308 | tx: mpsc::Sender<SyncedEvent>, | 609 | tx: mpsc::Sender<SyncedEvent>, |
| 309 | filter_service: Arc<FilterService>, | 610 | filter_service: Arc<FilterService>, |
| 611 | apply_jitter: bool, | ||
| 310 | ) { | 612 | ) { |
| 311 | let domain = self.relay_domain.clone(); | 613 | let domain = self.relay_domain.clone(); |
| 312 | let health_tracker = self.health_tracker.clone(); | 614 | let health_tracker = self.health_tracker.clone(); |
| 313 | let metrics = self.metrics.clone(); | 615 | let metrics = self.metrics.clone(); |
| 616 | let max_jitter = self.startup_jitter_ms; | ||
| 314 | 617 | ||
| 315 | tokio::spawn(async move { | 618 | tokio::spawn(async move { |
| 619 | // Apply startup jitter if requested | ||
| 620 | if apply_jitter && max_jitter > 0 { | ||
| 621 | let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); | ||
| 622 | tracing::debug!( | ||
| 623 | "Applying {}ms jitter before connecting to {}", | ||
| 624 | jitter_ms, | ||
| 625 | url | ||
| 626 | ); | ||
| 627 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | ||
| 628 | } | ||
| 629 | |||
| 316 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; | 630 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; |
| 317 | }); | 631 | }); |
| 318 | } | 632 | } |
| @@ -351,7 +665,10 @@ impl SyncManager { | |||
| 351 | } | 665 | } |
| 352 | 666 | ||
| 353 | // Validate through write policy using sync_source_addr derived from config | 667 | // Validate through write policy using sync_source_addr derived from config |
| 354 | let result = self.write_policy.admit_event(event, &self.sync_source_addr).await; | 668 | let result = self |
| 669 | .write_policy | ||
| 670 | .admit_event(event, &self.sync_source_addr) | ||
| 671 | .await; | ||
| 355 | 672 | ||
| 356 | match result { | 673 | match result { |
| 357 | PolicyResult::Accept => { | 674 | PolicyResult::Accept => { |
| @@ -386,12 +703,16 @@ impl SyncManager { | |||
| 386 | /// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" | 703 | /// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" |
| 387 | /// - "wss://relay.example.com" -> "relay.example.com" | 704 | /// - "wss://relay.example.com" -> "relay.example.com" |
| 388 | fn extract_domain_from_url(url: &str) -> Option<String> { | 705 | fn extract_domain_from_url(url: &str) -> Option<String> { |
| 389 | let url = url.trim_start_matches("ws://").trim_start_matches("wss://"); | 706 | let url = url |
| 390 | let url = url.trim_start_matches("http://").trim_start_matches("https://"); | 707 | .trim_start_matches("ws://") |
| 391 | 708 | .trim_start_matches("wss://"); | |
| 709 | let url = url | ||
| 710 | .trim_start_matches("http://") | ||
| 711 | .trim_start_matches("https://"); | ||
| 712 | |||
| 392 | // Remove path | 713 | // Remove path |
| 393 | let domain = url.split('/').next()?; | 714 | let domain = url.split('/').next()?; |
| 394 | 715 | ||
| 395 | Some(domain.to_string()) | 716 | Some(domain.to_string()) |
| 396 | } | 717 | } |
| 397 | 718 | ||
| @@ -430,4 +751,12 @@ mod tests { | |||
| 430 | Some("example.com:3000".to_string()) | 751 | Some("example.com:3000".to_string()) |
| 431 | ); | 752 | ); |
| 432 | } | 753 | } |
| 754 | |||
| 755 | #[test] | ||
| 756 | fn test_derive_own_relay_url() { | ||
| 757 | assert_eq!( | ||
| 758 | derive_own_relay_url("127.0.0.1:8080"), | ||
| 759 | "ws://127.0.0.1:8080".to_string() | ||
| 760 | ); | ||
| 761 | } | ||
| 433 | } \ No newline at end of file | 762 | } \ No newline at end of file |