1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
|
//! SyncManager - Coordinates proactive sync operations
//!
//! The SyncManager discovers relays from stored announcements, spawns connections
//! to each relay, receives events, validates them through the write policy,
//! and stores accepted events.
//!
//! ## Phase 2 Features
//!
//! - Relay discovery from stored kind 30617 announcements
//! - Multiple simultaneous relay connections
//! - Three-layer filter strategy via FilterService
//!
//! ## Phase 3 Features
//!
//! - Health tracking with exponential backoff
//! - Dead relay detection after 24h of failures
//! - Startup jitter to prevent thundering herd
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use nostr_relay_builder::prelude::*;
use rand::Rng;
use tokio::sync::mpsc;
use super::connection::{connect_with_retry, SyncedEvent};
use super::filter::FilterService;
use super::health::RelayHealthTracker;
use super::SYNC_SOURCE_ADDR;
use crate::config::Config;
use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
/// Maximum startup jitter in milliseconds (10 seconds)
const MAX_STARTUP_JITTER_MS: u64 = 10_000;
/// Coordinates proactive sync from configured and discovered relays
pub struct SyncManager {
/// Initial relay URL to sync from (from config)
initial_relay_url: Option<String>,
/// Our relay's domain (for filtering)
relay_domain: String,
/// Database for storing accepted events
database: SharedDatabase,
/// Write policy for validating events
write_policy: Nip34WritePolicy,
/// Health tracker for relay connections
health_tracker: Arc<RelayHealthTracker>,
}
impl SyncManager {
/// Create a new SyncManager
///
/// # Arguments
/// * `initial_relay_url` - Optional initial relay URL from config
/// * `relay_domain` - Our relay's domain (used to exclude self from sync)
/// * `database` - Shared database for storing events and querying announcements
/// * `write_policy` - Write policy for validating synced events
/// * `config` - Configuration for health tracking settings
pub fn new(
initial_relay_url: Option<String>,
relay_domain: String,
database: SharedDatabase,
write_policy: Nip34WritePolicy,
config: &Config,
) -> Self {
Self {
initial_relay_url,
relay_domain,
database,
write_policy,
health_tracker: Arc::new(RelayHealthTracker::new(config)),
}
}
/// Create a SyncManager with a single relay URL (Phase 1 compatibility)
pub fn with_single_relay(
sync_relay_url: String,
database: SharedDatabase,
write_policy: Nip34WritePolicy,
) -> Self {
// Extract domain from URL for filtering
let relay_domain = extract_domain_from_url(&sync_relay_url).unwrap_or_default();
Self {
initial_relay_url: Some(sync_relay_url),
relay_domain,
database,
write_policy,
health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
}
}
/// Get a reference to the health tracker
pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
self.health_tracker.clone()
}
/// Run the sync manager
///
/// This discovers relays from stored announcements, spawns connection tasks,
/// and processes incoming events. Runs indefinitely until cancelled.
pub async fn run(self) {
tracing::info!(
"Starting SyncManager (domain: {}, initial relay: {:?})",
self.relay_domain,
self.initial_relay_url
);
// Create the filter service
let filter_service = Arc::new(FilterService::new(
self.database.clone(),
self.relay_domain.clone(),
));
// Create channel for receiving events from all connections
let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
// Track active relay URLs to avoid duplicates
let mut active_relays: HashSet<String> = HashSet::new();
// Collect all relays to connect to
let mut relays_to_connect: Vec<String> = Vec::new();
// Start with initial relay if configured
if let Some(ref url) = self.initial_relay_url {
if !self.is_own_relay(url) {
relays_to_connect.push(url.clone());
active_relays.insert(url.clone());
} else {
tracing::info!("Skipping initial relay (is our own relay): {}", url);
}
}
// Discover additional relays from stored announcements
let discovered_urls = filter_service.discover_relay_urls().await;
for url in discovered_urls {
if !active_relays.contains(&url) && !self.is_own_relay(&url) {
relays_to_connect.push(url.clone());
active_relays.insert(url.clone());
}
}
// Spawn connections with startup jitter to prevent thundering herd
for url in relays_to_connect {
tracing::info!("Scheduling connection to sync relay: {}", url);
self.spawn_connection_with_jitter(url, tx.clone(), filter_service.clone());
}
if active_relays.is_empty() {
tracing::warn!("No sync relays configured or discovered, SyncManager idle");
} else {
tracing::info!(
"SyncManager connected to {} relays: {:?}",
active_relays.len(),
active_relays
);
}
// Process incoming events from all connections
while let Some(synced_event) = rx.recv().await {
// Check if this event reveals new relays to sync from
let new_urls = filter_service.extract_relay_urls_from_event(&synced_event.event);
for url in new_urls {
if !active_relays.contains(&url) && !self.is_own_relay(&url) {
tracing::info!("Discovered new relay from event, connecting: {}", url);
active_relays.insert(url.clone());
// New relays discovered during runtime don't need jitter
self.spawn_connection(url, tx.clone(), filter_service.clone());
}
}
self.process_event(synced_event).await;
}
tracing::warn!("SyncManager event channel closed, shutting down");
}
/// Check if a URL points to our own relay
fn is_own_relay(&self, url: &str) -> bool {
url.contains(&self.relay_domain)
}
/// Spawn a connection task for a relay with startup jitter
///
/// Adds a random delay (0-10s) before connecting to prevent thundering herd
/// on startup when multiple relays are configured.
fn spawn_connection_with_jitter(
&self,
url: String,
tx: mpsc::Sender<SyncedEvent>,
filter_service: Arc<FilterService>,
) {
let domain = self.relay_domain.clone();
let health_tracker = self.health_tracker.clone();
tokio::spawn(async move {
// Apply startup jitter
let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS);
tracing::debug!(
"Applying {}ms startup jitter before connecting to {}",
jitter_ms,
url
);
tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
});
}
/// Spawn a connection task for a relay without jitter
///
/// Used for relays discovered during runtime (not at startup).
fn spawn_connection(
&self,
url: String,
tx: mpsc::Sender<SyncedEvent>,
filter_service: Arc<FilterService>,
) {
let domain = self.relay_domain.clone();
let health_tracker = self.health_tracker.clone();
tokio::spawn(async move {
connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await;
});
}
/// Process a single synced event
async fn process_event(&self, synced_event: SyncedEvent) {
let event = &synced_event.event;
let event_id = event.id.to_hex();
tracing::debug!(
"Processing synced event {} (kind {}) from {}",
event_id,
event.kind.as_u16(),
synced_event.source_url
);
// Validate through write policy using SYNC_SOURCE_ADDR
let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
match result {
PolicyResult::Accept => {
tracing::info!(
"Synced event {} (kind {}) accepted, storing",
event_id,
event.kind.as_u16()
);
// Store the event in the database
if let Err(e) = self.database.save_event(event).await {
tracing::error!("Failed to store synced event {}: {}", event_id, e);
} else {
tracing::debug!("Synced event {} stored successfully", event_id);
}
}
PolicyResult::Reject(reason) => {
tracing::info!(
"Synced event {} (kind {}) rejected: {}",
event_id,
event.kind.as_u16(),
reason
);
}
}
}
}
/// Extract domain from a WebSocket URL
///
/// Examples:
/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
/// - "wss://relay.example.com" -> "relay.example.com"
fn extract_domain_from_url(url: &str) -> Option<String> {
let url = url.trim_start_matches("ws://").trim_start_matches("wss://");
let url = url.trim_start_matches("http://").trim_start_matches("https://");
// Remove path
let domain = url.split('/').next()?;
Some(domain.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_domain_ws() {
assert_eq!(
extract_domain_from_url("ws://127.0.0.1:8080"),
Some("127.0.0.1:8080".to_string())
);
}
#[test]
fn test_extract_domain_wss() {
assert_eq!(
extract_domain_from_url("wss://relay.example.com"),
Some("relay.example.com".to_string())
);
}
#[test]
fn test_extract_domain_with_path() {
assert_eq!(
extract_domain_from_url("ws://example.com/path"),
Some("example.com".to_string())
);
}
#[test]
fn test_extract_domain_http() {
assert_eq!(
extract_domain_from_url("http://example.com:3000"),
Some("example.com:3000".to_string())
);
}
}
|