diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 10:33:07 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 10:33:07 +0000 |
| commit | 586fc2a7df1ce256469f0742d23f687ac4b075b1 (patch) | |
| tree | dc07dbec88ea1ca2e80b4ced91831256bb68ce4e /src/sync/mod.rs | |
| parent | 2bc95d7652ea7a8a53424fa9fffe3579c9fdff5b (diff) | |
stub of sync v4
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 1264 |
1 files changed, 300 insertions, 964 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9dec982..c1f8bca 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1,1039 +1,375 @@ | |||
| 1 | //! Proactive Sync Module for GRASP-02 | 1 | //! Proactive Sync Module - GRASP-02 v4 Implementation |
| 2 | //! | 2 | //! |
| 3 | //! This module implements the proactive sync system that ensures data availability | 3 | //! This module implements proactive synchronization of repository data from external |
| 4 | //! for repositories hosted on this relay by syncing from other relays in the ecosystem. | 4 | //! relays based on relay URLs listed in 30617 repository announcements. |
| 5 | //! | 5 | //! |
| 6 | //! ## Architecture Overview | 6 | //! ## Architecture |
| 7 | //! | 7 | //! |
| 8 | //! The sync system is built around two core data structures: | 8 | //! The sync system uses three index structures: |
| 9 | //! - `RepoSyncIndex` - What we WANT to sync (source of truth from self-subscription) | ||
| 10 | //! - `RelaySyncIndex` - What we have CONFIRMED syncing + connection state | ||
| 11 | //! - `PendingSyncIndex` - In-flight batches awaiting EOSE confirmation | ||
| 9 | //! | 12 | //! |
| 10 | //! - **FollowingRepoRootEvents**: Tracks repository root events we're following | 13 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. |
| 11 | //! - **SyncRelays**: Tracks relays we sync from, including their repos and events | ||
| 12 | //! | ||
| 13 | //! These type aliases are colocated with SyncManager (following the pattern of | ||
| 14 | //! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity. | ||
| 15 | //! | ||
| 16 | //! ## Submodules | ||
| 17 | //! | ||
| 18 | //! - [`health`]: Relay health tracking with exponential backoff and dead relay detection | ||
| 19 | //! - [`metrics`]: Prometheus metrics for sync operations | ||
| 20 | //! | ||
| 21 | //! ## Memory Estimates (from design doc) | ||
| 22 | //! | ||
| 23 | //! At target scale (1,000 repos, 100 relays): | ||
| 24 | //! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB | ||
| 25 | //! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB | ||
| 26 | //! - **Total in-memory state**: ~10 MB | ||
| 27 | //! | ||
| 28 | //! ## Upper Bounds (triggers for redesign) | ||
| 29 | //! | ||
| 30 | //! - 10,000+ repos: Consider database-backed state | ||
| 31 | //! - 500+ sync relays: Consider connection pooling | ||
| 32 | //! - 500+ root events per repo: Consider per-repo pagination | ||
| 33 | //! | ||
| 34 | //! ## Design References | ||
| 35 | //! | ||
| 36 | //! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md) | ||
| 37 | //! for the complete design context. | ||
| 38 | 14 | ||
| 39 | use std::collections::{HashMap, HashSet}; | 15 | use std::collections::{HashMap, HashSet}; |
| 40 | use std::net::SocketAddr; | ||
| 41 | use std::sync::Arc; | 16 | use std::sync::Arc; |
| 42 | 17 | ||
| 43 | use nostr_relay_builder::prelude::{ | ||
| 44 | DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, | ||
| 45 | }; | ||
| 46 | use nostr_sdk::prelude::*; | 18 | use nostr_sdk::prelude::*; |
| 47 | use nostr_sdk::EventId; | 19 | use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; |
| 48 | use tokio::sync::{mpsc, RwLock}; | 20 | use tokio::sync::RwLock; |
| 49 | 21 | ||
| 50 | use crate::config::Config; | 22 | use crate::config::Config; |
| 51 | use crate::nostr::builder::Nip34WritePolicy; | 23 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 52 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | ||
| 53 | use crate::nostr::SharedDatabase; | ||
| 54 | |||
| 55 | mod relay_connection; | ||
| 56 | mod self_subscriber; | ||
| 57 | pub use relay_connection::{RelayConnection, RelayEvent}; | ||
| 58 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | ||
| 59 | 24 | ||
| 60 | // ============================================================================= | 25 | // ============================================================================= |
| 61 | // Type Aliases for Sync State | 26 | // Type Aliases for Index Structures |
| 62 | // ============================================================================= | 27 | // ============================================================================= |
| 63 | 28 | ||
| 64 | /// Repository root events we're following. | 29 | /// What we WANT to sync - derived from events received via self-subscription. |
| 65 | /// | 30 | /// Updated immediately when self-subscriber batch fires. |
| 66 | /// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) | 31 | /// Key: repo addressable ref - 30617:pubkey:identifier |
| 67 | /// we need to follow for each repository we host. | 32 | pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>; |
| 68 | /// | ||
| 69 | /// ## Key Format | ||
| 70 | /// | ||
| 71 | /// The key is a repository addressable reference in the format: | ||
| 72 | /// `"30617:<pubkey>:<identifier>"` | ||
| 73 | /// | ||
| 74 | /// For example: `"30617:abc123...def:my-project"` | ||
| 75 | /// | ||
| 76 | /// ## Value | ||
| 77 | /// | ||
| 78 | /// A set of event IDs representing root events (PRs, Issues, Patches, Status events) | ||
| 79 | /// that reference this repository via an `a` tag. | ||
| 80 | /// | ||
| 81 | /// ## Event Kinds Tracked | ||
| 82 | /// | ||
| 83 | /// - **1617**: Patches (NIP-34) | ||
| 84 | /// - **1618**: Issues (NIP-34) | ||
| 85 | /// - **1619**: PRs (Pull Requests, NIP-34) | ||
| 86 | /// - **1621**: Status events (NIP-34) | ||
| 87 | /// | ||
| 88 | /// ## Invariants | ||
| 89 | /// | ||
| 90 | /// - May include a few extra repo refs that aren't in `SyncRelays` | ||
| 91 | /// - This is acceptable - we won't query other relays for them | ||
| 92 | /// - Updated incrementally via self-subscription | ||
| 93 | /// | ||
| 94 | /// ## Thread Safety | ||
| 95 | /// | ||
| 96 | /// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple | ||
| 97 | /// async tasks performing sync operations. | ||
| 98 | /// | ||
| 99 | /// ## Example Usage | ||
| 100 | /// | ||
| 101 | /// ```rust,ignore | ||
| 102 | /// use ngit_grasp::sync::FollowingRepoRootEvents; | ||
| 103 | /// use std::collections::HashSet; | ||
| 104 | /// use nostr_sdk::EventId; | ||
| 105 | /// | ||
| 106 | /// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) { | ||
| 107 | /// let guard = state.read().await; | ||
| 108 | /// if let Some(events) = guard.get(repo_ref) { | ||
| 109 | /// println!("Tracking {} root events for {}", events.len(), repo_ref); | ||
| 110 | /// } | ||
| 111 | /// } | ||
| 112 | /// ``` | ||
| 113 | pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>; | ||
| 114 | |||
| 115 | /// Relays we sync from, including their repos and events. | ||
| 116 | /// | ||
| 117 | /// This structure tracks which relays we need to connect to for syncing, | ||
| 118 | /// and for each relay, which repositories and their root events we're interested in. | ||
| 119 | /// | ||
| 120 | /// ## Key Format (Outer HashMap) | ||
| 121 | /// | ||
| 122 | /// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"` | ||
| 123 | /// | ||
| 124 | /// ## Value Format (Inner HashMap) | ||
| 125 | /// | ||
| 126 | /// For each relay, we maintain a map of: | ||
| 127 | /// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`) | ||
| 128 | /// - Value: Set of event IDs for that repo which should be synced from this relay | ||
| 129 | /// | ||
| 130 | /// ## Relay Selection Criteria | ||
| 131 | /// | ||
| 132 | /// A relay is included if its URL appears in a repository announcement (kind 30617) | ||
| 133 | /// that **also** lists our service URL. This ensures we only sync from relays | ||
| 134 | /// for repositories that are actually hosted on our relay. | ||
| 135 | /// | ||
| 136 | /// ## Bootstrap Relay | ||
| 137 | /// | ||
| 138 | /// If configured, the bootstrap relay is always present in this map and is | ||
| 139 | /// excluded from automatic removal logic. The bootstrap relay is used for | ||
| 140 | /// initial sync and discovery even when no repositories explicitly list it. | ||
| 141 | /// | ||
| 142 | /// ## Thread Safety | ||
| 143 | /// | ||
| 144 | /// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple | ||
| 145 | /// async tasks performing sync operations. | ||
| 146 | /// | ||
| 147 | /// ## Example Usage | ||
| 148 | /// | ||
| 149 | /// ```rust,ignore | ||
| 150 | /// use ngit_grasp::sync::SyncRelays; | ||
| 151 | /// use std::collections::{HashMap, HashSet}; | ||
| 152 | /// | ||
| 153 | /// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) { | ||
| 154 | /// let guard = state.read().await; | ||
| 155 | /// if let Some(repos) = guard.get(relay_url) { | ||
| 156 | /// println!("Relay {} tracks {} repos", relay_url, repos.len()); | ||
| 157 | /// for (repo_ref, events) in repos { | ||
| 158 | /// println!(" {} -> {} events", repo_ref, events.len()); | ||
| 159 | /// } | ||
| 160 | /// } | ||
| 161 | /// } | ||
| 162 | /// ``` | ||
| 163 | pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>; | ||
| 164 | 33 | ||
| 165 | /// Creates a new empty `FollowingRepoRootEvents` state. | 34 | /// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. |
| 166 | /// | 35 | /// Key: relay URL |
| 167 | /// Use this to initialize the state before populating from database queries. | 36 | pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>; |
| 168 | pub fn new_following_repo_root_events() -> FollowingRepoRootEvents { | ||
| 169 | Arc::new(RwLock::new(HashMap::new())) | ||
| 170 | } | ||
| 171 | 37 | ||
| 172 | /// Creates a new empty `SyncRelays` state. | 38 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. |
| 173 | /// | 39 | /// Each batch has its own ID and can confirm independently. |
| 174 | /// Use this to initialize the state before populating from database queries. | 40 | /// Key: relay URL |
| 175 | pub fn new_sync_relays() -> SyncRelays { | 41 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; |
| 176 | Arc::new(RwLock::new(HashMap::new())) | ||
| 177 | } | ||
| 178 | 42 | ||
| 179 | // ============================================================================= | 43 | // ============================================================================= |
| 180 | // SyncManager | 44 | // Supporting Data Structures |
| 181 | // ============================================================================= | 45 | // ============================================================================= |
| 182 | 46 | ||
| 183 | /// Manages proactive synchronization with external relays. | 47 | /// What repos and root events need to be synced |
| 184 | /// | 48 | #[derive(Debug, Clone, Default)] |
| 185 | /// The SyncManager is responsible for: | 49 | pub struct RepoSyncNeeds { |
| 186 | /// - Discovering relays from stored repository announcements | 50 | /// Relay URLs listed in this repo's 30617 announcement |
| 187 | /// - Maintaining connections to sync relays | 51 | pub relays: HashSet<String>, |
| 188 | /// - Subscribing to events at external relays | 52 | /// Root event IDs - 1617/1618/1619/1621 - that reference this repo |
| 189 | /// - Applying the acceptance policy to synced events | 53 | pub root_events: HashSet<EventId>, |
| 190 | /// | 54 | } |
| 191 | /// ## Lifecycle | ||
| 192 | /// | ||
| 193 | /// 1. `new()` - Creates manager with database and config | ||
| 194 | /// 2. `run()` - Main async loop (call in a spawned task) | ||
| 195 | /// | ||
| 196 | /// ## Current Status | ||
| 197 | /// | ||
| 198 | /// Phase 2 implementation supports: | ||
| 199 | /// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter | ||
| 200 | /// - Event processing through write policy | ||
| 201 | /// - Storage of accepted events | ||
| 202 | /// | ||
| 203 | /// Core data structures: | ||
| 204 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following | ||
| 205 | /// - [`SyncRelays`]: Relays we sync from with their repos and events | ||
| 206 | pub struct SyncManager { | ||
| 207 | /// Bootstrap relay URL if configured | ||
| 208 | bootstrap_relay_url: Option<String>, | ||
| 209 | |||
| 210 | /// Our service domain for filtering repo announcements | ||
| 211 | #[allow(dead_code)] | ||
| 212 | service_domain: String, | ||
| 213 | |||
| 214 | /// Database for querying/storing events | ||
| 215 | database: SharedDatabase, | ||
| 216 | |||
| 217 | /// Write policy for applying acceptance rules | ||
| 218 | write_policy: Nip34WritePolicy, | ||
| 219 | |||
| 220 | /// Repository root events we're following (Phase 1 data structure) | ||
| 221 | #[allow(dead_code)] | ||
| 222 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 223 | |||
| 224 | /// Relays we sync from (Phase 1 data structure) | ||
| 225 | #[allow(dead_code)] | ||
| 226 | sync_relays: SyncRelays, | ||
| 227 | |||
| 228 | /// Max backoff duration for relay reconnection | ||
| 229 | #[allow(dead_code)] | ||
| 230 | max_backoff_secs: u64, | ||
| 231 | 55 | ||
| 232 | /// Socket address used for sync source (for write policy) | 56 | /// Connection status for a relay |
| 233 | sync_source_addr: SocketAddr, | 57 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] |
| 58 | pub enum ConnectionStatus { | ||
| 59 | /// Not currently connected | ||
| 60 | #[default] | ||
| 61 | Disconnected, | ||
| 62 | /// Connection attempt in progress | ||
| 63 | Connecting, | ||
| 64 | /// Successfully connected and subscribed | ||
| 65 | Connected, | ||
| 234 | } | 66 | } |
| 235 | 67 | ||
| 236 | impl SyncManager { | 68 | /// Complete state for a single relay - combines sync needs with connection lifecycle |
| 237 | /// Creates a new SyncManager. | 69 | #[derive(Debug)] |
| 238 | /// | 70 | pub struct RelayState { |
| 239 | /// # Arguments | 71 | /// Repos we have confirmed syncing from this relay |
| 240 | /// | 72 | pub repos: HashSet<String>, |
| 241 | /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync | 73 | /// Root events we have confirmed tracking |
| 242 | /// * `service_domain` - Our domain for filtering announcements | 74 | pub root_events: HashSet<EventId>, |
| 243 | /// * `database` - Database for event storage/queries | 75 | /// If true, never disconnect this relay |
| 244 | /// * `write_policy` - Policy for accepting events | 76 | pub is_bootstrap: bool, |
| 245 | /// * `config` - Configuration for sync parameters | 77 | /// Current connection status |
| 246 | pub fn new( | 78 | pub connection_status: ConnectionStatus, |
| 247 | bootstrap_relay_url: Option<String>, | 79 | /// When we last successfully connected - used for since filter on reconnect |
| 248 | service_domain: String, | 80 | pub last_connected: Option<Timestamp>, |
| 249 | database: SharedDatabase, | 81 | /// When we disconnected - for 15-minute state retention rule |
| 250 | write_policy: Nip34WritePolicy, | 82 | pub disconnected_at: Option<Timestamp>, |
| 251 | config: &Config, | 83 | // The active connection - will be added in Phase 4 |
| 252 | ) -> Self { | 84 | // pub connection: Option<RelayConnection>, |
| 253 | // Create a synthetic SocketAddr for sync source identification | 85 | } |
| 254 | // This is used when calling write_policy.admit_event() for synced events | ||
| 255 | let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); | ||
| 256 | 86 | ||
| 87 | impl Default for RelayState { | ||
| 88 | fn default() -> Self { | ||
| 257 | Self { | 89 | Self { |
| 258 | bootstrap_relay_url, | 90 | repos: HashSet::new(), |
| 259 | service_domain, | 91 | root_events: HashSet::new(), |
| 260 | database, | 92 | is_bootstrap: false, |
| 261 | write_policy, | 93 | connection_status: ConnectionStatus::Disconnected, |
| 262 | following_repo_root_events: new_following_repo_root_events(), | 94 | last_connected: None, |
| 263 | sync_relays: new_sync_relays(), | 95 | disconnected_at: None, |
| 264 | max_backoff_secs: config.sync_max_backoff_secs, | ||
| 265 | sync_source_addr, | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | /// Returns a reference to the following repo root events state. | ||
| 270 | /// | ||
| 271 | /// This is the Phase 1 data structure tracking which repository root events | ||
| 272 | /// (kinds 1617, 1618, 1619, 1621) we're following. | ||
| 273 | pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents { | ||
| 274 | &self.following_repo_root_events | ||
| 275 | } | ||
| 276 | |||
| 277 | /// Returns a reference to the sync relays state. | ||
| 278 | /// | ||
| 279 | /// This is the Phase 1 data structure tracking which relays we sync from | ||
| 280 | /// and their associated repositories/events. | ||
| 281 | pub fn sync_relays(&self) -> &SyncRelays { | ||
| 282 | &self.sync_relays | ||
| 283 | } | ||
| 284 | |||
| 285 | // ========================================================================= | ||
| 286 | // Phase 2: Database Initialization | ||
| 287 | // ========================================================================= | ||
| 288 | |||
| 289 | /// Initialize sync state from database queries at startup. | ||
| 290 | /// | ||
| 291 | /// This method performs two database queries: | ||
| 292 | /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events` | ||
| 293 | /// 2. Query kind 30617 to build `sync_relays` | ||
| 294 | /// | ||
| 295 | /// The bootstrap relay (if configured) is always added to `sync_relays`. | ||
| 296 | /// | ||
| 297 | /// # Errors | ||
| 298 | /// | ||
| 299 | /// Returns an error if database queries fail. | ||
| 300 | pub async fn initialize_from_database(&self) -> Result<(), String> { | ||
| 301 | // Initialize bootstrap relay if configured (never removed) | ||
| 302 | if let Some(bootstrap_url) = &self.bootstrap_relay_url { | ||
| 303 | self.sync_relays.write().await.insert( | ||
| 304 | bootstrap_url.clone(), | ||
| 305 | HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only) | ||
| 306 | ); | ||
| 307 | tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url); | ||
| 308 | } | ||
| 309 | |||
| 310 | // Query 1: Build following_repo_root_events | ||
| 311 | // Find all 1617/1618/1619/1621 events and extract their repo references | ||
| 312 | let root_event_kinds = vec![ | ||
| 313 | Kind::GitPatch, // 1617 | ||
| 314 | Kind::from(KIND_PR), // 1618 | ||
| 315 | Kind::from(KIND_PR_UPDATE), // 1619 | ||
| 316 | Kind::GitIssue, // 1621 | ||
| 317 | ]; | ||
| 318 | |||
| 319 | let filter = Filter::new().kinds(root_event_kinds); | ||
| 320 | let root_events = self | ||
| 321 | .database | ||
| 322 | .query(filter) | ||
| 323 | .await | ||
| 324 | .map_err(|e| format!("Failed to query root events: {}", e))?; | ||
| 325 | |||
| 326 | let mut root_events_count = 0; | ||
| 327 | for event in root_events { | ||
| 328 | // An event may have multiple 'a' tags pointing to different repos | ||
| 329 | let repo_refs = Self::extract_all_repo_refs(&event); | ||
| 330 | for repo_ref in repo_refs { | ||
| 331 | self.following_repo_root_events | ||
| 332 | .write() | ||
| 333 | .await | ||
| 334 | .entry(repo_ref) | ||
| 335 | .or_default() | ||
| 336 | .insert(event.id); | ||
| 337 | root_events_count += 1; | ||
| 338 | } | ||
| 339 | } | ||
| 340 | tracing::info!( | ||
| 341 | "Populated following_repo_root_events with {} repo-event mappings", | ||
| 342 | root_events_count | ||
| 343 | ); | ||
| 344 | |||
| 345 | // Query 2: Build sync_relays from kind 30617 announcements | ||
| 346 | let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT)); | ||
| 347 | let announcements = self | ||
| 348 | .database | ||
| 349 | .query(announcement_filter) | ||
| 350 | .await | ||
| 351 | .map_err(|e| format!("Failed to query announcements: {}", e))?; | ||
| 352 | |||
| 353 | let mut sync_relays_count = 0; | ||
| 354 | for event in announcements { | ||
| 355 | let repo_ref = Self::build_repo_ref(&event); | ||
| 356 | let relay_urls = Self::extract_relay_urls(&event); | ||
| 357 | |||
| 358 | // Only track repos that list BOTH a remote relay AND our service | ||
| 359 | if self.lists_our_service(&event) { | ||
| 360 | for relay_url in relay_urls { | ||
| 361 | if !self.is_own_relay(&relay_url) { | ||
| 362 | // Get events for this repo from following_repo_root_events | ||
| 363 | let events = self | ||
| 364 | .following_repo_root_events | ||
| 365 | .read() | ||
| 366 | .await | ||
| 367 | .get(&repo_ref) | ||
| 368 | .cloned() | ||
| 369 | .unwrap_or_default(); | ||
| 370 | |||
| 371 | self.sync_relays | ||
| 372 | .write() | ||
| 373 | .await | ||
| 374 | .entry(relay_url) | ||
| 375 | .or_default() | ||
| 376 | .insert(repo_ref.clone(), events); | ||
| 377 | sync_relays_count += 1; | ||
| 378 | } | ||
| 379 | } | ||
| 380 | } | ||
| 381 | } | ||
| 382 | tracing::info!( | ||
| 383 | "Populated sync_relays with {} relay-repo mappings", | ||
| 384 | sync_relays_count | ||
| 385 | ); | ||
| 386 | |||
| 387 | Ok(()) | ||
| 388 | } | ||
| 389 | |||
| 390 | // ========================================================================= | ||
| 391 | // Helper Methods for Event Extraction | ||
| 392 | // ========================================================================= | ||
| 393 | |||
| 394 | /// Extract ALL repo refs from an event (it may tag multiple repos). | ||
| 395 | /// | ||
| 396 | /// Looks for 'a' tags that reference kind 30617 (repository announcements). | ||
| 397 | /// Returns refs in format "30617:pubkey:identifier". | ||
| 398 | pub fn extract_all_repo_refs(event: &Event) -> Vec<String> { | ||
| 399 | event | ||
| 400 | .tags | ||
| 401 | .iter() | ||
| 402 | .filter_map(|tag| { | ||
| 403 | let tag_vec = tag.clone().to_vec(); | ||
| 404 | if tag_vec.len() >= 2 && tag_vec[0] == "a" { | ||
| 405 | // Validate it's a 30617 reference | ||
| 406 | if tag_vec[1].starts_with("30617:") { | ||
| 407 | Some(tag_vec[1].clone()) | ||
| 408 | } else { | ||
| 409 | None | ||
| 410 | } | ||
| 411 | } else { | ||
| 412 | None | ||
| 413 | } | ||
| 414 | }) | ||
| 415 | .collect() | ||
| 416 | } | ||
| 417 | |||
| 418 | /// Build a repo ref string from a 30617 announcement event. | ||
| 419 | /// | ||
| 420 | /// Returns format "30617:pubkey:identifier". | ||
| 421 | pub fn build_repo_ref(event: &Event) -> String { | ||
| 422 | // Extract 'd' tag for identifier | ||
| 423 | let identifier = event | ||
| 424 | .tags | ||
| 425 | .iter() | ||
| 426 | .find(|tag| tag.kind() == TagKind::d()) | ||
| 427 | .and_then(|tag| tag.content()) | ||
| 428 | .map(|s| s.to_string()) | ||
| 429 | .unwrap_or_default(); | ||
| 430 | |||
| 431 | format!("30617:{}:{}", event.pubkey.to_hex(), identifier) | ||
| 432 | } | ||
| 433 | |||
| 434 | /// Extract relay URLs from a repository announcement event. | ||
| 435 | /// | ||
| 436 | /// Looks for the 'relays' tag and returns all relay URLs. | ||
| 437 | pub fn extract_relay_urls(event: &Event) -> Vec<String> { | ||
| 438 | event | ||
| 439 | .tags | ||
| 440 | .iter() | ||
| 441 | .filter(|tag| matches!(tag.kind(), TagKind::Relays)) | ||
| 442 | .flat_map(|tag| { | ||
| 443 | let vec = tag.clone().to_vec(); | ||
| 444 | // Skip first element (tag name), rest are values | ||
| 445 | vec.into_iter().skip(1) | ||
| 446 | }) | ||
| 447 | .collect() | ||
| 448 | } | ||
| 449 | |||
| 450 | /// Check if event lists our service in the relays tag. | ||
| 451 | /// | ||
| 452 | /// Compares relay URLs against our service domain. | ||
| 453 | fn lists_our_service(&self, event: &Event) -> bool { | ||
| 454 | let relay_urls = Self::extract_relay_urls(event); | ||
| 455 | relay_urls.iter().any(|url| self.is_own_relay(url)) | ||
| 456 | } | ||
| 457 | |||
| 458 | /// Check if a relay URL matches our relay. | ||
| 459 | /// | ||
| 460 | /// Compares the URL against our service domain. | ||
| 461 | fn is_own_relay(&self, relay_url: &str) -> bool { | ||
| 462 | // Normalize comparison: check if URL contains our domain | ||
| 463 | relay_url.contains(&self.service_domain) | ||
| 464 | } | ||
| 465 | |||
| 466 | // ========================================================================= | ||
| 467 | // Main Run Loop | ||
| 468 | // ========================================================================= | ||
| 469 | |||
| 470 | /// Runs the sync manager main loop. | ||
| 471 | /// | ||
| 472 | /// This method should be called in a spawned task: | ||
| 473 | /// | ||
| 474 | /// ```rust,ignore | ||
| 475 | /// tokio::spawn(async move { | ||
| 476 | /// sync_manager.run().await; | ||
| 477 | /// }); | ||
| 478 | /// ``` | ||
| 479 | /// | ||
| 480 | /// ## Implementation Status | ||
| 481 | /// | ||
| 482 | /// - Phase 2: Layer 1 sync from bootstrap relay ✓ | ||
| 483 | /// - Phase 3: Self-subscription and relay discovery ✓ | ||
| 484 | /// - Phase 4-6: Filter building, connection management (TODO) | ||
| 485 | /// - Phase 7: Full sync loop (TODO) | ||
| 486 | pub async fn run(self) { | ||
| 487 | tracing::info!( | ||
| 488 | "SyncManager starting (bootstrap_relay={:?}, domain={})", | ||
| 489 | self.bootstrap_relay_url, | ||
| 490 | self.service_domain | ||
| 491 | ); | ||
| 492 | |||
| 493 | // Phase 3: Initialize state from database BEFORE spawning connections | ||
| 494 | if let Err(e) = self.initialize_from_database().await { | ||
| 495 | tracing::error!("Failed to initialize from database: {}", e); | ||
| 496 | // Continue anyway - we can still sync from bootstrap | ||
| 497 | } | ||
| 498 | |||
| 499 | // Create channel for relay actions from self-subscriber | ||
| 500 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); | ||
| 501 | |||
| 502 | // Construct our own relay URL for self-subscription | ||
| 503 | let own_relay_url = format!("ws://{}", self.service_domain); | ||
| 504 | |||
| 505 | // Spawn self-subscriber task | ||
| 506 | let self_subscriber = SelfSubscriber::new( | ||
| 507 | own_relay_url.clone(), | ||
| 508 | self.service_domain.clone(), | ||
| 509 | Arc::clone(&self.following_repo_root_events), | ||
| 510 | Arc::clone(&self.sync_relays), | ||
| 511 | action_tx, | ||
| 512 | ); | ||
| 513 | |||
| 514 | tokio::spawn(async move { | ||
| 515 | self_subscriber.run().await; | ||
| 516 | }); | ||
| 517 | |||
| 518 | tracing::info!("SelfSubscriber spawned for {}", own_relay_url); | ||
| 519 | |||
| 520 | // Track active relay connections (relay_url -> event_sender) | ||
| 521 | let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new(); | ||
| 522 | |||
| 523 | // Phase 2: Connect to bootstrap relay if configured | ||
| 524 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | ||
| 525 | if let Some(event_tx) = self | ||
| 526 | .spawn_relay_connection(bootstrap_url.clone(), None) | ||
| 527 | .await | ||
| 528 | { | ||
| 529 | active_relays.insert(bootstrap_url.clone(), event_tx); | ||
| 530 | } | ||
| 531 | } | ||
| 532 | |||
| 533 | // Main coordination loop | ||
| 534 | loop { | ||
| 535 | tokio::select! { | ||
| 536 | // Handle relay actions from self-subscriber | ||
| 537 | action = action_rx.recv() => { | ||
| 538 | match action { | ||
| 539 | Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { | ||
| 540 | tracing::info!("Spawning new relay connection to {}", relay_url); | ||
| 541 | if !active_relays.contains_key(&relay_url) { | ||
| 542 | if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { | ||
| 543 | active_relays.insert(relay_url, event_tx); | ||
| 544 | } | ||
| 545 | } | ||
| 546 | } | ||
| 547 | Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { | ||
| 548 | tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); | ||
| 549 | // TODO: Implement filter updates for existing connections | ||
| 550 | } | ||
| 551 | None => { | ||
| 552 | tracing::info!("Action channel closed, continuing without self-subscriber"); | ||
| 553 | } | ||
| 554 | } | ||
| 555 | } | ||
| 556 | // Sleep to prevent busy loop when no events | ||
| 557 | _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { | ||
| 558 | // Periodic maintenance could go here | ||
| 559 | } | ||
| 560 | } | ||
| 561 | } | ||
| 562 | } | ||
| 563 | |||
| 564 | /// Spawn a relay connection with optional Layer 2 filters. | ||
| 565 | /// | ||
| 566 | /// Returns the event sender channel if successfully spawned. | ||
| 567 | async fn spawn_relay_connection( | ||
| 568 | &self, | ||
| 569 | relay_url: String, | ||
| 570 | repos: Option<HashMap<String, HashSet<EventId>>>, | ||
| 571 | ) -> Option<mpsc::Sender<RelayEvent>> { | ||
| 572 | // Create channel for receiving events | ||
| 573 | let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100); | ||
| 574 | |||
| 575 | // Create connection | ||
| 576 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 577 | |||
| 578 | // Determine if this is bootstrap (no repos) or discovered relay (with repos) | ||
| 579 | let is_bootstrap = repos.is_none(); | ||
| 580 | |||
| 581 | match connection.connect_and_subscribe().await { | ||
| 582 | Ok(()) => { | ||
| 583 | if is_bootstrap { | ||
| 584 | tracing::info!("Bootstrap relay connection established: {}", relay_url); | ||
| 585 | } else { | ||
| 586 | tracing::info!( | ||
| 587 | "Discovered relay connection established: {} (with Layer 2 filters)", | ||
| 588 | relay_url | ||
| 589 | ); | ||
| 590 | |||
| 591 | // Add Layer 2 subscription for repo events | ||
| 592 | if let Some(ref repos) = repos { | ||
| 593 | if let Err(e) = self.add_layer2_subscription(&connection, repos).await { | ||
| 594 | tracing::warn!("Failed to add Layer 2 subscription: {}", e); | ||
| 595 | } | ||
| 596 | } | ||
| 597 | } | ||
| 598 | |||
| 599 | // Clone refs needed for event processing task | ||
| 600 | let database = Arc::clone(&self.database); | ||
| 601 | let write_policy = self.write_policy.clone(); | ||
| 602 | let sync_source_addr = self.sync_source_addr; | ||
| 603 | |||
| 604 | // Clone event_tx for the spawned task | ||
| 605 | let event_tx_clone = event_tx.clone(); | ||
| 606 | |||
| 607 | // Spawn event loop task | ||
| 608 | let conn_url = relay_url.clone(); | ||
| 609 | tokio::spawn(async move { | ||
| 610 | connection.run_event_loop(event_tx_clone).await; | ||
| 611 | }); | ||
| 612 | |||
| 613 | // Spawn event processing task | ||
| 614 | tokio::spawn(async move { | ||
| 615 | Self::process_relay_events( | ||
| 616 | event_rx, | ||
| 617 | database, | ||
| 618 | write_policy, | ||
| 619 | sync_source_addr, | ||
| 620 | conn_url, | ||
| 621 | ) | ||
| 622 | .await; | ||
| 623 | }); | ||
| 624 | |||
| 625 | Some(event_tx) | ||
| 626 | } | ||
| 627 | Err(e) => { | ||
| 628 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | ||
| 629 | None | ||
| 630 | } | ||
| 631 | } | ||
| 632 | } | ||
| 633 | |||
| 634 | /// Add Layer 2 subscription for repo-related events. | ||
| 635 | /// | ||
| 636 | /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. | ||
| 637 | async fn add_layer2_subscription( | ||
| 638 | &self, | ||
| 639 | connection: &RelayConnection, | ||
| 640 | repos: &HashMap<String, HashSet<EventId>>, | ||
| 641 | ) -> Result<(), String> { | ||
| 642 | if repos.is_empty() { | ||
| 643 | return Ok(()); | ||
| 644 | } | ||
| 645 | |||
| 646 | // Build repo refs list for filter | ||
| 647 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 648 | |||
| 649 | tracing::debug!( | ||
| 650 | "Adding Layer 2 subscription for {} repos to {}", | ||
| 651 | repo_refs.len(), | ||
| 652 | connection.url() | ||
| 653 | ); | ||
| 654 | |||
| 655 | // Chunk repo_refs into groups of 100 (per plan) | ||
| 656 | for chunk in repo_refs.chunks(100) { | ||
| 657 | // Build filter with lowercase 'a' tag for each repo ref | ||
| 658 | let mut filter = Filter::new().kinds([ | ||
| 659 | Kind::GitPatch, // 1617 | ||
| 660 | Kind::Custom(1618), // PR | ||
| 661 | Kind::Custom(1619), // PR update | ||
| 662 | Kind::GitIssue, // 1621 | ||
| 663 | ]); | ||
| 664 | |||
| 665 | // Add each repo ref as a custom tag filter | ||
| 666 | for repo_ref in chunk { | ||
| 667 | filter = | ||
| 668 | filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); | ||
| 669 | } | ||
| 670 | |||
| 671 | // Subscribe to this filter | ||
| 672 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 673 | return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); | ||
| 674 | } | ||
| 675 | } | 96 | } |
| 676 | |||
| 677 | Ok(()) | ||
| 678 | } | 97 | } |
| 98 | } | ||
| 679 | 99 | ||
| 680 | /// Process events from a single relay connection. | 100 | impl RelayState { |
| 681 | /// | 101 | /// Check if state should be cleared based on 15-minute rule |
| 682 | /// This is a static method that runs in its own task. | 102 | pub fn should_clear_state(&self) -> bool { |
| 683 | async fn process_relay_events( | 103 | match self.disconnected_at { |
| 684 | mut event_rx: mpsc::Receiver<RelayEvent>, | 104 | Some(disconnected) => { |
| 685 | database: SharedDatabase, | 105 | let now = Timestamp::now(); |
| 686 | write_policy: Nip34WritePolicy, | 106 | now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes |
| 687 | sync_source_addr: SocketAddr, | ||
| 688 | relay_url: String, | ||
| 689 | ) { | ||
| 690 | tracing::debug!("Starting event processing for relay: {}", relay_url); | ||
| 691 | |||
| 692 | while let Some(relay_event) = event_rx.recv().await { | ||
| 693 | match relay_event { | ||
| 694 | RelayEvent::Event(event) => { | ||
| 695 | Self::process_single_event_static( | ||
| 696 | &event, | ||
| 697 | &database, | ||
| 698 | &write_policy, | ||
| 699 | &sync_source_addr, | ||
| 700 | &relay_url, | ||
| 701 | ) | ||
| 702 | .await; | ||
| 703 | } | ||
| 704 | RelayEvent::EndOfStoredEvents => { | ||
| 705 | tracing::debug!("EOSE received from {}", relay_url); | ||
| 706 | } | ||
| 707 | RelayEvent::Closed(reason) => { | ||
| 708 | tracing::warn!("Connection to {} closed: {}", relay_url, reason); | ||
| 709 | break; | ||
| 710 | } | ||
| 711 | } | 107 | } |
| 108 | None => false, // Still connected or never connected | ||
| 712 | } | 109 | } |
| 713 | |||
| 714 | tracing::info!("Event processing ended for relay: {}", relay_url); | ||
| 715 | } | 110 | } |
| 716 | 111 | ||
| 717 | /// Process a single event (static version for use in spawned tasks). | 112 | /// Clear repos and root_events - called when reconnect takes > 15 minutes |
| 718 | async fn process_single_event_static( | 113 | pub fn clear_sync_state(&mut self) { |
| 719 | event: &Event, | 114 | self.repos.clear(); |
| 720 | database: &SharedDatabase, | 115 | self.root_events.clear(); |
| 721 | write_policy: &Nip34WritePolicy, | ||
| 722 | sync_source_addr: &SocketAddr, | ||
| 723 | relay_url: &str, | ||
| 724 | ) { | ||
| 725 | let event_id = event.id; | ||
| 726 | let kind = event.kind.as_u16(); | ||
| 727 | |||
| 728 | // Check if event already exists in database | ||
| 729 | match database.check_id(&event_id).await { | ||
| 730 | Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { | ||
| 731 | tracing::trace!("Event {} already exists, skipping", event_id); | ||
| 732 | return; | ||
| 733 | } | ||
| 734 | Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing | ||
| 735 | Err(e) => { | ||
| 736 | tracing::warn!("Failed to check if event {} exists: {}", event_id, e); | ||
| 737 | } | ||
| 738 | } | ||
| 739 | |||
| 740 | // Pass through write policy | ||
| 741 | let policy_result = write_policy.admit_event(event, sync_source_addr).await; | ||
| 742 | |||
| 743 | match policy_result { | ||
| 744 | PolicyResult::Accept => match database.save_event(event).await { | ||
| 745 | Ok(SaveEventStatus::Success) => { | ||
| 746 | tracing::info!( | ||
| 747 | "Synced event {} (kind {}) from {}", | ||
| 748 | event_id, | ||
| 749 | kind, | ||
| 750 | relay_url | ||
| 751 | ); | ||
| 752 | } | ||
| 753 | Ok(_) => { | ||
| 754 | tracing::debug!( | ||
| 755 | "Event {} (kind {}) already stored or rejected by database", | ||
| 756 | event_id, | ||
| 757 | kind | ||
| 758 | ); | ||
| 759 | } | ||
| 760 | Err(e) => { | ||
| 761 | tracing::error!("Failed to save synced event {}: {}", event_id, e); | ||
| 762 | } | ||
| 763 | }, | ||
| 764 | PolicyResult::Reject(reason) => { | ||
| 765 | tracing::debug!( | ||
| 766 | "Rejected synced event {} (kind {}): {}", | ||
| 767 | event_id, | ||
| 768 | kind, | ||
| 769 | reason | ||
| 770 | ); | ||
| 771 | } | ||
| 772 | } | ||
| 773 | } | 116 | } |
| 774 | } | 117 | } |
| 775 | 118 | ||
| 776 | // ============================================================================= | 119 | /// A batch of items pending EOSE confirmation |
| 777 | // Submodules | 120 | #[derive(Debug, Clone)] |
| 778 | // ============================================================================= | 121 | pub struct PendingBatch { |
| 779 | 122 | /// Unique ID for this batch - for debugging/logging | |
| 780 | pub mod health; | 123 | pub batch_id: u64, |
| 781 | pub mod metrics; | 124 | /// The items this batch is syncing |
| 125 | pub items: PendingItems, | ||
| 126 | /// Subscription IDs that must ALL receive EOSE before confirming | ||
| 127 | pub outstanding_subs: HashSet<SubscriptionId>, | ||
| 128 | } | ||
| 782 | 129 | ||
| 783 | // Re-export commonly used types | 130 | /// Items included in a pending batch |
| 784 | pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; | 131 | #[derive(Debug, Clone, Default)] |
| 785 | pub use metrics::{event_source, SyncMetrics}; | 132 | pub struct PendingItems { |
| 133 | /// Repos being synced in this batch | ||
| 134 | pub repos: HashSet<String>, | ||
| 135 | /// Root events being synced in this batch | ||
| 136 | pub root_events: HashSet<EventId>, | ||
| 137 | } | ||
| 786 | 138 | ||
| 787 | // ============================================================================= | 139 | // ============================================================================= |
| 788 | // Tests | 140 | // SyncMetrics - Prometheus Metrics for Sync System |
| 789 | // ============================================================================= | 141 | // ============================================================================= |
| 790 | 142 | ||
| 791 | #[cfg(test)] | 143 | /// Prometheus metrics for the proactive sync system. |
| 792 | mod tests { | 144 | /// |
| 793 | use super::*; | 145 | /// Tracks relay connections, sync progress, and operational statistics. |
| 794 | use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; | 146 | /// Following the comprehensive v3 metrics design. |
| 147 | #[derive(Clone)] | ||
| 148 | pub struct SyncMetrics { | ||
| 149 | // === Connection metrics === | ||
| 150 | /// Per-relay connection status (1=connected, 0=disconnected) | ||
| 151 | relay_connected: IntGaugeVec, | ||
| 152 | /// Connection attempts by relay and result (success/failure) | ||
| 153 | connection_attempts_total: IntCounterVec, | ||
| 154 | |||
| 155 | // === Event metrics === | ||
| 156 | /// Events synced by source (live/startup/reconnect/daily) | ||
| 157 | events_total: IntCounterVec, | ||
| 158 | |||
| 159 | // === Summary metrics === | ||
| 160 | /// Total relays discovered and tracked | ||
| 161 | relays_tracked_total: IntGauge, | ||
| 162 | /// Currently connected relay count | ||
| 163 | relays_connected_total: IntGauge, | ||
| 164 | } | ||
| 795 | 165 | ||
| 796 | /// Helper to create a test event with specific tags | 166 | impl SyncMetrics { |
| 797 | fn create_test_event(kind: Kind, tags: Vec<Tag>) -> Event { | 167 | /// Register sync metrics with a Prometheus registry. |
| 798 | let keys = Keys::generate(); | 168 | /// |
| 799 | EventBuilder::new(kind, "test content") | 169 | /// Returns an error if metrics are already registered (e.g., in tests). |
| 800 | .tags(tags) | 170 | pub fn register(registry: &Registry) -> Result<Self, prometheus::Error> { |
| 801 | .sign_with_keys(&keys) | 171 | // Connection metrics |
| 802 | .expect("Failed to sign test event") | 172 | let relay_connected = IntGaugeVec::new( |
| 173 | Opts::new( | ||
| 174 | "ngit_sync_relay_connected", | ||
| 175 | "Relay connection status (1=connected, 0=disconnected)", | ||
| 176 | ), | ||
| 177 | &["relay"], | ||
| 178 | )?; | ||
| 179 | registry.register(Box::new(relay_connected.clone()))?; | ||
| 180 | |||
| 181 | let connection_attempts_total = IntCounterVec::new( | ||
| 182 | Opts::new( | ||
| 183 | "ngit_sync_connection_attempts_total", | ||
| 184 | "Total connection attempts by relay and result", | ||
| 185 | ), | ||
| 186 | &["relay", "result"], | ||
| 187 | )?; | ||
| 188 | registry.register(Box::new(connection_attempts_total.clone()))?; | ||
| 189 | |||
| 190 | // Event metrics | ||
| 191 | let events_total = IntCounterVec::new( | ||
| 192 | Opts::new( | ||
| 193 | "ngit_sync_events_total", | ||
| 194 | "Total events synced by source type", | ||
| 195 | ), | ||
| 196 | &["source"], | ||
| 197 | )?; | ||
| 198 | registry.register(Box::new(events_total.clone()))?; | ||
| 199 | |||
| 200 | // Summary metrics | ||
| 201 | let relays_tracked_total = IntGauge::with_opts(Opts::new( | ||
| 202 | "ngit_sync_relays_tracked_total", | ||
| 203 | "Total number of relays discovered and tracked", | ||
| 204 | ))?; | ||
| 205 | registry.register(Box::new(relays_tracked_total.clone()))?; | ||
| 206 | |||
| 207 | let relays_connected_total = IntGauge::with_opts(Opts::new( | ||
| 208 | "ngit_sync_relays_connected_total", | ||
| 209 | "Number of currently connected relays", | ||
| 210 | ))?; | ||
| 211 | registry.register(Box::new(relays_connected_total.clone()))?; | ||
| 212 | |||
| 213 | Ok(Self { | ||
| 214 | relay_connected, | ||
| 215 | connection_attempts_total, | ||
| 216 | events_total, | ||
| 217 | relays_tracked_total, | ||
| 218 | relays_connected_total, | ||
| 219 | }) | ||
| 803 | } | 220 | } |
| 804 | 221 | ||
| 805 | // ========================================================================= | 222 | // === Connection Recording Methods === |
| 806 | // Tests for extract_all_repo_refs | ||
| 807 | // ========================================================================= | ||
| 808 | 223 | ||
| 809 | #[test] | 224 | /// Record a connection attempt (success or failure) |
| 810 | fn test_extract_all_repo_refs_single_ref() { | 225 | pub fn record_connection_attempt(&self, relay: &str, success: bool) { |
| 811 | let event = create_test_event( | 226 | let result = if success { "success" } else { "failure" }; |
| 812 | Kind::GitPatch, | 227 | self.connection_attempts_total |
| 813 | vec![Tag::custom( | 228 | .with_label_values(&[relay, result]) |
| 814 | nostr_relay_builder::prelude::TagKind::custom("a"), | 229 | .inc(); |
| 815 | vec!["30617:abc123def456:my-project"], | ||
| 816 | )], | ||
| 817 | ); | ||
| 818 | |||
| 819 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 820 | assert_eq!(refs.len(), 1); | ||
| 821 | assert_eq!(refs[0], "30617:abc123def456:my-project"); | ||
| 822 | } | 230 | } |
| 823 | 231 | ||
| 824 | #[test] | 232 | /// Set relay connection status |
| 825 | fn test_extract_all_repo_refs_multiple_refs() { | 233 | pub fn set_relay_connected(&self, relay: &str, connected: bool) { |
| 826 | let event = create_test_event( | 234 | self.relay_connected |
| 827 | Kind::GitPatch, | 235 | .with_label_values(&[relay]) |
| 828 | vec![ | 236 | .set(if connected { 1 } else { 0 }); |
| 829 | Tag::custom( | ||
| 830 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 831 | vec!["30617:abc123:project1"], | ||
| 832 | ), | ||
| 833 | Tag::custom( | ||
| 834 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 835 | vec!["30617:def456:project2"], | ||
| 836 | ), | ||
| 837 | ], | ||
| 838 | ); | ||
| 839 | |||
| 840 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 841 | assert_eq!(refs.len(), 2); | ||
| 842 | assert!(refs.contains(&"30617:abc123:project1".to_string())); | ||
| 843 | assert!(refs.contains(&"30617:def456:project2".to_string())); | ||
| 844 | } | 237 | } |
| 845 | 238 | ||
| 846 | #[test] | 239 | /// Increment connected count |
| 847 | fn test_extract_all_repo_refs_ignores_non_30617() { | 240 | pub fn inc_connected_count(&self) { |
| 848 | let event = create_test_event( | 241 | self.relays_connected_total.inc(); |
| 849 | Kind::GitPatch, | ||
| 850 | vec![ | ||
| 851 | Tag::custom( | ||
| 852 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 853 | vec!["30617:abc123:valid-repo"], | ||
| 854 | ), | ||
| 855 | Tag::custom( | ||
| 856 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 857 | vec!["30618:def456:state-event"], // Not a repo ref | ||
| 858 | ), | ||
| 859 | ], | ||
| 860 | ); | ||
| 861 | |||
| 862 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 863 | assert_eq!(refs.len(), 1); | ||
| 864 | assert_eq!(refs[0], "30617:abc123:valid-repo"); | ||
| 865 | } | 242 | } |
| 866 | 243 | ||
| 867 | #[test] | 244 | /// Decrement connected count |
| 868 | fn test_extract_all_repo_refs_empty_when_no_a_tags() { | 245 | pub fn dec_connected_count(&self) { |
| 869 | let event = create_test_event( | 246 | self.relays_connected_total.dec(); |
| 870 | Kind::GitPatch, | ||
| 871 | vec![Tag::custom( | ||
| 872 | nostr_relay_builder::prelude::TagKind::custom("e"), | ||
| 873 | vec!["some-event-id"], | ||
| 874 | )], | ||
| 875 | ); | ||
| 876 | |||
| 877 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 878 | assert!(refs.is_empty()); | ||
| 879 | } | 247 | } |
| 880 | 248 | ||
| 881 | // ========================================================================= | 249 | // === Event Recording Methods === |
| 882 | // Tests for build_repo_ref | ||
| 883 | // ========================================================================= | ||
| 884 | 250 | ||
| 885 | #[test] | 251 | /// Record a synced event by source type |
| 886 | fn test_build_repo_ref() { | 252 | /// |
| 887 | let keys = Keys::generate(); | 253 | /// Source types: |
| 888 | let event = EventBuilder::new(Kind::from(30617_u16), "announcement") | 254 | /// - "live" - Real-time subscription events |
| 889 | .tags(vec![Tag::custom( | 255 | /// - "startup" - Events from startup catchup |
| 890 | nostr_relay_builder::prelude::TagKind::d(), | 256 | /// - "reconnect" - Events from reconnection catchup |
| 891 | vec!["my-identifier"], | 257 | pub fn record_event(&self, source: &str) { |
| 892 | )]) | 258 | self.events_total.with_label_values(&[source]).inc(); |
| 893 | .sign_with_keys(&keys) | ||
| 894 | .expect("Failed to sign test event"); | ||
| 895 | |||
| 896 | let repo_ref = SyncManager::build_repo_ref(&event); | ||
| 897 | assert!(repo_ref.starts_with("30617:")); | ||
| 898 | assert!(repo_ref.ends_with(":my-identifier")); | ||
| 899 | assert!(repo_ref.contains(&event.pubkey.to_hex())); | ||
| 900 | } | 259 | } |
| 901 | 260 | ||
| 902 | #[test] | 261 | /// Record multiple events synced by source type |
| 903 | fn test_build_repo_ref_empty_identifier() { | 262 | pub fn record_events(&self, source: &str, count: u64) { |
| 904 | let keys = Keys::generate(); | 263 | self.events_total |
| 905 | let event = EventBuilder::new(Kind::from(30617_u16), "announcement") | 264 | .with_label_values(&[source]) |
| 906 | .sign_with_keys(&keys) | 265 | .inc_by(count); |
| 907 | .expect("Failed to sign test event"); | ||
| 908 | |||
| 909 | let repo_ref = SyncManager::build_repo_ref(&event); | ||
| 910 | assert!(repo_ref.starts_with("30617:")); | ||
| 911 | assert!(repo_ref.ends_with(":")); // Empty identifier | ||
| 912 | } | 266 | } |
| 913 | 267 | ||
| 914 | // ========================================================================= | 268 | // === Summary Recording Methods === |
| 915 | // Tests for extract_relay_urls | ||
| 916 | // ========================================================================= | ||
| 917 | |||
| 918 | #[test] | ||
| 919 | fn test_extract_relay_urls_single() { | ||
| 920 | let event = create_test_event( | ||
| 921 | Kind::from(30617_u16), | ||
| 922 | vec![Tag::custom( | ||
| 923 | nostr_relay_builder::prelude::TagKind::Relays, | ||
| 924 | vec!["wss://relay.example.com"], | ||
| 925 | )], | ||
| 926 | ); | ||
| 927 | 269 | ||
| 928 | let urls = SyncManager::extract_relay_urls(&event); | 270 | /// Set the total tracked relay count |
| 929 | assert_eq!(urls.len(), 1); | 271 | pub fn set_tracked_count(&self, count: i64) { |
| 930 | assert_eq!(urls[0], "wss://relay.example.com"); | 272 | self.relays_tracked_total.set(count); |
| 931 | } | 273 | } |
| 932 | 274 | ||
| 933 | #[test] | 275 | /// Increment tracked relay count |
| 934 | fn test_extract_relay_urls_multiple() { | 276 | pub fn inc_tracked_count(&self) { |
| 935 | let event = create_test_event( | 277 | self.relays_tracked_total.inc(); |
| 936 | Kind::from(30617_u16), | ||
| 937 | vec![Tag::custom( | ||
| 938 | nostr_relay_builder::prelude::TagKind::Relays, | ||
| 939 | vec!["wss://relay1.example.com", "wss://relay2.example.com"], | ||
| 940 | )], | ||
| 941 | ); | ||
| 942 | |||
| 943 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 944 | assert_eq!(urls.len(), 2); | ||
| 945 | assert!(urls.contains(&"wss://relay1.example.com".to_string())); | ||
| 946 | assert!(urls.contains(&"wss://relay2.example.com".to_string())); | ||
| 947 | } | 278 | } |
| 948 | 279 | ||
| 949 | #[test] | 280 | /// Get current tracked relay count |
| 950 | fn test_extract_relay_urls_empty_when_no_relays_tag() { | 281 | pub fn get_tracked_count(&self) -> i64 { |
| 951 | let event = create_test_event( | 282 | self.relays_tracked_total.get() |
| 952 | Kind::from(30617_u16), | ||
| 953 | vec![Tag::custom( | ||
| 954 | nostr_relay_builder::prelude::TagKind::custom("d"), | ||
| 955 | vec!["my-project"], | ||
| 956 | )], | ||
| 957 | ); | ||
| 958 | |||
| 959 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 960 | assert!(urls.is_empty()); | ||
| 961 | } | 283 | } |
| 962 | 284 | ||
| 963 | // ========================================================================= | 285 | /// Get current connected relay count |
| 964 | // Original data structure tests | 286 | pub fn get_connected_count(&self) -> i64 { |
| 965 | // ========================================================================= | 287 | self.relays_connected_total.get() |
| 966 | |||
| 967 | #[tokio::test] | ||
| 968 | async fn test_following_repo_root_events_basic_operations() { | ||
| 969 | let state = new_following_repo_root_events(); | ||
| 970 | |||
| 971 | // Insert some events | ||
| 972 | { | ||
| 973 | let mut guard = state.write().await; | ||
| 974 | let repo_ref = "30617:abc123:my-project".to_string(); | ||
| 975 | guard | ||
| 976 | .entry(repo_ref) | ||
| 977 | .or_default() | ||
| 978 | .insert(EventId::all_zeros()); | ||
| 979 | } | ||
| 980 | |||
| 981 | // Read back | ||
| 982 | { | ||
| 983 | let guard = state.read().await; | ||
| 984 | assert_eq!(guard.len(), 1); | ||
| 985 | assert!(guard.contains_key("30617:abc123:my-project")); | ||
| 986 | } | ||
| 987 | } | 288 | } |
| 289 | } | ||
| 988 | 290 | ||
| 989 | #[tokio::test] | 291 | /// Event source types for metrics tracking |
| 990 | async fn test_sync_relays_basic_operations() { | 292 | pub mod event_source { |
| 991 | let state = new_sync_relays(); | 293 | /// Real-time subscription events |
| 294 | pub const LIVE: &str = "live"; | ||
| 295 | /// Events from startup catchup | ||
| 296 | pub const STARTUP: &str = "startup"; | ||
| 297 | /// Events from reconnection catchup | ||
| 298 | pub const RECONNECT: &str = "reconnect"; | ||
| 299 | } | ||
| 992 | 300 | ||
| 993 | // Insert relay with repos | 301 | // ============================================================================= |
| 994 | { | 302 | // SyncManager - Main Entry Point |
| 995 | let mut guard = state.write().await; | 303 | // ============================================================================= |
| 996 | let relay_url = "wss://relay.example.com".to_string(); | ||
| 997 | let repo_ref = "30617:abc123:my-project".to_string(); | ||
| 998 | 304 | ||
| 999 | guard | 305 | /// Manages proactive synchronization with external relays |
| 1000 | .entry(relay_url) | 306 | /// |
| 1001 | .or_default() | 307 | /// The SyncManager runs as a background task, subscribing to repository |
| 1002 | .entry(repo_ref) | 308 | /// announcements on the local relay and syncing data from external relays |
| 1003 | .or_default() | 309 | /// listed in those announcements. |
| 1004 | .insert(EventId::all_zeros()); | 310 | #[allow(dead_code)] // Fields will be used in later phases |
| 1005 | } | 311 | pub struct SyncManager { |
| 312 | /// Bootstrap relay URL for initial sync (optional) | ||
| 313 | bootstrap_relay_url: Option<String>, | ||
| 314 | /// Our service domain - used for filtering relevant repos | ||
| 315 | service_domain: String, | ||
| 316 | /// Database for event storage and queries | ||
| 317 | database: SharedDatabase, | ||
| 318 | /// Write policy for validating incoming events | ||
| 319 | write_policy: Nip34WritePolicy, | ||
| 320 | /// Configuration reference for sync settings | ||
| 321 | config: Config, | ||
| 322 | /// What we want to sync (source of truth) | ||
| 323 | repo_sync_index: RepoSyncIndex, | ||
| 324 | /// What we've confirmed syncing + connection state | ||
| 325 | relay_sync_index: RelaySyncIndex, | ||
| 326 | /// In-flight subscription batches | ||
| 327 | pending_sync_index: PendingSyncIndex, | ||
| 328 | } | ||
| 1006 | 329 | ||
| 1007 | // Read back | 330 | impl SyncManager { |
| 1008 | { | 331 | /// Create a new SyncManager |
| 1009 | let guard = state.read().await; | 332 | /// |
| 1010 | assert_eq!(guard.len(), 1); | 333 | /// # Arguments |
| 1011 | let relay_repos = guard.get("wss://relay.example.com").unwrap(); | 334 | /// * `bootstrap_relay_url` - Optional relay URL for initial historical sync |
| 1012 | assert_eq!(relay_repos.len(), 1); | 335 | /// * `service_domain` - The domain this relay serves (for filtering repos) |
| 1013 | let events = relay_repos.get("30617:abc123:my-project").unwrap(); | 336 | /// * `database` - Shared database for event storage |
| 1014 | assert_eq!(events.len(), 1); | 337 | /// * `write_policy` - Policy for validating events before storage |
| 338 | /// * `config` - Configuration for sync settings | ||
| 339 | pub fn new( | ||
| 340 | bootstrap_relay_url: Option<String>, | ||
| 341 | service_domain: String, | ||
| 342 | database: SharedDatabase, | ||
| 343 | write_policy: Nip34WritePolicy, | ||
| 344 | config: &Config, | ||
| 345 | ) -> Self { | ||
| 346 | Self { | ||
| 347 | bootstrap_relay_url, | ||
| 348 | service_domain, | ||
| 349 | database, | ||
| 350 | write_policy, | ||
| 351 | config: config.clone(), | ||
| 352 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | ||
| 353 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | ||
| 354 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | ||
| 1015 | } | 355 | } |
| 1016 | } | 356 | } |
| 1017 | 357 | ||
| 1018 | #[tokio::test] | 358 | /// Run the sync manager (placeholder for Phase 1) |
| 1019 | async fn test_concurrent_access() { | 359 | /// |
| 1020 | let state = new_following_repo_root_events(); | 360 | /// This will be implemented in later phases to: |
| 1021 | let state_clone = Arc::clone(&state); | 361 | /// 1. Subscribe to local relay for 30617 events |
| 1022 | 362 | /// 2. Process events to build RepoSyncIndex | |
| 1023 | // Writer task | 363 | /// 3. Compute and execute sync actions |
| 1024 | let writer = tokio::spawn(async move { | 364 | /// 4. Handle reconnection and catch-up logic |
| 1025 | let mut guard = state_clone.write().await; | 365 | pub async fn run(self) { |
| 1026 | guard | 366 | tracing::info!( |
| 1027 | .entry("30617:writer:repo".to_string()) | 367 | bootstrap_relay = ?self.bootstrap_relay_url, |
| 1028 | .or_default() | 368 | service_domain = %self.service_domain, |
| 1029 | .insert(EventId::all_zeros()); | 369 | "SyncManager starting (placeholder - not yet implemented)" |
| 1030 | }); | 370 | ); |
| 1031 | |||
| 1032 | // Wait for writer | ||
| 1033 | writer.await.unwrap(); | ||
| 1034 | 371 | ||
| 1035 | // Reader should see the change | 372 | // Phase 1: Just log and return |
| 1036 | let guard = state.read().await; | 373 | // Full implementation will be added in subsequent phases |
| 1037 | assert!(guard.contains_key("30617:writer:repo")); | ||
| 1038 | } | 374 | } |
| 1039 | } | 375 | } \ No newline at end of file |