diff options
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v2.md')
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync-v2.md | 785 |
1 files changed, 0 insertions, 785 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md deleted file mode 100644 index 311e93c..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v2.md +++ /dev/null | |||
| @@ -1,785 +0,0 @@ | |||
| 1 | # GRASP-02: Proactive Sync v2 - Simplified Design | ||
| 2 | |||
| 3 | ## Overview | ||
| 4 | |||
| 5 | This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription. | ||
| 6 | |||
| 7 | ## Goals (Same as v1) | ||
| 8 | |||
| 9 | 1. **Data Availability**: Ensure we have all relevant events for repositories we host | ||
| 10 | 2. **Resilience**: Handle relay failures gracefully with backoff and health tracking | ||
| 11 | 3. **Efficiency**: Minimize connections and bandwidth through filter consolidation | ||
| 12 | 4. **Consistency**: Use unified filters for both live sync and catchup | ||
| 13 | |||
| 14 | ## Scale Targets & Upper Bounds | ||
| 15 | |||
| 16 | This design targets the following scale: | ||
| 17 | |||
| 18 | | Metric | Target | Notes | | ||
| 19 | | ----------------------------- | -------- | ----------------------------------------- | | ||
| 20 | | **Repositories** | 1,000 | Repos we host/track | | ||
| 21 | | **Root events per repo** | 50 (avg) | PRs, Issues, Patches per repo | | ||
| 22 | | **Total relays in ecosystem** | 100 | Unique relays across all repos | | ||
| 23 | | **Relays per repo** | 5 (avg) | Relays listed in each repo's announcement | | ||
| 24 | | **Total root events** | ~50,000 | 1,000 repos × 50 events | | ||
| 25 | | **Sync connections** | ~50-100 | Based on relay overlap | | ||
| 26 | |||
| 27 | **Memory Estimate (in-memory HashMaps):** | ||
| 28 | |||
| 29 | - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB | ||
| 30 | - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB | ||
| 31 | - **Total in-memory state**: ~10 MB (well within acceptable limits) | ||
| 32 | |||
| 33 | **Upper Bounds (redesign triggers):** | ||
| 34 | |||
| 35 | - **10,000+ repos**: Consider database-backed state instead of in-memory HashMaps | ||
| 36 | - **500+ sync relays**: Consider connection pooling or relay prioritization | ||
| 37 | - **500+ root events per repo**: Consider per-repo pagination in Layer 3 filters | ||
| 38 | - **Sustained >100 events/second**: Consider write batching to database | ||
| 39 | |||
| 40 | Beyond these limits, the in-memory HashMap model may need to evolve to a database-backed approach with lazy loading. | ||
| 41 | |||
| 42 | ## Core Data Structures | ||
| 43 | |||
| 44 | The entire sync filter state is captured in two HashMaps, initialized from database queries at startup: | ||
| 45 | |||
| 46 | ```rust | ||
| 47 | /// Repository root events we're following | ||
| 48 | /// Key: repo addressable reference (e.g., "30617:pubkey:identifier") | ||
| 49 | /// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo | ||
| 50 | /// | ||
| 51 | /// Note: May include a few extra repo refs that aren't in sync_relays. | ||
| 52 | /// This is acceptable - we won't query other relays for them. | ||
| 53 | type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>; | ||
| 54 | |||
| 55 | /// Relays we sync from, including their repos and events | ||
| 56 | /// Key: relay URL | ||
| 57 | /// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service | ||
| 58 | /// | ||
| 59 | /// Note: Bootstrap relay (if configured) is always present and excluded from removal logic. | ||
| 60 | type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>; | ||
| 61 | ``` | ||
| 62 | |||
| 63 | ## Architecture Overview | ||
| 64 | |||
| 65 | ```mermaid | ||
| 66 | flowchart TB | ||
| 67 | subgraph Startup | ||
| 68 | DB[(Database)] | ||
| 69 | Q1[Query kinds 1617/1618/1619/1621] | ||
| 70 | Q2[Query kind 30617] | ||
| 71 | DB --> Q1 --> FRRE[following_repo_root_events] | ||
| 72 | DB --> Q2 --> SR[sync_relays] | ||
| 73 | BR[Bootstrap Relay] --> SR | ||
| 74 | end | ||
| 75 | |||
| 76 | subgraph SyncManager | ||
| 77 | SS[Self-Subscriber] | ||
| 78 | FRRE --> SS | ||
| 79 | SR --> SS | ||
| 80 | end | ||
| 81 | |||
| 82 | subgraph SyncRelays | ||
| 83 | R1[Relay Connection 1] | ||
| 84 | R2[Relay Connection 2] | ||
| 85 | RN[Relay Connection N] | ||
| 86 | end | ||
| 87 | |||
| 88 | SS -->|spawn/update| R1 | ||
| 89 | SS -->|spawn/update| R2 | ||
| 90 | SS -->|spawn/update| RN | ||
| 91 | |||
| 92 | R1 -->|events| AP[Acceptance Policy] | ||
| 93 | R2 -->|events| AP | ||
| 94 | RN -->|events| AP | ||
| 95 | AP -->|store| DB | ||
| 96 | ``` | ||
| 97 | |||
| 98 | ## Module Structure | ||
| 99 | |||
| 100 | The sync module is organized following the pattern used by `src/http/mod.rs` and `src/metrics/mod.rs` where the primary struct lives in `mod.rs`: | ||
| 101 | |||
| 102 | ``` | ||
| 103 | src/sync/ | ||
| 104 | ├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays) | ||
| 105 | ├── self_subscriber.rs # SelfSubscriber struct and batching logic | ||
| 106 | ├── relay_connection.rs # Per-relay WebSocket connection management | ||
| 107 | ├── health.rs # RelayHealthTracker for backoff and dead relay detection | ||
| 108 | └── metrics.rs # SyncMetrics for Prometheus integration | ||
| 109 | ``` | ||
| 110 | |||
| 111 | **Rationale:** The state type aliases (`FollowingRepoRootEvents`, `SyncRelays`) are simple `Arc<RwLock<HashMap<...>>>` wrappers owned by SyncManager. Rather than creating a separate `state.rs` for two type aliases, they are colocated with SyncManager in `mod.rs` to reduce file count while maintaining clarity. | ||
| 112 | |||
| 113 | ## Design Decision: No Jitter | ||
| 114 | |||
| 115 | We considered adding jitter to prevent thundering herd scenarios when: | ||
| 116 | |||
| 117 | - Multiple relay connections initialize simultaneously | ||
| 118 | - Batched updates affect multiple relays | ||
| 119 | - Filter consolidation triggers across connections | ||
| 120 | |||
| 121 | **Decision: No jitter implemented.** | ||
| 122 | |||
| 123 | **Rationale:** | ||
| 124 | |||
| 125 | - Our GRASP server should handle the load of simultaneous operations | ||
| 126 | - Jitter would lead to more orphan filters (filters added one at a time rather than atomically) | ||
| 127 | - Jitter creates inefficiency - partial subscriptions miss events during the stagger window | ||
| 128 | - The batching window (5s) already provides natural smoothing without the downsides | ||
| 129 | |||
| 130 | ## Health Tracking & Backoff | ||
| 131 | |||
| 132 | ```rust | ||
| 133 | /// Health state machine for relay connections | ||
| 134 | enum HealthState { | ||
| 135 | Healthy, // Connected and working | ||
| 136 | Backoff(u32), // Failed, attempt count for exponential backoff | ||
| 137 | Dead, // 24h+ continuous failures | ||
| 138 | } | ||
| 139 | |||
| 140 | impl RelayHealthTracker { | ||
| 141 | /// Backoff durations: | ||
| 142 | /// - Attempt 1: 5s | ||
| 143 | /// - Attempt 2: 10s | ||
| 144 | /// - Attempt 3: 20s | ||
| 145 | /// - Attempt 4: 40s | ||
| 146 | /// - ... exponential up to 1h max | ||
| 147 | /// - After reaching 1h, continue hourly until 24h total failure time | ||
| 148 | /// - After 24h: marked Dead, retry once per 24h | ||
| 149 | fn get_backoff(&self, relay_url: &str) -> Duration; | ||
| 150 | } | ||
| 151 | ``` | ||
| 152 | |||
| 153 | | State | Retry Behavior | | ||
| 154 | | ----------- | -------------------------------------------- | | ||
| 155 | | **Healthy** | Immediate reconnect on disconnect | | ||
| 156 | | **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) | | ||
| 157 | | **Hourly** | Once per hour after hitting 1h cap | | ||
| 158 | | **Dead** | After 24h total failures, retry once per 24h | | ||
| 159 | |||
| 160 | ## Startup Initialization | ||
| 161 | |||
| 162 | At startup, two database queries initialize the sync state: | ||
| 163 | |||
| 164 | ```rust | ||
| 165 | impl SyncManager { | ||
| 166 | async fn initialize_from_database(&mut self) -> Result<()> { | ||
| 167 | // Initialize bootstrap relay if configured (never removed) | ||
| 168 | if let Some(bootstrap_url) = &self.config.bootstrap_relay_url { | ||
| 169 | self.sync_relays.write().await.insert( | ||
| 170 | bootstrap_url.clone(), | ||
| 171 | HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only) | ||
| 172 | ); | ||
| 173 | } | ||
| 174 | |||
| 175 | // Query 1: Build following_repo_root_events | ||
| 176 | // Find all 1617/1618/1619/1621 events and extract their repo references | ||
| 177 | let root_events = self.database | ||
| 178 | .query(Filter::new().kinds([ | ||
| 179 | Kind::GitPatch, // 1617 | ||
| 180 | Kind::Custom(1618), // PRs | ||
| 181 | Kind::Custom(1619), // PR updates | ||
| 182 | Kind::Custom(1621), // Issues | ||
| 183 | ])) | ||
| 184 | .await?; | ||
| 185 | |||
| 186 | for event in root_events { | ||
| 187 | // An event may have multiple 'a' tags pointing to different repos | ||
| 188 | let repo_refs = self.extract_all_repo_refs(&event); | ||
| 189 | for repo_ref in repo_refs { | ||
| 190 | self.following_repo_root_events | ||
| 191 | .write().await | ||
| 192 | .entry(repo_ref) | ||
| 193 | .or_default() | ||
| 194 | .insert(event.id); | ||
| 195 | } | ||
| 196 | } | ||
| 197 | |||
| 198 | // Query 2: Build sync_relays from kind 30617 announcements | ||
| 199 | let announcements = self.database | ||
| 200 | .query(Filter::new().kind(Kind::Custom(30617))) | ||
| 201 | .await?; | ||
| 202 | |||
| 203 | for event in announcements { | ||
| 204 | let repo_ref = self.build_repo_ref(&event); | ||
| 205 | let relay_urls = self.extract_relay_urls(&event); | ||
| 206 | |||
| 207 | // Only track repos that list BOTH a remote relay AND our service | ||
| 208 | if self.lists_our_service(&event) { | ||
| 209 | for relay_url in relay_urls { | ||
| 210 | if !self.is_own_relay(&relay_url) { | ||
| 211 | // Get events for this repo from following_repo_root_events | ||
| 212 | let events = self.following_repo_root_events | ||
| 213 | .read().await | ||
| 214 | .get(&repo_ref) | ||
| 215 | .cloned() | ||
| 216 | .unwrap_or_default(); | ||
| 217 | |||
| 218 | self.sync_relays | ||
| 219 | .write().await | ||
| 220 | .entry(relay_url) | ||
| 221 | .or_default() | ||
| 222 | .insert(repo_ref.clone(), events); | ||
| 223 | } | ||
| 224 | } | ||
| 225 | } | ||
| 226 | } | ||
| 227 | |||
| 228 | Ok(()) | ||
| 229 | } | ||
| 230 | |||
| 231 | /// Extract ALL repo refs from an event (it may tag multiple repos) | ||
| 232 | fn extract_all_repo_refs(&self, event: &Event) -> Vec<String> { | ||
| 233 | event.tags.iter() | ||
| 234 | .filter_map(|tag| { | ||
| 235 | let tag_vec = tag.clone().to_vec(); | ||
| 236 | if tag_vec.len() >= 2 && tag_vec[0] == "a" { | ||
| 237 | // Validate it's a 30617 reference | ||
| 238 | if tag_vec[1].starts_with("30617:") { | ||
| 239 | Some(tag_vec[1].clone()) | ||
| 240 | } else { | ||
| 241 | None | ||
| 242 | } | ||
| 243 | } else { | ||
| 244 | None | ||
| 245 | } | ||
| 246 | }) | ||
| 247 | .collect() | ||
| 248 | } | ||
| 249 | } | ||
| 250 | ``` | ||
| 251 | |||
| 252 | ## Self-Subscriber: Event-Driven Updates | ||
| 253 | |||
| 254 | A single self-subscriber watches for new events from **our own relay** and updates the HashMaps. | ||
| 255 | |||
| 256 | **Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections). | ||
| 257 | |||
| 258 | ### Batching Strategy | ||
| 259 | |||
| 260 | The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires. | ||
| 261 | |||
| 262 | **Important:** Once the batch timer starts, it does NOT reset when additional events arrive. The batch will fire exactly 5 seconds after the first event, regardless of how many subsequent events are queued. This ensures predictable latency and prevents indefinite batching during high-activity periods. | ||
| 263 | |||
| 264 | ```rust | ||
| 265 | impl SelfSubscriber { | ||
| 266 | async fn run(&self) { | ||
| 267 | // Subscribe to our own relay for relevant kinds | ||
| 268 | // Note: 30618 NOT included - synced from remote relays only | ||
| 269 | let filter = Filter::new() | ||
| 270 | .kinds([ | ||
| 271 | Kind::Custom(30617), // Repository announcements | ||
| 272 | Kind::GitPatch, // 1617 Patches | ||
| 273 | Kind::Custom(1618), // PRs | ||
| 274 | Kind::Custom(1619), // PR updates | ||
| 275 | Kind::Custom(1621), // Issues | ||
| 276 | ]); | ||
| 277 | |||
| 278 | let mut pending_updates: Vec<PendingUpdate> = Vec::new(); | ||
| 279 | let mut batch_deadline: Option<Instant> = None; | ||
| 280 | |||
| 281 | loop { | ||
| 282 | let timeout = batch_deadline | ||
| 283 | .map(|d| d.saturating_duration_since(Instant::now())) | ||
| 284 | .unwrap_or(Duration::MAX); | ||
| 285 | |||
| 286 | tokio::select! { | ||
| 287 | Some(event) = self.event_receiver.recv() => { | ||
| 288 | pending_updates.push(self.classify_update(&event)); | ||
| 289 | |||
| 290 | // Start batch timer on first event | ||
| 291 | if batch_deadline.is_none() { | ||
| 292 | batch_deadline = Some(Instant::now() + Duration::from_secs(5)); | ||
| 293 | } | ||
| 294 | } | ||
| 295 | _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => { | ||
| 296 | // Batch window elapsed - apply all pending updates | ||
| 297 | self.apply_batched_updates(pending_updates.drain(..).collect()).await; | ||
| 298 | batch_deadline = None; | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
| 302 | } | ||
| 303 | |||
| 304 | fn classify_update(&self, event: &Event) -> PendingUpdate { | ||
| 305 | match event.kind.as_u16() { | ||
| 306 | 30617 => PendingUpdate::NewAnnouncement(event.clone()), | ||
| 307 | 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()), | ||
| 308 | _ => PendingUpdate::None, | ||
| 309 | } | ||
| 310 | } | ||
| 311 | } | ||
| 312 | ``` | ||
| 313 | |||
| 314 | ### Applying Batched Updates | ||
| 315 | |||
| 316 | When the batch window closes, we process all pending updates together: | ||
| 317 | |||
| 318 | ```rust | ||
| 319 | /// Batched updates grouped by relay | ||
| 320 | struct RelayUpdateBatch { | ||
| 321 | /// New repo refs to subscribe to (Layer 2) | ||
| 322 | new_repo_refs: HashSet<String>, | ||
| 323 | /// New event IDs to subscribe to (Layer 3) | ||
| 324 | new_event_ids: HashSet<EventId>, | ||
| 325 | /// Whether this is a newly discovered relay | ||
| 326 | is_new_relay: bool, | ||
| 327 | } | ||
| 328 | |||
| 329 | impl SelfSubscriber { | ||
| 330 | async fn apply_batched_updates(&mut self, updates: Vec<PendingUpdate>) { | ||
| 331 | // Step 1: Process all updates and update HashMaps | ||
| 332 | // Build batched actions per relay | ||
| 333 | let mut relay_batches: HashMap<String, RelayUpdateBatch> = HashMap::new(); | ||
| 334 | |||
| 335 | for update in updates { | ||
| 336 | match update { | ||
| 337 | PendingUpdate::NewAnnouncement(event) => { | ||
| 338 | self.process_announcement(&event, &mut relay_batches).await; | ||
| 339 | } | ||
| 340 | PendingUpdate::NewRootEvent(event) => { | ||
| 341 | self.process_root_event(&event, &mut relay_batches).await; | ||
| 342 | } | ||
| 343 | PendingUpdate::None => {} | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | // Step 2: Apply batched updates to each relay | ||
| 348 | for (relay_url, batch) in relay_batches { | ||
| 349 | self.apply_batch_to_relay(&relay_url, batch).await; | ||
| 350 | } | ||
| 351 | |||
| 352 | // Step 3: Check for relay removal (repos removed from announcements) | ||
| 353 | self.check_relay_removal().await; | ||
| 354 | } | ||
| 355 | |||
| 356 | async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) { | ||
| 357 | if batch.is_new_relay { | ||
| 358 | // Spawn new relay connection with full filters | ||
| 359 | self.spawn_sync_relay(relay_url.to_string()).await; | ||
| 360 | return; | ||
| 361 | } | ||
| 362 | |||
| 363 | // Build incremental filters for new content (NO since - get historical) | ||
| 364 | let incremental_filters = self.build_incremental_filters(&batch); | ||
| 365 | |||
| 366 | if incremental_filters.is_empty() { | ||
| 367 | return; | ||
| 368 | } | ||
| 369 | |||
| 370 | // Check if we need to consolidate | ||
| 371 | let current_filter_count = self.get_filter_count_for_relay(relay_url).await; | ||
| 372 | let new_filter_count = current_filter_count + incremental_filters.len(); | ||
| 373 | |||
| 374 | // Note: 70 is a conservative threshold that may need tuning based on | ||
| 375 | // production observations. It was chosen to trigger consolidation earlier | ||
| 376 | // than v1's 150, but optimal value depends on relay behavior. | ||
| 377 | if new_filter_count > 70 { | ||
| 378 | // Consolidate: add incremental filters first (no since), wait for EOSE, | ||
| 379 | // then close all and resubscribe with consolidated filters (with since) | ||
| 380 | self.consolidate_relay_subscription(relay_url, incremental_filters).await; | ||
| 381 | } else { | ||
| 382 | // Just add incremental filters (no since - to get historical events) | ||
| 383 | self.send_filters_to_relay(relay_url, incremental_filters).await; | ||
| 384 | } | ||
| 385 | } | ||
| 386 | |||
| 387 | fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec<Filter> { | ||
| 388 | let mut filters = Vec::new(); | ||
| 389 | |||
| 390 | // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags) | ||
| 391 | if !batch.new_repo_refs.is_empty() { | ||
| 392 | let refs: Vec<String> = batch.new_repo_refs.iter().cloned().collect(); | ||
| 393 | for chunk in refs.chunks(100) { | ||
| 394 | // All kinds with lowercase 'a' tag | ||
| 395 | filters.push( | ||
| 396 | Filter::new() | ||
| 397 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) | ||
| 398 | ); | ||
| 399 | // All kinds with uppercase 'A' tag | ||
| 400 | filters.push( | ||
| 401 | Filter::new() | ||
| 402 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) | ||
| 403 | ); | ||
| 404 | // All kinds with 'q' tag (quote) | ||
| 405 | filters.push( | ||
| 406 | Filter::new() | ||
| 407 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 408 | ); | ||
| 409 | } | ||
| 410 | } | ||
| 411 | |||
| 412 | // Layer 3: New event IDs | ||
| 413 | if !batch.new_event_ids.is_empty() { | ||
| 414 | let ids: Vec<String> = batch.new_event_ids.iter() | ||
| 415 | .map(|id| id.to_hex()) | ||
| 416 | .collect(); | ||
| 417 | for chunk in ids.chunks(100) { | ||
| 418 | filters.push( | ||
| 419 | Filter::new() | ||
| 420 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) | ||
| 421 | ); | ||
| 422 | filters.push( | ||
| 423 | Filter::new() | ||
| 424 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) | ||
| 425 | ); | ||
| 426 | filters.push( | ||
| 427 | Filter::new() | ||
| 428 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 429 | ); | ||
| 430 | } | ||
| 431 | } | ||
| 432 | |||
| 433 | filters | ||
| 434 | } | ||
| 435 | } | ||
| 436 | ``` | ||
| 437 | |||
| 438 | ### Consolidation Strategy | ||
| 439 | |||
| 440 | When consolidating, we need a two-phase approach: | ||
| 441 | |||
| 442 | 1. First, subscribe with incremental filters (no `since`) to get any historical events we missed | ||
| 443 | 2. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`) | ||
| 444 | |||
| 445 | ```rust | ||
| 446 | async fn consolidate_relay_subscription( | ||
| 447 | &mut self, | ||
| 448 | relay_url: &str, | ||
| 449 | incremental_filters: Vec<Filter>, | ||
| 450 | ) { | ||
| 451 | // Phase 1: Add incremental filters WITHOUT since to catch up on new content | ||
| 452 | // These filters are for new repo_refs / event_ids we just discovered | ||
| 453 | let phase1_sub_id = self.send_filters_to_relay_and_wait_eose( | ||
| 454 | relay_url, | ||
| 455 | incremental_filters | ||
| 456 | ).await; | ||
| 457 | |||
| 458 | // Phase 2: After EOSE, consolidate everything | ||
| 459 | // Close ALL existing subscriptions for this relay | ||
| 460 | self.close_all_subscriptions(relay_url).await; | ||
| 461 | |||
| 462 | // Build fresh consolidated filters using current HashMap state | ||
| 463 | let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await; | ||
| 464 | |||
| 465 | // Resubscribe with since = now - 15 minutes | ||
| 466 | let since = Timestamp::now() - 900; | ||
| 467 | let filters_with_since: Vec<Filter> = consolidated_filters | ||
| 468 | .into_iter() | ||
| 469 | .map(|f| f.since(since)) | ||
| 470 | .collect(); | ||
| 471 | |||
| 472 | self.send_filters_to_relay(relay_url, filters_with_since).await; | ||
| 473 | } | ||
| 474 | ``` | ||
| 475 | |||
| 476 | ## Daily Full Catchup | ||
| 477 | |||
| 478 | To capture events that may have taken longer than 15 minutes to propagate through the nostr network, we perform a daily full catchup: | ||
| 479 | |||
| 480 | ```rust | ||
| 481 | impl SyncManager { | ||
| 482 | /// Runs approximately every 24 hours per relay connection | ||
| 483 | async fn daily_catchup(&mut self, relay_url: &str) { | ||
| 484 | // Close all current subscriptions for this relay | ||
| 485 | self.close_all_subscriptions(relay_url).await; | ||
| 486 | |||
| 487 | // Rebuild fresh filters from current HashMap state | ||
| 488 | let filters = self.build_three_layer_filters_for_relay(relay_url).await; | ||
| 489 | |||
| 490 | // Subscribe WITHOUT since filter to get full historical sync | ||
| 491 | for filter in filters { | ||
| 492 | self.subscribe_to_relay(relay_url, filter).await; | ||
| 493 | } | ||
| 494 | |||
| 495 | // After EOSE, switch back to live mode with since filter | ||
| 496 | self.wait_for_eose(relay_url).await; | ||
| 497 | |||
| 498 | // Re-add since filter for ongoing live sync | ||
| 499 | let since = Timestamp::now() - 900; // 15 minutes ago | ||
| 500 | self.resubscribe_with_since(relay_url, since).await; | ||
| 501 | } | ||
| 502 | } | ||
| 503 | ``` | ||
| 504 | |||
| 505 | **Rationale:** The 15-minute reconnection window is standard for nostr event propagation, but some events may take longer. Rather than increasing the window (which would cause more duplicate processing), we do a daily full catchup to ensure nothing is missed. This adds minimal complexity while providing comprehensive coverage. | ||
| 506 | |||
| 507 | ## Sync Relay Connections | ||
| 508 | |||
| 509 | Each sync relay connection uses the three-layer filter strategy: | ||
| 510 | |||
| 511 | ```rust | ||
| 512 | impl SyncRelayConnection { | ||
| 513 | async fn start(&mut self) { | ||
| 514 | loop { | ||
| 515 | match self.connect_and_subscribe().await { | ||
| 516 | Ok(()) => { | ||
| 517 | // Record successful connection | ||
| 518 | self.last_successful_connection = Instant::now(); | ||
| 519 | self.health_tracker.record_success(&self.url); | ||
| 520 | |||
| 521 | // Run event loop until disconnect | ||
| 522 | self.run_event_loop().await; | ||
| 523 | } | ||
| 524 | Err(e) => { | ||
| 525 | self.health_tracker.record_failure(&self.url); | ||
| 526 | } | ||
| 527 | } | ||
| 528 | |||
| 529 | // Reconnect with backoff and since filter | ||
| 530 | let backoff = self.health_tracker.get_backoff(&self.url); | ||
| 531 | tokio::time::sleep(backoff).await; | ||
| 532 | |||
| 533 | // On reconnect, use since = last_successful - 15 minutes | ||
| 534 | self.reconnect_since = Some( | ||
| 535 | Timestamp::from(self.last_successful_connection - Duration::from_secs(900)) | ||
| 536 | ); | ||
| 537 | } | ||
| 538 | } | ||
| 539 | |||
| 540 | async fn connect_and_subscribe(&mut self) -> Result<()> { | ||
| 541 | self.client.connect().await?; | ||
| 542 | |||
| 543 | let filters = self.build_three_layer_filters().await; | ||
| 544 | |||
| 545 | // Apply since filter if reconnecting | ||
| 546 | let filters = if let Some(since) = self.reconnect_since { | ||
| 547 | filters.into_iter().map(|f| f.since(since)).collect() | ||
| 548 | } else { | ||
| 549 | filters | ||
| 550 | }; | ||
| 551 | |||
| 552 | for filter in filters { | ||
| 553 | self.client.subscribe(filter, None).await?; | ||
| 554 | } | ||
| 555 | |||
| 556 | Ok(()) | ||
| 557 | } | ||
| 558 | } | ||
| 559 | ``` | ||
| 560 | |||
| 561 | ## Three-Layer Filter Strategy | ||
| 562 | |||
| 563 | ```rust | ||
| 564 | impl SyncRelayConnection { | ||
| 565 | async fn build_three_layer_filters(&self) -> Vec<Filter> { | ||
| 566 | let mut filters = Vec::new(); | ||
| 567 | |||
| 568 | // Get repos for this relay | ||
| 569 | let repos = self.sync_relays.read().await | ||
| 570 | .get(&self.url) | ||
| 571 | .cloned() | ||
| 572 | .unwrap_or_default(); | ||
| 573 | |||
| 574 | // Layer 1: Announcements (kinds 30617 + 30618) | ||
| 575 | // Note: 30618 is ONLY synced from remote relays, not self-subscribed | ||
| 576 | // Always included even if relay has no repos (bootstrap relay case) | ||
| 577 | filters.push( | ||
| 578 | Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) | ||
| 579 | ); | ||
| 580 | |||
| 581 | // Layer 2: Events tagging repos with 'a' tags (ALL kinds) | ||
| 582 | // Batched per 100 repo refs | ||
| 583 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 584 | for chunk in repo_refs.chunks(100) { | ||
| 585 | filters.push( | ||
| 586 | Filter::new() | ||
| 587 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) | ||
| 588 | ); | ||
| 589 | filters.push( | ||
| 590 | Filter::new() | ||
| 591 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) | ||
| 592 | ); | ||
| 593 | filters.push( | ||
| 594 | Filter::new() | ||
| 595 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 596 | ); | ||
| 597 | } | ||
| 598 | |||
| 599 | // Layer 3: Events tagging root events (batch per 100 event IDs) | ||
| 600 | let all_event_ids: HashSet<EventId> = repos.values() | ||
| 601 | .flat_map(|ids| ids.iter().cloned()) | ||
| 602 | .collect(); | ||
| 603 | |||
| 604 | let event_id_strs: Vec<String> = all_event_ids | ||
| 605 | .iter() | ||
| 606 | .map(|id| id.to_hex()) | ||
| 607 | .collect(); | ||
| 608 | |||
| 609 | for chunk in event_id_strs.chunks(100) { | ||
| 610 | filters.push( | ||
| 611 | Filter::new() | ||
| 612 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) | ||
| 613 | ); | ||
| 614 | filters.push( | ||
| 615 | Filter::new() | ||
| 616 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) | ||
| 617 | ); | ||
| 618 | filters.push( | ||
| 619 | Filter::new() | ||
| 620 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 621 | ); | ||
| 622 | } | ||
| 623 | |||
| 624 | filters | ||
| 625 | } | ||
| 626 | } | ||
| 627 | ``` | ||
| 628 | |||
| 629 | ## Relay Removal | ||
| 630 | |||
| 631 | ```rust | ||
| 632 | async fn check_relay_removal(&mut self) { | ||
| 633 | let relays_to_check: Vec<String> = self.sync_relays.read().await | ||
| 634 | .keys() | ||
| 635 | .cloned() | ||
| 636 | .collect(); | ||
| 637 | |||
| 638 | for relay_url in relays_to_check { | ||
| 639 | // Never remove bootstrap relay | ||
| 640 | if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() { | ||
| 641 | continue; | ||
| 642 | } | ||
| 643 | |||
| 644 | // Check if relay has any repos left | ||
| 645 | let should_remove = { | ||
| 646 | let sync_relays = self.sync_relays.read().await; | ||
| 647 | sync_relays.get(&relay_url) | ||
| 648 | .map(|repos| repos.is_empty()) | ||
| 649 | .unwrap_or(true) | ||
| 650 | }; | ||
| 651 | |||
| 652 | if should_remove { | ||
| 653 | // Remove from HashMap | ||
| 654 | self.sync_relays.write().await.remove(&relay_url); | ||
| 655 | |||
| 656 | // Close connection | ||
| 657 | self.close_relay_connection(&relay_url).await; | ||
| 658 | } | ||
| 659 | } | ||
| 660 | } | ||
| 661 | ``` | ||
| 662 | |||
| 663 | ## Prometheus Metrics (Same as v1) | ||
| 664 | |||
| 665 | | Metric | Type | Labels | Description | | ||
| 666 | | ------------------------------------- | ------- | ------------- | ----------------------- | | ||
| 667 | | `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 | | ||
| 668 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome | | ||
| 669 | | `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 | | ||
| 670 | | `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures | | ||
| 671 | | `ngit_sync_events_total` | Counter | source | Events by source type | | ||
| 672 | | `ngit_sync_gap_events_total` | Counter | relay | Gap events filled | | ||
| 673 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | | ||
| 674 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected | | ||
| 675 | | `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count | | ||
| 676 | |||
| 677 | ## Module Structure (Simplified) | ||
| 678 | |||
| 679 | ``` | ||
| 680 | src/sync/ | ||
| 681 | ├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays) | ||
| 682 | ├── self_subscriber.rs # SelfSubscriber + batching logic | ||
| 683 | ├── relay_connection.rs # Per-relay WebSocket + filters | ||
| 684 | ├── health.rs # RelayHealthTracker (reuse from v1) | ||
| 685 | └── metrics.rs # SyncMetrics (reuse from v1) | ||
| 686 | ``` | ||
| 687 | |||
| 688 | > **Note:** SyncManager and state type aliases are colocated in `mod.rs` following the pattern of `src/http/mod.rs` (HttpService) and `src/metrics/mod.rs` (Metrics). See the earlier "Module Structure" section for rationale. | ||
| 689 | |||
| 690 | ## Comparison: v1 vs v2 | ||
| 691 | |||
| 692 | | Aspect | v1 (Current) | v2 (Simplified) | | ||
| 693 | | ------------------- | ------------------------------------------------------------------ | --------------------------------------------- | | ||
| 694 | | **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB | | ||
| 695 | | **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe | | ||
| 696 | | **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) | | ||
| 697 | | **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters | | ||
| 698 | | **Batching** | Per 100 tags | Per 100 tags | | ||
| 699 | | **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min | | ||
| 700 | | **Jitter** | Startup jitter | None (see design decision) | | ||
| 701 | | **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed | | ||
| 702 | | **1621 (Issues)** | Not included | Included with 1617/1618/1619 | | ||
| 703 | | **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags | | ||
| 704 | | **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily | | ||
| 705 | |||
| 706 | ## Key Design Decisions | ||
| 707 | |||
| 708 | 1. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database | ||
| 709 | 2. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them | ||
| 710 | 3. **Batched Filter Updates**: 5-second window that starts on first event (timer does NOT reset on subsequent events) | ||
| 711 | 4. **Uniform Reconnection**: Always use `since = last_successful - 15min` | ||
| 712 | 5. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns | ||
| 713 | 6. **Bootstrap Relay Protected**: Never removed from sync_relays - ensures at least one sync connection exists even when no repositories currently list our service (cold start / recovery scenario) | ||
| 714 | 7. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed | ||
| 715 | 8. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation (conservative value that may need tuning based on production observation) | ||
| 716 | 9. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters | ||
| 717 | 10. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds | ||
| 718 | 11. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since | ||
| 719 | 12. **Multiple Repo Refs**: Handle events that tag multiple repos correctly | ||
| 720 | 13. **Daily Full Catchup**: Periodic sync restart without `since` filter (~24h) to catch slow-propagating events | ||
| 721 | 14. **Dual DB Queries at Startup**: Separate queries for root events and announcements. Could be combined into a single query, but some relays cache by kind which may make separate queries more efficient. Trade-off deferred for future optimization. | ||
| 722 | |||
| 723 | --- | ||
| 724 | |||
| 725 | ## Detailed Flow Diagram | ||
| 726 | |||
| 727 | ```mermaid | ||
| 728 | sequenceDiagram | ||
| 729 | participant DB as Database | ||
| 730 | participant SM as SyncManager | ||
| 731 | participant SS as Self-Subscriber | ||
| 732 | participant RC as RelayConnection | ||
| 733 | |||
| 734 | Note over SM: Startup | ||
| 735 | SM->>SM: Add bootstrap relay to sync_relays | ||
| 736 | SM->>DB: Query kinds 1617/1618/1619/1621 | ||
| 737 | DB-->>SM: Root events | ||
| 738 | SM->>SM: Build following_repo_root_events | ||
| 739 | SM->>SM: Handle multi-repo events | ||
| 740 | |||
| 741 | SM->>DB: Query kind 30617 | ||
| 742 | DB-->>SM: Announcements | ||
| 743 | SM->>SM: Build sync_relays | ||
| 744 | |||
| 745 | SM->>RC: Spawn connections for each relay | ||
| 746 | RC->>RC: Build 3-layer filters from sync_relays | ||
| 747 | RC->>RC: Connect and subscribe | ||
| 748 | |||
| 749 | Note over SS: Event-Driven Updates | ||
| 750 | SS->>SS: Subscribe to 30617/1617/1618/1619/1621 | ||
| 751 | SS->>SS: Receive event - start 5s batch timer | ||
| 752 | SS->>SS: Collect more events in batch window | ||
| 753 | SS->>SS: Batch window closes | ||
| 754 | SS->>SM: Apply batched updates to HashMaps | ||
| 755 | |||
| 756 | alt New Relay Discovered | ||
| 757 | SM->>RC: Spawn new connection | ||
| 758 | else New Content for Existing Relay | ||
| 759 | alt Under 70 filter limit | ||
| 760 | SM->>RC: Add incremental filter - no since | ||
| 761 | else Over 70 filter limit | ||
| 762 | SM->>RC: Add incremental filters - no since | ||
| 763 | RC-->>SM: EOSE received | ||
| 764 | SM->>RC: Close all subscriptions | ||
| 765 | SM->>RC: Resubscribe consolidated - with since | ||
| 766 | end | ||
| 767 | else Relay Has No More Repos | ||
| 768 | alt Is Bootstrap Relay | ||
| 769 | SM->>SM: Keep connection - Layer 1 only | ||
| 770 | else Not Bootstrap Relay | ||
| 771 | SM->>RC: Close connection | ||
| 772 | end | ||
| 773 | end | ||
| 774 | |||
| 775 | Note over RC: Connection Lifecycle | ||
| 776 | RC->>RC: Process incoming events | ||
| 777 | RC->>DB: Store via acceptance policy | ||
| 778 | |||
| 779 | RC->>RC: Connection drops | ||
| 780 | RC->>RC: Wait backoff - 5s to 1h exponential | ||
| 781 | RC->>RC: Reconnect with since = last_success - 15min | ||
| 782 | |||
| 783 | Note over RC: If failures continue 24h | ||
| 784 | RC->>RC: Mark dead - retry once per 24h | ||
| 785 | ``` | ||