upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:12 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:18 +0000
commitefaad1e2857914b87307cf78903a957a604697a8 (patch)
treedadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/manager.rs
parent91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff)
basic sync stub
Diffstat (limited to 'src/sync/manager.rs')
-rw-r--r--src/sync/manager.rs762
1 files changed, 0 insertions, 762 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
deleted file mode 100644
index 6ae82ef..0000000
--- a/src/sync/manager.rs
+++ /dev/null
@@ -1,762 +0,0 @@
1//! SyncManager - Coordinates proactive sync operations
2//!
3//! The SyncManager connects to remote relays, receives events, validates them
4//! through the write policy, and stores accepted events.
5//!
6//! ## Simplified Relay Discovery Architecture
7//!
8//! All relay discovery is centralized in the self-subscriber:
9//! - Bootstrap relay: connected immediately (no jitter, single relay)
10//! - All other relays: discovered via self-subscriber announcements (with jitter)
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────┐
14//! │ ngit-grasp │
15//! │ │
16//! │ ┌─────────────┐ broadcasts ┌───────────────┐ │
17//! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │
18//! │ │ Database │ │ Client │ │
19//! │ └─────────────┘ └───────┬───────┘ │
20//! │ ▲ │ │
21//! │ │ stores │ extracts │
22//! │ │ │ relay │
23//! │ ┌─────┴─────┐ │ URLs │
24//! │ │ Remote │◀────────────────────────────────┘ │
25//! │ │Connections│ spawns new │
26//! │ └───────────┘ connections (with jitter) │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Key Design Decisions
31//!
32//! - **Single relay discovery path**: Only the self-subscriber discovers new relays
33//! - **Jitter at point of discovery**: Applied when spawning connections from announcements
34//! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect
35//! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd
36//!
37//! ## Phase 2 Features
38//!
39//! - Relay discovery from kind 30617 announcements (via self-subscriber)
40//! - Multiple simultaneous relay connections
41//! - Three-layer filter strategy via FilterService
42//!
43//! ## Phase 3 Features
44//!
45//! - Health tracking with exponential backoff
46//! - Dead relay detection after 24h of failures
47//! - Startup jitter to prevent thundering herd
48//!
49//! ## Phase 4 Features
50//!
51//! - Dynamic subscription updates handled per-connection
52//! - Each connection manages its own SubscriptionManager
53//! - Announcements trigger Layer 2 subscriptions
54//! - PRs/Issues trigger Layer 3 subscriptions
55//! - Consolidation when filter count exceeds 150
56
57use std::collections::HashSet;
58use std::net::{IpAddr, Ipv4Addr, SocketAddr};
59use std::sync::Arc;
60use std::time::Duration;
61
62use nostr_relay_builder::prelude::*;
63use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp};
64use rand::Rng;
65use tokio::sync::mpsc;
66
67use super::connection::{connect_with_retry, SyncedEvent};
68use super::filter::FilterService;
69use super::health::RelayHealthTracker;
70use super::metrics::SyncMetrics;
71use crate::config::Config;
72use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
73use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT;
74
75/// Default fallback address for sync source when bind_address cannot be parsed
76///
77/// This distinguishes synced events from directly-submitted events in logs and metrics.
78/// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker.
79pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr =
80 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
81
82/// Derive sync source address from config bind_address
83///
84/// Parses the bind_address string and returns a SocketAddr.
85/// Falls back to 127.0.0.1:8080 if parsing fails.
86fn get_sync_source_addr(bind_address: &str) -> SocketAddr {
87 bind_address
88 .parse()
89 .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR)
90}
91
92/// Derive the WebSocket URL for our own relay from bind_address
93fn derive_own_relay_url(bind_address: &str) -> String {
94 format!("ws://{}", bind_address)
95}
96
97/// Coordinates proactive sync from configured and discovered relays
98pub struct SyncManager {
99 /// Bootstrap relay URL for initial sync (from config)
100 /// Additional relays are discovered from repository announcements that list our service
101 bootstrap_relay_url: Option<String>,
102 /// Our relay's domain (for filtering)
103 relay_domain: String,
104 /// Our relay's WebSocket URL (for self-subscribe)
105 own_relay_url: String,
106 /// Database for storing accepted events
107 database: SharedDatabase,
108 /// Write policy for validating events
109 write_policy: Nip34WritePolicy,
110 /// Health tracker for relay connections
111 health_tracker: Arc<RelayHealthTracker>,
112 /// Sync metrics for Prometheus
113 metrics: Option<SyncMetrics>,
114 /// Source address for synced events (derived from config.bind_address)
115 sync_source_addr: SocketAddr,
116 /// Maximum startup jitter in milliseconds (from config)
117 startup_jitter_ms: u64,
118}
119
120impl SyncManager {
121 /// Create a new SyncManager
122 ///
123 /// # Arguments
124 /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config
125 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
126 /// * `database` - Shared database for storing events and querying announcements
127 /// * `write_policy` - Write policy for validating synced events
128 /// * `config` - Configuration for health tracking settings
129 pub fn new(
130 bootstrap_relay_url: Option<String>,
131 relay_domain: String,
132 database: SharedDatabase,
133 write_policy: Nip34WritePolicy,
134 config: &Config,
135 ) -> Self {
136 let own_relay_url = derive_own_relay_url(&config.bind_address);
137 Self {
138 bootstrap_relay_url,
139 relay_domain,
140 own_relay_url,
141 database,
142 write_policy,
143 health_tracker: Arc::new(RelayHealthTracker::new(config)),
144 metrics: None,
145 sync_source_addr: get_sync_source_addr(&config.bind_address),
146 startup_jitter_ms: config.sync_startup_jitter_ms,
147 }
148 }
149
150 /// Create a new SyncManager with metrics
151 ///
152 /// # Arguments
153 /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config
154 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
155 /// * `database` - Shared database for storing events and querying announcements
156 /// * `write_policy` - Write policy for validating synced events
157 /// * `config` - Configuration for health tracking settings
158 /// * `metrics` - Sync metrics for Prometheus
159 pub fn with_metrics(
160 bootstrap_relay_url: Option<String>,
161 relay_domain: String,
162 database: SharedDatabase,
163 write_policy: Nip34WritePolicy,
164 config: &Config,
165 metrics: SyncMetrics,
166 ) -> Self {
167 let own_relay_url = derive_own_relay_url(&config.bind_address);
168 Self {
169 bootstrap_relay_url,
170 relay_domain,
171 own_relay_url,
172 database,
173 write_policy,
174 health_tracker: Arc::new(RelayHealthTracker::new(config)),
175 metrics: Some(metrics),
176 sync_source_addr: get_sync_source_addr(&config.bind_address),
177 startup_jitter_ms: config.sync_startup_jitter_ms,
178 }
179 }
180
181 /// Create a SyncManager with a single relay URL (Phase 1 compatibility)
182 pub fn with_single_relay(
183 bootstrap_url: String,
184 database: SharedDatabase,
185 write_policy: Nip34WritePolicy,
186 ) -> Self {
187 // Extract domain from URL for filtering
188 let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default();
189 let own_relay_url = format!("ws://{}", relay_domain);
190 Self {
191 bootstrap_relay_url: Some(bootstrap_url),
192 relay_domain,
193 own_relay_url,
194 database,
195 write_policy,
196 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
197 metrics: None,
198 sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR,
199 startup_jitter_ms: 10_000, // Default 10 seconds
200 }
201 }
202
203 /// Set metrics for the sync manager
204 pub fn set_metrics(&mut self, metrics: SyncMetrics) {
205 self.metrics = Some(metrics);
206 }
207
208 /// Get a reference to the metrics
209 pub fn metrics(&self) -> Option<&SyncMetrics> {
210 self.metrics.as_ref()
211 }
212
213 /// Get a reference to the health tracker
214 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
215 self.health_tracker.clone()
216 }
217
218 /// Run the sync manager
219 ///
220 /// This spawns the bootstrap relay connection (if configured), sets up a
221 /// self-subscriber for event-driven relay discovery, and processes incoming
222 /// events. The self-subscriber handles ALL relay discovery from announcements.
223 /// Runs indefinitely until cancelled.
224 ///
225 /// ## Simplified Relay Discovery Architecture
226 ///
227 /// All relay discovery is centralized in the self-subscriber:
228 /// - Bootstrap relay: connected immediately (no jitter, single relay)
229 /// - All other relays: discovered via self-subscriber announcements (with jitter)
230 /// - Jitter applied at point of discovery (not startup)
231 ///
232 /// This eliminates three redundant discovery paths:
233 /// 1. DB query at startup (removed)
234 /// 2. Remote event extraction (removed)
235 /// 3. Self-subscriber (sole discovery path)
236 pub async fn run(self) {
237 tracing::info!(
238 "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})",
239 self.relay_domain,
240 self.own_relay_url,
241 self.bootstrap_relay_url
242 );
243
244 // Create the filter service
245 let filter_service = Arc::new(FilterService::new(
246 self.database.clone(),
247 self.relay_domain.clone(),
248 ));
249
250 // Create channel for receiving events from all connections
251 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
252
253 // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing)
254 let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::<String>::new()));
255
256 // Bootstrap relay - connect immediately (no jitter, just one relay)
257 if let Some(ref url) = self.bootstrap_relay_url {
258 if !self.is_own_relay(url) {
259 tracing::info!("Connecting to bootstrap relay: {}", url);
260 active_relays.lock().await.insert(url.clone());
261 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false);
262 } else {
263 tracing::info!("Skipping bootstrap relay (is our own relay): {}", url);
264 }
265 }
266
267 // Record initial tracked relay count
268 if let Some(ref metrics) = self.metrics {
269 let count = active_relays.lock().await.len();
270 metrics.set_tracked_count(count as i64);
271 }
272
273 {
274 let active = active_relays.lock().await;
275 if active.is_empty() {
276 tracing::info!(
277 "No bootstrap relay configured, waiting for announcements via self-subscriber..."
278 );
279 } else {
280 tracing::info!(
281 "SyncManager connected to {} relay(s): {:?}",
282 active.len(),
283 *active
284 );
285 }
286 }
287
288 // Spawn self-subscriber task for ALL relay discovery
289 let self_subscriber_handle = self.spawn_self_subscriber(
290 tx.clone(),
291 filter_service.clone(),
292 active_relays.clone(),
293 );
294
295 // Process incoming events - just validate and store, NO relay discovery
296 // (relay discovery is handled solely by the self-subscriber)
297 while let Some(synced_event) = rx.recv().await {
298 self.process_event(synced_event).await;
299 }
300
301 // Clean up self-subscriber
302 self_subscriber_handle.abort();
303 tracing::warn!("SyncManager event channel closed, shutting down");
304 }
305
306 /// Check if a URL points to our own relay
307 fn is_own_relay(&self, url: &str) -> bool {
308 url.contains(&self.relay_domain)
309 }
310
311 /// Spawn a self-subscriber task that connects to our own relay
312 /// and watches for kind 30617 announcements to discover new relays.
313 ///
314 /// This is the SOLE relay discovery path - all relay discovery happens here.
315 /// When a new announcement is saved to our database (from direct submission
316 /// or synced from another relay), the self-subscriber receives it immediately
317 /// and spawns connections to newly discovered relays (with jitter).
318 fn spawn_self_subscriber(
319 &self,
320 tx: mpsc::Sender<SyncedEvent>,
321 filter_service: Arc<FilterService>,
322 active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>,
323 ) -> tokio::task::JoinHandle<()> {
324 let own_relay_url = self.own_relay_url.clone();
325 let relay_domain = self.relay_domain.clone();
326 let metrics = self.metrics.clone();
327 let health_tracker = self.health_tracker.clone();
328 let startup_jitter_ms = self.startup_jitter_ms;
329
330 tokio::spawn(async move {
331 Self::run_self_subscriber_loop(
332 own_relay_url,
333 relay_domain,
334 tx,
335 filter_service,
336 active_relays,
337 metrics,
338 health_tracker,
339 startup_jitter_ms,
340 )
341 .await;
342 })
343 }
344
345 /// Main loop for the self-subscriber
346 ///
347 /// Connects to our own relay, subscribes to kind 30617 announcements,
348 /// and processes events to discover new relays. Handles reconnection
349 /// on disconnect.
350 ///
351 /// ## Reconnection Behavior
352 ///
353 /// - First connection: no `since` filter (get all historical announcements)
354 /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing
355 #[allow(clippy::too_many_arguments)]
356 async fn run_self_subscriber_loop(
357 own_relay_url: String,
358 relay_domain: String,
359 tx: mpsc::Sender<SyncedEvent>,
360 filter_service: Arc<FilterService>,
361 active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>,
362 metrics: Option<SyncMetrics>,
363 health_tracker: Arc<RelayHealthTracker>,
364 startup_jitter_ms: u64,
365 ) {
366 let mut reconnect_delay = Duration::from_secs(1);
367 let max_reconnect_delay = Duration::from_secs(60);
368 let mut is_first_connection = true;
369
370 loop {
371 tracing::info!(
372 "Self-subscriber connecting to own relay: {}",
373 own_relay_url
374 );
375
376 match Self::connect_self_subscriber(&own_relay_url).await {
377 Ok(client) => {
378 // Reset reconnect delay on successful connection
379 reconnect_delay = Duration::from_secs(1);
380
381 tracing::info!(
382 "Self-subscriber connected to own relay, subscribing to kind {} announcements{}",
383 KIND_REPOSITORY_ANNOUNCEMENT,
384 if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" }
385 );
386
387 // Subscribe to repository announcements
388 // First connection: get all historical; reconnections: only last 15 minutes
389 let filter = if is_first_connection {
390 Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT))
391 } else {
392 let since = Timestamp::now() - 15 * 60; // 15 minutes ago
393 Filter::new()
394 .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT))
395 .since(since)
396 };
397
398 is_first_connection = false;
399
400 if let Err(e) = client.subscribe(filter, None).await {
401 tracing::error!(
402 "Self-subscriber failed to subscribe on {}: {}",
403 own_relay_url,
404 e
405 );
406 // Will reconnect after delay
407 } else {
408 // Handle notifications until disconnect
409 Self::handle_self_subscriber_notifications(
410 &client,
411 &own_relay_url,
412 &relay_domain,
413 &tx,
414 &filter_service,
415 &active_relays,
416 &metrics,
417 &health_tracker,
418 startup_jitter_ms,
419 )
420 .await;
421 }
422
423 // Disconnect and cleanup
424 client.disconnect().await;
425 }
426 Err(e) => {
427 tracing::warn!(
428 "Self-subscriber failed to connect to {}: {}",
429 own_relay_url,
430 e
431 );
432 }
433 }
434
435 // Wait before reconnecting with exponential backoff
436 tracing::debug!(
437 "Self-subscriber will reconnect to {} in {:?}",
438 own_relay_url,
439 reconnect_delay
440 );
441 tokio::time::sleep(reconnect_delay).await;
442 reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay);
443 }
444 }
445
446 /// Connect to our own relay for self-subscribing
447 async fn connect_self_subscriber(
448 url: &str,
449 ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> {
450 let client = Client::default();
451 client.add_relay(url).await?;
452 client.connect().await;
453
454 // Wait for connection to establish (with timeout)
455 let timeout = Duration::from_secs(10);
456 let start = std::time::Instant::now();
457
458 while start.elapsed() < timeout {
459 let relays = client.relays().await;
460 if relays.values().any(|r| r.is_connected()) {
461 return Ok(client);
462 }
463 tokio::time::sleep(Duration::from_millis(100)).await;
464 }
465
466 Err("Timeout waiting for self-subscriber connection".into())
467 }
468
469 /// Handle notifications from the self-subscriber client
470 ///
471 /// Processes announcement events to discover new relay URLs.
472 /// Applies jitter before spawning connections to prevent thundering herd.
473 #[allow(clippy::too_many_arguments)]
474 async fn handle_self_subscriber_notifications(
475 client: &Client,
476 own_relay_url: &str,
477 relay_domain: &str,
478 tx: &mpsc::Sender<SyncedEvent>,
479 filter_service: &Arc<FilterService>,
480 active_relays: &Arc<tokio::sync::Mutex<HashSet<String>>>,
481 metrics: &Option<SyncMetrics>,
482 health_tracker: &Arc<RelayHealthTracker>,
483 startup_jitter_ms: u64,
484 ) {
485 let own_relay_url = own_relay_url.to_string();
486 let relay_domain = relay_domain.to_string();
487 let filter_service = filter_service.clone();
488 let active_relays = active_relays.clone();
489 let metrics = metrics.clone();
490 let health_tracker = health_tracker.clone();
491 let tx = tx.clone();
492
493 client
494 .handle_notifications(|notification| {
495 let own_relay_url = own_relay_url.clone();
496 let relay_domain = relay_domain.clone();
497 let filter_service = filter_service.clone();
498 let active_relays = active_relays.clone();
499 let metrics = metrics.clone();
500 let health_tracker = health_tracker.clone();
501 let tx = tx.clone();
502
503 async move {
504 match notification {
505 RelayPoolNotification::Event { event, .. } => {
506 // Only process repository announcement events
507 if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT {
508 return Ok(false);
509 }
510
511 tracing::debug!(
512 "Self-subscriber received announcement {} from {}",
513 event.id,
514 own_relay_url
515 );
516
517 // Extract relay URLs from the announcement
518 let new_urls = filter_service.extract_relay_urls_from_event(&event);
519
520 for url in new_urls {
521 // Check if we should connect to this relay
522 let should_connect = {
523 let mut active = active_relays.lock().await;
524 let is_new = !active.contains(&url);
525 let is_not_self = !url.contains(&relay_domain);
526
527 if is_new && is_not_self {
528 active.insert(url.clone());
529 true
530 } else {
531 false
532 }
533 };
534
535 if should_connect {
536 tracing::info!(
537 "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}",
538 event.id,
539 url
540 );
541
542 // Update tracked relay count
543 if let Some(ref m) = metrics {
544 m.inc_tracked_count();
545 }
546
547 // Spawn connection to the new relay WITH jitter at point of discovery
548 let url_clone = url.clone();
549 let tx_clone = tx.clone();
550 let filter_service_clone = filter_service.clone();
551 let domain_clone = relay_domain.clone();
552 let health_tracker_clone = health_tracker.clone();
553 let metrics_clone = metrics.clone();
554
555 tokio::spawn(async move {
556 // Apply jitter at point of discovery
557 if startup_jitter_ms > 0 {
558 let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms);
559 tracing::debug!(
560 "Applying {}ms jitter before connecting to discovered relay {}",
561 jitter_ms,
562 url_clone
563 );
564 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
565 }
566
567 connect_with_retry(
568 &url_clone,
569 tx_clone,
570 filter_service_clone,
571 &domain_clone,
572 health_tracker_clone,
573 metrics_clone,
574 )
575 .await;
576 });
577 }
578 }
579
580 Ok(false) // Continue processing
581 }
582 RelayPoolNotification::Shutdown => {
583 tracing::warn!(
584 "Self-subscriber connection shutdown for {}",
585 own_relay_url
586 );
587 Ok(true) // Stop on shutdown
588 }
589 RelayPoolNotification::Message { .. } => {
590 Ok(false) // Continue processing
591 }
592 }
593 }
594 })
595 .await
596 .ok();
597 }
598
599 /// Spawn a connection task for a relay
600 ///
601 /// # Arguments
602 /// * `url` - Relay URL to connect to
603 /// * `tx` - Channel sender for synced events
604 /// * `filter_service` - Filter service for subscriptions
605 /// * `apply_jitter` - Whether to apply startup jitter before connecting
606 fn spawn_connection(
607 &self,
608 url: String,
609 tx: mpsc::Sender<SyncedEvent>,
610 filter_service: Arc<FilterService>,
611 apply_jitter: bool,
612 ) {
613 let domain = self.relay_domain.clone();
614 let health_tracker = self.health_tracker.clone();
615 let metrics = self.metrics.clone();
616 let max_jitter = self.startup_jitter_ms;
617
618 tokio::spawn(async move {
619 // Apply startup jitter if requested
620 if apply_jitter && max_jitter > 0 {
621 let jitter_ms = rand::thread_rng().gen_range(0..max_jitter);
622 tracing::debug!(
623 "Applying {}ms jitter before connecting to {}",
624 jitter_ms,
625 url
626 );
627 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
628 }
629
630 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
631 });
632 }
633
634 /// Process a single synced event
635 ///
636 /// Events are validated through the write policy and stored if accepted.
637 /// Dynamic subscription updates are handled by each connection's SubscriptionManager.
638 async fn process_event(&self, synced_event: SyncedEvent) {
639 let event = &synced_event.event;
640 let event_id = event.id.to_hex();
641 let kind = event.kind.as_u16();
642
643 tracing::debug!(
644 "Processing synced event {} (kind {}) from {}",
645 event_id,
646 kind,
647 synced_event.source_url
648 );
649
650 // Log subscription-relevant events for debugging
651 match kind {
652 30617 | 30618 => {
653 tracing::debug!(
654 "Received announcement {} - connection will add Layer 2 subscription",
655 event_id
656 );
657 }
658 1617 | 1618 | 1619 | 1621 | 1622 => {
659 tracing::debug!(
660 "Received PR/Issue {} - connection will add Layer 3 subscription",
661 event_id
662 );
663 }
664 _ => {}
665 }
666
667 // Validate through write policy using sync_source_addr derived from config
668 let result = self
669 .write_policy
670 .admit_event(event, &self.sync_source_addr)
671 .await;
672
673 match result {
674 PolicyResult::Accept => {
675 tracing::info!(
676 "Synced event {} (kind {}) accepted, storing",
677 event_id,
678 event.kind.as_u16()
679 );
680
681 // Store the event in the database
682 if let Err(e) = self.database.save_event(event).await {
683 tracing::error!("Failed to store synced event {}: {}", event_id, e);
684 } else {
685 tracing::debug!("Synced event {} stored successfully", event_id);
686 }
687 }
688 PolicyResult::Reject(reason) => {
689 tracing::info!(
690 "Synced event {} (kind {}) rejected: {}",
691 event_id,
692 event.kind.as_u16(),
693 reason
694 );
695 }
696 }
697 }
698}
699
700/// Extract domain from a WebSocket URL
701///
702/// Examples:
703/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
704/// - "wss://relay.example.com" -> "relay.example.com"
705fn extract_domain_from_url(url: &str) -> Option<String> {
706 let url = url
707 .trim_start_matches("ws://")
708 .trim_start_matches("wss://");
709 let url = url
710 .trim_start_matches("http://")
711 .trim_start_matches("https://");
712
713 // Remove path
714 let domain = url.split('/').next()?;
715
716 Some(domain.to_string())
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_extract_domain_ws() {
725 assert_eq!(
726 extract_domain_from_url("ws://127.0.0.1:8080"),
727 Some("127.0.0.1:8080".to_string())
728 );
729 }
730
731 #[test]
732 fn test_extract_domain_wss() {
733 assert_eq!(
734 extract_domain_from_url("wss://relay.example.com"),
735 Some("relay.example.com".to_string())
736 );
737 }
738
739 #[test]
740 fn test_extract_domain_with_path() {
741 assert_eq!(
742 extract_domain_from_url("ws://example.com/path"),
743 Some("example.com".to_string())
744 );
745 }
746
747 #[test]
748 fn test_extract_domain_http() {
749 assert_eq!(
750 extract_domain_from_url("http://example.com:3000"),
751 Some("example.com:3000".to_string())
752 );
753 }
754
755 #[test]
756 fn test_derive_own_relay_url() {
757 assert_eq!(
758 derive_own_relay_url("127.0.0.1:8080"),
759 "ws://127.0.0.1:8080".to_string()
760 );
761 }
762} \ No newline at end of file