upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs1264
1 files changed, 300 insertions, 964 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 9dec982..c1f8bca 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1,1039 +1,375 @@
1//! Proactive Sync Module for GRASP-02 1//! Proactive Sync Module - GRASP-02 v4 Implementation
2//! 2//!
3//! This module implements the proactive sync system that ensures data availability 3//! This module implements proactive synchronization of repository data from external
4//! for repositories hosted on this relay by syncing from other relays in the ecosystem. 4//! relays based on relay URLs listed in 30617 repository announcements.
5//! 5//!
6//! ## Architecture Overview 6//! ## Architecture
7//! 7//!
8//! The sync system is built around two core data structures: 8//! The sync system uses three index structures:
9//! - `RepoSyncIndex` - What we WANT to sync (source of truth from self-subscription)
10//! - `RelaySyncIndex` - What we have CONFIRMED syncing + connection state
11//! - `PendingSyncIndex` - In-flight batches awaiting EOSE confirmation
9//! 12//!
10//! - **FollowingRepoRootEvents**: Tracks repository root events we're following 13//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
11//! - **SyncRelays**: Tracks relays we sync from, including their repos and events
12//!
13//! These type aliases are colocated with SyncManager (following the pattern of
14//! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity.
15//!
16//! ## Submodules
17//!
18//! - [`health`]: Relay health tracking with exponential backoff and dead relay detection
19//! - [`metrics`]: Prometheus metrics for sync operations
20//!
21//! ## Memory Estimates (from design doc)
22//!
23//! At target scale (1,000 repos, 100 relays):
24//! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB
25//! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB
26//! - **Total in-memory state**: ~10 MB
27//!
28//! ## Upper Bounds (triggers for redesign)
29//!
30//! - 10,000+ repos: Consider database-backed state
31//! - 500+ sync relays: Consider connection pooling
32//! - 500+ root events per repo: Consider per-repo pagination
33//!
34//! ## Design References
35//!
36//! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md)
37//! for the complete design context.
38 14
39use std::collections::{HashMap, HashSet}; 15use std::collections::{HashMap, HashSet};
40use std::net::SocketAddr;
41use std::sync::Arc; 16use std::sync::Arc;
42 17
43use nostr_relay_builder::prelude::{
44 DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy,
45};
46use nostr_sdk::prelude::*; 18use nostr_sdk::prelude::*;
47use nostr_sdk::EventId; 19use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
48use tokio::sync::{mpsc, RwLock}; 20use tokio::sync::RwLock;
49 21
50use crate::config::Config; 22use crate::config::Config;
51use crate::nostr::builder::Nip34WritePolicy; 23use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
52use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
53use crate::nostr::SharedDatabase;
54
55mod relay_connection;
56mod self_subscriber;
57pub use relay_connection::{RelayConnection, RelayEvent};
58pub use self_subscriber::{RelayAction, SelfSubscriber};
59 24
60// ============================================================================= 25// =============================================================================
61// Type Aliases for Sync State 26// Type Aliases for Index Structures
62// ============================================================================= 27// =============================================================================
63 28
64/// Repository root events we're following. 29/// What we WANT to sync - derived from events received via self-subscription.
65/// 30/// Updated immediately when self-subscriber batch fires.
66/// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) 31/// Key: repo addressable ref - 30617:pubkey:identifier
67/// we need to follow for each repository we host. 32pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;
68///
69/// ## Key Format
70///
71/// The key is a repository addressable reference in the format:
72/// `"30617:<pubkey>:<identifier>"`
73///
74/// For example: `"30617:abc123...def:my-project"`
75///
76/// ## Value
77///
78/// A set of event IDs representing root events (PRs, Issues, Patches, Status events)
79/// that reference this repository via an `a` tag.
80///
81/// ## Event Kinds Tracked
82///
83/// - **1617**: Patches (NIP-34)
84/// - **1618**: Issues (NIP-34)
85/// - **1619**: PRs (Pull Requests, NIP-34)
86/// - **1621**: Status events (NIP-34)
87///
88/// ## Invariants
89///
90/// - May include a few extra repo refs that aren't in `SyncRelays`
91/// - This is acceptable - we won't query other relays for them
92/// - Updated incrementally via self-subscription
93///
94/// ## Thread Safety
95///
96/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
97/// async tasks performing sync operations.
98///
99/// ## Example Usage
100///
101/// ```rust,ignore
102/// use ngit_grasp::sync::FollowingRepoRootEvents;
103/// use std::collections::HashSet;
104/// use nostr_sdk::EventId;
105///
106/// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) {
107/// let guard = state.read().await;
108/// if let Some(events) = guard.get(repo_ref) {
109/// println!("Tracking {} root events for {}", events.len(), repo_ref);
110/// }
111/// }
112/// ```
113pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>;
114
115/// Relays we sync from, including their repos and events.
116///
117/// This structure tracks which relays we need to connect to for syncing,
118/// and for each relay, which repositories and their root events we're interested in.
119///
120/// ## Key Format (Outer HashMap)
121///
122/// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"`
123///
124/// ## Value Format (Inner HashMap)
125///
126/// For each relay, we maintain a map of:
127/// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`)
128/// - Value: Set of event IDs for that repo which should be synced from this relay
129///
130/// ## Relay Selection Criteria
131///
132/// A relay is included if its URL appears in a repository announcement (kind 30617)
133/// that **also** lists our service URL. This ensures we only sync from relays
134/// for repositories that are actually hosted on our relay.
135///
136/// ## Bootstrap Relay
137///
138/// If configured, the bootstrap relay is always present in this map and is
139/// excluded from automatic removal logic. The bootstrap relay is used for
140/// initial sync and discovery even when no repositories explicitly list it.
141///
142/// ## Thread Safety
143///
144/// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple
145/// async tasks performing sync operations.
146///
147/// ## Example Usage
148///
149/// ```rust,ignore
150/// use ngit_grasp::sync::SyncRelays;
151/// use std::collections::{HashMap, HashSet};
152///
153/// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) {
154/// let guard = state.read().await;
155/// if let Some(repos) = guard.get(relay_url) {
156/// println!("Relay {} tracks {} repos", relay_url, repos.len());
157/// for (repo_ref, events) in repos {
158/// println!(" {} -> {} events", repo_ref, events.len());
159/// }
160/// }
161/// }
162/// ```
163pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>;
164 33
165/// Creates a new empty `FollowingRepoRootEvents` state. 34/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
166/// 35/// Key: relay URL
167/// Use this to initialize the state before populating from database queries. 36pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;
168pub fn new_following_repo_root_events() -> FollowingRepoRootEvents {
169 Arc::new(RwLock::new(HashMap::new()))
170}
171 37
172/// Creates a new empty `SyncRelays` state. 38/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
173/// 39/// Each batch has its own ID and can confirm independently.
174/// Use this to initialize the state before populating from database queries. 40/// Key: relay URL
175pub fn new_sync_relays() -> SyncRelays { 41pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
176 Arc::new(RwLock::new(HashMap::new()))
177}
178 42
179// ============================================================================= 43// =============================================================================
180// SyncManager 44// Supporting Data Structures
181// ============================================================================= 45// =============================================================================
182 46
183/// Manages proactive synchronization with external relays. 47/// What repos and root events need to be synced
184/// 48#[derive(Debug, Clone, Default)]
185/// The SyncManager is responsible for: 49pub struct RepoSyncNeeds {
186/// - Discovering relays from stored repository announcements 50 /// Relay URLs listed in this repo's 30617 announcement
187/// - Maintaining connections to sync relays 51 pub relays: HashSet<String>,
188/// - Subscribing to events at external relays 52 /// Root event IDs - 1617/1618/1619/1621 - that reference this repo
189/// - Applying the acceptance policy to synced events 53 pub root_events: HashSet<EventId>,
190/// 54}
191/// ## Lifecycle
192///
193/// 1. `new()` - Creates manager with database and config
194/// 2. `run()` - Main async loop (call in a spawned task)
195///
196/// ## Current Status
197///
198/// Phase 2 implementation supports:
199/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter
200/// - Event processing through write policy
201/// - Storage of accepted events
202///
203/// Core data structures:
204/// - [`FollowingRepoRootEvents`]: Repository root events we're following
205/// - [`SyncRelays`]: Relays we sync from with their repos and events
206pub struct SyncManager {
207 /// Bootstrap relay URL if configured
208 bootstrap_relay_url: Option<String>,
209
210 /// Our service domain for filtering repo announcements
211 #[allow(dead_code)]
212 service_domain: String,
213
214 /// Database for querying/storing events
215 database: SharedDatabase,
216
217 /// Write policy for applying acceptance rules
218 write_policy: Nip34WritePolicy,
219
220 /// Repository root events we're following (Phase 1 data structure)
221 #[allow(dead_code)]
222 following_repo_root_events: FollowingRepoRootEvents,
223
224 /// Relays we sync from (Phase 1 data structure)
225 #[allow(dead_code)]
226 sync_relays: SyncRelays,
227
228 /// Max backoff duration for relay reconnection
229 #[allow(dead_code)]
230 max_backoff_secs: u64,
231 55
232 /// Socket address used for sync source (for write policy) 56/// Connection status for a relay
233 sync_source_addr: SocketAddr, 57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub enum ConnectionStatus {
59 /// Not currently connected
60 #[default]
61 Disconnected,
62 /// Connection attempt in progress
63 Connecting,
64 /// Successfully connected and subscribed
65 Connected,
234} 66}
235 67
236impl SyncManager { 68/// Complete state for a single relay - combines sync needs with connection lifecycle
237 /// Creates a new SyncManager. 69#[derive(Debug)]
238 /// 70pub struct RelayState {
239 /// # Arguments 71 /// Repos we have confirmed syncing from this relay
240 /// 72 pub repos: HashSet<String>,
241 /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync 73 /// Root events we have confirmed tracking
242 /// * `service_domain` - Our domain for filtering announcements 74 pub root_events: HashSet<EventId>,
243 /// * `database` - Database for event storage/queries 75 /// If true, never disconnect this relay
244 /// * `write_policy` - Policy for accepting events 76 pub is_bootstrap: bool,
245 /// * `config` - Configuration for sync parameters 77 /// Current connection status
246 pub fn new( 78 pub connection_status: ConnectionStatus,
247 bootstrap_relay_url: Option<String>, 79 /// When we last successfully connected - used for since filter on reconnect
248 service_domain: String, 80 pub last_connected: Option<Timestamp>,
249 database: SharedDatabase, 81 /// When we disconnected - for 15-minute state retention rule
250 write_policy: Nip34WritePolicy, 82 pub disconnected_at: Option<Timestamp>,
251 config: &Config, 83 // The active connection - will be added in Phase 4
252 ) -> Self { 84 // pub connection: Option<RelayConnection>,
253 // Create a synthetic SocketAddr for sync source identification 85}
254 // This is used when calling write_policy.admit_event() for synced events
255 let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
256 86
87impl Default for RelayState {
88 fn default() -> Self {
257 Self { 89 Self {
258 bootstrap_relay_url, 90 repos: HashSet::new(),
259 service_domain, 91 root_events: HashSet::new(),
260 database, 92 is_bootstrap: false,
261 write_policy, 93 connection_status: ConnectionStatus::Disconnected,
262 following_repo_root_events: new_following_repo_root_events(), 94 last_connected: None,
263 sync_relays: new_sync_relays(), 95 disconnected_at: None,
264 max_backoff_secs: config.sync_max_backoff_secs,
265 sync_source_addr,
266 }
267 }
268
269 /// Returns a reference to the following repo root events state.
270 ///
271 /// This is the Phase 1 data structure tracking which repository root events
272 /// (kinds 1617, 1618, 1619, 1621) we're following.
273 pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents {
274 &self.following_repo_root_events
275 }
276
277 /// Returns a reference to the sync relays state.
278 ///
279 /// This is the Phase 1 data structure tracking which relays we sync from
280 /// and their associated repositories/events.
281 pub fn sync_relays(&self) -> &SyncRelays {
282 &self.sync_relays
283 }
284
285 // =========================================================================
286 // Phase 2: Database Initialization
287 // =========================================================================
288
289 /// Initialize sync state from database queries at startup.
290 ///
291 /// This method performs two database queries:
292 /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events`
293 /// 2. Query kind 30617 to build `sync_relays`
294 ///
295 /// The bootstrap relay (if configured) is always added to `sync_relays`.
296 ///
297 /// # Errors
298 ///
299 /// Returns an error if database queries fail.
300 pub async fn initialize_from_database(&self) -> Result<(), String> {
301 // Initialize bootstrap relay if configured (never removed)
302 if let Some(bootstrap_url) = &self.bootstrap_relay_url {
303 self.sync_relays.write().await.insert(
304 bootstrap_url.clone(),
305 HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only)
306 );
307 tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url);
308 }
309
310 // Query 1: Build following_repo_root_events
311 // Find all 1617/1618/1619/1621 events and extract their repo references
312 let root_event_kinds = vec![
313 Kind::GitPatch, // 1617
314 Kind::from(KIND_PR), // 1618
315 Kind::from(KIND_PR_UPDATE), // 1619
316 Kind::GitIssue, // 1621
317 ];
318
319 let filter = Filter::new().kinds(root_event_kinds);
320 let root_events = self
321 .database
322 .query(filter)
323 .await
324 .map_err(|e| format!("Failed to query root events: {}", e))?;
325
326 let mut root_events_count = 0;
327 for event in root_events {
328 // An event may have multiple 'a' tags pointing to different repos
329 let repo_refs = Self::extract_all_repo_refs(&event);
330 for repo_ref in repo_refs {
331 self.following_repo_root_events
332 .write()
333 .await
334 .entry(repo_ref)
335 .or_default()
336 .insert(event.id);
337 root_events_count += 1;
338 }
339 }
340 tracing::info!(
341 "Populated following_repo_root_events with {} repo-event mappings",
342 root_events_count
343 );
344
345 // Query 2: Build sync_relays from kind 30617 announcements
346 let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT));
347 let announcements = self
348 .database
349 .query(announcement_filter)
350 .await
351 .map_err(|e| format!("Failed to query announcements: {}", e))?;
352
353 let mut sync_relays_count = 0;
354 for event in announcements {
355 let repo_ref = Self::build_repo_ref(&event);
356 let relay_urls = Self::extract_relay_urls(&event);
357
358 // Only track repos that list BOTH a remote relay AND our service
359 if self.lists_our_service(&event) {
360 for relay_url in relay_urls {
361 if !self.is_own_relay(&relay_url) {
362 // Get events for this repo from following_repo_root_events
363 let events = self
364 .following_repo_root_events
365 .read()
366 .await
367 .get(&repo_ref)
368 .cloned()
369 .unwrap_or_default();
370
371 self.sync_relays
372 .write()
373 .await
374 .entry(relay_url)
375 .or_default()
376 .insert(repo_ref.clone(), events);
377 sync_relays_count += 1;
378 }
379 }
380 }
381 }
382 tracing::info!(
383 "Populated sync_relays with {} relay-repo mappings",
384 sync_relays_count
385 );
386
387 Ok(())
388 }
389
390 // =========================================================================
391 // Helper Methods for Event Extraction
392 // =========================================================================
393
394 /// Extract ALL repo refs from an event (it may tag multiple repos).
395 ///
396 /// Looks for 'a' tags that reference kind 30617 (repository announcements).
397 /// Returns refs in format "30617:pubkey:identifier".
398 pub fn extract_all_repo_refs(event: &Event) -> Vec<String> {
399 event
400 .tags
401 .iter()
402 .filter_map(|tag| {
403 let tag_vec = tag.clone().to_vec();
404 if tag_vec.len() >= 2 && tag_vec[0] == "a" {
405 // Validate it's a 30617 reference
406 if tag_vec[1].starts_with("30617:") {
407 Some(tag_vec[1].clone())
408 } else {
409 None
410 }
411 } else {
412 None
413 }
414 })
415 .collect()
416 }
417
418 /// Build a repo ref string from a 30617 announcement event.
419 ///
420 /// Returns format "30617:pubkey:identifier".
421 pub fn build_repo_ref(event: &Event) -> String {
422 // Extract 'd' tag for identifier
423 let identifier = event
424 .tags
425 .iter()
426 .find(|tag| tag.kind() == TagKind::d())
427 .and_then(|tag| tag.content())
428 .map(|s| s.to_string())
429 .unwrap_or_default();
430
431 format!("30617:{}:{}", event.pubkey.to_hex(), identifier)
432 }
433
434 /// Extract relay URLs from a repository announcement event.
435 ///
436 /// Looks for the 'relays' tag and returns all relay URLs.
437 pub fn extract_relay_urls(event: &Event) -> Vec<String> {
438 event
439 .tags
440 .iter()
441 .filter(|tag| matches!(tag.kind(), TagKind::Relays))
442 .flat_map(|tag| {
443 let vec = tag.clone().to_vec();
444 // Skip first element (tag name), rest are values
445 vec.into_iter().skip(1)
446 })
447 .collect()
448 }
449
450 /// Check if event lists our service in the relays tag.
451 ///
452 /// Compares relay URLs against our service domain.
453 fn lists_our_service(&self, event: &Event) -> bool {
454 let relay_urls = Self::extract_relay_urls(event);
455 relay_urls.iter().any(|url| self.is_own_relay(url))
456 }
457
458 /// Check if a relay URL matches our relay.
459 ///
460 /// Compares the URL against our service domain.
461 fn is_own_relay(&self, relay_url: &str) -> bool {
462 // Normalize comparison: check if URL contains our domain
463 relay_url.contains(&self.service_domain)
464 }
465
466 // =========================================================================
467 // Main Run Loop
468 // =========================================================================
469
470 /// Runs the sync manager main loop.
471 ///
472 /// This method should be called in a spawned task:
473 ///
474 /// ```rust,ignore
475 /// tokio::spawn(async move {
476 /// sync_manager.run().await;
477 /// });
478 /// ```
479 ///
480 /// ## Implementation Status
481 ///
482 /// - Phase 2: Layer 1 sync from bootstrap relay ✓
483 /// - Phase 3: Self-subscription and relay discovery ✓
484 /// - Phase 4-6: Filter building, connection management (TODO)
485 /// - Phase 7: Full sync loop (TODO)
486 pub async fn run(self) {
487 tracing::info!(
488 "SyncManager starting (bootstrap_relay={:?}, domain={})",
489 self.bootstrap_relay_url,
490 self.service_domain
491 );
492
493 // Phase 3: Initialize state from database BEFORE spawning connections
494 if let Err(e) = self.initialize_from_database().await {
495 tracing::error!("Failed to initialize from database: {}", e);
496 // Continue anyway - we can still sync from bootstrap
497 }
498
499 // Create channel for relay actions from self-subscriber
500 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
501
502 // Construct our own relay URL for self-subscription
503 let own_relay_url = format!("ws://{}", self.service_domain);
504
505 // Spawn self-subscriber task
506 let self_subscriber = SelfSubscriber::new(
507 own_relay_url.clone(),
508 self.service_domain.clone(),
509 Arc::clone(&self.following_repo_root_events),
510 Arc::clone(&self.sync_relays),
511 action_tx,
512 );
513
514 tokio::spawn(async move {
515 self_subscriber.run().await;
516 });
517
518 tracing::info!("SelfSubscriber spawned for {}", own_relay_url);
519
520 // Track active relay connections (relay_url -> event_sender)
521 let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new();
522
523 // Phase 2: Connect to bootstrap relay if configured
524 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
525 if let Some(event_tx) = self
526 .spawn_relay_connection(bootstrap_url.clone(), None)
527 .await
528 {
529 active_relays.insert(bootstrap_url.clone(), event_tx);
530 }
531 }
532
533 // Main coordination loop
534 loop {
535 tokio::select! {
536 // Handle relay actions from self-subscriber
537 action = action_rx.recv() => {
538 match action {
539 Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => {
540 tracing::info!("Spawning new relay connection to {}", relay_url);
541 if !active_relays.contains_key(&relay_url) {
542 if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await {
543 active_relays.insert(relay_url, event_tx);
544 }
545 }
546 }
547 Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => {
548 tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len());
549 // TODO: Implement filter updates for existing connections
550 }
551 None => {
552 tracing::info!("Action channel closed, continuing without self-subscriber");
553 }
554 }
555 }
556 // Sleep to prevent busy loop when no events
557 _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
558 // Periodic maintenance could go here
559 }
560 }
561 }
562 }
563
564 /// Spawn a relay connection with optional Layer 2 filters.
565 ///
566 /// Returns the event sender channel if successfully spawned.
567 async fn spawn_relay_connection(
568 &self,
569 relay_url: String,
570 repos: Option<HashMap<String, HashSet<EventId>>>,
571 ) -> Option<mpsc::Sender<RelayEvent>> {
572 // Create channel for receiving events
573 let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100);
574
575 // Create connection
576 let connection = RelayConnection::new(relay_url.clone());
577
578 // Determine if this is bootstrap (no repos) or discovered relay (with repos)
579 let is_bootstrap = repos.is_none();
580
581 match connection.connect_and_subscribe().await {
582 Ok(()) => {
583 if is_bootstrap {
584 tracing::info!("Bootstrap relay connection established: {}", relay_url);
585 } else {
586 tracing::info!(
587 "Discovered relay connection established: {} (with Layer 2 filters)",
588 relay_url
589 );
590
591 // Add Layer 2 subscription for repo events
592 if let Some(ref repos) = repos {
593 if let Err(e) = self.add_layer2_subscription(&connection, repos).await {
594 tracing::warn!("Failed to add Layer 2 subscription: {}", e);
595 }
596 }
597 }
598
599 // Clone refs needed for event processing task
600 let database = Arc::clone(&self.database);
601 let write_policy = self.write_policy.clone();
602 let sync_source_addr = self.sync_source_addr;
603
604 // Clone event_tx for the spawned task
605 let event_tx_clone = event_tx.clone();
606
607 // Spawn event loop task
608 let conn_url = relay_url.clone();
609 tokio::spawn(async move {
610 connection.run_event_loop(event_tx_clone).await;
611 });
612
613 // Spawn event processing task
614 tokio::spawn(async move {
615 Self::process_relay_events(
616 event_rx,
617 database,
618 write_policy,
619 sync_source_addr,
620 conn_url,
621 )
622 .await;
623 });
624
625 Some(event_tx)
626 }
627 Err(e) => {
628 tracing::error!("Failed to connect to relay {}: {}", relay_url, e);
629 None
630 }
631 }
632 }
633
634 /// Add Layer 2 subscription for repo-related events.
635 ///
636 /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track.
637 async fn add_layer2_subscription(
638 &self,
639 connection: &RelayConnection,
640 repos: &HashMap<String, HashSet<EventId>>,
641 ) -> Result<(), String> {
642 if repos.is_empty() {
643 return Ok(());
644 }
645
646 // Build repo refs list for filter
647 let repo_refs: Vec<String> = repos.keys().cloned().collect();
648
649 tracing::debug!(
650 "Adding Layer 2 subscription for {} repos to {}",
651 repo_refs.len(),
652 connection.url()
653 );
654
655 // Chunk repo_refs into groups of 100 (per plan)
656 for chunk in repo_refs.chunks(100) {
657 // Build filter with lowercase 'a' tag for each repo ref
658 let mut filter = Filter::new().kinds([
659 Kind::GitPatch, // 1617
660 Kind::Custom(1618), // PR
661 Kind::Custom(1619), // PR update
662 Kind::GitIssue, // 1621
663 ]);
664
665 // Add each repo ref as a custom tag filter
666 for repo_ref in chunk {
667 filter =
668 filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone());
669 }
670
671 // Subscribe to this filter
672 if let Err(e) = connection.subscribe_filter(filter).await {
673 return Err(format!("Failed to subscribe with Layer 2 filter: {}", e));
674 }
675 } 96 }
676
677 Ok(())
678 } 97 }
98}
679 99
680 /// Process events from a single relay connection. 100impl RelayState {
681 /// 101 /// Check if state should be cleared based on 15-minute rule
682 /// This is a static method that runs in its own task. 102 pub fn should_clear_state(&self) -> bool {
683 async fn process_relay_events( 103 match self.disconnected_at {
684 mut event_rx: mpsc::Receiver<RelayEvent>, 104 Some(disconnected) => {
685 database: SharedDatabase, 105 let now = Timestamp::now();
686 write_policy: Nip34WritePolicy, 106 now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes
687 sync_source_addr: SocketAddr,
688 relay_url: String,
689 ) {
690 tracing::debug!("Starting event processing for relay: {}", relay_url);
691
692 while let Some(relay_event) = event_rx.recv().await {
693 match relay_event {
694 RelayEvent::Event(event) => {
695 Self::process_single_event_static(
696 &event,
697 &database,
698 &write_policy,
699 &sync_source_addr,
700 &relay_url,
701 )
702 .await;
703 }
704 RelayEvent::EndOfStoredEvents => {
705 tracing::debug!("EOSE received from {}", relay_url);
706 }
707 RelayEvent::Closed(reason) => {
708 tracing::warn!("Connection to {} closed: {}", relay_url, reason);
709 break;
710 }
711 } 107 }
108 None => false, // Still connected or never connected
712 } 109 }
713
714 tracing::info!("Event processing ended for relay: {}", relay_url);
715 } 110 }
716 111
717 /// Process a single event (static version for use in spawned tasks). 112 /// Clear repos and root_events - called when reconnect takes > 15 minutes
718 async fn process_single_event_static( 113 pub fn clear_sync_state(&mut self) {
719 event: &Event, 114 self.repos.clear();
720 database: &SharedDatabase, 115 self.root_events.clear();
721 write_policy: &Nip34WritePolicy,
722 sync_source_addr: &SocketAddr,
723 relay_url: &str,
724 ) {
725 let event_id = event.id;
726 let kind = event.kind.as_u16();
727
728 // Check if event already exists in database
729 match database.check_id(&event_id).await {
730 Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => {
731 tracing::trace!("Event {} already exists, skipping", event_id);
732 return;
733 }
734 Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing
735 Err(e) => {
736 tracing::warn!("Failed to check if event {} exists: {}", event_id, e);
737 }
738 }
739
740 // Pass through write policy
741 let policy_result = write_policy.admit_event(event, sync_source_addr).await;
742
743 match policy_result {
744 PolicyResult::Accept => match database.save_event(event).await {
745 Ok(SaveEventStatus::Success) => {
746 tracing::info!(
747 "Synced event {} (kind {}) from {}",
748 event_id,
749 kind,
750 relay_url
751 );
752 }
753 Ok(_) => {
754 tracing::debug!(
755 "Event {} (kind {}) already stored or rejected by database",
756 event_id,
757 kind
758 );
759 }
760 Err(e) => {
761 tracing::error!("Failed to save synced event {}: {}", event_id, e);
762 }
763 },
764 PolicyResult::Reject(reason) => {
765 tracing::debug!(
766 "Rejected synced event {} (kind {}): {}",
767 event_id,
768 kind,
769 reason
770 );
771 }
772 }
773 } 116 }
774} 117}
775 118
776// ============================================================================= 119/// A batch of items pending EOSE confirmation
777// Submodules 120#[derive(Debug, Clone)]
778// ============================================================================= 121pub struct PendingBatch {
779 122 /// Unique ID for this batch - for debugging/logging
780pub mod health; 123 pub batch_id: u64,
781pub mod metrics; 124 /// The items this batch is syncing
125 pub items: PendingItems,
126 /// Subscription IDs that must ALL receive EOSE before confirming
127 pub outstanding_subs: HashSet<SubscriptionId>,
128}
782 129
783// Re-export commonly used types 130/// Items included in a pending batch
784pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; 131#[derive(Debug, Clone, Default)]
785pub use metrics::{event_source, SyncMetrics}; 132pub struct PendingItems {
133 /// Repos being synced in this batch
134 pub repos: HashSet<String>,
135 /// Root events being synced in this batch
136 pub root_events: HashSet<EventId>,
137}
786 138
787// ============================================================================= 139// =============================================================================
788// Tests 140// SyncMetrics - Prometheus Metrics for Sync System
789// ============================================================================= 141// =============================================================================
790 142
791#[cfg(test)] 143/// Prometheus metrics for the proactive sync system.
792mod tests { 144///
793 use super::*; 145/// Tracks relay connections, sync progress, and operational statistics.
794 use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; 146/// Following the comprehensive v3 metrics design.
147#[derive(Clone)]
148pub struct SyncMetrics {
149 // === Connection metrics ===
150 /// Per-relay connection status (1=connected, 0=disconnected)
151 relay_connected: IntGaugeVec,
152 /// Connection attempts by relay and result (success/failure)
153 connection_attempts_total: IntCounterVec,
154
155 // === Event metrics ===
156 /// Events synced by source (live/startup/reconnect/daily)
157 events_total: IntCounterVec,
158
159 // === Summary metrics ===
160 /// Total relays discovered and tracked
161 relays_tracked_total: IntGauge,
162 /// Currently connected relay count
163 relays_connected_total: IntGauge,
164}
795 165
796 /// Helper to create a test event with specific tags 166impl SyncMetrics {
797 fn create_test_event(kind: Kind, tags: Vec<Tag>) -> Event { 167 /// Register sync metrics with a Prometheus registry.
798 let keys = Keys::generate(); 168 ///
799 EventBuilder::new(kind, "test content") 169 /// Returns an error if metrics are already registered (e.g., in tests).
800 .tags(tags) 170 pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> {
801 .sign_with_keys(&keys) 171 // Connection metrics
802 .expect("Failed to sign test event") 172 let relay_connected = IntGaugeVec::new(
173 Opts::new(
174 "ngit_sync_relay_connected",
175 "Relay connection status (1=connected, 0=disconnected)",
176 ),
177 &["relay"],
178 )?;
179 registry.register(Box::new(relay_connected.clone()))?;
180
181 let connection_attempts_total = IntCounterVec::new(
182 Opts::new(
183 "ngit_sync_connection_attempts_total",
184 "Total connection attempts by relay and result",
185 ),
186 &["relay", "result"],
187 )?;
188 registry.register(Box::new(connection_attempts_total.clone()))?;
189
190 // Event metrics
191 let events_total = IntCounterVec::new(
192 Opts::new(
193 "ngit_sync_events_total",
194 "Total events synced by source type",
195 ),
196 &["source"],
197 )?;
198 registry.register(Box::new(events_total.clone()))?;
199
200 // Summary metrics
201 let relays_tracked_total = IntGauge::with_opts(Opts::new(
202 "ngit_sync_relays_tracked_total",
203 "Total number of relays discovered and tracked",
204 ))?;
205 registry.register(Box::new(relays_tracked_total.clone()))?;
206
207 let relays_connected_total = IntGauge::with_opts(Opts::new(
208 "ngit_sync_relays_connected_total",
209 "Number of currently connected relays",
210 ))?;
211 registry.register(Box::new(relays_connected_total.clone()))?;
212
213 Ok(Self {
214 relay_connected,
215 connection_attempts_total,
216 events_total,
217 relays_tracked_total,
218 relays_connected_total,
219 })
803 } 220 }
804 221
805 // ========================================================================= 222 // === Connection Recording Methods ===
806 // Tests for extract_all_repo_refs
807 // =========================================================================
808 223
809 #[test] 224 /// Record a connection attempt (success or failure)
810 fn test_extract_all_repo_refs_single_ref() { 225 pub fn record_connection_attempt(&self, relay: &str, success: bool) {
811 let event = create_test_event( 226 let result = if success { "success" } else { "failure" };
812 Kind::GitPatch, 227 self.connection_attempts_total
813 vec![Tag::custom( 228 .with_label_values(&[relay, result])
814 nostr_relay_builder::prelude::TagKind::custom("a"), 229 .inc();
815 vec!["30617:abc123def456:my-project"],
816 )],
817 );
818
819 let refs = SyncManager::extract_all_repo_refs(&event);
820 assert_eq!(refs.len(), 1);
821 assert_eq!(refs[0], "30617:abc123def456:my-project");
822 } 230 }
823 231
824 #[test] 232 /// Set relay connection status
825 fn test_extract_all_repo_refs_multiple_refs() { 233 pub fn set_relay_connected(&self, relay: &str, connected: bool) {
826 let event = create_test_event( 234 self.relay_connected
827 Kind::GitPatch, 235 .with_label_values(&[relay])
828 vec![ 236 .set(if connected { 1 } else { 0 });
829 Tag::custom(
830 nostr_relay_builder::prelude::TagKind::custom("a"),
831 vec!["30617:abc123:project1"],
832 ),
833 Tag::custom(
834 nostr_relay_builder::prelude::TagKind::custom("a"),
835 vec!["30617:def456:project2"],
836 ),
837 ],
838 );
839
840 let refs = SyncManager::extract_all_repo_refs(&event);
841 assert_eq!(refs.len(), 2);
842 assert!(refs.contains(&"30617:abc123:project1".to_string()));
843 assert!(refs.contains(&"30617:def456:project2".to_string()));
844 } 237 }
845 238
846 #[test] 239 /// Increment connected count
847 fn test_extract_all_repo_refs_ignores_non_30617() { 240 pub fn inc_connected_count(&self) {
848 let event = create_test_event( 241 self.relays_connected_total.inc();
849 Kind::GitPatch,
850 vec![
851 Tag::custom(
852 nostr_relay_builder::prelude::TagKind::custom("a"),
853 vec!["30617:abc123:valid-repo"],
854 ),
855 Tag::custom(
856 nostr_relay_builder::prelude::TagKind::custom("a"),
857 vec!["30618:def456:state-event"], // Not a repo ref
858 ),
859 ],
860 );
861
862 let refs = SyncManager::extract_all_repo_refs(&event);
863 assert_eq!(refs.len(), 1);
864 assert_eq!(refs[0], "30617:abc123:valid-repo");
865 } 242 }
866 243
867 #[test] 244 /// Decrement connected count
868 fn test_extract_all_repo_refs_empty_when_no_a_tags() { 245 pub fn dec_connected_count(&self) {
869 let event = create_test_event( 246 self.relays_connected_total.dec();
870 Kind::GitPatch,
871 vec![Tag::custom(
872 nostr_relay_builder::prelude::TagKind::custom("e"),
873 vec!["some-event-id"],
874 )],
875 );
876
877 let refs = SyncManager::extract_all_repo_refs(&event);
878 assert!(refs.is_empty());
879 } 247 }
880 248
881 // ========================================================================= 249 // === Event Recording Methods ===
882 // Tests for build_repo_ref
883 // =========================================================================
884 250
885 #[test] 251 /// Record a synced event by source type
886 fn test_build_repo_ref() { 252 ///
887 let keys = Keys::generate(); 253 /// Source types:
888 let event = EventBuilder::new(Kind::from(30617_u16), "announcement") 254 /// - "live" - Real-time subscription events
889 .tags(vec![Tag::custom( 255 /// - "startup" - Events from startup catchup
890 nostr_relay_builder::prelude::TagKind::d(), 256 /// - "reconnect" - Events from reconnection catchup
891 vec!["my-identifier"], 257 pub fn record_event(&self, source: &str) {
892 )]) 258 self.events_total.with_label_values(&[source]).inc();
893 .sign_with_keys(&keys)
894 .expect("Failed to sign test event");
895
896 let repo_ref = SyncManager::build_repo_ref(&event);
897 assert!(repo_ref.starts_with("30617:"));
898 assert!(repo_ref.ends_with(":my-identifier"));
899 assert!(repo_ref.contains(&event.pubkey.to_hex()));
900 } 259 }
901 260
902 #[test] 261 /// Record multiple events synced by source type
903 fn test_build_repo_ref_empty_identifier() { 262 pub fn record_events(&self, source: &str, count: u64) {
904 let keys = Keys::generate(); 263 self.events_total
905 let event = EventBuilder::new(Kind::from(30617_u16), "announcement") 264 .with_label_values(&[source])
906 .sign_with_keys(&keys) 265 .inc_by(count);
907 .expect("Failed to sign test event");
908
909 let repo_ref = SyncManager::build_repo_ref(&event);
910 assert!(repo_ref.starts_with("30617:"));
911 assert!(repo_ref.ends_with(":")); // Empty identifier
912 } 266 }
913 267
914 // ========================================================================= 268 // === Summary Recording Methods ===
915 // Tests for extract_relay_urls
916 // =========================================================================
917
918 #[test]
919 fn test_extract_relay_urls_single() {
920 let event = create_test_event(
921 Kind::from(30617_u16),
922 vec![Tag::custom(
923 nostr_relay_builder::prelude::TagKind::Relays,
924 vec!["wss://relay.example.com"],
925 )],
926 );
927 269
928 let urls = SyncManager::extract_relay_urls(&event); 270 /// Set the total tracked relay count
929 assert_eq!(urls.len(), 1); 271 pub fn set_tracked_count(&self, count: i64) {
930 assert_eq!(urls[0], "wss://relay.example.com"); 272 self.relays_tracked_total.set(count);
931 } 273 }
932 274
933 #[test] 275 /// Increment tracked relay count
934 fn test_extract_relay_urls_multiple() { 276 pub fn inc_tracked_count(&self) {
935 let event = create_test_event( 277 self.relays_tracked_total.inc();
936 Kind::from(30617_u16),
937 vec![Tag::custom(
938 nostr_relay_builder::prelude::TagKind::Relays,
939 vec!["wss://relay1.example.com", "wss://relay2.example.com"],
940 )],
941 );
942
943 let urls = SyncManager::extract_relay_urls(&event);
944 assert_eq!(urls.len(), 2);
945 assert!(urls.contains(&"wss://relay1.example.com".to_string()));
946 assert!(urls.contains(&"wss://relay2.example.com".to_string()));
947 } 278 }
948 279
949 #[test] 280 /// Get current tracked relay count
950 fn test_extract_relay_urls_empty_when_no_relays_tag() { 281 pub fn get_tracked_count(&self) -> i64 {
951 let event = create_test_event( 282 self.relays_tracked_total.get()
952 Kind::from(30617_u16),
953 vec![Tag::custom(
954 nostr_relay_builder::prelude::TagKind::custom("d"),
955 vec!["my-project"],
956 )],
957 );
958
959 let urls = SyncManager::extract_relay_urls(&event);
960 assert!(urls.is_empty());
961 } 283 }
962 284
963 // ========================================================================= 285 /// Get current connected relay count
964 // Original data structure tests 286 pub fn get_connected_count(&self) -> i64 {
965 // ========================================================================= 287 self.relays_connected_total.get()
966
967 #[tokio::test]
968 async fn test_following_repo_root_events_basic_operations() {
969 let state = new_following_repo_root_events();
970
971 // Insert some events
972 {
973 let mut guard = state.write().await;
974 let repo_ref = "30617:abc123:my-project".to_string();
975 guard
976 .entry(repo_ref)
977 .or_default()
978 .insert(EventId::all_zeros());
979 }
980
981 // Read back
982 {
983 let guard = state.read().await;
984 assert_eq!(guard.len(), 1);
985 assert!(guard.contains_key("30617:abc123:my-project"));
986 }
987 } 288 }
289}
988 290
989 #[tokio::test] 291/// Event source types for metrics tracking
990 async fn test_sync_relays_basic_operations() { 292pub mod event_source {
991 let state = new_sync_relays(); 293 /// Real-time subscription events
294 pub const LIVE: &str = "live";
295 /// Events from startup catchup
296 pub const STARTUP: &str = "startup";
297 /// Events from reconnection catchup
298 pub const RECONNECT: &str = "reconnect";
299}
992 300
993 // Insert relay with repos 301// =============================================================================
994 { 302// SyncManager - Main Entry Point
995 let mut guard = state.write().await; 303// =============================================================================
996 let relay_url = "wss://relay.example.com".to_string();
997 let repo_ref = "30617:abc123:my-project".to_string();
998 304
999 guard 305/// Manages proactive synchronization with external relays
1000 .entry(relay_url) 306///
1001 .or_default() 307/// The SyncManager runs as a background task, subscribing to repository
1002 .entry(repo_ref) 308/// announcements on the local relay and syncing data from external relays
1003 .or_default() 309/// listed in those announcements.
1004 .insert(EventId::all_zeros()); 310#[allow(dead_code)] // Fields will be used in later phases
1005 } 311pub struct SyncManager {
312 /// Bootstrap relay URL for initial sync (optional)
313 bootstrap_relay_url: Option<String>,
314 /// Our service domain - used for filtering relevant repos
315 service_domain: String,
316 /// Database for event storage and queries
317 database: SharedDatabase,
318 /// Write policy for validating incoming events
319 write_policy: Nip34WritePolicy,
320 /// Configuration reference for sync settings
321 config: Config,
322 /// What we want to sync (source of truth)
323 repo_sync_index: RepoSyncIndex,
324 /// What we've confirmed syncing + connection state
325 relay_sync_index: RelaySyncIndex,
326 /// In-flight subscription batches
327 pending_sync_index: PendingSyncIndex,
328}
1006 329
1007 // Read back 330impl SyncManager {
1008 { 331 /// Create a new SyncManager
1009 let guard = state.read().await; 332 ///
1010 assert_eq!(guard.len(), 1); 333 /// # Arguments
1011 let relay_repos = guard.get("wss://relay.example.com").unwrap(); 334 /// * `bootstrap_relay_url` - Optional relay URL for initial historical sync
1012 assert_eq!(relay_repos.len(), 1); 335 /// * `service_domain` - The domain this relay serves (for filtering repos)
1013 let events = relay_repos.get("30617:abc123:my-project").unwrap(); 336 /// * `database` - Shared database for event storage
1014 assert_eq!(events.len(), 1); 337 /// * `write_policy` - Policy for validating events before storage
338 /// * `config` - Configuration for sync settings
339 pub fn new(
340 bootstrap_relay_url: Option<String>,
341 service_domain: String,
342 database: SharedDatabase,
343 write_policy: Nip34WritePolicy,
344 config: &Config,
345 ) -> Self {
346 Self {
347 bootstrap_relay_url,
348 service_domain,
349 database,
350 write_policy,
351 config: config.clone(),
352 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
353 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
354 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
1015 } 355 }
1016 } 356 }
1017 357
1018 #[tokio::test] 358 /// Run the sync manager (placeholder for Phase 1)
1019 async fn test_concurrent_access() { 359 ///
1020 let state = new_following_repo_root_events(); 360 /// This will be implemented in later phases to:
1021 let state_clone = Arc::clone(&state); 361 /// 1. Subscribe to local relay for 30617 events
1022 362 /// 2. Process events to build RepoSyncIndex
1023 // Writer task 363 /// 3. Compute and execute sync actions
1024 let writer = tokio::spawn(async move { 364 /// 4. Handle reconnection and catch-up logic
1025 let mut guard = state_clone.write().await; 365 pub async fn run(self) {
1026 guard 366 tracing::info!(
1027 .entry("30617:writer:repo".to_string()) 367 bootstrap_relay = ?self.bootstrap_relay_url,
1028 .or_default() 368 service_domain = %self.service_domain,
1029 .insert(EventId::all_zeros()); 369 "SyncManager starting (placeholder - not yet implemented)"
1030 }); 370 );
1031
1032 // Wait for writer
1033 writer.await.unwrap();
1034 371
1035 // Reader should see the change 372 // Phase 1: Just log and return
1036 let guard = state.read().await; 373 // Full implementation will be added in subsequent phases
1037 assert!(guard.contains_key("30617:writer:repo"));
1038 } 374 }
1039} 375} \ No newline at end of file