diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-09 09:28:12 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-09 09:28:18 +0000 |
| commit | efaad1e2857914b87307cf78903a957a604697a8 (patch) | |
| tree | dadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/manager.rs | |
| parent | 91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff) | |
basic sync stub
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 762 |
1 files changed, 0 insertions, 762 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs deleted file mode 100644 index 6ae82ef..0000000 --- a/src/sync/manager.rs +++ /dev/null | |||
| @@ -1,762 +0,0 @@ | |||
| 1 | //! SyncManager - Coordinates proactive sync operations | ||
| 2 | //! | ||
| 3 | //! The SyncManager connects to remote relays, receives events, validates them | ||
| 4 | //! through the write policy, and stores accepted events. | ||
| 5 | //! | ||
| 6 | //! ## Simplified Relay Discovery Architecture | ||
| 7 | //! | ||
| 8 | //! All relay discovery is centralized in the self-subscriber: | ||
| 9 | //! - Bootstrap relay: connected immediately (no jitter, single relay) | ||
| 10 | //! - All other relays: discovered via self-subscriber announcements (with jitter) | ||
| 11 | //! | ||
| 12 | //! ```text | ||
| 13 | //! ┌─────────────────────────────────────────────────────────────┐ | ||
| 14 | //! │ ngit-grasp │ | ||
| 15 | //! │ │ | ||
| 16 | //! │ ┌─────────────┐ broadcasts ┌───────────────┐ │ | ||
| 17 | //! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │ | ||
| 18 | //! │ │ Database │ │ Client │ │ | ||
| 19 | //! │ └─────────────┘ └───────┬───────┘ │ | ||
| 20 | //! │ ▲ │ │ | ||
| 21 | //! │ │ stores │ extracts │ | ||
| 22 | //! │ │ │ relay │ | ||
| 23 | //! │ ┌─────┴─────┐ │ URLs │ | ||
| 24 | //! │ │ Remote │◀────────────────────────────────┘ │ | ||
| 25 | //! │ │Connections│ spawns new │ | ||
| 26 | //! │ └───────────┘ connections (with jitter) │ | ||
| 27 | //! └─────────────────────────────────────────────────────────────┘ | ||
| 28 | //! ``` | ||
| 29 | //! | ||
| 30 | //! ## Key Design Decisions | ||
| 31 | //! | ||
| 32 | //! - **Single relay discovery path**: Only the self-subscriber discovers new relays | ||
| 33 | //! - **Jitter at point of discovery**: Applied when spawning connections from announcements | ||
| 34 | //! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect | ||
| 35 | //! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd | ||
| 36 | //! | ||
| 37 | //! ## Phase 2 Features | ||
| 38 | //! | ||
| 39 | //! - Relay discovery from kind 30617 announcements (via self-subscriber) | ||
| 40 | //! - Multiple simultaneous relay connections | ||
| 41 | //! - Three-layer filter strategy via FilterService | ||
| 42 | //! | ||
| 43 | //! ## Phase 3 Features | ||
| 44 | //! | ||
| 45 | //! - Health tracking with exponential backoff | ||
| 46 | //! - Dead relay detection after 24h of failures | ||
| 47 | //! - Startup jitter to prevent thundering herd | ||
| 48 | //! | ||
| 49 | //! ## Phase 4 Features | ||
| 50 | //! | ||
| 51 | //! - Dynamic subscription updates handled per-connection | ||
| 52 | //! - Each connection manages its own SubscriptionManager | ||
| 53 | //! - Announcements trigger Layer 2 subscriptions | ||
| 54 | //! - PRs/Issues trigger Layer 3 subscriptions | ||
| 55 | //! - Consolidation when filter count exceeds 150 | ||
| 56 | |||
| 57 | use std::collections::HashSet; | ||
| 58 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
| 59 | use std::sync::Arc; | ||
| 60 | use std::time::Duration; | ||
| 61 | |||
| 62 | use nostr_relay_builder::prelude::*; | ||
| 63 | use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp}; | ||
| 64 | use rand::Rng; | ||
| 65 | use tokio::sync::mpsc; | ||
| 66 | |||
| 67 | use super::connection::{connect_with_retry, SyncedEvent}; | ||
| 68 | use super::filter::FilterService; | ||
| 69 | use super::health::RelayHealthTracker; | ||
| 70 | use super::metrics::SyncMetrics; | ||
| 71 | use crate::config::Config; | ||
| 72 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | ||
| 73 | use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT; | ||
| 74 | |||
| 75 | /// Default fallback address for sync source when bind_address cannot be parsed | ||
| 76 | /// | ||
| 77 | /// This distinguishes synced events from directly-submitted events in logs and metrics. | ||
| 78 | /// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker. | ||
| 79 | pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr = | ||
| 80 | SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); | ||
| 81 | |||
| 82 | /// Derive sync source address from config bind_address | ||
| 83 | /// | ||
| 84 | /// Parses the bind_address string and returns a SocketAddr. | ||
| 85 | /// Falls back to 127.0.0.1:8080 if parsing fails. | ||
| 86 | fn get_sync_source_addr(bind_address: &str) -> SocketAddr { | ||
| 87 | bind_address | ||
| 88 | .parse() | ||
| 89 | .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) | ||
| 90 | } | ||
| 91 | |||
| 92 | /// Derive the WebSocket URL for our own relay from bind_address | ||
| 93 | fn derive_own_relay_url(bind_address: &str) -> String { | ||
| 94 | format!("ws://{}", bind_address) | ||
| 95 | } | ||
| 96 | |||
| 97 | /// Coordinates proactive sync from configured and discovered relays | ||
| 98 | pub struct SyncManager { | ||
| 99 | /// Bootstrap relay URL for initial sync (from config) | ||
| 100 | /// Additional relays are discovered from repository announcements that list our service | ||
| 101 | bootstrap_relay_url: Option<String>, | ||
| 102 | /// Our relay's domain (for filtering) | ||
| 103 | relay_domain: String, | ||
| 104 | /// Our relay's WebSocket URL (for self-subscribe) | ||
| 105 | own_relay_url: String, | ||
| 106 | /// Database for storing accepted events | ||
| 107 | database: SharedDatabase, | ||
| 108 | /// Write policy for validating events | ||
| 109 | write_policy: Nip34WritePolicy, | ||
| 110 | /// Health tracker for relay connections | ||
| 111 | health_tracker: Arc<RelayHealthTracker>, | ||
| 112 | /// Sync metrics for Prometheus | ||
| 113 | metrics: Option<SyncMetrics>, | ||
| 114 | /// Source address for synced events (derived from config.bind_address) | ||
| 115 | sync_source_addr: SocketAddr, | ||
| 116 | /// Maximum startup jitter in milliseconds (from config) | ||
| 117 | startup_jitter_ms: u64, | ||
| 118 | } | ||
| 119 | |||
| 120 | impl SyncManager { | ||
| 121 | /// Create a new SyncManager | ||
| 122 | /// | ||
| 123 | /// # Arguments | ||
| 124 | /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config | ||
| 125 | /// * `relay_domain` - Our relay's domain (used to exclude self from sync) | ||
| 126 | /// * `database` - Shared database for storing events and querying announcements | ||
| 127 | /// * `write_policy` - Write policy for validating synced events | ||
| 128 | /// * `config` - Configuration for health tracking settings | ||
| 129 | pub fn new( | ||
| 130 | bootstrap_relay_url: Option<String>, | ||
| 131 | relay_domain: String, | ||
| 132 | database: SharedDatabase, | ||
| 133 | write_policy: Nip34WritePolicy, | ||
| 134 | config: &Config, | ||
| 135 | ) -> Self { | ||
| 136 | let own_relay_url = derive_own_relay_url(&config.bind_address); | ||
| 137 | Self { | ||
| 138 | bootstrap_relay_url, | ||
| 139 | relay_domain, | ||
| 140 | own_relay_url, | ||
| 141 | database, | ||
| 142 | write_policy, | ||
| 143 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | ||
| 144 | metrics: None, | ||
| 145 | sync_source_addr: get_sync_source_addr(&config.bind_address), | ||
| 146 | startup_jitter_ms: config.sync_startup_jitter_ms, | ||
| 147 | } | ||
| 148 | } | ||
| 149 | |||
| 150 | /// Create a new SyncManager with metrics | ||
| 151 | /// | ||
| 152 | /// # Arguments | ||
| 153 | /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config | ||
| 154 | /// * `relay_domain` - Our relay's domain (used to exclude self from sync) | ||
| 155 | /// * `database` - Shared database for storing events and querying announcements | ||
| 156 | /// * `write_policy` - Write policy for validating synced events | ||
| 157 | /// * `config` - Configuration for health tracking settings | ||
| 158 | /// * `metrics` - Sync metrics for Prometheus | ||
| 159 | pub fn with_metrics( | ||
| 160 | bootstrap_relay_url: Option<String>, | ||
| 161 | relay_domain: String, | ||
| 162 | database: SharedDatabase, | ||
| 163 | write_policy: Nip34WritePolicy, | ||
| 164 | config: &Config, | ||
| 165 | metrics: SyncMetrics, | ||
| 166 | ) -> Self { | ||
| 167 | let own_relay_url = derive_own_relay_url(&config.bind_address); | ||
| 168 | Self { | ||
| 169 | bootstrap_relay_url, | ||
| 170 | relay_domain, | ||
| 171 | own_relay_url, | ||
| 172 | database, | ||
| 173 | write_policy, | ||
| 174 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | ||
| 175 | metrics: Some(metrics), | ||
| 176 | sync_source_addr: get_sync_source_addr(&config.bind_address), | ||
| 177 | startup_jitter_ms: config.sync_startup_jitter_ms, | ||
| 178 | } | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Create a SyncManager with a single relay URL (Phase 1 compatibility) | ||
| 182 | pub fn with_single_relay( | ||
| 183 | bootstrap_url: String, | ||
| 184 | database: SharedDatabase, | ||
| 185 | write_policy: Nip34WritePolicy, | ||
| 186 | ) -> Self { | ||
| 187 | // Extract domain from URL for filtering | ||
| 188 | let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default(); | ||
| 189 | let own_relay_url = format!("ws://{}", relay_domain); | ||
| 190 | Self { | ||
| 191 | bootstrap_relay_url: Some(bootstrap_url), | ||
| 192 | relay_domain, | ||
| 193 | own_relay_url, | ||
| 194 | database, | ||
| 195 | write_policy, | ||
| 196 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), | ||
| 197 | metrics: None, | ||
| 198 | sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, | ||
| 199 | startup_jitter_ms: 10_000, // Default 10 seconds | ||
| 200 | } | ||
| 201 | } | ||
| 202 | |||
| 203 | /// Set metrics for the sync manager | ||
| 204 | pub fn set_metrics(&mut self, metrics: SyncMetrics) { | ||
| 205 | self.metrics = Some(metrics); | ||
| 206 | } | ||
| 207 | |||
| 208 | /// Get a reference to the metrics | ||
| 209 | pub fn metrics(&self) -> Option<&SyncMetrics> { | ||
| 210 | self.metrics.as_ref() | ||
| 211 | } | ||
| 212 | |||
| 213 | /// Get a reference to the health tracker | ||
| 214 | pub fn health_tracker(&self) -> Arc<RelayHealthTracker> { | ||
| 215 | self.health_tracker.clone() | ||
| 216 | } | ||
| 217 | |||
| 218 | /// Run the sync manager | ||
| 219 | /// | ||
| 220 | /// This spawns the bootstrap relay connection (if configured), sets up a | ||
| 221 | /// self-subscriber for event-driven relay discovery, and processes incoming | ||
| 222 | /// events. The self-subscriber handles ALL relay discovery from announcements. | ||
| 223 | /// Runs indefinitely until cancelled. | ||
| 224 | /// | ||
| 225 | /// ## Simplified Relay Discovery Architecture | ||
| 226 | /// | ||
| 227 | /// All relay discovery is centralized in the self-subscriber: | ||
| 228 | /// - Bootstrap relay: connected immediately (no jitter, single relay) | ||
| 229 | /// - All other relays: discovered via self-subscriber announcements (with jitter) | ||
| 230 | /// - Jitter applied at point of discovery (not startup) | ||
| 231 | /// | ||
| 232 | /// This eliminates three redundant discovery paths: | ||
| 233 | /// 1. DB query at startup (removed) | ||
| 234 | /// 2. Remote event extraction (removed) | ||
| 235 | /// 3. Self-subscriber (sole discovery path) | ||
| 236 | pub async fn run(self) { | ||
| 237 | tracing::info!( | ||
| 238 | "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})", | ||
| 239 | self.relay_domain, | ||
| 240 | self.own_relay_url, | ||
| 241 | self.bootstrap_relay_url | ||
| 242 | ); | ||
| 243 | |||
| 244 | // Create the filter service | ||
| 245 | let filter_service = Arc::new(FilterService::new( | ||
| 246 | self.database.clone(), | ||
| 247 | self.relay_domain.clone(), | ||
| 248 | )); | ||
| 249 | |||
| 250 | // Create channel for receiving events from all connections | ||
| 251 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); | ||
| 252 | |||
| 253 | // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing) | ||
| 254 | let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::<String>::new())); | ||
| 255 | |||
| 256 | // Bootstrap relay - connect immediately (no jitter, just one relay) | ||
| 257 | if let Some(ref url) = self.bootstrap_relay_url { | ||
| 258 | if !self.is_own_relay(url) { | ||
| 259 | tracing::info!("Connecting to bootstrap relay: {}", url); | ||
| 260 | active_relays.lock().await.insert(url.clone()); | ||
| 261 | self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false); | ||
| 262 | } else { | ||
| 263 | tracing::info!("Skipping bootstrap relay (is our own relay): {}", url); | ||
| 264 | } | ||
| 265 | } | ||
| 266 | |||
| 267 | // Record initial tracked relay count | ||
| 268 | if let Some(ref metrics) = self.metrics { | ||
| 269 | let count = active_relays.lock().await.len(); | ||
| 270 | metrics.set_tracked_count(count as i64); | ||
| 271 | } | ||
| 272 | |||
| 273 | { | ||
| 274 | let active = active_relays.lock().await; | ||
| 275 | if active.is_empty() { | ||
| 276 | tracing::info!( | ||
| 277 | "No bootstrap relay configured, waiting for announcements via self-subscriber..." | ||
| 278 | ); | ||
| 279 | } else { | ||
| 280 | tracing::info!( | ||
| 281 | "SyncManager connected to {} relay(s): {:?}", | ||
| 282 | active.len(), | ||
| 283 | *active | ||
| 284 | ); | ||
| 285 | } | ||
| 286 | } | ||
| 287 | |||
| 288 | // Spawn self-subscriber task for ALL relay discovery | ||
| 289 | let self_subscriber_handle = self.spawn_self_subscriber( | ||
| 290 | tx.clone(), | ||
| 291 | filter_service.clone(), | ||
| 292 | active_relays.clone(), | ||
| 293 | ); | ||
| 294 | |||
| 295 | // Process incoming events - just validate and store, NO relay discovery | ||
| 296 | // (relay discovery is handled solely by the self-subscriber) | ||
| 297 | while let Some(synced_event) = rx.recv().await { | ||
| 298 | self.process_event(synced_event).await; | ||
| 299 | } | ||
| 300 | |||
| 301 | // Clean up self-subscriber | ||
| 302 | self_subscriber_handle.abort(); | ||
| 303 | tracing::warn!("SyncManager event channel closed, shutting down"); | ||
| 304 | } | ||
| 305 | |||
| 306 | /// Check if a URL points to our own relay | ||
| 307 | fn is_own_relay(&self, url: &str) -> bool { | ||
| 308 | url.contains(&self.relay_domain) | ||
| 309 | } | ||
| 310 | |||
| 311 | /// Spawn a self-subscriber task that connects to our own relay | ||
| 312 | /// and watches for kind 30617 announcements to discover new relays. | ||
| 313 | /// | ||
| 314 | /// This is the SOLE relay discovery path - all relay discovery happens here. | ||
| 315 | /// When a new announcement is saved to our database (from direct submission | ||
| 316 | /// or synced from another relay), the self-subscriber receives it immediately | ||
| 317 | /// and spawns connections to newly discovered relays (with jitter). | ||
| 318 | fn spawn_self_subscriber( | ||
| 319 | &self, | ||
| 320 | tx: mpsc::Sender<SyncedEvent>, | ||
| 321 | filter_service: Arc<FilterService>, | ||
| 322 | active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>, | ||
| 323 | ) -> tokio::task::JoinHandle<()> { | ||
| 324 | let own_relay_url = self.own_relay_url.clone(); | ||
| 325 | let relay_domain = self.relay_domain.clone(); | ||
| 326 | let metrics = self.metrics.clone(); | ||
| 327 | let health_tracker = self.health_tracker.clone(); | ||
| 328 | let startup_jitter_ms = self.startup_jitter_ms; | ||
| 329 | |||
| 330 | tokio::spawn(async move { | ||
| 331 | Self::run_self_subscriber_loop( | ||
| 332 | own_relay_url, | ||
| 333 | relay_domain, | ||
| 334 | tx, | ||
| 335 | filter_service, | ||
| 336 | active_relays, | ||
| 337 | metrics, | ||
| 338 | health_tracker, | ||
| 339 | startup_jitter_ms, | ||
| 340 | ) | ||
| 341 | .await; | ||
| 342 | }) | ||
| 343 | } | ||
| 344 | |||
| 345 | /// Main loop for the self-subscriber | ||
| 346 | /// | ||
| 347 | /// Connects to our own relay, subscribes to kind 30617 announcements, | ||
| 348 | /// and processes events to discover new relays. Handles reconnection | ||
| 349 | /// on disconnect. | ||
| 350 | /// | ||
| 351 | /// ## Reconnection Behavior | ||
| 352 | /// | ||
| 353 | /// - First connection: no `since` filter (get all historical announcements) | ||
| 354 | /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing | ||
| 355 | #[allow(clippy::too_many_arguments)] | ||
| 356 | async fn run_self_subscriber_loop( | ||
| 357 | own_relay_url: String, | ||
| 358 | relay_domain: String, | ||
| 359 | tx: mpsc::Sender<SyncedEvent>, | ||
| 360 | filter_service: Arc<FilterService>, | ||
| 361 | active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>, | ||
| 362 | metrics: Option<SyncMetrics>, | ||
| 363 | health_tracker: Arc<RelayHealthTracker>, | ||
| 364 | startup_jitter_ms: u64, | ||
| 365 | ) { | ||
| 366 | let mut reconnect_delay = Duration::from_secs(1); | ||
| 367 | let max_reconnect_delay = Duration::from_secs(60); | ||
| 368 | let mut is_first_connection = true; | ||
| 369 | |||
| 370 | loop { | ||
| 371 | tracing::info!( | ||
| 372 | "Self-subscriber connecting to own relay: {}", | ||
| 373 | own_relay_url | ||
| 374 | ); | ||
| 375 | |||
| 376 | match Self::connect_self_subscriber(&own_relay_url).await { | ||
| 377 | Ok(client) => { | ||
| 378 | // Reset reconnect delay on successful connection | ||
| 379 | reconnect_delay = Duration::from_secs(1); | ||
| 380 | |||
| 381 | tracing::info!( | ||
| 382 | "Self-subscriber connected to own relay, subscribing to kind {} announcements{}", | ||
| 383 | KIND_REPOSITORY_ANNOUNCEMENT, | ||
| 384 | if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" } | ||
| 385 | ); | ||
| 386 | |||
| 387 | // Subscribe to repository announcements | ||
| 388 | // First connection: get all historical; reconnections: only last 15 minutes | ||
| 389 | let filter = if is_first_connection { | ||
| 390 | Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) | ||
| 391 | } else { | ||
| 392 | let since = Timestamp::now() - 15 * 60; // 15 minutes ago | ||
| 393 | Filter::new() | ||
| 394 | .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT)) | ||
| 395 | .since(since) | ||
| 396 | }; | ||
| 397 | |||
| 398 | is_first_connection = false; | ||
| 399 | |||
| 400 | if let Err(e) = client.subscribe(filter, None).await { | ||
| 401 | tracing::error!( | ||
| 402 | "Self-subscriber failed to subscribe on {}: {}", | ||
| 403 | own_relay_url, | ||
| 404 | e | ||
| 405 | ); | ||
| 406 | // Will reconnect after delay | ||
| 407 | } else { | ||
| 408 | // Handle notifications until disconnect | ||
| 409 | Self::handle_self_subscriber_notifications( | ||
| 410 | &client, | ||
| 411 | &own_relay_url, | ||
| 412 | &relay_domain, | ||
| 413 | &tx, | ||
| 414 | &filter_service, | ||
| 415 | &active_relays, | ||
| 416 | &metrics, | ||
| 417 | &health_tracker, | ||
| 418 | startup_jitter_ms, | ||
| 419 | ) | ||
| 420 | .await; | ||
| 421 | } | ||
| 422 | |||
| 423 | // Disconnect and cleanup | ||
| 424 | client.disconnect().await; | ||
| 425 | } | ||
| 426 | Err(e) => { | ||
| 427 | tracing::warn!( | ||
| 428 | "Self-subscriber failed to connect to {}: {}", | ||
| 429 | own_relay_url, | ||
| 430 | e | ||
| 431 | ); | ||
| 432 | } | ||
| 433 | } | ||
| 434 | |||
| 435 | // Wait before reconnecting with exponential backoff | ||
| 436 | tracing::debug!( | ||
| 437 | "Self-subscriber will reconnect to {} in {:?}", | ||
| 438 | own_relay_url, | ||
| 439 | reconnect_delay | ||
| 440 | ); | ||
| 441 | tokio::time::sleep(reconnect_delay).await; | ||
| 442 | reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); | ||
| 443 | } | ||
| 444 | } | ||
| 445 | |||
| 446 | /// Connect to our own relay for self-subscribing | ||
| 447 | async fn connect_self_subscriber( | ||
| 448 | url: &str, | ||
| 449 | ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { | ||
| 450 | let client = Client::default(); | ||
| 451 | client.add_relay(url).await?; | ||
| 452 | client.connect().await; | ||
| 453 | |||
| 454 | // Wait for connection to establish (with timeout) | ||
| 455 | let timeout = Duration::from_secs(10); | ||
| 456 | let start = std::time::Instant::now(); | ||
| 457 | |||
| 458 | while start.elapsed() < timeout { | ||
| 459 | let relays = client.relays().await; | ||
| 460 | if relays.values().any(|r| r.is_connected()) { | ||
| 461 | return Ok(client); | ||
| 462 | } | ||
| 463 | tokio::time::sleep(Duration::from_millis(100)).await; | ||
| 464 | } | ||
| 465 | |||
| 466 | Err("Timeout waiting for self-subscriber connection".into()) | ||
| 467 | } | ||
| 468 | |||
| 469 | /// Handle notifications from the self-subscriber client | ||
| 470 | /// | ||
| 471 | /// Processes announcement events to discover new relay URLs. | ||
| 472 | /// Applies jitter before spawning connections to prevent thundering herd. | ||
| 473 | #[allow(clippy::too_many_arguments)] | ||
| 474 | async fn handle_self_subscriber_notifications( | ||
| 475 | client: &Client, | ||
| 476 | own_relay_url: &str, | ||
| 477 | relay_domain: &str, | ||
| 478 | tx: &mpsc::Sender<SyncedEvent>, | ||
| 479 | filter_service: &Arc<FilterService>, | ||
| 480 | active_relays: &Arc<tokio::sync::Mutex<HashSet<String>>>, | ||
| 481 | metrics: &Option<SyncMetrics>, | ||
| 482 | health_tracker: &Arc<RelayHealthTracker>, | ||
| 483 | startup_jitter_ms: u64, | ||
| 484 | ) { | ||
| 485 | let own_relay_url = own_relay_url.to_string(); | ||
| 486 | let relay_domain = relay_domain.to_string(); | ||
| 487 | let filter_service = filter_service.clone(); | ||
| 488 | let active_relays = active_relays.clone(); | ||
| 489 | let metrics = metrics.clone(); | ||
| 490 | let health_tracker = health_tracker.clone(); | ||
| 491 | let tx = tx.clone(); | ||
| 492 | |||
| 493 | client | ||
| 494 | .handle_notifications(|notification| { | ||
| 495 | let own_relay_url = own_relay_url.clone(); | ||
| 496 | let relay_domain = relay_domain.clone(); | ||
| 497 | let filter_service = filter_service.clone(); | ||
| 498 | let active_relays = active_relays.clone(); | ||
| 499 | let metrics = metrics.clone(); | ||
| 500 | let health_tracker = health_tracker.clone(); | ||
| 501 | let tx = tx.clone(); | ||
| 502 | |||
| 503 | async move { | ||
| 504 | match notification { | ||
| 505 | RelayPoolNotification::Event { event, .. } => { | ||
| 506 | // Only process repository announcement events | ||
| 507 | if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT { | ||
| 508 | return Ok(false); | ||
| 509 | } | ||
| 510 | |||
| 511 | tracing::debug!( | ||
| 512 | "Self-subscriber received announcement {} from {}", | ||
| 513 | event.id, | ||
| 514 | own_relay_url | ||
| 515 | ); | ||
| 516 | |||
| 517 | // Extract relay URLs from the announcement | ||
| 518 | let new_urls = filter_service.extract_relay_urls_from_event(&event); | ||
| 519 | |||
| 520 | for url in new_urls { | ||
| 521 | // Check if we should connect to this relay | ||
| 522 | let should_connect = { | ||
| 523 | let mut active = active_relays.lock().await; | ||
| 524 | let is_new = !active.contains(&url); | ||
| 525 | let is_not_self = !url.contains(&relay_domain); | ||
| 526 | |||
| 527 | if is_new && is_not_self { | ||
| 528 | active.insert(url.clone()); | ||
| 529 | true | ||
| 530 | } else { | ||
| 531 | false | ||
| 532 | } | ||
| 533 | }; | ||
| 534 | |||
| 535 | if should_connect { | ||
| 536 | tracing::info!( | ||
| 537 | "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}", | ||
| 538 | event.id, | ||
| 539 | url | ||
| 540 | ); | ||
| 541 | |||
| 542 | // Update tracked relay count | ||
| 543 | if let Some(ref m) = metrics { | ||
| 544 | m.inc_tracked_count(); | ||
| 545 | } | ||
| 546 | |||
| 547 | // Spawn connection to the new relay WITH jitter at point of discovery | ||
| 548 | let url_clone = url.clone(); | ||
| 549 | let tx_clone = tx.clone(); | ||
| 550 | let filter_service_clone = filter_service.clone(); | ||
| 551 | let domain_clone = relay_domain.clone(); | ||
| 552 | let health_tracker_clone = health_tracker.clone(); | ||
| 553 | let metrics_clone = metrics.clone(); | ||
| 554 | |||
| 555 | tokio::spawn(async move { | ||
| 556 | // Apply jitter at point of discovery | ||
| 557 | if startup_jitter_ms > 0 { | ||
| 558 | let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms); | ||
| 559 | tracing::debug!( | ||
| 560 | "Applying {}ms jitter before connecting to discovered relay {}", | ||
| 561 | jitter_ms, | ||
| 562 | url_clone | ||
| 563 | ); | ||
| 564 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | ||
| 565 | } | ||
| 566 | |||
| 567 | connect_with_retry( | ||
| 568 | &url_clone, | ||
| 569 | tx_clone, | ||
| 570 | filter_service_clone, | ||
| 571 | &domain_clone, | ||
| 572 | health_tracker_clone, | ||
| 573 | metrics_clone, | ||
| 574 | ) | ||
| 575 | .await; | ||
| 576 | }); | ||
| 577 | } | ||
| 578 | } | ||
| 579 | |||
| 580 | Ok(false) // Continue processing | ||
| 581 | } | ||
| 582 | RelayPoolNotification::Shutdown => { | ||
| 583 | tracing::warn!( | ||
| 584 | "Self-subscriber connection shutdown for {}", | ||
| 585 | own_relay_url | ||
| 586 | ); | ||
| 587 | Ok(true) // Stop on shutdown | ||
| 588 | } | ||
| 589 | RelayPoolNotification::Message { .. } => { | ||
| 590 | Ok(false) // Continue processing | ||
| 591 | } | ||
| 592 | } | ||
| 593 | } | ||
| 594 | }) | ||
| 595 | .await | ||
| 596 | .ok(); | ||
| 597 | } | ||
| 598 | |||
| 599 | /// Spawn a connection task for a relay | ||
| 600 | /// | ||
| 601 | /// # Arguments | ||
| 602 | /// * `url` - Relay URL to connect to | ||
| 603 | /// * `tx` - Channel sender for synced events | ||
| 604 | /// * `filter_service` - Filter service for subscriptions | ||
| 605 | /// * `apply_jitter` - Whether to apply startup jitter before connecting | ||
| 606 | fn spawn_connection( | ||
| 607 | &self, | ||
| 608 | url: String, | ||
| 609 | tx: mpsc::Sender<SyncedEvent>, | ||
| 610 | filter_service: Arc<FilterService>, | ||
| 611 | apply_jitter: bool, | ||
| 612 | ) { | ||
| 613 | let domain = self.relay_domain.clone(); | ||
| 614 | let health_tracker = self.health_tracker.clone(); | ||
| 615 | let metrics = self.metrics.clone(); | ||
| 616 | let max_jitter = self.startup_jitter_ms; | ||
| 617 | |||
| 618 | tokio::spawn(async move { | ||
| 619 | // Apply startup jitter if requested | ||
| 620 | if apply_jitter && max_jitter > 0 { | ||
| 621 | let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); | ||
| 622 | tracing::debug!( | ||
| 623 | "Applying {}ms jitter before connecting to {}", | ||
| 624 | jitter_ms, | ||
| 625 | url | ||
| 626 | ); | ||
| 627 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | ||
| 628 | } | ||
| 629 | |||
| 630 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; | ||
| 631 | }); | ||
| 632 | } | ||
| 633 | |||
| 634 | /// Process a single synced event | ||
| 635 | /// | ||
| 636 | /// Events are validated through the write policy and stored if accepted. | ||
| 637 | /// Dynamic subscription updates are handled by each connection's SubscriptionManager. | ||
| 638 | async fn process_event(&self, synced_event: SyncedEvent) { | ||
| 639 | let event = &synced_event.event; | ||
| 640 | let event_id = event.id.to_hex(); | ||
| 641 | let kind = event.kind.as_u16(); | ||
| 642 | |||
| 643 | tracing::debug!( | ||
| 644 | "Processing synced event {} (kind {}) from {}", | ||
| 645 | event_id, | ||
| 646 | kind, | ||
| 647 | synced_event.source_url | ||
| 648 | ); | ||
| 649 | |||
| 650 | // Log subscription-relevant events for debugging | ||
| 651 | match kind { | ||
| 652 | 30617 | 30618 => { | ||
| 653 | tracing::debug!( | ||
| 654 | "Received announcement {} - connection will add Layer 2 subscription", | ||
| 655 | event_id | ||
| 656 | ); | ||
| 657 | } | ||
| 658 | 1617 | 1618 | 1619 | 1621 | 1622 => { | ||
| 659 | tracing::debug!( | ||
| 660 | "Received PR/Issue {} - connection will add Layer 3 subscription", | ||
| 661 | event_id | ||
| 662 | ); | ||
| 663 | } | ||
| 664 | _ => {} | ||
| 665 | } | ||
| 666 | |||
| 667 | // Validate through write policy using sync_source_addr derived from config | ||
| 668 | let result = self | ||
| 669 | .write_policy | ||
| 670 | .admit_event(event, &self.sync_source_addr) | ||
| 671 | .await; | ||
| 672 | |||
| 673 | match result { | ||
| 674 | PolicyResult::Accept => { | ||
| 675 | tracing::info!( | ||
| 676 | "Synced event {} (kind {}) accepted, storing", | ||
| 677 | event_id, | ||
| 678 | event.kind.as_u16() | ||
| 679 | ); | ||
| 680 | |||
| 681 | // Store the event in the database | ||
| 682 | if let Err(e) = self.database.save_event(event).await { | ||
| 683 | tracing::error!("Failed to store synced event {}: {}", event_id, e); | ||
| 684 | } else { | ||
| 685 | tracing::debug!("Synced event {} stored successfully", event_id); | ||
| 686 | } | ||
| 687 | } | ||
| 688 | PolicyResult::Reject(reason) => { | ||
| 689 | tracing::info!( | ||
| 690 | "Synced event {} (kind {}) rejected: {}", | ||
| 691 | event_id, | ||
| 692 | event.kind.as_u16(), | ||
| 693 | reason | ||
| 694 | ); | ||
| 695 | } | ||
| 696 | } | ||
| 697 | } | ||
| 698 | } | ||
| 699 | |||
| 700 | /// Extract domain from a WebSocket URL | ||
| 701 | /// | ||
| 702 | /// Examples: | ||
| 703 | /// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080" | ||
| 704 | /// - "wss://relay.example.com" -> "relay.example.com" | ||
| 705 | fn extract_domain_from_url(url: &str) -> Option<String> { | ||
| 706 | let url = url | ||
| 707 | .trim_start_matches("ws://") | ||
| 708 | .trim_start_matches("wss://"); | ||
| 709 | let url = url | ||
| 710 | .trim_start_matches("http://") | ||
| 711 | .trim_start_matches("https://"); | ||
| 712 | |||
| 713 | // Remove path | ||
| 714 | let domain = url.split('/').next()?; | ||
| 715 | |||
| 716 | Some(domain.to_string()) | ||
| 717 | } | ||
| 718 | |||
| 719 | #[cfg(test)] | ||
| 720 | mod tests { | ||
| 721 | use super::*; | ||
| 722 | |||
| 723 | #[test] | ||
| 724 | fn test_extract_domain_ws() { | ||
| 725 | assert_eq!( | ||
| 726 | extract_domain_from_url("ws://127.0.0.1:8080"), | ||
| 727 | Some("127.0.0.1:8080".to_string()) | ||
| 728 | ); | ||
| 729 | } | ||
| 730 | |||
| 731 | #[test] | ||
| 732 | fn test_extract_domain_wss() { | ||
| 733 | assert_eq!( | ||
| 734 | extract_domain_from_url("wss://relay.example.com"), | ||
| 735 | Some("relay.example.com".to_string()) | ||
| 736 | ); | ||
| 737 | } | ||
| 738 | |||
| 739 | #[test] | ||
| 740 | fn test_extract_domain_with_path() { | ||
| 741 | assert_eq!( | ||
| 742 | extract_domain_from_url("ws://example.com/path"), | ||
| 743 | Some("example.com".to_string()) | ||
| 744 | ); | ||
| 745 | } | ||
| 746 | |||
| 747 | #[test] | ||
| 748 | fn test_extract_domain_http() { | ||
| 749 | assert_eq!( | ||
| 750 | extract_domain_from_url("http://example.com:3000"), | ||
| 751 | Some("example.com:3000".to_string()) | ||
| 752 | ); | ||
| 753 | } | ||
| 754 | |||
| 755 | #[test] | ||
| 756 | fn test_derive_own_relay_url() { | ||
| 757 | assert_eq!( | ||
| 758 | derive_own_relay_url("127.0.0.1:8080"), | ||
| 759 | "ws://127.0.0.1:8080".to_string() | ||
| 760 | ); | ||
| 761 | } | ||
| 762 | } \ No newline at end of file | ||