upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:12 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:18 +0000
commitefaad1e2857914b87307cf78903a957a604697a8 (patch)
treedadd0285727b324328166d06d86a6e1e6fb935cf /src/sync
parent91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff)
basic sync stub
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/connection.rs473
-rw-r--r--src/sync/filter.rs451
-rw-r--r--src/sync/manager.rs762
-rw-r--r--src/sync/mod.rs416
-rw-r--r--src/sync/negentropy.rs477
-rw-r--r--src/sync/subscription.rs229
6 files changed, 385 insertions, 2423 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
deleted file mode 100644
index 61a33f8..0000000
--- a/src/sync/connection.rs
+++ /dev/null
@@ -1,473 +0,0 @@
1//! WebSocket connection handling for sync
2//!
3//! Manages the connection to a source relay, subscribes to events using
4//! the three-layer filter strategy, and passes them through validation.
5//!
6//! ## Phase 2 Features
7//!
8//! - Three-layer filter subscriptions:
9//! 1. Layer 1: kinds 30617 + 30618 (announcements)
10//! 2. Layer 2: A/a tags for repository events
11//! 3. Layer 3: E/e tags for related events (PRs, Issues, etc.)
12//!
13//! ## Phase 3 Features
14//!
15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates when new announcements/PRs arrive
22//! - Per-connection subscription tracking
23//! - Filter consolidation when count exceeds threshold (>150)
24//! - Duplicate subscription prevention
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use nostr_sdk::prelude::*;
30use tokio::sync::mpsc;
31
32use super::filter::FilterService;
33use super::health::RelayHealthTracker;
34use super::metrics::{event_source, SyncMetrics};
35use super::subscription::SubscriptionManager;
36
37/// Event received from the sync connection
38#[derive(Debug, Clone)]
39pub struct SyncedEvent {
40 pub event: Event,
41 pub source_url: String,
42}
43
44/// Manages a WebSocket connection to a single relay for syncing
45pub struct SyncConnection {
46 url: String,
47 client: Client,
48 filter_service: Arc<FilterService>,
49 remote_domain: String,
50 subscription_manager: SubscriptionManager,
51 metrics: Option<SyncMetrics>,
52}
53
54impl SyncConnection {
55 /// Create a new sync connection to the given relay URL
56 pub async fn new(
57 url: &str,
58 filter_service: Arc<FilterService>,
59 remote_domain: &str,
60 metrics: Option<SyncMetrics>,
61 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
62 let client = Client::default();
63
64 // Add the relay
65 client.add_relay(url).await?;
66
67 // Connect to the relay
68 client.connect().await;
69
70 tracing::info!("Sync connection established to {}", url);
71
72 // Create subscription manager for this connection
73 let subscription_manager =
74 SubscriptionManager::new(filter_service.clone(), remote_domain.to_string());
75
76 Ok(Self {
77 url: url.to_string(),
78 client,
79 filter_service,
80 remote_domain: remote_domain.to_string(),
81 subscription_manager,
82 metrics,
83 })
84 }
85
86 /// Start receiving events and send them through the channel
87 ///
88 /// This method runs indefinitely, handling events from all three filter layers.
89 /// Dynamic subscription updates are triggered when new announcements or PRs arrive.
90 pub async fn run(mut self, tx: mpsc::Sender<SyncedEvent>) {
91 // Subscribe to all three filter layers
92
93 // Layer 1: Announcement discovery (kinds 30617 + 30618)
94 let layer1_filters = self.filter_service.get_layer1_filters();
95 for filter in &layer1_filters {
96 match self.client.subscribe(filter.clone(), None).await {
97 Ok(output) => {
98 tracing::info!(
99 "Subscribed to Layer 1 (announcements) on {} (subscription: {})",
100 self.url,
101 output.id()
102 );
103 }
104 Err(e) => {
105 tracing::error!("Failed to subscribe Layer 1 on {}: {}", self.url, e);
106 }
107 }
108 }
109
110 // Layer 2: Repository events (A/a tags)
111 let layer2_filters = self
112 .filter_service
113 .get_layer2_filters(&self.remote_domain)
114 .await;
115 for filter in &layer2_filters {
116 match self.client.subscribe(filter.clone(), None).await {
117 Ok(output) => {
118 tracing::info!(
119 "Subscribed to Layer 2 (repo events) on {} (subscription: {})",
120 self.url,
121 output.id()
122 );
123 }
124 Err(e) => {
125 tracing::error!("Failed to subscribe Layer 2 on {}: {}", self.url, e);
126 }
127 }
128 }
129
130 // Layer 3: Related events (E/e tags)
131 let layer3_filters = self.filter_service.get_layer3_filters().await;
132 for filter in &layer3_filters {
133 match self.client.subscribe(filter.clone(), None).await {
134 Ok(output) => {
135 tracing::info!(
136 "Subscribed to Layer 3 (related events) on {} (subscription: {})",
137 self.url,
138 output.id()
139 );
140 }
141 Err(e) => {
142 tracing::error!("Failed to subscribe Layer 3 on {}: {}", self.url, e);
143 }
144 }
145 }
146
147 tracing::info!(
148 "Sync subscriptions active on {} (L1: {}, L2: {}, L3: {})",
149 self.url,
150 layer1_filters.len(),
151 layer2_filters.len(),
152 layer3_filters.len()
153 );
154
155 // Handle incoming notifications
156 let url = self.url.clone();
157 let metrics = self.metrics.clone();
158 self.client
159 .handle_notifications(|notification| {
160 let tx = tx.clone();
161 let url = url.clone();
162 let metrics = metrics.clone();
163 async move {
164 match notification {
165 RelayPoolNotification::Event { event, .. } => {
166 tracing::debug!(
167 "Received event {} from {} (kind {})",
168 event.id,
169 url,
170 event.kind.as_u16()
171 );
172
173 // Record live event metric
174 if let Some(ref m) = metrics {
175 m.record_event(event_source::LIVE);
176 }
177
178 // Send the event to the manager for processing
179 let synced = SyncedEvent {
180 event: (*event).clone(),
181 source_url: url.clone(),
182 };
183
184 if let Err(e) = tx.send(synced).await {
185 tracing::warn!("Failed to send synced event: {}", e);
186 return Ok(true); // Stop if channel is closed
187 }
188 }
189 RelayPoolNotification::Shutdown => {
190 tracing::warn!("Relay connection shutdown for {}", url);
191 return Ok(true); // Stop on shutdown
192 }
193 RelayPoolNotification::Message { message, .. } => {
194 tracing::trace!("Received message from {}: {:?}", url, message);
195 }
196 }
197 Ok(false) // Continue processing
198 }
199 })
200 .await
201 .ok();
202 }
203
204 /// Handle dynamic subscription updates based on incoming event kind
205 ///
206 /// - kind 30617/30618: New announcement → add Layer 2 subscription
207 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
208 async fn handle_dynamic_subscription(&mut self, event: &Event) {
209 // Check if this is an announcement kind (triggers Layer 2 subscription)
210 if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
211 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
212 tracing::info!(
213 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
214 event.id.to_hex(),
215 self.url,
216 new_filters.len(),
217 self.subscription_manager.get_filter_count()
218 );
219 self.subscribe_to_filters(new_filters, "Layer 2").await;
220 }
221 }
222
223 // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription)
224 if matches!(
225 event.kind,
226 Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618)
227 ) {
228 if let Some(new_filters) = self.subscription_manager.add_event(event) {
229 tracing::info!(
230 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
231 event.id.to_hex(),
232 self.url,
233 new_filters.len(),
234 self.subscription_manager.get_filter_count()
235 );
236 self.subscribe_to_filters(new_filters, "Layer 3").await;
237 }
238 }
239
240 // Check if we need to consolidate
241 if self.subscription_manager.should_consolidate() {
242 self.consolidate_subscriptions().await;
243 }
244 }
245
246 /// Subscribe to new filters
247 async fn subscribe_to_filters(&self, filters: Vec<Filter>, layer_name: &str) {
248 for filter in filters {
249 match self.client.subscribe(filter, None).await {
250 Ok(output) => {
251 tracing::debug!(
252 "Dynamic {} subscription on {} (subscription: {})",
253 layer_name,
254 self.url,
255 output.id()
256 );
257 }
258 Err(e) => {
259 tracing::warn!(
260 "Failed to add dynamic {} subscription on {}: {}",
261 layer_name,
262 self.url,
263 e
264 );
265 }
266 }
267 }
268 }
269
270 /// Consolidate subscriptions back to Layer 1 only
271 ///
272 /// This is triggered when the filter count exceeds 150.
273 /// All existing subscriptions are closed and only Layer 1 is re-subscribed.
274 async fn consolidate_subscriptions(&mut self) {
275 tracing::warn!(
276 "Filter count {} exceeds threshold, consolidating subscriptions on {}",
277 self.subscription_manager.get_filter_count(),
278 self.url
279 );
280
281 // Get consolidated filters (clears tracking and returns Layer 1 only)
282 let layer1_filters = self.subscription_manager.consolidate();
283
284 // Note: nostr-sdk doesn't provide a way to close all subscriptions easily
285 // The client will manage subscription count internally
286 // We just add the new Layer 1 subscription
287
288 for filter in layer1_filters {
289 match self.client.subscribe(filter, None).await {
290 Ok(output) => {
291 tracing::info!(
292 "Consolidated to Layer 1 subscription on {} (subscription: {})",
293 self.url,
294 output.id()
295 );
296 }
297 Err(e) => {
298 tracing::error!(
299 "Failed to subscribe Layer 1 after consolidation on {}: {}",
300 self.url,
301 e
302 );
303 }
304 }
305 }
306 }
307
308 /// Get the current filter count from the subscription manager
309 pub fn get_filter_count(&self) -> usize {
310 self.subscription_manager.get_filter_count()
311 }
312
313 /// Check if subscriptions have been consolidated
314 pub fn is_consolidated(&self) -> bool {
315 self.subscription_manager.is_consolidated()
316 }
317}
318
319/// Reconnect loop with health-aware exponential backoff
320///
321/// This function manages the connection lifecycle with health tracking:
322/// - Checks health state before attempting connections
323/// - Reports success/failure to the health tracker
324/// - Respects backoff delays from the health tracker
325/// - Handles dead relay detection (24h+ failures)
326///
327/// # Arguments
328/// * `url` - The relay URL to connect to
329/// * `tx` - Channel sender for synced events
330/// * `filter_service` - FilterService for building subscriptions
331/// * `our_domain` - Our relay's domain (used to extract remote domain)
332/// * `health_tracker` - Health tracker for managing connection state
333/// * `metrics` - Optional sync metrics for Prometheus
334pub async fn connect_with_retry(
335 url: &str,
336 tx: mpsc::Sender<SyncedEvent>,
337 filter_service: Arc<FilterService>,
338 _our_domain: &str,
339 health_tracker: Arc<RelayHealthTracker>,
340 metrics: Option<SyncMetrics>,
341) {
342 // Extract remote domain from URL
343 let remote_domain = extract_domain_from_url(url).unwrap_or_else(|| url.to_string());
344
345 loop {
346 // Check if we should attempt connection based on health state
347 if !health_tracker.should_attempt_connection(url) {
348 // Wait for remaining backoff
349 if let Some(remaining) = health_tracker.get_remaining_backoff(url) {
350 tracing::debug!(
351 "Relay {} in backoff, waiting {:?} before retry",
352 url,
353 remaining
354 );
355 tokio::time::sleep(remaining).await;
356 continue;
357 }
358 }
359
360 // Log current health state for dead relays
361 if health_tracker.is_dead(url) {
362 tracing::info!(
363 "Attempting reconnection to dead relay {} (daily retry)",
364 url
365 );
366 }
367
368 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone())
369 .await
370 {
371 Ok(conn) => {
372 // Record successful connection
373 health_tracker.record_success(url);
374
375 // Record metrics
376 if let Some(ref m) = metrics {
377 m.record_connection_attempt(url, true);
378 m.set_relay_connected(url, true);
379 m.inc_connected_count();
380 m.record_health_state(url, health_tracker.get_state(url));
381 m.record_failure_count(url, 0);
382 }
383
384 tracing::info!("Sync connection established to {}", url);
385
386 // Run the connection (this blocks until disconnection)
387 conn.run(tx.clone()).await;
388
389 // Connection ended - record as failure for reconnection backoff
390 // (The connection ending is considered a failure even if it worked for a while)
391 health_tracker.record_failure(url);
392
393 // Update metrics for disconnection
394 if let Some(ref m) = metrics {
395 m.set_relay_connected(url, false);
396 m.dec_connected_count();
397 m.record_health_state(url, health_tracker.get_state(url));
398 m.record_failure_count(url, health_tracker.get_failure_count(url));
399 }
400
401 tracing::warn!("Sync connection to {} ended, will reconnect", url);
402 }
403 Err(e) => {
404 // Record connection failure
405 health_tracker.record_failure(url);
406
407 let failure_count = health_tracker.get_failure_count(url);
408 let state = health_tracker.get_state(url);
409
410 // Record metrics
411 if let Some(ref m) = metrics {
412 m.record_connection_attempt(url, false);
413 m.set_relay_connected(url, false);
414 m.record_health_state(url, state);
415 m.record_failure_count(url, failure_count);
416
417 // Track dead relays
418 if state == super::health::HealthState::Dead {
419 m.inc_dead_count();
420 }
421 }
422
423 tracing::error!(
424 "Failed to connect to sync relay {} (attempt #{}, state: {}): {}",
425 url,
426 failure_count,
427 state,
428 e
429 );
430 }
431 }
432
433 // Get the backoff duration from health tracker
434 // If the health tracker has no backoff set (shouldn't happen), use a small default
435 let wait_duration = health_tracker
436 .get_remaining_backoff(url)
437 .unwrap_or(Duration::from_secs(5));
438
439 tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url);
440 tokio::time::sleep(wait_duration).await;
441 }
442}
443
444/// Extract domain from a URL
445fn extract_domain_from_url(url: &str) -> Option<String> {
446 let url = url
447 .trim_start_matches("ws://")
448 .trim_start_matches("wss://")
449 .trim_start_matches("http://")
450 .trim_start_matches("https://");
451
452 // Remove path
453 let domain = url.split('/').next()?;
454
455 Some(domain.to_string())
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_extract_domain() {
464 assert_eq!(
465 extract_domain_from_url("ws://127.0.0.1:8080"),
466 Some("127.0.0.1:8080".to_string())
467 );
468 assert_eq!(
469 extract_domain_from_url("wss://relay.example.com/path"),
470 Some("relay.example.com".to_string())
471 );
472 }
473}
diff --git a/src/sync/filter.rs b/src/sync/filter.rs
deleted file mode 100644
index 108c92a..0000000
--- a/src/sync/filter.rs
+++ /dev/null
@@ -1,451 +0,0 @@
1//! Filter Service for GRASP-02 Proactive Sync
2//!
3//! Implements the three-layer filter strategy for comprehensive event syncing:
4//! - Layer 1: Announcement discovery (kinds 30617 + 30618)
5//! - Layer 2: Repository events (A/a tags pointing to shared repos)
6//! - Layer 3: Related events (E/e tags pointing to Layer 2 events)
7
8use std::collections::HashSet;
9
10use nostr_sdk::prelude::*;
11
12use crate::nostr::builder::SharedDatabase;
13use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT;
14
15/// Maximum number of tags per filter to stay within relay limits
16const MAX_TAGS_PER_FILTER: usize = 100;
17
18/// Kind for maintainer metadata (NIP-34)
19const KIND_MAINTAINER_LIST: u16 = 30618;
20
21/// FilterService builds subscription filters for proactive sync
22///
23/// Uses a three-layer strategy:
24/// 1. Layer 1: Discover new repository announcements and maintainer metadata
25/// 2. Layer 2: Sync events directly related to repositories we track
26/// 3. Layer 3: Sync discussions and updates related to Layer 2 events
27#[derive(Debug)]
28pub struct FilterService {
29 database: SharedDatabase,
30 /// Our relay's domain for filtering
31 relay_domain: String,
32}
33
34impl FilterService {
35 /// Create a new FilterService
36 ///
37 /// # Arguments
38 /// * `database` - Shared database for querying stored events
39 /// * `relay_domain` - Our relay's domain (used for filtering shared repos)
40 pub fn new(database: SharedDatabase, relay_domain: String) -> Self {
41 Self {
42 database,
43 relay_domain,
44 }
45 }
46
47 /// Get Layer 1 filters for announcement discovery
48 ///
49 /// Returns filters for kinds 30617 (repository announcements) and 30618 (maintainer metadata)
50 pub fn get_layer1_filters(&self) -> Vec<Filter> {
51 vec![Filter::new().kinds(vec![
52 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT),
53 Kind::Custom(KIND_MAINTAINER_LIST),
54 ])]
55 }
56
57 /// Get Layer 2 filters for repository-related events
58 ///
59 /// Queries the database for kind 30617 events and builds filters for events
60 /// with `a` tags pointing to repositories that reference both:
61 /// - Our relay (from clone tags)
62 /// - Are stored in our database (meaning they're relevant to us)
63 ///
64 /// # Arguments
65 /// * `remote_relay_domain` - The domain of the remote relay we're syncing from
66 pub async fn get_layer2_filters(&self, remote_relay_domain: &str) -> Vec<Filter> {
67 // Query all kind 30617 events from our database
68 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT));
69
70 let events = match self.database.query(filter).await {
71 Ok(events) => events,
72 Err(e) => {
73 tracing::warn!("Failed to query announcements for Layer 2 filters: {}", e);
74 return Vec::new();
75 }
76 };
77
78 // Build a set of addressable coordinates for repos that list both relays
79 let mut coords: Vec<String> = Vec::new();
80
81 for event in events {
82 // Check if this repo lists our domain in clone tags
83 let has_our_relay = event.tags.iter().any(|tag| {
84 let tag_vec = tag.clone().to_vec();
85 tag_vec.len() >= 2
86 && (tag_vec[0] == "clone" || tag_vec[0] == "relays")
87 && tag_vec.iter().any(|v| v.contains(&self.relay_domain))
88 });
89
90 // Check if this repo lists the remote relay in clone/relays tags
91 let has_remote_relay = event.tags.iter().any(|tag| {
92 let tag_vec = tag.clone().to_vec();
93 tag_vec.len() >= 2
94 && (tag_vec[0] == "clone" || tag_vec[0] == "relays")
95 && tag_vec.iter().any(|v| v.contains(remote_relay_domain))
96 });
97
98 if has_our_relay && has_remote_relay {
99 // Extract the d tag (identifier)
100 if let Some(identifier) = event.tags.iter().find_map(|tag| {
101 let tag_vec = tag.clone().to_vec();
102 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
103 Some(tag_vec[1].clone())
104 } else {
105 None
106 }
107 }) {
108 // Build the addressable coordinate: kind:pubkey:identifier
109 let coord = format!(
110 "{}:{}:{}",
111 KIND_REPOSITORY_ANNOUNCEMENT,
112 event.pubkey.to_hex(),
113 identifier
114 );
115 coords.push(coord);
116 }
117 }
118 }
119
120 if coords.is_empty() {
121 return Vec::new();
122 }
123
124 // Batch coordinates into filters with A/a/q tags
125 Self::batch_layer2_filters(coords)
126 }
127
128 /// Get Layer 3 filters for related events
129 ///
130 /// Queries the database for events with `a` tags (PRs, Issues, etc.)
131 /// and builds filters for events that reference them with `e` tags.
132 pub async fn get_layer3_filters(&self) -> Vec<Filter> {
133 // Query events that reference repositories (have 'a' tags with 30617)
134 // These are typically PRs (1618), Issues (1621), etc.
135
136 // First, get all kind 30617 announcements
137 let announcement_filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT));
138
139 let announcements = match self.database.query(announcement_filter).await {
140 Ok(events) => events,
141 Err(e) => {
142 tracing::warn!("Failed to query announcements for Layer 3 filters: {}", e);
143 return Vec::new();
144 }
145 };
146
147 // Build a set of event IDs from PRs, Issues, etc. that reference our repos
148 let mut event_ids: Vec<String> = Vec::new();
149
150 // Get the set of valid repository coordinates
151 let repo_coords: HashSet<String> = announcements
152 .iter()
153 .filter_map(|e| {
154 e.tags.iter().find_map(|tag| {
155 let tag_vec = tag.clone().to_vec();
156 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
157 Some(format!(
158 "{}:{}:{}",
159 KIND_REPOSITORY_ANNOUNCEMENT,
160 e.pubkey.to_hex(),
161 tag_vec[1]
162 ))
163 } else {
164 None
165 }
166 })
167 })
168 .collect();
169
170 if repo_coords.is_empty() {
171 return Vec::new();
172 }
173
174 // Query for PR and Patch events from our repositories
175 let repos_pr_patch_filter = Filter::new().kinds(vec![
176 Kind::Custom(1617), // Patch
177 Kind::Custom(1618), // PR
178 ]);
179
180 let related_events = match self.database.query(repos_pr_patch_filter).await {
181 Ok(events) => events,
182 Err(e) => {
183 tracing::warn!("Failed to query related events for Layer 3 filters: {}", e);
184 return Vec::new();
185 }
186 };
187
188 // Collect event IDs that reference our repositories
189 for event in related_events {
190 // Check if this event has an 'a' tag pointing to one of our repos
191 let references_our_repo = event.tags.iter().any(|tag| {
192 let tag_vec = tag.clone().to_vec();
193 tag_vec.len() >= 2 && tag_vec[0] == "a" && repo_coords.contains(&tag_vec[1])
194 });
195
196 if references_our_repo {
197 event_ids.push(event.id.to_hex());
198 }
199 }
200
201 if event_ids.is_empty() {
202 return Vec::new();
203 }
204
205 // Batch event IDs into filters with 'E', 'e', and 'q' tags
206 Self::batch_layer3_filters(event_ids)
207 }
208
209 /// Batch a list of addressable coordinates into Layer 2 filters with 'A', 'a', and 'q' tags
210 ///
211 /// Different Nostr clients use different tag conventions for referencing repository
212 /// announcements. This function generates THREE filters per chunk to capture all variants:
213 /// - Uppercase 'A' tags (used by some clients)
214 /// - Lowercase 'a' tags (standard addressable event reference)
215 /// - Lowercase 'q' tags (quote tags, used by some clients)
216 ///
217 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets.
218 fn batch_layer2_filters(coords: Vec<String>) -> Vec<Filter> {
219 if coords.is_empty() {
220 return Vec::new();
221 }
222
223 coords
224 .chunks(MAX_TAGS_PER_FILTER)
225 .flat_map(|chunk| {
226 // Create THREE filters per chunk - one for each tag type
227 vec![
228 // Uppercase A tag filter
229 Filter::new().custom_tags(
230 SingleLetterTag::uppercase(Alphabet::A),
231 chunk.iter().cloned(),
232 ),
233 // Lowercase a tag filter
234 Filter::new().custom_tags(
235 SingleLetterTag::lowercase(Alphabet::A),
236 chunk.iter().cloned(),
237 ),
238 // Quote q tag filter
239 Filter::new().custom_tags(
240 SingleLetterTag::lowercase(Alphabet::Q),
241 chunk.iter().cloned(),
242 ),
243 ]
244 })
245 .collect()
246 }
247
248 /// Batch a list of event IDs into Layer 3 filters with 'E', 'e', and 'q' tags
249 ///
250 /// Different Nostr clients use different tag conventions for referencing events.
251 /// This function generates THREE filters per chunk to capture all variants:
252 /// - Uppercase 'E' tags (used by some clients)
253 /// - Lowercase 'e' tags (standard event reference)
254 /// - Lowercase 'q' tags (quote tags, used by some clients)
255 ///
256 /// When tag counts exceed MAX_TAGS_PER_FILTER, creates multiple filter sets.
257 fn batch_layer3_filters(event_ids: Vec<String>) -> Vec<Filter> {
258 if event_ids.is_empty() {
259 return Vec::new();
260 }
261
262 event_ids
263 .chunks(MAX_TAGS_PER_FILTER)
264 .flat_map(|chunk| {
265 // Create THREE filters per chunk - one for each tag type
266 vec![
267 // Uppercase E tag filter
268 Filter::new().custom_tags(
269 SingleLetterTag::uppercase(Alphabet::E),
270 chunk.iter().cloned(),
271 ),
272 // Lowercase e tag filter
273 Filter::new().custom_tags(
274 SingleLetterTag::lowercase(Alphabet::E),
275 chunk.iter().cloned(),
276 ),
277 // Quote q tag filter
278 Filter::new().custom_tags(
279 SingleLetterTag::lowercase(Alphabet::Q),
280 chunk.iter().cloned(),
281 ),
282 ]
283 })
284 .collect()
285 }
286
287 /// Discover relay URLs from stored kind 30617 announcements
288 ///
289 /// Only returns relay URLs from repositories that list **our** relay.
290 /// This ensures we only connect to relays where we have shared repos,
291 /// avoiding wasted connections with empty Layer 2 filters.
292 ///
293 /// Extracts unique relay URLs from `clone` and `relays` tags,
294 /// excluding our own relay domain.
295 pub async fn discover_relay_urls(&self) -> Vec<String> {
296 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT));
297
298 let events = match self.database.query(filter).await {
299 Ok(events) => events,
300 Err(e) => {
301 tracing::warn!("Failed to query announcements for relay discovery: {}", e);
302 return Vec::new();
303 }
304 };
305
306 let mut relay_urls: HashSet<String> = HashSet::new();
307
308 for event in events {
309 // First check: Does this repo list our relay?
310 // Only process repos that reference us - otherwise we'd connect to relays
311 // where we have no shared repos, resulting in empty Layer 2 filters.
312 let has_our_relay = event.tags.iter().any(|tag| {
313 let tag_vec = tag.clone().to_vec();
314 tag_vec.len() >= 2
315 && (tag_vec[0] == "clone" || tag_vec[0] == "relays")
316 && tag_vec.iter().any(|v| v.contains(&self.relay_domain))
317 });
318
319 if !has_our_relay {
320 // Skip repos that don't list our relay - no shared repos possible
321 continue;
322 }
323
324 // Extract relay URLs from repos that list us
325 for tag in event.tags.iter() {
326 let tag_vec = tag.clone().to_vec();
327 if tag_vec.len() < 2 {
328 continue;
329 }
330
331 // Extract URLs from clone and relays tags
332 if tag_vec[0] == "clone" || tag_vec[0] == "relays" {
333 for value in tag_vec.iter().skip(1) {
334 // Check if it looks like a URL
335 if value.starts_with("ws://")
336 || value.starts_with("wss://")
337 || value.starts_with("http://")
338 || value.starts_with("https://")
339 {
340 // Exclude our own relay
341 if !value.contains(&self.relay_domain) {
342 relay_urls.insert(value.clone());
343 }
344 }
345 }
346 }
347 }
348 }
349
350 relay_urls.into_iter().collect()
351 }
352
353 /// Extract relay URLs from a specific event's clone tags
354 ///
355 /// Returns URLs that are not our own relay.
356 pub fn extract_relay_urls_from_event(&self, event: &Event) -> Vec<String> {
357 let mut urls = Vec::new();
358
359 for tag in event.tags.iter() {
360 let tag_vec = tag.clone().to_vec();
361 if tag_vec.len() < 2 {
362 continue;
363 }
364
365 if tag_vec[0] == "clone" || tag_vec[0] == "relays" {
366 for value in tag_vec.iter().skip(1) {
367 if value.starts_with("ws://")
368 || value.starts_with("wss://")
369 || value.starts_with("http://")
370 || value.starts_with("https://")
371 {
372 if !value.contains(&self.relay_domain) {
373 urls.push(value.clone());
374 }
375 }
376 }
377 }
378 }
379
380 urls
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn test_batch_layer2_filters_empty() {
390 let filters = FilterService::batch_layer2_filters(vec![]);
391 assert!(filters.is_empty());
392 }
393
394 #[test]
395 fn test_batch_layer2_filters_small() {
396 let coords = vec!["30617:abc:repo1".to_string(), "30617:def:repo2".to_string()];
397 let filters = FilterService::batch_layer2_filters(coords);
398 // 1 chunk × 3 tag types (A, a, q) = 3 filters
399 assert_eq!(filters.len(), 3);
400 }
401
402 #[test]
403 fn test_batch_layer2_filters_large() {
404 // Create 250 coordinates to test batching
405 let coords: Vec<String> = (0..250)
406 .map(|i| format!("30617:pubkey{}:repo{}", i, i))
407 .collect();
408
409 let filters = FilterService::batch_layer2_filters(coords);
410 // 3 chunks (100 + 100 + 50) × 3 tag types (A, a, q) = 9 filters
411 assert_eq!(filters.len(), 9);
412 }
413
414 #[test]
415 fn test_batch_layer3_filters_empty() {
416 let filters = FilterService::batch_layer3_filters(vec![]);
417 assert!(filters.is_empty());
418 }
419
420 #[test]
421 fn test_batch_layer3_filters_small() {
422 let event_ids = vec!["eventid1".to_string(), "eventid2".to_string()];
423 let filters = FilterService::batch_layer3_filters(event_ids);
424 // 1 chunk × 3 tag types (E, e, q) = 3 filters
425 assert_eq!(filters.len(), 3);
426 }
427
428 #[test]
429 fn test_batch_layer3_filters_large() {
430 // Create 250 event IDs to test batching
431 let event_ids: Vec<String> = (0..250).map(|i| format!("eventid{:064}", i)).collect();
432
433 let filters = FilterService::batch_layer3_filters(event_ids);
434 // 3 chunks (100 + 100 + 50) × 3 tag types (E, e, q) = 9 filters
435 assert_eq!(filters.len(), 9);
436 }
437
438 #[test]
439 fn test_layer1_filters() {
440 // Create a mock database - we'll use a memory database for testing
441 // This test just verifies the filter structure
442 let filter = Filter::new().kinds(vec![
443 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT),
444 Kind::Custom(KIND_MAINTAINER_LIST),
445 ]);
446
447 // Verify the filter has the correct kinds
448 // Note: We can't easily inspect Filter internals, but we can ensure it compiles
449 assert!(!filter.is_empty());
450 }
451}
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
deleted file mode 100644
index 6ae82ef..0000000
--- a/src/sync/manager.rs
+++ /dev/null
@@ -1,762 +0,0 @@
1//! SyncManager - Coordinates proactive sync operations
2//!
3//! The SyncManager connects to remote relays, receives events, validates them
4//! through the write policy, and stores accepted events.
5//!
6//! ## Simplified Relay Discovery Architecture
7//!
8//! All relay discovery is centralized in the self-subscriber:
9//! - Bootstrap relay: connected immediately (no jitter, single relay)
10//! - All other relays: discovered via self-subscriber announcements (with jitter)
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────┐
14//! │ ngit-grasp │
15//! │ │
16//! │ ┌─────────────┐ broadcasts ┌───────────────┐ │
17//! │ │ Relay │ ─────────────────────▶ │ Self-Subscribe│ │
18//! │ │ Database │ │ Client │ │
19//! │ └─────────────┘ └───────┬───────┘ │
20//! │ ▲ │ │
21//! │ │ stores │ extracts │
22//! │ │ │ relay │
23//! │ ┌─────┴─────┐ │ URLs │
24//! │ │ Remote │◀────────────────────────────────┘ │
25//! │ │Connections│ spawns new │
26//! │ └───────────┘ connections (with jitter) │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Key Design Decisions
31//!
32//! - **Single relay discovery path**: Only the self-subscriber discovers new relays
33//! - **Jitter at point of discovery**: Applied when spawning connections from announcements
34//! - **Since filter on reconnection**: Avoids re-processing old announcements after disconnect
35//! - **Bootstrap relay has no jitter**: Single relay doesn't cause thundering herd
36//!
37//! ## Phase 2 Features
38//!
39//! - Relay discovery from kind 30617 announcements (via self-subscriber)
40//! - Multiple simultaneous relay connections
41//! - Three-layer filter strategy via FilterService
42//!
43//! ## Phase 3 Features
44//!
45//! - Health tracking with exponential backoff
46//! - Dead relay detection after 24h of failures
47//! - Startup jitter to prevent thundering herd
48//!
49//! ## Phase 4 Features
50//!
51//! - Dynamic subscription updates handled per-connection
52//! - Each connection manages its own SubscriptionManager
53//! - Announcements trigger Layer 2 subscriptions
54//! - PRs/Issues trigger Layer 3 subscriptions
55//! - Consolidation when filter count exceeds 150
56
57use std::collections::HashSet;
58use std::net::{IpAddr, Ipv4Addr, SocketAddr};
59use std::sync::Arc;
60use std::time::Duration;
61
62use nostr_relay_builder::prelude::*;
63use nostr_sdk::prelude::{Client, Filter, Kind, RelayPoolNotification, Timestamp};
64use rand::Rng;
65use tokio::sync::mpsc;
66
67use super::connection::{connect_with_retry, SyncedEvent};
68use super::filter::FilterService;
69use super::health::RelayHealthTracker;
70use super::metrics::SyncMetrics;
71use crate::config::Config;
72use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
73use crate::nostr::events::KIND_REPOSITORY_ANNOUNCEMENT;
74
75/// Default fallback address for sync source when bind_address cannot be parsed
76///
77/// This distinguishes synced events from directly-submitted events in logs and metrics.
78/// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker.
79pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr =
80 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
81
82/// Derive sync source address from config bind_address
83///
84/// Parses the bind_address string and returns a SocketAddr.
85/// Falls back to 127.0.0.1:8080 if parsing fails.
86fn get_sync_source_addr(bind_address: &str) -> SocketAddr {
87 bind_address
88 .parse()
89 .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR)
90}
91
92/// Derive the WebSocket URL for our own relay from bind_address
93fn derive_own_relay_url(bind_address: &str) -> String {
94 format!("ws://{}", bind_address)
95}
96
97/// Coordinates proactive sync from configured and discovered relays
98pub struct SyncManager {
99 /// Bootstrap relay URL for initial sync (from config)
100 /// Additional relays are discovered from repository announcements that list our service
101 bootstrap_relay_url: Option<String>,
102 /// Our relay's domain (for filtering)
103 relay_domain: String,
104 /// Our relay's WebSocket URL (for self-subscribe)
105 own_relay_url: String,
106 /// Database for storing accepted events
107 database: SharedDatabase,
108 /// Write policy for validating events
109 write_policy: Nip34WritePolicy,
110 /// Health tracker for relay connections
111 health_tracker: Arc<RelayHealthTracker>,
112 /// Sync metrics for Prometheus
113 metrics: Option<SyncMetrics>,
114 /// Source address for synced events (derived from config.bind_address)
115 sync_source_addr: SocketAddr,
116 /// Maximum startup jitter in milliseconds (from config)
117 startup_jitter_ms: u64,
118}
119
120impl SyncManager {
121 /// Create a new SyncManager
122 ///
123 /// # Arguments
124 /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config
125 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
126 /// * `database` - Shared database for storing events and querying announcements
127 /// * `write_policy` - Write policy for validating synced events
128 /// * `config` - Configuration for health tracking settings
129 pub fn new(
130 bootstrap_relay_url: Option<String>,
131 relay_domain: String,
132 database: SharedDatabase,
133 write_policy: Nip34WritePolicy,
134 config: &Config,
135 ) -> Self {
136 let own_relay_url = derive_own_relay_url(&config.bind_address);
137 Self {
138 bootstrap_relay_url,
139 relay_domain,
140 own_relay_url,
141 database,
142 write_policy,
143 health_tracker: Arc::new(RelayHealthTracker::new(config)),
144 metrics: None,
145 sync_source_addr: get_sync_source_addr(&config.bind_address),
146 startup_jitter_ms: config.sync_startup_jitter_ms,
147 }
148 }
149
150 /// Create a new SyncManager with metrics
151 ///
152 /// # Arguments
153 /// * `bootstrap_relay_url` - Optional bootstrap relay URL from config
154 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
155 /// * `database` - Shared database for storing events and querying announcements
156 /// * `write_policy` - Write policy for validating synced events
157 /// * `config` - Configuration for health tracking settings
158 /// * `metrics` - Sync metrics for Prometheus
159 pub fn with_metrics(
160 bootstrap_relay_url: Option<String>,
161 relay_domain: String,
162 database: SharedDatabase,
163 write_policy: Nip34WritePolicy,
164 config: &Config,
165 metrics: SyncMetrics,
166 ) -> Self {
167 let own_relay_url = derive_own_relay_url(&config.bind_address);
168 Self {
169 bootstrap_relay_url,
170 relay_domain,
171 own_relay_url,
172 database,
173 write_policy,
174 health_tracker: Arc::new(RelayHealthTracker::new(config)),
175 metrics: Some(metrics),
176 sync_source_addr: get_sync_source_addr(&config.bind_address),
177 startup_jitter_ms: config.sync_startup_jitter_ms,
178 }
179 }
180
181 /// Create a SyncManager with a single relay URL (Phase 1 compatibility)
182 pub fn with_single_relay(
183 bootstrap_url: String,
184 database: SharedDatabase,
185 write_policy: Nip34WritePolicy,
186 ) -> Self {
187 // Extract domain from URL for filtering
188 let relay_domain = extract_domain_from_url(&bootstrap_url).unwrap_or_default();
189 let own_relay_url = format!("ws://{}", relay_domain);
190 Self {
191 bootstrap_relay_url: Some(bootstrap_url),
192 relay_domain,
193 own_relay_url,
194 database,
195 write_policy,
196 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
197 metrics: None,
198 sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR,
199 startup_jitter_ms: 10_000, // Default 10 seconds
200 }
201 }
202
203 /// Set metrics for the sync manager
204 pub fn set_metrics(&mut self, metrics: SyncMetrics) {
205 self.metrics = Some(metrics);
206 }
207
208 /// Get a reference to the metrics
209 pub fn metrics(&self) -> Option<&SyncMetrics> {
210 self.metrics.as_ref()
211 }
212
213 /// Get a reference to the health tracker
214 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
215 self.health_tracker.clone()
216 }
217
218 /// Run the sync manager
219 ///
220 /// This spawns the bootstrap relay connection (if configured), sets up a
221 /// self-subscriber for event-driven relay discovery, and processes incoming
222 /// events. The self-subscriber handles ALL relay discovery from announcements.
223 /// Runs indefinitely until cancelled.
224 ///
225 /// ## Simplified Relay Discovery Architecture
226 ///
227 /// All relay discovery is centralized in the self-subscriber:
228 /// - Bootstrap relay: connected immediately (no jitter, single relay)
229 /// - All other relays: discovered via self-subscriber announcements (with jitter)
230 /// - Jitter applied at point of discovery (not startup)
231 ///
232 /// This eliminates three redundant discovery paths:
233 /// 1. DB query at startup (removed)
234 /// 2. Remote event extraction (removed)
235 /// 3. Self-subscriber (sole discovery path)
236 pub async fn run(self) {
237 tracing::info!(
238 "Starting SyncManager (domain: {}, own_relay: {}, bootstrap relay: {:?})",
239 self.relay_domain,
240 self.own_relay_url,
241 self.bootstrap_relay_url
242 );
243
244 // Create the filter service
245 let filter_service = Arc::new(FilterService::new(
246 self.database.clone(),
247 self.relay_domain.clone(),
248 ));
249
250 // Create channel for receiving events from all connections
251 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
252
253 // Track active relay URLs to avoid duplicates (wrapped in Arc for sharing)
254 let active_relays = Arc::new(tokio::sync::Mutex::new(HashSet::<String>::new()));
255
256 // Bootstrap relay - connect immediately (no jitter, just one relay)
257 if let Some(ref url) = self.bootstrap_relay_url {
258 if !self.is_own_relay(url) {
259 tracing::info!("Connecting to bootstrap relay: {}", url);
260 active_relays.lock().await.insert(url.clone());
261 self.spawn_connection(url.clone(), tx.clone(), filter_service.clone(), false);
262 } else {
263 tracing::info!("Skipping bootstrap relay (is our own relay): {}", url);
264 }
265 }
266
267 // Record initial tracked relay count
268 if let Some(ref metrics) = self.metrics {
269 let count = active_relays.lock().await.len();
270 metrics.set_tracked_count(count as i64);
271 }
272
273 {
274 let active = active_relays.lock().await;
275 if active.is_empty() {
276 tracing::info!(
277 "No bootstrap relay configured, waiting for announcements via self-subscriber..."
278 );
279 } else {
280 tracing::info!(
281 "SyncManager connected to {} relay(s): {:?}",
282 active.len(),
283 *active
284 );
285 }
286 }
287
288 // Spawn self-subscriber task for ALL relay discovery
289 let self_subscriber_handle = self.spawn_self_subscriber(
290 tx.clone(),
291 filter_service.clone(),
292 active_relays.clone(),
293 );
294
295 // Process incoming events - just validate and store, NO relay discovery
296 // (relay discovery is handled solely by the self-subscriber)
297 while let Some(synced_event) = rx.recv().await {
298 self.process_event(synced_event).await;
299 }
300
301 // Clean up self-subscriber
302 self_subscriber_handle.abort();
303 tracing::warn!("SyncManager event channel closed, shutting down");
304 }
305
306 /// Check if a URL points to our own relay
307 fn is_own_relay(&self, url: &str) -> bool {
308 url.contains(&self.relay_domain)
309 }
310
311 /// Spawn a self-subscriber task that connects to our own relay
312 /// and watches for kind 30617 announcements to discover new relays.
313 ///
314 /// This is the SOLE relay discovery path - all relay discovery happens here.
315 /// When a new announcement is saved to our database (from direct submission
316 /// or synced from another relay), the self-subscriber receives it immediately
317 /// and spawns connections to newly discovered relays (with jitter).
318 fn spawn_self_subscriber(
319 &self,
320 tx: mpsc::Sender<SyncedEvent>,
321 filter_service: Arc<FilterService>,
322 active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>,
323 ) -> tokio::task::JoinHandle<()> {
324 let own_relay_url = self.own_relay_url.clone();
325 let relay_domain = self.relay_domain.clone();
326 let metrics = self.metrics.clone();
327 let health_tracker = self.health_tracker.clone();
328 let startup_jitter_ms = self.startup_jitter_ms;
329
330 tokio::spawn(async move {
331 Self::run_self_subscriber_loop(
332 own_relay_url,
333 relay_domain,
334 tx,
335 filter_service,
336 active_relays,
337 metrics,
338 health_tracker,
339 startup_jitter_ms,
340 )
341 .await;
342 })
343 }
344
345 /// Main loop for the self-subscriber
346 ///
347 /// Connects to our own relay, subscribes to kind 30617 announcements,
348 /// and processes events to discover new relays. Handles reconnection
349 /// on disconnect.
350 ///
351 /// ## Reconnection Behavior
352 ///
353 /// - First connection: no `since` filter (get all historical announcements)
354 /// - Reconnections: use `since` filter (15 minutes ago) to avoid re-processing
355 #[allow(clippy::too_many_arguments)]
356 async fn run_self_subscriber_loop(
357 own_relay_url: String,
358 relay_domain: String,
359 tx: mpsc::Sender<SyncedEvent>,
360 filter_service: Arc<FilterService>,
361 active_relays: Arc<tokio::sync::Mutex<HashSet<String>>>,
362 metrics: Option<SyncMetrics>,
363 health_tracker: Arc<RelayHealthTracker>,
364 startup_jitter_ms: u64,
365 ) {
366 let mut reconnect_delay = Duration::from_secs(1);
367 let max_reconnect_delay = Duration::from_secs(60);
368 let mut is_first_connection = true;
369
370 loop {
371 tracing::info!(
372 "Self-subscriber connecting to own relay: {}",
373 own_relay_url
374 );
375
376 match Self::connect_self_subscriber(&own_relay_url).await {
377 Ok(client) => {
378 // Reset reconnect delay on successful connection
379 reconnect_delay = Duration::from_secs(1);
380
381 tracing::info!(
382 "Self-subscriber connected to own relay, subscribing to kind {} announcements{}",
383 KIND_REPOSITORY_ANNOUNCEMENT,
384 if is_first_connection { " (initial, no since filter)" } else { " (reconnection, with since filter)" }
385 );
386
387 // Subscribe to repository announcements
388 // First connection: get all historical; reconnections: only last 15 minutes
389 let filter = if is_first_connection {
390 Filter::new().kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT))
391 } else {
392 let since = Timestamp::now() - 15 * 60; // 15 minutes ago
393 Filter::new()
394 .kind(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT))
395 .since(since)
396 };
397
398 is_first_connection = false;
399
400 if let Err(e) = client.subscribe(filter, None).await {
401 tracing::error!(
402 "Self-subscriber failed to subscribe on {}: {}",
403 own_relay_url,
404 e
405 );
406 // Will reconnect after delay
407 } else {
408 // Handle notifications until disconnect
409 Self::handle_self_subscriber_notifications(
410 &client,
411 &own_relay_url,
412 &relay_domain,
413 &tx,
414 &filter_service,
415 &active_relays,
416 &metrics,
417 &health_tracker,
418 startup_jitter_ms,
419 )
420 .await;
421 }
422
423 // Disconnect and cleanup
424 client.disconnect().await;
425 }
426 Err(e) => {
427 tracing::warn!(
428 "Self-subscriber failed to connect to {}: {}",
429 own_relay_url,
430 e
431 );
432 }
433 }
434
435 // Wait before reconnecting with exponential backoff
436 tracing::debug!(
437 "Self-subscriber will reconnect to {} in {:?}",
438 own_relay_url,
439 reconnect_delay
440 );
441 tokio::time::sleep(reconnect_delay).await;
442 reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay);
443 }
444 }
445
446 /// Connect to our own relay for self-subscribing
447 async fn connect_self_subscriber(
448 url: &str,
449 ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> {
450 let client = Client::default();
451 client.add_relay(url).await?;
452 client.connect().await;
453
454 // Wait for connection to establish (with timeout)
455 let timeout = Duration::from_secs(10);
456 let start = std::time::Instant::now();
457
458 while start.elapsed() < timeout {
459 let relays = client.relays().await;
460 if relays.values().any(|r| r.is_connected()) {
461 return Ok(client);
462 }
463 tokio::time::sleep(Duration::from_millis(100)).await;
464 }
465
466 Err("Timeout waiting for self-subscriber connection".into())
467 }
468
469 /// Handle notifications from the self-subscriber client
470 ///
471 /// Processes announcement events to discover new relay URLs.
472 /// Applies jitter before spawning connections to prevent thundering herd.
473 #[allow(clippy::too_many_arguments)]
474 async fn handle_self_subscriber_notifications(
475 client: &Client,
476 own_relay_url: &str,
477 relay_domain: &str,
478 tx: &mpsc::Sender<SyncedEvent>,
479 filter_service: &Arc<FilterService>,
480 active_relays: &Arc<tokio::sync::Mutex<HashSet<String>>>,
481 metrics: &Option<SyncMetrics>,
482 health_tracker: &Arc<RelayHealthTracker>,
483 startup_jitter_ms: u64,
484 ) {
485 let own_relay_url = own_relay_url.to_string();
486 let relay_domain = relay_domain.to_string();
487 let filter_service = filter_service.clone();
488 let active_relays = active_relays.clone();
489 let metrics = metrics.clone();
490 let health_tracker = health_tracker.clone();
491 let tx = tx.clone();
492
493 client
494 .handle_notifications(|notification| {
495 let own_relay_url = own_relay_url.clone();
496 let relay_domain = relay_domain.clone();
497 let filter_service = filter_service.clone();
498 let active_relays = active_relays.clone();
499 let metrics = metrics.clone();
500 let health_tracker = health_tracker.clone();
501 let tx = tx.clone();
502
503 async move {
504 match notification {
505 RelayPoolNotification::Event { event, .. } => {
506 // Only process repository announcement events
507 if event.kind.as_u16() != KIND_REPOSITORY_ANNOUNCEMENT {
508 return Ok(false);
509 }
510
511 tracing::debug!(
512 "Self-subscriber received announcement {} from {}",
513 event.id,
514 own_relay_url
515 );
516
517 // Extract relay URLs from the announcement
518 let new_urls = filter_service.extract_relay_urls_from_event(&event);
519
520 for url in new_urls {
521 // Check if we should connect to this relay
522 let should_connect = {
523 let mut active = active_relays.lock().await;
524 let is_new = !active.contains(&url);
525 let is_not_self = !url.contains(&relay_domain);
526
527 if is_new && is_not_self {
528 active.insert(url.clone());
529 true
530 } else {
531 false
532 }
533 };
534
535 if should_connect {
536 tracing::info!(
537 "Self-subscriber discovered new relay from announcement {}, scheduling connection: {}",
538 event.id,
539 url
540 );
541
542 // Update tracked relay count
543 if let Some(ref m) = metrics {
544 m.inc_tracked_count();
545 }
546
547 // Spawn connection to the new relay WITH jitter at point of discovery
548 let url_clone = url.clone();
549 let tx_clone = tx.clone();
550 let filter_service_clone = filter_service.clone();
551 let domain_clone = relay_domain.clone();
552 let health_tracker_clone = health_tracker.clone();
553 let metrics_clone = metrics.clone();
554
555 tokio::spawn(async move {
556 // Apply jitter at point of discovery
557 if startup_jitter_ms > 0 {
558 let jitter_ms = rand::thread_rng().gen_range(0..startup_jitter_ms);
559 tracing::debug!(
560 "Applying {}ms jitter before connecting to discovered relay {}",
561 jitter_ms,
562 url_clone
563 );
564 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
565 }
566
567 connect_with_retry(
568 &url_clone,
569 tx_clone,
570 filter_service_clone,
571 &domain_clone,
572 health_tracker_clone,
573 metrics_clone,
574 )
575 .await;
576 });
577 }
578 }
579
580 Ok(false) // Continue processing
581 }
582 RelayPoolNotification::Shutdown => {
583 tracing::warn!(
584 "Self-subscriber connection shutdown for {}",
585 own_relay_url
586 );
587 Ok(true) // Stop on shutdown
588 }
589 RelayPoolNotification::Message { .. } => {
590 Ok(false) // Continue processing
591 }
592 }
593 }
594 })
595 .await
596 .ok();
597 }
598
599 /// Spawn a connection task for a relay
600 ///
601 /// # Arguments
602 /// * `url` - Relay URL to connect to
603 /// * `tx` - Channel sender for synced events
604 /// * `filter_service` - Filter service for subscriptions
605 /// * `apply_jitter` - Whether to apply startup jitter before connecting
606 fn spawn_connection(
607 &self,
608 url: String,
609 tx: mpsc::Sender<SyncedEvent>,
610 filter_service: Arc<FilterService>,
611 apply_jitter: bool,
612 ) {
613 let domain = self.relay_domain.clone();
614 let health_tracker = self.health_tracker.clone();
615 let metrics = self.metrics.clone();
616 let max_jitter = self.startup_jitter_ms;
617
618 tokio::spawn(async move {
619 // Apply startup jitter if requested
620 if apply_jitter && max_jitter > 0 {
621 let jitter_ms = rand::thread_rng().gen_range(0..max_jitter);
622 tracing::debug!(
623 "Applying {}ms jitter before connecting to {}",
624 jitter_ms,
625 url
626 );
627 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
628 }
629
630 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
631 });
632 }
633
634 /// Process a single synced event
635 ///
636 /// Events are validated through the write policy and stored if accepted.
637 /// Dynamic subscription updates are handled by each connection's SubscriptionManager.
638 async fn process_event(&self, synced_event: SyncedEvent) {
639 let event = &synced_event.event;
640 let event_id = event.id.to_hex();
641 let kind = event.kind.as_u16();
642
643 tracing::debug!(
644 "Processing synced event {} (kind {}) from {}",
645 event_id,
646 kind,
647 synced_event.source_url
648 );
649
650 // Log subscription-relevant events for debugging
651 match kind {
652 30617 | 30618 => {
653 tracing::debug!(
654 "Received announcement {} - connection will add Layer 2 subscription",
655 event_id
656 );
657 }
658 1617 | 1618 | 1619 | 1621 | 1622 => {
659 tracing::debug!(
660 "Received PR/Issue {} - connection will add Layer 3 subscription",
661 event_id
662 );
663 }
664 _ => {}
665 }
666
667 // Validate through write policy using sync_source_addr derived from config
668 let result = self
669 .write_policy
670 .admit_event(event, &self.sync_source_addr)
671 .await;
672
673 match result {
674 PolicyResult::Accept => {
675 tracing::info!(
676 "Synced event {} (kind {}) accepted, storing",
677 event_id,
678 event.kind.as_u16()
679 );
680
681 // Store the event in the database
682 if let Err(e) = self.database.save_event(event).await {
683 tracing::error!("Failed to store synced event {}: {}", event_id, e);
684 } else {
685 tracing::debug!("Synced event {} stored successfully", event_id);
686 }
687 }
688 PolicyResult::Reject(reason) => {
689 tracing::info!(
690 "Synced event {} (kind {}) rejected: {}",
691 event_id,
692 event.kind.as_u16(),
693 reason
694 );
695 }
696 }
697 }
698}
699
700/// Extract domain from a WebSocket URL
701///
702/// Examples:
703/// - "ws://127.0.0.1:8080" -> "127.0.0.1:8080"
704/// - "wss://relay.example.com" -> "relay.example.com"
705fn extract_domain_from_url(url: &str) -> Option<String> {
706 let url = url
707 .trim_start_matches("ws://")
708 .trim_start_matches("wss://");
709 let url = url
710 .trim_start_matches("http://")
711 .trim_start_matches("https://");
712
713 // Remove path
714 let domain = url.split('/').next()?;
715
716 Some(domain.to_string())
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_extract_domain_ws() {
725 assert_eq!(
726 extract_domain_from_url("ws://127.0.0.1:8080"),
727 Some("127.0.0.1:8080".to_string())
728 );
729 }
730
731 #[test]
732 fn test_extract_domain_wss() {
733 assert_eq!(
734 extract_domain_from_url("wss://relay.example.com"),
735 Some("relay.example.com".to_string())
736 );
737 }
738
739 #[test]
740 fn test_extract_domain_with_path() {
741 assert_eq!(
742 extract_domain_from_url("ws://example.com/path"),
743 Some("example.com".to_string())
744 );
745 }
746
747 #[test]
748 fn test_extract_domain_http() {
749 assert_eq!(
750 extract_domain_from_url("http://example.com:3000"),
751 Some("example.com:3000".to_string())
752 );
753 }
754
755 #[test]
756 fn test_derive_own_relay_url() {
757 assert_eq!(
758 derive_own_relay_url("127.0.0.1:8080"),
759 "ws://127.0.0.1:8080".to_string()
760 );
761 }
762} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 17418d0..aa34490 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1,40 +1,394 @@
1//! Proactive Sync Module for GRASP-02 1//! Proactive Sync Module for GRASP-02
2//! 2//!
3//! This module implements proactive synchronization of kind 30617 (repository state) 3//! This module implements the proactive sync system that ensures data availability
4//! events from configured relay(s). Events are validated through the same write policy 4//! for repositories hosted on this relay by syncing from other relays in the ecosystem.
5//! as directly-submitted events.
6//! 5//!
7//! ## Three-Layer Filter Strategy (Phase 2) 6//! ## Architecture Overview
8//! 7//!
9//! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) 8//! The sync system is built around two core data structures:
10//! - **Layer 2**: Repository events (A/a tags for shared repos)
11//! - **Layer 3**: Related events (E/e tags for discussions, reviews)
12//! 9//!
13//! ## Resilience & Health Tracking (Phase 3) 10//! - **FollowingRepoRootEvents**: Tracks repository root events we're following
11//! - **SyncRelays**: Tracks relays we sync from, including their repos and events
14//! 12//!
15//! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead) 13//! These type aliases are colocated with SyncManager (following the pattern of
16//! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max) 14//! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity.
17//! - **Dead relay handling**: Minimal retry for 24h+ failed relays 15//!
18//! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay) 16//! ## Submodules
17//!
18//! - [`health`]: Relay health tracking with exponential backoff and dead relay detection
19//! - [`metrics`]: Prometheus metrics for sync operations
20//!
21//! ## Memory Estimates (from design doc)
22//!
23//! At target scale (1,000 repos, 100 relays):
24//! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB
25//! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB
26//! - **Total in-memory state**: ~10 MB
27//!
28//! ## Upper Bounds (triggers for redesign)
29//!
30//! - 10,000+ repos: Consider database-backed state
31//! - 500+ sync relays: Consider connection pooling
32//! - 500+ root events per repo: Consider per-repo pagination
33//!
34//! ## Design References
35//!
36//! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md)
37//! for the complete design context.
38
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41
42use nostr_sdk::EventId;
43use tokio::sync::RwLock;
44
45use crate::config::Config;
46use crate::nostr::builder::Nip34WritePolicy;
47use crate::nostr::SharedDatabase;
48
49// =============================================================================
50// Type Aliases for Sync State
51// =============================================================================
52
53/// Repository root events we're following.
54///
55/// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621)
56/// we need to follow for each repository we host.
57///
58/// ## Key Format
59///
60/// The key is a repository addressable reference in the format:
61/// `"30617:<pubkey>:<identifier>"`
62///
63/// For example: `"30617:abc123...def:my-project"`
64///
65/// ## Value
66///
67/// A set of event IDs representing root events (PRs, Issues, Patches, Status events)
68/// that reference this repository via an `a` tag.
69///
70/// ## Event Kinds Tracked
71///
72/// - **1617**: Patches (NIP-34)
73/// - **1618**: Issues (NIP-34)
74/// - **1619**: PRs (Pull Requests, NIP-34)
75/// - **1621**: Status events (NIP-34)
76///
77/// ## Invariants
78///
79/// - May include a few extra repo refs that aren't in `SyncRelays`
80/// - This is acceptable - we won't query other relays for them
81/// - Updated incrementally via self-subscription
82///
83/// ## Thread Safety
84///
85/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
86/// async tasks performing sync operations.
87///
88/// ## Example Usage
89///
90/// ```rust,ignore
91/// use ngit_grasp::sync::FollowingRepoRootEvents;
92/// use std::collections::HashSet;
93/// use nostr_sdk::EventId;
94///
95/// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) {
96/// let guard = state.read().await;
97/// if let Some(events) = guard.get(repo_ref) {
98/// println!("Tracking {} root events for {}", events.len(), repo_ref);
99/// }
100/// }
101/// ```
102pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
103
104/// Relays we sync from, including their repos and events.
105///
106/// This structure tracks which relays we need to connect to for syncing,
107/// and for each relay, which repositories and their root events we're interested in.
108///
109/// ## Key Format (Outer HashMap)
110///
111/// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"`
112///
113/// ## Value Format (Inner HashMap)
114///
115/// For each relay, we maintain a map of:
116/// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`)
117/// - Value: Set of event IDs for that repo which should be synced from this relay
118///
119/// ## Relay Selection Criteria
120///
121/// A relay is included if its URL appears in a repository announcement (kind 30617)
122/// that **also** lists our service URL. This ensures we only sync from relays
123/// for repositories that are actually hosted on our relay.
124///
125/// ## Bootstrap Relay
126///
127/// If configured, the bootstrap relay is always present in this map and is
128/// excluded from automatic removal logic. The bootstrap relay is used for
129/// initial sync and discovery even when no repositories explicitly list it.
130///
131/// ## Thread Safety
132///
133/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
134/// async tasks performing sync operations.
135///
136/// ## Example Usage
137///
138/// ```rust,ignore
139/// use ngit_grasp::sync::SyncRelays;
140/// use std::collections::{HashMap, HashSet};
141///
142/// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) {
143/// let guard = state.read().await;
144/// if let Some(repos) = guard.get(relay_url) {
145/// println!("Relay {} tracks {} repos", relay_url, repos.len());
146/// for (repo_ref, events) in repos {
147/// println!(" {} -> {} events", repo_ref, events.len());
148/// }
149/// }
150/// }
151/// ```
152pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
153
154/// Creates a new empty `FollowingRepoRootEvents` state.
155///
156/// Use this to initialize the state before populating from database queries.
157pub fn new_following_repo_root_events() -> FollowingRepoRootEvents {
158 Arc::new(RwLock::new(HashMap::new()))
159}
160
161/// Creates a new empty `SyncRelays` state.
162///
163/// Use this to initialize the state before populating from database queries.
164pub fn new_sync_relays() -> SyncRelays {
165 Arc::new(RwLock::new(HashMap::new()))
166}
167
168// =============================================================================
169// SyncManager
170// =============================================================================
171
172/// Manages proactive synchronization with external relays.
173///
174/// The SyncManager is responsible for:
175/// - Discovering relays from stored repository announcements
176/// - Maintaining connections to sync relays
177/// - Subscribing to events at external relays
178/// - Applying the acceptance policy to synced events
179///
180/// ## Lifecycle
181///
182/// 1. `new()` - Creates manager with database and config
183/// 2. `run()` - Main async loop (call in a spawned task)
184///
185/// ## Current Status
186///
187/// This is a stub implementation. The core data structures are:
188/// - [`FollowingRepoRootEvents`]: Repository root events we're following
189/// - [`SyncRelays`]: Relays we sync from with their repos and events
190///
191/// Full implementation will come in later phases.
192pub struct SyncManager {
193 /// Bootstrap relay URL if configured
194 #[allow(dead_code)]
195 bootstrap_relay_url: Option<String>,
196
197 /// Our service domain for filtering repo announcements
198 #[allow(dead_code)]
199 service_domain: String,
200
201 /// Database for querying/storing events
202 #[allow(dead_code)]
203 database: SharedDatabase,
204
205 /// Write policy for applying acceptance rules
206 #[allow(dead_code)]
207 write_policy: Nip34WritePolicy,
208
209 /// Repository root events we're following (Phase 1 data structure)
210 #[allow(dead_code)]
211 following_repo_root_events: FollowingRepoRootEvents,
212
213 /// Relays we sync from (Phase 1 data structure)
214 #[allow(dead_code)]
215 sync_relays: SyncRelays,
216
217 /// Max backoff duration for relay reconnection
218 #[allow(dead_code)]
219 max_backoff_secs: u64,
220}
221
222impl SyncManager {
223 /// Creates a new SyncManager.
224 ///
225 /// # Arguments
226 ///
227 /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync
228 /// * `service_domain` - Our domain for filtering announcements
229 /// * `database` - Database for event storage/queries
230 /// * `write_policy` - Policy for accepting events
231 /// * `config` - Configuration for sync parameters
232 pub fn new(
233 bootstrap_relay_url: Option<String>,
234 service_domain: String,
235 database: SharedDatabase,
236 write_policy: Nip34WritePolicy,
237 config: &Config,
238 ) -> Self {
239 Self {
240 bootstrap_relay_url,
241 service_domain,
242 database,
243 write_policy,
244 following_repo_root_events: new_following_repo_root_events(),
245 sync_relays: new_sync_relays(),
246 max_backoff_secs: config.sync_max_backoff_secs,
247 }
248 }
249
250 /// Returns a reference to the following repo root events state.
251 ///
252 /// This is the Phase 1 data structure tracking which repository root events
253 /// (kinds 1617, 1618, 1619, 1621) we're following.
254 pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents {
255 &self.following_repo_root_events
256 }
257
258 /// Returns a reference to the sync relays state.
259 ///
260 /// This is the Phase 1 data structure tracking which relays we sync from
261 /// and their associated repositories/events.
262 pub fn sync_relays(&self) -> &SyncRelays {
263 &self.sync_relays
264 }
265
266 /// Runs the sync manager main loop.
267 ///
268 /// This method should be called in a spawned task:
269 ///
270 /// ```rust,ignore
271 /// tokio::spawn(async move {
272 /// sync_manager.run().await;
273 /// });
274 /// ```
275 ///
276 /// ## Current Status
277 ///
278 /// This is a stub that logs and then waits indefinitely.
279 /// Full implementation includes:
280 /// - Phase 2: Database initialization queries
281 /// - Phase 3: Self-subscription for incremental updates
282 /// - Phase 4-6: Filter building, connection management
283 /// - Phase 7: Full sync loop
284 pub async fn run(self) {
285 tracing::info!(
286 "SyncManager stub started (bootstrap_relay={:?}, domain={})",
287 self.bootstrap_relay_url,
288 self.service_domain
289 );
290
291 tracing::info!(
292 "Phase 1 data structures initialized: following_repo_root_events, sync_relays"
293 );
294
295 // Stub: just wait indefinitely until full implementation
296 // This prevents the spawned task from immediately completing
297 loop {
298 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
299 }
300 }
301}
302
303// =============================================================================
304// Submodules
305// =============================================================================
19 306
20mod connection;
21mod filter;
22pub mod health; 307pub mod health;
23mod manager;
24pub mod metrics; 308pub mod metrics;
25pub mod negentropy; 309
26mod subscription; 310// Re-export commonly used types
27 311pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker};
28pub use filter::FilterService; 312pub use metrics::{event_source, SyncMetrics};
29pub use health::{HealthState, RelayHealth, RelayHealthTracker}; 313
30pub use manager::SyncManager; 314// =============================================================================
31pub use metrics::SyncMetrics; 315// Tests
32pub use negentropy::NegentropyService; 316// =============================================================================
33pub use subscription::SubscriptionManager; 317
34 318#[cfg(test)]
35// Re-export default sync source address for backward compatibility with modules like negentropy.rs 319mod tests {
36// Manager.rs derives sync_source_addr from config.bind_address at runtime 320 use super::*;
37pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR; 321
38 322 #[tokio::test]
39/// Kind for repository state events (NIP-34) 323 async fn test_following_repo_root_events_basic_operations() {
40pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file 324 let state = new_following_repo_root_events();
325
326 // Insert some events
327 {
328 let mut guard = state.write().await;
329 let repo_ref = "30617:abc123:my-project".to_string();
330 guard
331 .entry(repo_ref)
332 .or_default()
333 .insert(EventId::all_zeros());
334 }
335
336 // Read back
337 {
338 let guard = state.read().await;
339 assert_eq!(guard.len(), 1);
340 assert!(guard.contains_key("30617:abc123:my-project"));
341 }
342 }
343
344 #[tokio::test]
345 async fn test_sync_relays_basic_operations() {
346 let state = new_sync_relays();
347
348 // Insert relay with repos
349 {
350 let mut guard = state.write().await;
351 let relay_url = "wss://relay.example.com".to_string();
352 let repo_ref = "30617:abc123:my-project".to_string();
353
354 guard
355 .entry(relay_url)
356 .or_default()
357 .entry(repo_ref)
358 .or_default()
359 .insert(EventId::all_zeros());
360 }
361
362 // Read back
363 {
364 let guard = state.read().await;
365 assert_eq!(guard.len(), 1);
366 let relay_repos = guard.get("wss://relay.example.com").unwrap();
367 assert_eq!(relay_repos.len(), 1);
368 let events = relay_repos.get("30617:abc123:my-project").unwrap();
369 assert_eq!(events.len(), 1);
370 }
371 }
372
373 #[tokio::test]
374 async fn test_concurrent_access() {
375 let state = new_following_repo_root_events();
376 let state_clone = Arc::clone(&state);
377
378 // Writer task
379 let writer = tokio::spawn(async move {
380 let mut guard = state_clone.write().await;
381 guard
382 .entry("30617:writer:repo".to_string())
383 .or_default()
384 .insert(EventId::all_zeros());
385 });
386
387 // Wait for writer
388 writer.await.unwrap();
389
390 // Reader should see the change
391 let guard = state.read().await;
392 assert!(guard.contains_key("30617:writer:repo"));
393 }
394} \ No newline at end of file
diff --git a/src/sync/negentropy.rs b/src/sync/negentropy.rs
deleted file mode 100644
index 5c0a246..0000000
--- a/src/sync/negentropy.rs
+++ /dev/null
@@ -1,477 +0,0 @@
1//! Negentropy Catchup Service for GRASP-02 Phase 5
2//!
3//! Implements gap-filling synchronization to ensure no events are missed during:
4//! - Startup (initial sync after warm-up period)
5//! - Reconnection (after connection restore)
6//! - Daily maintenance (periodic full reconciliation)
7//!
8//! ## Note on NIP-77
9//!
10//! This implementation uses a simplified gap-filling strategy (fetch and compare)
11//! rather than full NIP-77 negentropy set reconciliation. The nostr-sdk 0.44 does
12//! not include built-in negentropy support, so we implement an equivalent approach:
13//!
14//! 1. Fetch events from relay using same filters as live sync
15//! 2. Compare with local database (skip already-stored events)
16//! 3. Validate and store missing events through policy
17//!
18//! Full NIP-77 support can be added in a future release if needed.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use nostr_relay_builder::prelude::*;
25use nostr_sdk::prelude::*;
26use tokio::sync::RwLock;
27
28use super::filter::FilterService;
29use super::SYNC_SOURCE_ADDR;
30use crate::config::Config;
31use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
32
33/// Default startup delay before first catchup (30 seconds)
34const DEFAULT_STARTUP_DELAY_SECS: u64 = 30;
35
36/// Default delay after reconnection before catchup (10 seconds)
37const DEFAULT_RECONNECT_DELAY_SECS: u64 = 10;
38
39/// Default lookback period for reconnect catchup (3 days)
40const DEFAULT_RECONNECT_LOOKBACK_DAYS: u64 = 3;
41
42/// Daily catchup interval (24 hours)
43const DAILY_CATCHUP_INTERVAL_SECS: u64 = 86400;
44
45/// Stagger delay between relays for catchup operations (5 minutes)
46const RELAY_STAGGER_SECS: u64 = 300;
47
48/// Timeout for fetching events during catchup
49const CATCHUP_FETCH_TIMEOUT_SECS: u64 = 60;
50
51/// Negentropy Catchup Service
52///
53/// Manages gap-filling operations for different scenarios:
54/// - Startup catchup after warm-up period
55/// - Reconnect catchup after connection restore
56/// - Daily catchup for periodic maintenance
57#[derive(Debug)]
58pub struct NegentropyService {
59 /// Database for storing and querying events
60 database: SharedDatabase,
61 /// Filter service for building catchup filters
62 filter_service: Arc<FilterService>,
63 /// Write policy for validating synced events
64 write_policy: Nip34WritePolicy,
65 /// Startup time of the service
66 startup_time: Instant,
67 /// Configuration values
68 startup_delay_secs: u64,
69 reconnect_delay_secs: u64,
70 reconnect_lookback_days: u64,
71 /// Whether startup catchup has been run
72 startup_catchup_completed: Arc<RwLock<bool>>,
73 /// Last daily catchup time per relay
74 last_daily_catchup: Arc<RwLock<HashMap<String, Instant>>>,
75}
76
77impl NegentropyService {
78 /// Create a new NegentropyService
79 ///
80 /// # Arguments
81 /// * `database` - Shared database for storing events
82 /// * `filter_service` - Filter service for building catchup filters
83 /// * `write_policy` - Write policy for validating events
84 /// * `config` - Configuration for catchup timing
85 pub fn new(
86 database: SharedDatabase,
87 filter_service: Arc<FilterService>,
88 write_policy: Nip34WritePolicy,
89 config: &Config,
90 ) -> Self {
91 Self {
92 database,
93 filter_service,
94 write_policy,
95 startup_time: Instant::now(),
96 startup_delay_secs: config.sync_startup_delay_secs,
97 reconnect_delay_secs: config.sync_reconnect_delay_secs,
98 reconnect_lookback_days: config.sync_reconnect_lookback_days,
99 startup_catchup_completed: Arc::new(RwLock::new(false)),
100 last_daily_catchup: Arc::new(RwLock::new(HashMap::new())),
101 }
102 }
103
104 /// Create a NegentropyService with default configuration
105 pub fn with_defaults(
106 database: SharedDatabase,
107 filter_service: Arc<FilterService>,
108 write_policy: Nip34WritePolicy,
109 ) -> Self {
110 Self {
111 database,
112 filter_service,
113 write_policy,
114 startup_time: Instant::now(),
115 startup_delay_secs: DEFAULT_STARTUP_DELAY_SECS,
116 reconnect_delay_secs: DEFAULT_RECONNECT_DELAY_SECS,
117 reconnect_lookback_days: DEFAULT_RECONNECT_LOOKBACK_DAYS,
118 startup_catchup_completed: Arc::new(RwLock::new(false)),
119 last_daily_catchup: Arc::new(RwLock::new(HashMap::new())),
120 }
121 }
122
123 /// Check if startup catchup should run
124 ///
125 /// Returns true if:
126 /// - Startup delay has elapsed (default 30s)
127 /// - Startup catchup hasn't been completed yet
128 pub async fn should_run_startup_catchup(&self) -> bool {
129 let completed = *self.startup_catchup_completed.read().await;
130 if completed {
131 return false;
132 }
133
134 let elapsed = self.startup_time.elapsed();
135 elapsed >= Duration::from_secs(self.startup_delay_secs)
136 }
137
138 /// Check if daily catchup should run for a specific relay
139 ///
140 /// Returns true if 24 hours have elapsed since last daily catchup
141 pub async fn should_run_daily_catchup(&self, relay_url: &str) -> bool {
142 let last_catchup = self.last_daily_catchup.read().await;
143
144 match last_catchup.get(relay_url) {
145 None => true, // Never run, should run
146 Some(last_time) => {
147 last_time.elapsed() >= Duration::from_secs(DAILY_CATCHUP_INTERVAL_SECS)
148 }
149 }
150 }
151
152 /// Get the startup delay in seconds
153 pub fn startup_delay_secs(&self) -> u64 {
154 self.startup_delay_secs
155 }
156
157 /// Get the reconnect delay in seconds
158 pub fn reconnect_delay_secs(&self) -> u64 {
159 self.reconnect_delay_secs
160 }
161
162 /// Get the relay stagger delay in seconds
163 pub fn relay_stagger_secs(&self) -> u64 {
164 RELAY_STAGGER_SECS
165 }
166
167 /// Run startup catchup for a relay
168 ///
169 /// Fetches all events matching the sync filters and stores any missing ones.
170 /// This is called after the startup warm-up period (default 30s).
171 ///
172 /// Returns the count of gap events filled.
173 pub async fn run_startup_catchup(
174 &self,
175 relay_url: &str,
176 remote_domain: &str,
177 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
178 tracing::info!("Starting startup catchup for {}", relay_url);
179
180 // Run full catchup (no time restriction)
181 let gap_count = self
182 .run_catchup(relay_url, remote_domain, None, "startup")
183 .await?;
184
185 // Mark startup catchup as completed
186 {
187 let mut completed = self.startup_catchup_completed.write().await;
188 *completed = true;
189 }
190
191 if gap_count > 0 {
192 tracing::warn!(
193 "Startup catchup filled {} gaps from {}",
194 gap_count,
195 relay_url
196 );
197 } else {
198 tracing::info!("Startup catchup completed for {} (no gaps)", relay_url);
199 }
200
201 Ok(gap_count)
202 }
203
204 /// Run reconnect catchup for a relay
205 ///
206 /// Fetches events from the last 3 days (configurable) and stores any missing ones.
207 /// This is called after a connection is restored (after reconnect delay).
208 ///
209 /// Returns the count of gap events filled.
210 pub async fn run_reconnect_catchup(
211 &self,
212 relay_url: &str,
213 remote_domain: &str,
214 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
215 tracing::info!("Starting reconnect catchup for {}", relay_url);
216
217 // Calculate "since" timestamp (3 days ago)
218 let lookback_secs = self.reconnect_lookback_days * 24 * 60 * 60;
219 let since = Timestamp::now() - lookback_secs;
220
221 let gap_count = self
222 .run_catchup(relay_url, remote_domain, Some(since), "reconnect")
223 .await?;
224
225 if gap_count > 0 {
226 tracing::warn!(
227 "Reconnect catchup filled {} gaps from {}",
228 gap_count,
229 relay_url
230 );
231 } else {
232 tracing::debug!("Reconnect catchup completed for {} (no gaps)", relay_url);
233 }
234
235 Ok(gap_count)
236 }
237
238 /// Run daily catchup for a relay
239 ///
240 /// Performs full reconciliation and stores any missing events.
241 /// This is called once per day per relay (with stagger).
242 ///
243 /// Returns the count of gap events filled.
244 pub async fn run_daily_catchup(
245 &self,
246 relay_url: &str,
247 remote_domain: &str,
248 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
249 tracing::info!("Starting daily catchup for {}", relay_url);
250
251 // Run full catchup (no time restriction)
252 let gap_count = self
253 .run_catchup(relay_url, remote_domain, None, "daily")
254 .await?;
255
256 // Update last daily catchup time
257 {
258 let mut last_catchup = self.last_daily_catchup.write().await;
259 last_catchup.insert(relay_url.to_string(), Instant::now());
260 }
261
262 if gap_count > 0 {
263 tracing::warn!(
264 "Daily catchup filled {} gaps from {}",
265 gap_count,
266 relay_url
267 );
268 } else {
269 tracing::info!("Daily catchup completed for {} (no gaps)", relay_url);
270 }
271
272 Ok(gap_count)
273 }
274
275 /// Core catchup implementation
276 ///
277 /// Fetches events from relay matching sync filters, compares with local database,
278 /// validates through policy, and stores missing events.
279 ///
280 /// # Arguments
281 /// * `relay_url` - URL of the relay to fetch from
282 /// * `remote_domain` - Domain of the remote relay (for filter building)
283 /// * `since` - Optional timestamp to filter events (for reconnect catchup)
284 /// * `catchup_type` - Type of catchup for logging ("startup", "reconnect", "daily")
285 async fn run_catchup(
286 &self,
287 relay_url: &str,
288 remote_domain: &str,
289 since: Option<Timestamp>,
290 catchup_type: &str,
291 ) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
292 // Create a client for fetching events
293 let client = Client::default();
294 client.add_relay(relay_url).await?;
295 client.connect().await;
296
297 let mut gap_count = 0;
298
299 // Build filters (same as live sync uses)
300 let mut all_filters = Vec::new();
301
302 // Layer 1: Announcement discovery
303 let layer1_filters = self.filter_service.get_layer1_filters();
304 all_filters.extend(layer1_filters);
305
306 // Layer 2: Repository events
307 let layer2_filters = self.filter_service.get_layer2_filters(remote_domain).await;
308 all_filters.extend(layer2_filters);
309
310 // Layer 3: Related events
311 let layer3_filters = self.filter_service.get_layer3_filters().await;
312 all_filters.extend(layer3_filters);
313
314 // Apply "since" filter if specified (for reconnect catchup)
315 let filters: Vec<Filter> = if let Some(since_ts) = since {
316 all_filters
317 .into_iter()
318 .map(|f| f.since(since_ts))
319 .collect()
320 } else {
321 all_filters
322 };
323
324 if filters.is_empty() {
325 tracing::debug!("No filters for {} catchup on {}", catchup_type, relay_url);
326 client.disconnect().await;
327 return Ok(0);
328 }
329
330 tracing::debug!(
331 "Running {} catchup on {} with {} filters",
332 catchup_type,
333 relay_url,
334 filters.len()
335 );
336
337 // Fetch events for each filter
338 for filter in filters {
339 match client
340 .fetch_events(filter, Duration::from_secs(CATCHUP_FETCH_TIMEOUT_SECS))
341 .await
342 {
343 Ok(events) => {
344 for event in events.into_iter() {
345 // Check if event already exists in local database
346 if self.event_exists_locally(&event).await {
347 continue;
348 }
349
350 // Validate through write policy
351 let result = self
352 .write_policy
353 .admit_event(&event, &SYNC_SOURCE_ADDR)
354 .await;
355
356 match result {
357 PolicyResult::Accept => {
358 // Log gap event at WARN level to distinguish from live events
359 tracing::warn!(
360 "Gap event filled via {} catchup: {} (kind {})",
361 catchup_type,
362 event.id.to_hex(),
363 event.kind.as_u16()
364 );
365
366 // Store the event
367 if let Err(e) = self.database.save_event(&event).await {
368 tracing::error!(
369 "Failed to store gap event {}: {}",
370 event.id.to_hex(),
371 e
372 );
373 } else {
374 gap_count += 1;
375 }
376 }
377 PolicyResult::Reject(reason) => {
378 tracing::debug!(
379 "Gap event {} rejected by policy: {}",
380 event.id.to_hex(),
381 reason
382 );
383 }
384 }
385 }
386 }
387 Err(e) => {
388 tracing::warn!(
389 "Failed to fetch events for {} catchup from {}: {}",
390 catchup_type,
391 relay_url,
392 e
393 );
394 }
395 }
396 }
397
398 client.disconnect().await;
399
400 Ok(gap_count)
401 }
402
403 /// Check if an event already exists in the local database
404 async fn event_exists_locally(&self, event: &Event) -> bool {
405 // Query for the specific event by ID
406 let filter = Filter::new().id(event.id);
407
408 match self.database.query(filter).await {
409 Ok(events) => !events.is_empty(),
410 Err(e) => {
411 tracing::warn!(
412 "Failed to check if event {} exists locally: {}",
413 event.id.to_hex(),
414 e
415 );
416 // Assume it doesn't exist to avoid skipping events on error
417 false
418 }
419 }
420 }
421
422 /// Mark startup catchup as completed (for testing)
423 #[cfg(test)]
424 pub async fn mark_startup_completed(&self) {
425 let mut completed = self.startup_catchup_completed.write().await;
426 *completed = true;
427 }
428
429 /// Reset startup catchup status (for testing)
430 #[cfg(test)]
431 pub async fn reset_startup_status(&self) {
432 let mut completed = self.startup_catchup_completed.write().await;
433 *completed = false;
434 }
435}
436
437/// Create a shared NegentropyService wrapped in Arc
438pub fn create_negentropy_service(
439 database: SharedDatabase,
440 filter_service: Arc<FilterService>,
441 write_policy: Nip34WritePolicy,
442 config: &Config,
443) -> Arc<NegentropyService> {
444 Arc::new(NegentropyService::new(
445 database,
446 filter_service,
447 write_policy,
448 config,
449 ))
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_default_constants() {
458 assert_eq!(DEFAULT_STARTUP_DELAY_SECS, 30);
459 assert_eq!(DEFAULT_RECONNECT_DELAY_SECS, 10);
460 assert_eq!(DEFAULT_RECONNECT_LOOKBACK_DAYS, 3);
461 assert_eq!(DAILY_CATCHUP_INTERVAL_SECS, 86400);
462 assert_eq!(RELAY_STAGGER_SECS, 300);
463 }
464
465 #[test]
466 fn test_reconnect_lookback_calculation() {
467 // 3 days = 3 * 24 * 60 * 60 = 259,200 seconds
468 let lookback_days: u64 = 3;
469 let lookback_secs = lookback_days * 24 * 60 * 60;
470 assert_eq!(lookback_secs, 259200);
471 }
472
473 #[test]
474 fn test_stagger_delay_is_5_minutes() {
475 assert_eq!(RELAY_STAGGER_SECS, 300); // 5 * 60 = 300
476 }
477} \ No newline at end of file
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs
deleted file mode 100644
index bbeaa2a..0000000
--- a/src/sync/subscription.rs
+++ /dev/null
@@ -1,229 +0,0 @@
1//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
2//!
3//! Manages dynamic subscription updates per connection, including:
4//! - Tracking subscribed announcements and events
5//! - Adding new subscriptions when announcements/PRs arrive
6//! - Consolidating filters when count exceeds threshold
7//! - Preventing duplicate subscriptions
8//!
9//! ## Dynamic Subscription Strategy
10//!
11//! Initial: Layer 1 (announcements)
12//! ↓ (announcement received)
13//! Add: Layer 2 (events for that repo)
14//! ↓ (PR/Issue received)
15//! Add: Layer 3 (events for that PR/Issue)
16//! ↓ (filter count > 150)
17//! Consolidate: Back to Layer 1 only
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use nostr_sdk::prelude::*;
23
24use super::filter::FilterService;
25
26/// Maximum number of filters before consolidation is triggered
27const CONSOLIDATION_THRESHOLD: usize = 150;
28
29/// Manages subscriptions for a single relay connection
30///
31/// Tracks which announcements and events have been subscribed to,
32/// and handles dynamic subscription updates as new events arrive.
33#[derive(Debug)]
34pub struct SubscriptionManager {
35 /// Event IDs of announcements we've subscribed to (for Layer 2)
36 subscribed_announcements: HashSet<String>,
37 /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
38 subscribed_events: HashSet<String>,
39 /// Whether we've consolidated back to Layer 1 only
40 is_consolidated: bool,
41 /// FilterService for building filters
42 filter_service: Arc<FilterService>,
43 /// Remote relay domain for Layer 2 filters
44 remote_domain: String,
45}
46
47impl SubscriptionManager {
48 /// Create a new SubscriptionManager
49 ///
50 /// # Arguments
51 /// * `filter_service` - FilterService for building subscription filters
52 /// * `remote_domain` - The domain of the remote relay we're syncing from
53 pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
54 Self {
55 subscribed_announcements: HashSet::new(),
56 subscribed_events: HashSet::new(),
57 is_consolidated: false,
58 filter_service,
59 remote_domain,
60 }
61 }
62
63 /// Add an announcement and return new filters to subscribe to
64 ///
65 /// When a new announcement (kind 30617/30618) arrives, this creates
66 /// Layer 2 filters to subscribe to events for that repository.
67 ///
68 /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
69 pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
70 let event_id = event.id.to_hex();
71
72 // Check if already subscribed or consolidated
73 if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
74 return None;
75 }
76
77 // Add to tracked announcements
78 self.subscribed_announcements.insert(event_id);
79
80 // Build Layer 2 filters for this announcement
81 // Layer 2 filters target events with 'a' tags pointing to this repo
82 let filters = self.build_layer2_filter_for_announcement(event);
83
84 if filters.is_empty() {
85 None
86 } else {
87 Some(filters)
88 }
89 }
90
91 /// Add a PR/Issue/Patch event and return new filters to subscribe to
92 ///
93 /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
94 /// this creates Layer 3 filters to subscribe to related events.
95 ///
96 /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
97 pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
98 let event_id = event.id.to_hex();
99
100 // Check if already subscribed or consolidated
101 if self.is_consolidated || self.subscribed_events.contains(&event_id) {
102 return None;
103 }
104
105 // Add to tracked events
106 self.subscribed_events.insert(event_id.clone());
107
108 // Build Layer 3 filter for this event
109 // Layer 3 filters target events with 'e' tags pointing to this event
110 let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id);
111
112 Some(vec![filter])
113 }
114
115 /// Check if consolidation is needed
116 ///
117 /// Returns true if the total filter count exceeds the threshold (150).
118 pub fn should_consolidate(&self) -> bool {
119 !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
120 }
121
122 /// Consolidate all subscriptions back to Layer 1 only
123 ///
124 /// Clears all tracked announcements and events, marks as consolidated,
125 /// and returns the Layer 1 filters to re-subscribe to.
126 pub fn consolidate(&mut self) -> Vec<Filter> {
127 tracing::info!(
128 "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
129 self.subscribed_announcements.len(),
130 self.subscribed_events.len()
131 );
132
133 // Clear tracked subscriptions
134 self.subscribed_announcements.clear();
135 self.subscribed_events.clear();
136 self.is_consolidated = true;
137
138 // Return Layer 1 filters
139 self.filter_service.get_layer1_filters()
140 }
141
142 /// Get the total count of active filters
143 ///
144 /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
145 /// plus the base Layer 1 filter count.
146 pub fn get_filter_count(&self) -> usize {
147 if self.is_consolidated {
148 // When consolidated, we only have Layer 1 filters
149 1
150 } else {
151 // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
152 1 + self.subscribed_announcements.len() + self.subscribed_events.len()
153 }
154 }
155
156 /// Check if an announcement has been subscribed to
157 pub fn has_announcement(&self, event_id: &str) -> bool {
158 self.subscribed_announcements.contains(event_id)
159 }
160
161 /// Check if an event has been subscribed to
162 pub fn has_event(&self, event_id: &str) -> bool {
163 self.subscribed_events.contains(event_id)
164 }
165
166 /// Check if subscriptions have been consolidated
167 pub fn is_consolidated(&self) -> bool {
168 self.is_consolidated
169 }
170
171 /// Get the count of subscribed announcements
172 pub fn announcement_count(&self) -> usize {
173 self.subscribed_announcements.len()
174 }
175
176 /// Get the count of subscribed events
177 pub fn event_count(&self) -> usize {
178 self.subscribed_events.len()
179 }
180
181 /// Build Layer 2 filter for a specific announcement event
182 ///
183 /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
184 fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
185 // Extract the d tag (identifier) from the event
186 let identifier = event.tags.iter().find_map(|tag| {
187 let tag_vec = tag.clone().to_vec();
188 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
189 Some(tag_vec[1].clone())
190 } else {
191 None
192 }
193 });
194
195 let identifier = match identifier {
196 Some(id) => id,
197 None => {
198 tracing::warn!(
199 "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
200 event.id.to_hex()
201 );
202 return Vec::new();
203 }
204 };
205
206 // Verify this is an announcement kind
207 if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
208 tracing::warn!(
209 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
210 event.id.to_hex(),
211 event.kind
212 );
213 return Vec::new();
214 }
215
216 // Build the addressable coordinate: kind:pubkey:identifier
217 let coord = format!(
218 "{}:{}:{}",
219 event.kind.as_u16(),
220 event.pubkey.to_hex(),
221 identifier
222 );
223
224 // Create filter with 'a' tag for this coordinate
225 let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord);
226
227 vec![filter]
228 }
229}