upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/negentropy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/negentropy.rs')
-rw-r--r--src/sync/negentropy.rs477
1 files changed, 477 insertions, 0 deletions
diff --git a/src/sync/negentropy.rs b/src/sync/negentropy.rs
new file mode 100644
index 0000000..5c0a246
--- /dev/null
+++ b/src/sync/negentropy.rs
@@ -0,0 +1,477 @@
1//! Negentropy Catchup Service for GRASP-02 Phase 5
2//!
3//! Implements gap-filling synchronization to ensure no events are missed during:
4//! - Startup (initial sync after warm-up period)
5//! - Reconnection (after connection restore)
6//! - Daily maintenance (periodic full reconciliation)
7//!
8//! ## Note on NIP-77
9//!
10//! This implementation uses a simplified gap-filling strategy (fetch and compare)
11//! rather than full NIP-77 negentropy set reconciliation. The nostr-sdk 0.44 does
12//! not include built-in negentropy support, so we implement an equivalent approach:
13//!
14//! 1. Fetch events from relay using same filters as live sync
15//! 2. Compare with local database (skip already-stored events)
16//! 3. Validate and store missing events through policy
17//!
18//! Full NIP-77 support can be added in a future release if needed.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use nostr_relay_builder::prelude::*;
25use nostr_sdk::prelude::*;
26use tokio::sync::RwLock;
27
28use super::filter::FilterService;
29use super::SYNC_SOURCE_ADDR;
30use crate::config::Config;
31use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
32
33/// Default startup delay before first catchup (30 seconds)
34const DEFAULT_STARTUP_DELAY_SECS: u64 = 30;
35
36/// Default delay after reconnection before catchup (10 seconds)
37const DEFAULT_RECONNECT_DELAY_SECS: u64 = 10;
38
39/// Default lookback period for reconnect catchup (3 days)
40const DEFAULT_RECONNECT_LOOKBACK_DAYS: u64 = 3;
41
42/// Daily catchup interval (24 hours)
43const DAILY_CATCHUP_INTERVAL_SECS: u64 = 86400;
44
45/// Stagger delay between relays for catchup operations (5 minutes)
46const RELAY_STAGGER_SECS: u64 = 300;
47
48/// Timeout for fetching events during catchup
49const CATCHUP_FETCH_TIMEOUT_SECS: u64 = 60;
50
51/// Negentropy Catchup Service
52///
53/// Manages gap-filling operations for different scenarios:
54/// - Startup catchup after warm-up period
55/// - Reconnect catchup after connection restore
56/// - Daily catchup for periodic maintenance
57#[derive(Debug)]
58pub struct NegentropyService {
59 /// Database for storing and querying events
60 database: SharedDatabase,
61 /// Filter service for building catchup filters
62 filter_service: Arc<FilterService>,
63 /// Write policy for validating synced events
64 write_policy: Nip34WritePolicy,
65 /// Startup time of the service
66 startup_time: Instant,
67 /// Configuration values
68 startup_delay_secs: u64,
69 reconnect_delay_secs: u64,
70 reconnect_lookback_days: u64,
71 /// Whether startup catchup has been run
72 startup_catchup_completed: Arc<RwLock<bool>>,
73 /// Last daily catchup time per relay
74 last_daily_catchup: Arc<RwLock<HashMap<String, Instant>>>,
75}
76
77impl NegentropyService {
78 /// Create a new NegentropyService
79 ///
80 /// # Arguments
81 /// * `database` - Shared database for storing events
82 /// * `filter_service` - Filter service for building catchup filters
83 /// * `write_policy` - Write policy for validating events
84 /// * `config` - Configuration for catchup timing
85 pub fn new(
86 database: SharedDatabase,
87 filter_service: Arc<FilterService>,
88 write_policy: Nip34WritePolicy,
89 config: &Config,
90 ) -> Self {
91 Self {
92 database,
93 filter_service,
94 write_policy,
95 startup_time: Instant::now(),
96 startup_delay_secs: config.sync_startup_delay_secs,
97 reconnect_delay_secs: config.sync_reconnect_delay_secs,
98 reconnect_lookback_days: config.sync_reconnect_lookback_days,
99 startup_catchup_completed: Arc::new(RwLock::new(false)),
100 last_daily_catchup: Arc::new(RwLock::new(HashMap::new())),
101 }
102 }
103
104 /// Create a NegentropyService with default configuration
105 pub fn with_defaults(
106 database: SharedDatabase,
107 filter_service: Arc<FilterService>,
108 write_policy: Nip34WritePolicy,
109 ) -> Self {
110 Self {
111 database,
112 filter_service,
113 write_policy,
114 startup_time: Instant::now(),
115 startup_delay_secs: DEFAULT_STARTUP_DELAY_SECS,
116 reconnect_delay_secs: DEFAULT_RECONNECT_DELAY_SECS,
117 reconnect_lookback_days: DEFAULT_RECONNECT_LOOKBACK_DAYS,
118 startup_catchup_completed: Arc::new(RwLock::new(false)),
119 last_daily_catchup: Arc::new(RwLock::new(HashMap::new())),
120 }
121 }
122
123 /// Check if startup catchup should run
124 ///
125 /// Returns true if:
126 /// - Startup delay has elapsed (default 30s)
127 /// - Startup catchup hasn't been completed yet
128 pub async fn should_run_startup_catchup(&self) -> bool {
129 let completed = *self.startup_catchup_completed.read().await;
130 if completed {
131 return false;
132 }
133
134 let elapsed = self.startup_time.elapsed();
135 elapsed >= Duration::from_secs(self.startup_delay_secs)
136 }
137
138 /// Check if daily catchup should run for a specific relay
139 ///
140 /// Returns true if 24 hours have elapsed since last daily catchup
141 pub async fn should_run_daily_catchup(&self, relay_url: &str) -> bool {
142 let last_catchup = self.last_daily_catchup.read().await;
143
144 match last_catchup.get(relay_url) {
145 None => true, // Never run, should run
146 Some(last_time) => {
147 last_time.elapsed() >= Duration::from_secs(DAILY_CATCHUP_INTERVAL_SECS)
148 }
149 }
150 }
151
152 /// Get the startup delay in seconds
153 pub fn startup_delay_secs(&self) -> u64 {
154 self.startup_delay_secs
155 }
156
157 /// Get the reconnect delay in seconds
158 pub fn reconnect_delay_secs(&self) -> u64 {
159 self.reconnect_delay_secs
160 }
161
162 /// Get the relay stagger delay in seconds
163 pub fn relay_stagger_secs(&self) -> u64 {
164 RELAY_STAGGER_SECS
165 }
166
167 /// Run startup catchup for a relay
168 ///
169 /// Fetches all events matching the sync filters and stores any missing ones.
170 /// This is called after the startup warm-up period (default 30s).
171 ///
172 /// Returns the count of gap events filled.
173 pub async fn run_startup_catchup(
174 &self,
175 relay_url: &str,
176 remote_domain: &str,
177 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
178 tracing::info!("Starting startup catchup for {}", relay_url);
179
180 // Run full catchup (no time restriction)
181 let gap_count = self
182 .run_catchup(relay_url, remote_domain, None, "startup")
183 .await?;
184
185 // Mark startup catchup as completed
186 {
187 let mut completed = self.startup_catchup_completed.write().await;
188 *completed = true;
189 }
190
191 if gap_count > 0 {
192 tracing::warn!(
193 "Startup catchup filled {} gaps from {}",
194 gap_count,
195 relay_url
196 );
197 } else {
198 tracing::info!("Startup catchup completed for {} (no gaps)", relay_url);
199 }
200
201 Ok(gap_count)
202 }
203
204 /// Run reconnect catchup for a relay
205 ///
206 /// Fetches events from the last 3 days (configurable) and stores any missing ones.
207 /// This is called after a connection is restored (after reconnect delay).
208 ///
209 /// Returns the count of gap events filled.
210 pub async fn run_reconnect_catchup(
211 &self,
212 relay_url: &str,
213 remote_domain: &str,
214 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
215 tracing::info!("Starting reconnect catchup for {}", relay_url);
216
217 // Calculate "since" timestamp (3 days ago)
218 let lookback_secs = self.reconnect_lookback_days * 24 * 60 * 60;
219 let since = Timestamp::now() - lookback_secs;
220
221 let gap_count = self
222 .run_catchup(relay_url, remote_domain, Some(since), "reconnect")
223 .await?;
224
225 if gap_count > 0 {
226 tracing::warn!(
227 "Reconnect catchup filled {} gaps from {}",
228 gap_count,
229 relay_url
230 );
231 } else {
232 tracing::debug!("Reconnect catchup completed for {} (no gaps)", relay_url);
233 }
234
235 Ok(gap_count)
236 }
237
238 /// Run daily catchup for a relay
239 ///
240 /// Performs full reconciliation and stores any missing events.
241 /// This is called once per day per relay (with stagger).
242 ///
243 /// Returns the count of gap events filled.
244 pub async fn run_daily_catchup(
245 &self,
246 relay_url: &str,
247 remote_domain: &str,
248 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
249 tracing::info!("Starting daily catchup for {}", relay_url);
250
251 // Run full catchup (no time restriction)
252 let gap_count = self
253 .run_catchup(relay_url, remote_domain, None, "daily")
254 .await?;
255
256 // Update last daily catchup time
257 {
258 let mut last_catchup = self.last_daily_catchup.write().await;
259 last_catchup.insert(relay_url.to_string(), Instant::now());
260 }
261
262 if gap_count > 0 {
263 tracing::warn!(
264 "Daily catchup filled {} gaps from {}",
265 gap_count,
266 relay_url
267 );
268 } else {
269 tracing::info!("Daily catchup completed for {} (no gaps)", relay_url);
270 }
271
272 Ok(gap_count)
273 }
274
275 /// Core catchup implementation
276 ///
277 /// Fetches events from relay matching sync filters, compares with local database,
278 /// validates through policy, and stores missing events.
279 ///
280 /// # Arguments
281 /// * `relay_url` - URL of the relay to fetch from
282 /// * `remote_domain` - Domain of the remote relay (for filter building)
283 /// * `since` - Optional timestamp to filter events (for reconnect catchup)
284 /// * `catchup_type` - Type of catchup for logging ("startup", "reconnect", "daily")
285 async fn run_catchup(
286 &self,
287 relay_url: &str,
288 remote_domain: &str,
289 since: Option<Timestamp>,
290 catchup_type: &str,
291 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
292 // Create a client for fetching events
293 let client = Client::default();
294 client.add_relay(relay_url).await?;
295 client.connect().await;
296
297 let mut gap_count = 0;
298
299 // Build filters (same as live sync uses)
300 let mut all_filters = Vec::new();
301
302 // Layer 1: Announcement discovery
303 let layer1_filters = self.filter_service.get_layer1_filters();
304 all_filters.extend(layer1_filters);
305
306 // Layer 2: Repository events
307 let layer2_filters = self.filter_service.get_layer2_filters(remote_domain).await;
308 all_filters.extend(layer2_filters);
309
310 // Layer 3: Related events
311 let layer3_filters = self.filter_service.get_layer3_filters().await;
312 all_filters.extend(layer3_filters);
313
314 // Apply "since" filter if specified (for reconnect catchup)
315 let filters: Vec<Filter> = if let Some(since_ts) = since {
316 all_filters
317 .into_iter()
318 .map(|f| f.since(since_ts))
319 .collect()
320 } else {
321 all_filters
322 };
323
324 if filters.is_empty() {
325 tracing::debug!("No filters for {} catchup on {}", catchup_type, relay_url);
326 client.disconnect().await;
327 return Ok(0);
328 }
329
330 tracing::debug!(
331 "Running {} catchup on {} with {} filters",
332 catchup_type,
333 relay_url,
334 filters.len()
335 );
336
337 // Fetch events for each filter
338 for filter in filters {
339 match client
340 .fetch_events(filter, Duration::from_secs(CATCHUP_FETCH_TIMEOUT_SECS))
341 .await
342 {
343 Ok(events) => {
344 for event in events.into_iter() {
345 // Check if event already exists in local database
346 if self.event_exists_locally(&event).await {
347 continue;
348 }
349
350 // Validate through write policy
351 let result = self
352 .write_policy
353 .admit_event(&event, &SYNC_SOURCE_ADDR)
354 .await;
355
356 match result {
357 PolicyResult::Accept => {
358 // Log gap event at WARN level to distinguish from live events
359 tracing::warn!(
360 "Gap event filled via {} catchup: {} (kind {})",
361 catchup_type,
362 event.id.to_hex(),
363 event.kind.as_u16()
364 );
365
366 // Store the event
367 if let Err(e) = self.database.save_event(&event).await {
368 tracing::error!(
369 "Failed to store gap event {}: {}",
370 event.id.to_hex(),
371 e
372 );
373 } else {
374 gap_count += 1;
375 }
376 }
377 PolicyResult::Reject(reason) => {
378 tracing::debug!(
379 "Gap event {} rejected by policy: {}",
380 event.id.to_hex(),
381 reason
382 );
383 }
384 }
385 }
386 }
387 Err(e) => {
388 tracing::warn!(
389 "Failed to fetch events for {} catchup from {}: {}",
390 catchup_type,
391 relay_url,
392 e
393 );
394 }
395 }
396 }
397
398 client.disconnect().await;
399
400 Ok(gap_count)
401 }
402
403 /// Check if an event already exists in the local database
404 async fn event_exists_locally(&self, event: &Event) -> bool {
405 // Query for the specific event by ID
406 let filter = Filter::new().id(event.id);
407
408 match self.database.query(filter).await {
409 Ok(events) => !events.is_empty(),
410 Err(e) => {
411 tracing::warn!(
412 "Failed to check if event {} exists locally: {}",
413 event.id.to_hex(),
414 e
415 );
416 // Assume it doesn't exist to avoid skipping events on error
417 false
418 }
419 }
420 }
421
422 /// Mark startup catchup as completed (for testing)
423 #[cfg(test)]
424 pub async fn mark_startup_completed(&self) {
425 let mut completed = self.startup_catchup_completed.write().await;
426 *completed = true;
427 }
428
429 /// Reset startup catchup status (for testing)
430 #[cfg(test)]
431 pub async fn reset_startup_status(&self) {
432 let mut completed = self.startup_catchup_completed.write().await;
433 *completed = false;
434 }
435}
436
437/// Create a shared NegentropyService wrapped in Arc
438pub fn create_negentropy_service(
439 database: SharedDatabase,
440 filter_service: Arc<FilterService>,
441 write_policy: Nip34WritePolicy,
442 config: &Config,
443) -> Arc<NegentropyService> {
444 Arc::new(NegentropyService::new(
445 database,
446 filter_service,
447 write_policy,
448 config,
449 ))
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_default_constants() {
458 assert_eq!(DEFAULT_STARTUP_DELAY_SECS, 30);
459 assert_eq!(DEFAULT_RECONNECT_DELAY_SECS, 10);
460 assert_eq!(DEFAULT_RECONNECT_LOOKBACK_DAYS, 3);
461 assert_eq!(DAILY_CATCHUP_INTERVAL_SECS, 86400);
462 assert_eq!(RELAY_STAGGER_SECS, 300);
463 }
464
465 #[test]
466 fn test_reconnect_lookback_calculation() {
467 // 3 days = 3 * 24 * 60 * 60 = 259,200 seconds
468 let lookback_days: u64 = 3;
469 let lookback_secs = lookback_days * 24 * 60 * 60;
470 assert_eq!(lookback_secs, 259200);
471 }
472
473 #[test]
474 fn test_stagger_delay_is_5_minutes() {
475 assert_eq!(RELAY_STAGGER_SECS, 300); // 5 * 60 = 300
476 }
477} \ No newline at end of file