upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
blob: 1f70f42e4b3b49ff334043db8e5845911aa28399 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
//! SyncManager - Coordinates proactive sync operations
//!
//! The SyncManager discovers relays from stored announcements, spawns connections
//! to each relay, receives events, validates them through the write policy,
//! and stores accepted events.
//!
//! ## Phase 2 Features
//!
//! - Relay discovery from stored kind 30617 announcements
//! - Multiple simultaneous relay connections
//! - Three-layer filter strategy via FilterService
//!
//! ## Phase 3 Features
//!
//! - Health tracking with exponential backoff
//! - Dead relay detection after 24h of failures
//! - Startup jitter to prevent thundering herd

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use nostr_relay_builder::prelude::*;
use rand::Rng;
use tokio::sync::mpsc;

use super::connection::{connect_with_retry, SyncedEvent};
use super::filter::FilterService;
use super::health::RelayHealthTracker;
use super::SYNC_SOURCE_ADDR;
use crate::config::Config;
use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};

/// Maximum startup jitter in milliseconds (10 seconds)
const MAX_STARTUP_JITTER_MS: u64 = 10_000;

/// Coordinates proactive sync from configured and discovered relays
pub struct SyncManager {
    /// Initial relay URL to sync from (from config)
    initial_relay_url: Option<String>,
    /// Our relay's domain (for filtering)
    relay_domain: String,
    /// Database for storing accepted events
    database: SharedDatabase,
    /// Write policy for validating events
    write_policy: Nip34WritePolicy,
    /// Health tracker for relay connections
    health_tracker: Arc<RelayHealthTracker>,
}

impl SyncManager {
    /// Create a new SyncManager
    ///
    /// # Arguments
    /// * `initial_relay_url` - Optional initial relay URL from config
    /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
    /// * `database` - Shared database for storing events and querying announcements
    /// * `write_policy` - Write policy for validating synced events
    /// * `config` - Configuration for health tracking settings
    pub fn new(
        initial_relay_url: Option<String>,
        relay_domain: String,
        database: SharedDatabase,
        write_policy: Nip34WritePolicy,
        config: &Config,
    ) -> Self {
        Self {
            initial_relay_url,
            relay_domain,
            database,
            write_policy,
            health_tracker: Arc::new(RelayHealthTracker::new(config)),
        }
    }

    /// Create a SyncManager with a single relay URL (Phase 1 compatibility)
    pub fn with_single_relay(
        sync_relay_url: String,
        database: SharedDatabase,
        write_policy: Nip34WritePolicy,
    ) -> Self {
        // Extract domain from URL for filtering
        let relay_domain = extract_domain_from_url(&sync_relay_url).unwrap_or_default();
        Self {
            initial_relay_url: Some(sync_relay_url),
            relay_domain,
            database,
            write_policy,
            health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
        }
    }

    /// Get a reference to the health tracker
    pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
        self.health_tracker.clone()
    }

    /// Run the sync manager
    ///
    /// This discovers relays from stored announcements, spawns connection tasks,
    /// and processes incoming events. Runs indefinitely until cancelled.
    pub async fn run(self) {
        tracing::info!(
            "Starting SyncManager (domain: {}, initial relay: {:?})",
            self.relay_domain,
            self.initial_relay_url
        );

        // Create the filter service
        let filter_service = Arc::new(FilterService::new(
            self.database.clone(),
            self.relay_domain.clone(),
        ));

        // Create channel for receiving events from all connections
        let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);

        // Track active relay URLs to avoid duplicates
        let mut active_relays: HashSet<String> = HashSet::new();

        // Collect all relays to connect to
        let mut relays_to_connect: Vec<String> = Vec::new();

        // Start with initial relay if configured
        if let Some(ref url) = self.initial_relay_url {
            if !self.is_own_relay(url) {
                relays_to_connect.push(url.clone());
                active_relays.insert(url.clone());
            } else {
                tracing::info!("Skipping initial relay (is our own relay): {}", url);
            }
        }

        // Discover additional relays from stored announcements
        let discovered_urls = filter_service.discover_relay_urls().await;
        for url in discovered_urls {
            if !active_relays.contains(&url) && !self.is_own_relay(&url) {
                relays_to_connect.push(url.clone());
                active_relays.insert(url.clone());
            }
        }

        // Spawn connections with startup jitter to prevent thundering herd
        for url in relays_to_connect {
            tracing::info!("Scheduling connection to sync relay: {}", url);
            self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone());
        }

        if active_relays.is_empty() {
            tracing::warn!("No sync relays configured or discovered, SyncManager idle");
        } else {
            tracing::info!(
                "SyncManager connected to {} relays: {:?}",
                active_relays.len(),
                active_relays
            );
        }

        // Process incoming events from all connections
        while let Some(synced_event) = rx.recv().await {
            // Check if this event reveals new relays to sync from
            let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event);
            for url in new_urls {
                if !active_relays.contains(&url) && !self.is_own_relay(&url) {
                    tracing::info!("Discovered new relay from event, connecting: {}", url);
                    active_relays.insert(url.clone());
                    // New relays discovered during runtime don't need jitter
                    self.spawn_connection(url, tx.clone(), filter_service.clone());
                }
            }

            self.process_event(synced_event).await;
        }

        tracing::warn!("SyncManager event channel closed, shutting down");
    }

    /// Check if a URL points to our own relay
    fn is_own_relay(&self, url: &str) -> bool {
        url.contains(&self.relay_domain)
    }

    /// Spawn a connection task for a relay with startup jitter
    ///
    /// Adds a random delay (0-10s) before connecting to prevent thundering herd
    /// on startup when multiple relays are configured.
    fn spawn_connection_with_jitter(
        &self,
        url: String,
        tx: mpsc::Sender<SyncedEvent>,
        filter_service: Arc<FilterService>,
    ) {
        let domain = self.relay_domain.clone();
        let health_tracker = self.health_tracker.clone();

        tokio::spawn(async move {
            // Apply startup jitter
            let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS);
            tracing::debug!(
                "Applying {}ms startup jitter before connecting to {}",
                jitter_ms,
                url
            );
            tokio::time::sleep(Duration::from_millis(jitter_ms)).await;

            connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
        });
    }

    /// Spawn a connection task for a relay without jitter
    ///
    /// Used for relays discovered during runtime (not at startup).
    fn spawn_connection(
        &self,
        url: String,
        tx: mpsc::Sender<SyncedEvent>,
        filter_service: Arc<FilterService>,
    ) {
        let domain = self.relay_domain.clone();
        let health_tracker = self.health_tracker.clone();

        tokio::spawn(async move {
            connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
        });
    }

    /// Process a single synced event
    async fn process_event(&self, synced_event: SyncedEvent) {
        let event = &synced_event.event;
        let event_id = event.id.to_hex();

        tracing::debug!(
            "Processing synced event {} (kind {}) from {}",
            event_id,
            event.kind.as_u16(),
            synced_event.source_url
        );

        // Validate through write policy using SYNC_SOURCE_ADDR
        let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;

        match result {
            PolicyResult::Accept => {
                tracing::info!(
                    "Synced event {} (kind {}) accepted, storing",
                    event_id,
                    event.kind.as_u16()
                );

                // Store the event in the database
                if let Err(e) = self.database.save_event(event).await {
                    tracing::error!("Failed to store synced event {}: {}", event_id, e);
                } else {
                    tracing::debug!("Synced event {} stored successfully", event_id);
                }
            }
            PolicyResult::Reject(reason) => {
                tracing::info!(
                    "Synced event {} (kind {}) rejected: {}",
                    event_id,
                    event.kind.as_u16(),
                    reason
                );
            }
        }
    }
}

/// Extract domain from a WebSocket URL
///
/// Examples:
/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
/// - "wss://relay.example.com" -> "relay.example.com"
fn extract_domain_from_url(url: &str) -> Option<String> {
    let url = url.trim_start_matches("ws://").trim_start_matches("wss://");
    let url = url.trim_start_matches("http://").trim_start_matches("https://");
    
    // Remove path
    let domain = url.split('/').next()?;
    
    Some(domain.to_string())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_extract_domain_ws() {
        assert_eq!(
            extract_domain_from_url("ws://127.0.0.1:8080"),
            Some("127.0.0.1:8080".to_string())
        );
    }

    #[test]
    fn test_extract_domain_wss() {
        assert_eq!(
            extract_domain_from_url("wss://relay.example.com"),
            Some("relay.example.com".to_string())
        );
    }

    #[test]
    fn test_extract_domain_with_path() {
        assert_eq!(
            extract_domain_from_url("ws://example.com/path"),
            Some("example.com".to_string())
        );
    }

    #[test]
    fn test_extract_domain_http() {
        assert_eq!(
            extract_domain_from_url("http://example.com:3000"),
            Some("example.com:3000".to_string())
        );
    }
}