diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-09 09:28:12 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-09 09:28:18 +0000 |
| commit | efaad1e2857914b87307cf78903a957a604697a8 (patch) | |
| tree | dadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/mod.rs | |
| parent | 91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff) | |
basic sync stub
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 416 |
1 files changed, 385 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 17418d0..aa34490 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1,40 +1,394 @@ | |||
| 1 | //! Proactive Sync Module for GRASP-02 | 1 | //! Proactive Sync Module for GRASP-02 |
| 2 | //! | 2 | //! |
| 3 | //! This module implements proactive synchronization of kind 30617 (repository state) | 3 | //! This module implements the proactive sync system that ensures data availability |
| 4 | //! events from configured relay(s). Events are validated through the same write policy | 4 | //! for repositories hosted on this relay by syncing from other relays in the ecosystem. |
| 5 | //! as directly-submitted events. | ||
| 6 | //! | 5 | //! |
| 7 | //! ## Three-Layer Filter Strategy (Phase 2) | 6 | //! ## Architecture Overview |
| 8 | //! | 7 | //! |
| 9 | //! - **Layer 1**: Announcement discovery (kinds 30617 + 30618) | 8 | //! The sync system is built around two core data structures: |
| 10 | //! - **Layer 2**: Repository events (A/a tags for shared repos) | ||
| 11 | //! - **Layer 3**: Related events (E/e tags for discussions, reviews) | ||
| 12 | //! | 9 | //! |
| 13 | //! ## Resilience & Health Tracking (Phase 3) | 10 | //! - **FollowingRepoRootEvents**: Tracks repository root events we're following |
| 11 | //! - **SyncRelays**: Tracks relays we sync from, including their repos and events | ||
| 14 | //! | 12 | //! |
| 15 | //! - **Health tracking**: Per-relay connection health states (Healthy, Degraded, Dead) | 13 | //! These type aliases are colocated with SyncManager (following the pattern of |
| 16 | //! - **Exponential backoff**: Smart retry delays on failures (5s -> 1h max) | 14 | //! `src/http/mod.rs` and `src/metrics/mod.rs`) to reduce file count while maintaining clarity. |
| 17 | //! - **Dead relay handling**: Minimal retry for 24h+ failed relays | 15 | //! |
| 18 | //! - **Startup jitter**: Prevent thundering herd on launch (0-10s random delay) | 16 | //! ## Submodules |
| 17 | //! | ||
| 18 | //! - [`health`]: Relay health tracking with exponential backoff and dead relay detection | ||
| 19 | //! - [`metrics`]: Prometheus metrics for sync operations | ||
| 20 | //! | ||
| 21 | //! ## Memory Estimates (from design doc) | ||
| 22 | //! | ||
| 23 | //! At target scale (1,000 repos, 100 relays): | ||
| 24 | //! - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB | ||
| 25 | //! - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB | ||
| 26 | //! - **Total in-memory state**: ~10 MB | ||
| 27 | //! | ||
| 28 | //! ## Upper Bounds (triggers for redesign) | ||
| 29 | //! | ||
| 30 | //! - 10,000+ repos: Consider database-backed state | ||
| 31 | //! - 500+ sync relays: Consider connection pooling | ||
| 32 | //! - 500+ root events per repo: Consider per-repo pagination | ||
| 33 | //! | ||
| 34 | //! ## Design References | ||
| 35 | //! | ||
| 36 | //! See [`docs/explanation/grasp-02-proactive-sync-v2.md`](../../docs/explanation/grasp-02-proactive-sync-v2.md) | ||
| 37 | //! for the complete design context. | ||
| 38 | |||
| 39 | use std::collections::{HashMap, HashSet}; | ||
| 40 | use std::sync::Arc; | ||
| 41 | |||
| 42 | use nostr_sdk::EventId; | ||
| 43 | use tokio::sync::RwLock; | ||
| 44 | |||
| 45 | use crate::config::Config; | ||
| 46 | use crate::nostr::builder::Nip34WritePolicy; | ||
| 47 | use crate::nostr::SharedDatabase; | ||
| 48 | |||
| 49 | // ============================================================================= | ||
| 50 | // Type Aliases for Sync State | ||
| 51 | // ============================================================================= | ||
| 52 | |||
| 53 | /// Repository root events we're following. | ||
| 54 | /// | ||
| 55 | /// This structure tracks which repository root events (kinds 1617, 1618, 1619, 1621) | ||
| 56 | /// we need to follow for each repository we host. | ||
| 57 | /// | ||
| 58 | /// ## Key Format | ||
| 59 | /// | ||
| 60 | /// The key is a repository addressable reference in the format: | ||
| 61 | /// `"30617:<pubkey>:<identifier>"` | ||
| 62 | /// | ||
| 63 | /// For example: `"30617:abc123...def:my-project"` | ||
| 64 | /// | ||
| 65 | /// ## Value | ||
| 66 | /// | ||
| 67 | /// A set of event IDs representing root events (PRs, Issues, Patches, Status events) | ||
| 68 | /// that reference this repository via an `a` tag. | ||
| 69 | /// | ||
| 70 | /// ## Event Kinds Tracked | ||
| 71 | /// | ||
| 72 | /// - **1617**: Patches (NIP-34) | ||
| 73 | /// - **1618**: Issues (NIP-34) | ||
| 74 | /// - **1619**: PRs (Pull Requests, NIP-34) | ||
| 75 | /// - **1621**: Status events (NIP-34) | ||
| 76 | /// | ||
| 77 | /// ## Invariants | ||
| 78 | /// | ||
| 79 | /// - May include a few extra repo refs that aren't in `SyncRelays` | ||
| 80 | /// - This is acceptable - we won't query other relays for them | ||
| 81 | /// - Updated incrementally via self-subscription | ||
| 82 | /// | ||
| 83 | /// ## Thread Safety | ||
| 84 | /// | ||
| 85 | /// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple | ||
| 86 | /// async tasks performing sync operations. | ||
| 87 | /// | ||
| 88 | /// ## Example Usage | ||
| 89 | /// | ||
| 90 | /// ```rust,ignore | ||
| 91 | /// use ngit_grasp::sync::FollowingRepoRootEvents; | ||
| 92 | /// use std::collections::HashSet; | ||
| 93 | /// use nostr_sdk::EventId; | ||
| 94 | /// | ||
| 95 | /// async fn check_repo(state: &FollowingRepoRootEvents, repo_ref: &str) { | ||
| 96 | /// let guard = state.read().await; | ||
| 97 | /// if let Some(events) = guard.get(repo_ref) { | ||
| 98 | /// println!("Tracking {} root events for {}", events.len(), repo_ref); | ||
| 99 | /// } | ||
| 100 | /// } | ||
| 101 | /// ``` | ||
| 102 | pub type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>; | ||
| 103 | |||
| 104 | /// Relays we sync from, including their repos and events. | ||
| 105 | /// | ||
| 106 | /// This structure tracks which relays we need to connect to for syncing, | ||
| 107 | /// and for each relay, which repositories and their root events we're interested in. | ||
| 108 | /// | ||
| 109 | /// ## Key Format (Outer HashMap) | ||
| 110 | /// | ||
| 111 | /// The outer key is a relay WebSocket URL, e.g., `"wss://relay.example.com"` | ||
| 112 | /// | ||
| 113 | /// ## Value Format (Inner HashMap) | ||
| 114 | /// | ||
| 115 | /// For each relay, we maintain a map of: | ||
| 116 | /// - Key: Repository addressable reference (`"30617:<pubkey>:<identifier>"`) | ||
| 117 | /// - Value: Set of event IDs for that repo which should be synced from this relay | ||
| 118 | /// | ||
| 119 | /// ## Relay Selection Criteria | ||
| 120 | /// | ||
| 121 | /// A relay is included if its URL appears in a repository announcement (kind 30617) | ||
| 122 | /// that **also** lists our service URL. This ensures we only sync from relays | ||
| 123 | /// for repositories that are actually hosted on our relay. | ||
| 124 | /// | ||
| 125 | /// ## Bootstrap Relay | ||
| 126 | /// | ||
| 127 | /// If configured, the bootstrap relay is always present in this map and is | ||
| 128 | /// excluded from automatic removal logic. The bootstrap relay is used for | ||
| 129 | /// initial sync and discovery even when no repositories explicitly list it. | ||
| 130 | /// | ||
| 131 | /// ## Thread Safety | ||
| 132 | /// | ||
| 133 | /// Wrapped in `Arc<RwLock<...>>` for safe concurrent access from multiple | ||
| 134 | /// async tasks performing sync operations. | ||
| 135 | /// | ||
| 136 | /// ## Example Usage | ||
| 137 | /// | ||
| 138 | /// ```rust,ignore | ||
| 139 | /// use ngit_grasp::sync::SyncRelays; | ||
| 140 | /// use std::collections::{HashMap, HashSet}; | ||
| 141 | /// | ||
| 142 | /// async fn get_relay_repos(state: &SyncRelays, relay_url: &str) { | ||
| 143 | /// let guard = state.read().await; | ||
| 144 | /// if let Some(repos) = guard.get(relay_url) { | ||
| 145 | /// println!("Relay {} tracks {} repos", relay_url, repos.len()); | ||
| 146 | /// for (repo_ref, events) in repos { | ||
| 147 | /// println!(" {} -> {} events", repo_ref, events.len()); | ||
| 148 | /// } | ||
| 149 | /// } | ||
| 150 | /// } | ||
| 151 | /// ``` | ||
| 152 | pub type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>; | ||
| 153 | |||
| 154 | /// Creates a new empty `FollowingRepoRootEvents` state. | ||
| 155 | /// | ||
| 156 | /// Use this to initialize the state before populating from database queries. | ||
| 157 | pub fn new_following_repo_root_events() -> FollowingRepoRootEvents { | ||
| 158 | Arc::new(RwLock::new(HashMap::new())) | ||
| 159 | } | ||
| 160 | |||
| 161 | /// Creates a new empty `SyncRelays` state. | ||
| 162 | /// | ||
| 163 | /// Use this to initialize the state before populating from database queries. | ||
| 164 | pub fn new_sync_relays() -> SyncRelays { | ||
| 165 | Arc::new(RwLock::new(HashMap::new())) | ||
| 166 | } | ||
| 167 | |||
| 168 | // ============================================================================= | ||
| 169 | // SyncManager | ||
| 170 | // ============================================================================= | ||
| 171 | |||
| 172 | /// Manages proactive synchronization with external relays. | ||
| 173 | /// | ||
| 174 | /// The SyncManager is responsible for: | ||
| 175 | /// - Discovering relays from stored repository announcements | ||
| 176 | /// - Maintaining connections to sync relays | ||
| 177 | /// - Subscribing to events at external relays | ||
| 178 | /// - Applying the acceptance policy to synced events | ||
| 179 | /// | ||
| 180 | /// ## Lifecycle | ||
| 181 | /// | ||
| 182 | /// 1. `new()` - Creates manager with database and config | ||
| 183 | /// 2. `run()` - Main async loop (call in a spawned task) | ||
| 184 | /// | ||
| 185 | /// ## Current Status | ||
| 186 | /// | ||
| 187 | /// This is a stub implementation. The core data structures are: | ||
| 188 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following | ||
| 189 | /// - [`SyncRelays`]: Relays we sync from with their repos and events | ||
| 190 | /// | ||
| 191 | /// Full implementation will come in later phases. | ||
| 192 | pub struct SyncManager { | ||
| 193 | /// Bootstrap relay URL if configured | ||
| 194 | #[allow(dead_code)] | ||
| 195 | bootstrap_relay_url: Option<String>, | ||
| 196 | |||
| 197 | /// Our service domain for filtering repo announcements | ||
| 198 | #[allow(dead_code)] | ||
| 199 | service_domain: String, | ||
| 200 | |||
| 201 | /// Database for querying/storing events | ||
| 202 | #[allow(dead_code)] | ||
| 203 | database: SharedDatabase, | ||
| 204 | |||
| 205 | /// Write policy for applying acceptance rules | ||
| 206 | #[allow(dead_code)] | ||
| 207 | write_policy: Nip34WritePolicy, | ||
| 208 | |||
| 209 | /// Repository root events we're following (Phase 1 data structure) | ||
| 210 | #[allow(dead_code)] | ||
| 211 | following_repo_root_events: FollowingRepoRootEvents, | ||
| 212 | |||
| 213 | /// Relays we sync from (Phase 1 data structure) | ||
| 214 | #[allow(dead_code)] | ||
| 215 | sync_relays: SyncRelays, | ||
| 216 | |||
| 217 | /// Max backoff duration for relay reconnection | ||
| 218 | #[allow(dead_code)] | ||
| 219 | max_backoff_secs: u64, | ||
| 220 | } | ||
| 221 | |||
| 222 | impl SyncManager { | ||
| 223 | /// Creates a new SyncManager. | ||
| 224 | /// | ||
| 225 | /// # Arguments | ||
| 226 | /// | ||
| 227 | /// * `bootstrap_relay_url` - Optional bootstrap relay for initial sync | ||
| 228 | /// * `service_domain` - Our domain for filtering announcements | ||
| 229 | /// * `database` - Database for event storage/queries | ||
| 230 | /// * `write_policy` - Policy for accepting events | ||
| 231 | /// * `config` - Configuration for sync parameters | ||
| 232 | pub fn new( | ||
| 233 | bootstrap_relay_url: Option<String>, | ||
| 234 | service_domain: String, | ||
| 235 | database: SharedDatabase, | ||
| 236 | write_policy: Nip34WritePolicy, | ||
| 237 | config: &Config, | ||
| 238 | ) -> Self { | ||
| 239 | Self { | ||
| 240 | bootstrap_relay_url, | ||
| 241 | service_domain, | ||
| 242 | database, | ||
| 243 | write_policy, | ||
| 244 | following_repo_root_events: new_following_repo_root_events(), | ||
| 245 | sync_relays: new_sync_relays(), | ||
| 246 | max_backoff_secs: config.sync_max_backoff_secs, | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | /// Returns a reference to the following repo root events state. | ||
| 251 | /// | ||
| 252 | /// This is the Phase 1 data structure tracking which repository root events | ||
| 253 | /// (kinds 1617, 1618, 1619, 1621) we're following. | ||
| 254 | pub fn following_repo_root_events(&self) -> &FollowingRepoRootEvents { | ||
| 255 | &self.following_repo_root_events | ||
| 256 | } | ||
| 257 | |||
| 258 | /// Returns a reference to the sync relays state. | ||
| 259 | /// | ||
| 260 | /// This is the Phase 1 data structure tracking which relays we sync from | ||
| 261 | /// and their associated repositories/events. | ||
| 262 | pub fn sync_relays(&self) -> &SyncRelays { | ||
| 263 | &self.sync_relays | ||
| 264 | } | ||
| 265 | |||
| 266 | /// Runs the sync manager main loop. | ||
| 267 | /// | ||
| 268 | /// This method should be called in a spawned task: | ||
| 269 | /// | ||
| 270 | /// ```rust,ignore | ||
| 271 | /// tokio::spawn(async move { | ||
| 272 | /// sync_manager.run().await; | ||
| 273 | /// }); | ||
| 274 | /// ``` | ||
| 275 | /// | ||
| 276 | /// ## Current Status | ||
| 277 | /// | ||
| 278 | /// This is a stub that logs and then waits indefinitely. | ||
| 279 | /// Full implementation includes: | ||
| 280 | /// - Phase 2: Database initialization queries | ||
| 281 | /// - Phase 3: Self-subscription for incremental updates | ||
| 282 | /// - Phase 4-6: Filter building, connection management | ||
| 283 | /// - Phase 7: Full sync loop | ||
| 284 | pub async fn run(self) { | ||
| 285 | tracing::info!( | ||
| 286 | "SyncManager stub started (bootstrap_relay={:?}, domain={})", | ||
| 287 | self.bootstrap_relay_url, | ||
| 288 | self.service_domain | ||
| 289 | ); | ||
| 290 | |||
| 291 | tracing::info!( | ||
| 292 | "Phase 1 data structures initialized: following_repo_root_events, sync_relays" | ||
| 293 | ); | ||
| 294 | |||
| 295 | // Stub: just wait indefinitely until full implementation | ||
| 296 | // This prevents the spawned task from immediately completing | ||
| 297 | loop { | ||
| 298 | tokio::time::sleep(std::time::Duration::from_secs(3600)).await; | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
| 302 | |||
| 303 | // ============================================================================= | ||
| 304 | // Submodules | ||
| 305 | // ============================================================================= | ||
| 19 | 306 | ||
| 20 | mod connection; | ||
| 21 | mod filter; | ||
| 22 | pub mod health; | 307 | pub mod health; |
| 23 | mod manager; | ||
| 24 | pub mod metrics; | 308 | pub mod metrics; |
| 25 | pub mod negentropy; | 309 | |
| 26 | mod subscription; | 310 | // Re-export commonly used types |
| 27 | 311 | pub use health::{create_health_tracker, HealthState, RelayHealth, RelayHealthTracker}; | |
| 28 | pub use filter::FilterService; | 312 | pub use metrics::{event_source, SyncMetrics}; |
| 29 | pub use health::{HealthState, RelayHealth, RelayHealthTracker}; | 313 | |
| 30 | pub use manager::SyncManager; | 314 | // ============================================================================= |
| 31 | pub use metrics::SyncMetrics; | 315 | // Tests |
| 32 | pub use negentropy::NegentropyService; | 316 | // ============================================================================= |
| 33 | pub use subscription::SubscriptionManager; | 317 | |
| 34 | 318 | #[cfg(test)] | |
| 35 | // Re-export default sync source address for backward compatibility with modules like negentropy.rs | 319 | mod tests { |
| 36 | // Manager.rs derives sync_source_addr from config.bind_address at runtime | 320 | use super::*; |
| 37 | pub use manager::DEFAULT_SYNC_SOURCE_ADDR as SYNC_SOURCE_ADDR; | 321 | |
| 38 | 322 | #[tokio::test] | |
| 39 | /// Kind for repository state events (NIP-34) | 323 | async fn test_following_repo_root_events_basic_operations() { |
| 40 | pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file | 324 | let state = new_following_repo_root_events(); |
| 325 | |||
| 326 | // Insert some events | ||
| 327 | { | ||
| 328 | let mut guard = state.write().await; | ||
| 329 | let repo_ref = "30617:abc123:my-project".to_string(); | ||
| 330 | guard | ||
| 331 | .entry(repo_ref) | ||
| 332 | .or_default() | ||
| 333 | .insert(EventId::all_zeros()); | ||
| 334 | } | ||
| 335 | |||
| 336 | // Read back | ||
| 337 | { | ||
| 338 | let guard = state.read().await; | ||
| 339 | assert_eq!(guard.len(), 1); | ||
| 340 | assert!(guard.contains_key("30617:abc123:my-project")); | ||
| 341 | } | ||
| 342 | } | ||
| 343 | |||
| 344 | #[tokio::test] | ||
| 345 | async fn test_sync_relays_basic_operations() { | ||
| 346 | let state = new_sync_relays(); | ||
| 347 | |||
| 348 | // Insert relay with repos | ||
| 349 | { | ||
| 350 | let mut guard = state.write().await; | ||
| 351 | let relay_url = "wss://relay.example.com".to_string(); | ||
| 352 | let repo_ref = "30617:abc123:my-project".to_string(); | ||
| 353 | |||
| 354 | guard | ||
| 355 | .entry(relay_url) | ||
| 356 | .or_default() | ||
| 357 | .entry(repo_ref) | ||
| 358 | .or_default() | ||
| 359 | .insert(EventId::all_zeros()); | ||
| 360 | } | ||
| 361 | |||
| 362 | // Read back | ||
| 363 | { | ||
| 364 | let guard = state.read().await; | ||
| 365 | assert_eq!(guard.len(), 1); | ||
| 366 | let relay_repos = guard.get("wss://relay.example.com").unwrap(); | ||
| 367 | assert_eq!(relay_repos.len(), 1); | ||
| 368 | let events = relay_repos.get("30617:abc123:my-project").unwrap(); | ||
| 369 | assert_eq!(events.len(), 1); | ||
| 370 | } | ||
| 371 | } | ||
| 372 | |||
| 373 | #[tokio::test] | ||
| 374 | async fn test_concurrent_access() { | ||
| 375 | let state = new_following_repo_root_events(); | ||
| 376 | let state_clone = Arc::clone(&state); | ||
| 377 | |||
| 378 | // Writer task | ||
| 379 | let writer = tokio::spawn(async move { | ||
| 380 | let mut guard = state_clone.write().await; | ||
| 381 | guard | ||
| 382 | .entry("30617:writer:repo".to_string()) | ||
| 383 | .or_default() | ||
| 384 | .insert(EventId::all_zeros()); | ||
| 385 | }); | ||
| 386 | |||
| 387 | // Wait for writer | ||
| 388 | writer.await.unwrap(); | ||
| 389 | |||
| 390 | // Reader should see the change | ||
| 391 | let guard = state.read().await; | ||
| 392 | assert!(guard.contains_key("30617:writer:repo")); | ||
| 393 | } | ||
| 394 | } \ No newline at end of file | ||