diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-08 19:41:16 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-08 19:41:16 +0000 |
| commit | 20388e4e76864195860dfab81a5b80725184e7c9 (patch) | |
| tree | 9a429d6f6b2adf0d88b85165adb523d664456367 /docs/explanation/grasp-02-proactive-sync-v2.md | |
| parent | 9f594fadf2a1d5bfda0ab027f2b3cf7a247900ec (diff) | |
redesign sync - document only
Diffstat (limited to 'docs/explanation/grasp-02-proactive-sync-v2.md')
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync-v2.md | 704 |
1 files changed, 704 insertions, 0 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md new file mode 100644 index 0000000..faa9255 --- /dev/null +++ b/docs/explanation/grasp-02-proactive-sync-v2.md | |||
| @@ -0,0 +1,704 @@ | |||
| 1 | # GRASP-02: Proactive Sync v2 - Simplified Design | ||
| 2 | |||
| 3 | ## Overview | ||
| 4 | |||
| 5 | This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription. | ||
| 6 | |||
| 7 | ## Goals (Same as v1) | ||
| 8 | |||
| 9 | 1. **Data Availability**: Ensure we have all relevant events for repositories we host | ||
| 10 | 2. **Resilience**: Handle relay failures gracefully with backoff and health tracking | ||
| 11 | 3. **Efficiency**: Minimize connections and bandwidth through filter consolidation | ||
| 12 | 4. **Consistency**: Use unified filters for both live sync and catchup | ||
| 13 | |||
| 14 | ## Core Data Structures | ||
| 15 | |||
| 16 | The entire sync filter state is captured in two HashMaps, initialized from database queries at startup: | ||
| 17 | |||
| 18 | ```rust | ||
| 19 | /// Repository root events we're following | ||
| 20 | /// Key: repo addressable reference (e.g., "30617:pubkey:identifier") | ||
| 21 | /// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo | ||
| 22 | /// | ||
| 23 | /// Note: May include a few extra repo refs that aren't in sync_relays. | ||
| 24 | /// This is acceptable - we won't query other relays for them. | ||
| 25 | type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>; | ||
| 26 | |||
| 27 | /// Relays we sync from, including their repos and events | ||
| 28 | /// Key: relay URL | ||
| 29 | /// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service | ||
| 30 | /// | ||
| 31 | /// Note: Bootstrap relay (if configured) is always present and excluded from removal logic. | ||
| 32 | type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>; | ||
| 33 | ``` | ||
| 34 | |||
| 35 | ## Architecture Overview | ||
| 36 | |||
| 37 | ```mermaid | ||
| 38 | flowchart TB | ||
| 39 | subgraph Startup | ||
| 40 | DB[(Database)] | ||
| 41 | Q1[Query kinds 1617/1618/1619/1621] | ||
| 42 | Q2[Query kind 30617] | ||
| 43 | DB --> Q1 --> FRRE[following_repo_root_events] | ||
| 44 | DB --> Q2 --> SR[sync_relays] | ||
| 45 | BR[Bootstrap Relay] --> SR | ||
| 46 | end | ||
| 47 | |||
| 48 | subgraph SyncManager | ||
| 49 | SS[Self-Subscriber] | ||
| 50 | FRRE --> SS | ||
| 51 | SR --> SS | ||
| 52 | end | ||
| 53 | |||
| 54 | subgraph SyncRelays | ||
| 55 | R1[Relay Connection 1] | ||
| 56 | R2[Relay Connection 2] | ||
| 57 | RN[Relay Connection N] | ||
| 58 | end | ||
| 59 | |||
| 60 | SS -->|spawn/update| R1 | ||
| 61 | SS -->|spawn/update| R2 | ||
| 62 | SS -->|spawn/update| RN | ||
| 63 | |||
| 64 | R1 -->|events| AP[Acceptance Policy] | ||
| 65 | R2 -->|events| AP | ||
| 66 | RN -->|events| AP | ||
| 67 | AP -->|store| DB | ||
| 68 | ``` | ||
| 69 | |||
| 70 | ## Design Decision: No Jitter | ||
| 71 | |||
| 72 | We considered adding jitter to prevent thundering herd scenarios when: | ||
| 73 | |||
| 74 | - Multiple relay connections initialize simultaneously | ||
| 75 | - Batched updates affect multiple relays | ||
| 76 | - Filter consolidation triggers across connections | ||
| 77 | |||
| 78 | **Decision: No jitter implemented.** | ||
| 79 | |||
| 80 | **Rationale:** | ||
| 81 | |||
| 82 | - Our GRASP server should handle the load of simultaneous operations | ||
| 83 | - Jitter would lead to more orphan filters (filters added one at a time rather than atomically) | ||
| 84 | - Jitter creates inefficiency - partial subscriptions miss events during the stagger window | ||
| 85 | - The batching window (5s) already provides natural smoothing without the downsides | ||
| 86 | |||
| 87 | ## Health Tracking & Backoff | ||
| 88 | |||
| 89 | ```rust | ||
| 90 | /// Health state machine for relay connections | ||
| 91 | enum HealthState { | ||
| 92 | Healthy, // Connected and working | ||
| 93 | Backoff(u32), // Failed, attempt count for exponential backoff | ||
| 94 | Dead, // 24h+ continuous failures | ||
| 95 | } | ||
| 96 | |||
| 97 | impl RelayHealthTracker { | ||
| 98 | /// Backoff durations: | ||
| 99 | /// - Attempt 1: 5s | ||
| 100 | /// - Attempt 2: 10s | ||
| 101 | /// - Attempt 3: 20s | ||
| 102 | /// - Attempt 4: 40s | ||
| 103 | /// - ... exponential up to 1h max | ||
| 104 | /// - After reaching 1h, continue hourly until 24h total failure time | ||
| 105 | /// - After 24h: marked Dead, retry once per 24h | ||
| 106 | fn get_backoff(&self, relay_url: &str) -> Duration; | ||
| 107 | } | ||
| 108 | ``` | ||
| 109 | |||
| 110 | | State | Retry Behavior | | ||
| 111 | | ----------- | -------------------------------------------- | | ||
| 112 | | **Healthy** | Immediate reconnect on disconnect | | ||
| 113 | | **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) | | ||
| 114 | | **Hourly** | Once per hour after hitting 1h cap | | ||
| 115 | | **Dead** | After 24h total failures, retry once per 24h | | ||
| 116 | |||
| 117 | ## Startup Initialization | ||
| 118 | |||
| 119 | At startup, two database queries initialize the sync state: | ||
| 120 | |||
| 121 | ```rust | ||
| 122 | impl SyncManager { | ||
| 123 | async fn initialize_from_database(&mut self) -> Result<()> { | ||
| 124 | // Initialize bootstrap relay if configured (never removed) | ||
| 125 | if let Some(bootstrap_url) = &self.config.bootstrap_relay_url { | ||
| 126 | self.sync_relays.write().await.insert( | ||
| 127 | bootstrap_url.clone(), | ||
| 128 | HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only) | ||
| 129 | ); | ||
| 130 | } | ||
| 131 | |||
| 132 | // Query 1: Build following_repo_root_events | ||
| 133 | // Find all 1617/1618/1619/1621 events and extract their repo references | ||
| 134 | let root_events = self.database | ||
| 135 | .query(Filter::new().kinds([ | ||
| 136 | Kind::GitPatch, // 1617 | ||
| 137 | Kind::Custom(1618), // PRs | ||
| 138 | Kind::Custom(1619), // PR updates | ||
| 139 | Kind::Custom(1621), // Issues | ||
| 140 | ])) | ||
| 141 | .await?; | ||
| 142 | |||
| 143 | for event in root_events { | ||
| 144 | // An event may have multiple 'a' tags pointing to different repos | ||
| 145 | let repo_refs = self.extract_all_repo_refs(&event); | ||
| 146 | for repo_ref in repo_refs { | ||
| 147 | self.following_repo_root_events | ||
| 148 | .write().await | ||
| 149 | .entry(repo_ref) | ||
| 150 | .or_default() | ||
| 151 | .insert(event.id); | ||
| 152 | } | ||
| 153 | } | ||
| 154 | |||
| 155 | // Query 2: Build sync_relays from kind 30617 announcements | ||
| 156 | let announcements = self.database | ||
| 157 | .query(Filter::new().kind(Kind::Custom(30617))) | ||
| 158 | .await?; | ||
| 159 | |||
| 160 | for event in announcements { | ||
| 161 | let repo_ref = self.build_repo_ref(&event); | ||
| 162 | let relay_urls = self.extract_relay_urls(&event); | ||
| 163 | |||
| 164 | // Only track repos that list BOTH a remote relay AND our service | ||
| 165 | if self.lists_our_service(&event) { | ||
| 166 | for relay_url in relay_urls { | ||
| 167 | if !self.is_own_relay(&relay_url) { | ||
| 168 | // Get events for this repo from following_repo_root_events | ||
| 169 | let events = self.following_repo_root_events | ||
| 170 | .read().await | ||
| 171 | .get(&repo_ref) | ||
| 172 | .cloned() | ||
| 173 | .unwrap_or_default(); | ||
| 174 | |||
| 175 | self.sync_relays | ||
| 176 | .write().await | ||
| 177 | .entry(relay_url) | ||
| 178 | .or_default() | ||
| 179 | .insert(repo_ref.clone(), events); | ||
| 180 | } | ||
| 181 | } | ||
| 182 | } | ||
| 183 | } | ||
| 184 | |||
| 185 | Ok(()) | ||
| 186 | } | ||
| 187 | |||
| 188 | /// Extract ALL repo refs from an event (it may tag multiple repos) | ||
| 189 | fn extract_all_repo_refs(&self, event: &Event) -> Vec<String> { | ||
| 190 | event.tags.iter() | ||
| 191 | .filter_map(|tag| { | ||
| 192 | let tag_vec = tag.clone().to_vec(); | ||
| 193 | if tag_vec.len() >= 2 && tag_vec[0] == "a" { | ||
| 194 | // Validate it's a 30617 reference | ||
| 195 | if tag_vec[1].starts_with("30617:") { | ||
| 196 | Some(tag_vec[1].clone()) | ||
| 197 | } else { | ||
| 198 | None | ||
| 199 | } | ||
| 200 | } else { | ||
| 201 | None | ||
| 202 | } | ||
| 203 | }) | ||
| 204 | .collect() | ||
| 205 | } | ||
| 206 | } | ||
| 207 | ``` | ||
| 208 | |||
| 209 | ## Self-Subscriber: Event-Driven Updates | ||
| 210 | |||
| 211 | A single self-subscriber watches for new events from **our own relay** and updates the HashMaps. | ||
| 212 | |||
| 213 | **Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections). | ||
| 214 | |||
| 215 | ### Batching Strategy | ||
| 216 | |||
| 217 | The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires. | ||
| 218 | |||
| 219 | ```rust | ||
| 220 | impl SelfSubscriber { | ||
| 221 | async fn run(&self) { | ||
| 222 | // Subscribe to our own relay for relevant kinds | ||
| 223 | // Note: 30618 NOT included - synced from remote relays only | ||
| 224 | let filter = Filter::new() | ||
| 225 | .kinds([ | ||
| 226 | Kind::Custom(30617), // Repository announcements | ||
| 227 | Kind::GitPatch, // 1617 Patches | ||
| 228 | Kind::Custom(1618), // PRs | ||
| 229 | Kind::Custom(1619), // PR updates | ||
| 230 | Kind::Custom(1621), // Issues | ||
| 231 | ]); | ||
| 232 | |||
| 233 | let mut pending_updates: Vec<PendingUpdate> = Vec::new(); | ||
| 234 | let mut batch_deadline: Option<Instant> = None; | ||
| 235 | |||
| 236 | loop { | ||
| 237 | let timeout = batch_deadline | ||
| 238 | .map(|d| d.saturating_duration_since(Instant::now())) | ||
| 239 | .unwrap_or(Duration::MAX); | ||
| 240 | |||
| 241 | tokio::select! { | ||
| 242 | Some(event) = self.event_receiver.recv() => { | ||
| 243 | pending_updates.push(self.classify_update(&event)); | ||
| 244 | |||
| 245 | // Start batch timer on first event | ||
| 246 | if batch_deadline.is_none() { | ||
| 247 | batch_deadline = Some(Instant::now() + Duration::from_secs(5)); | ||
| 248 | } | ||
| 249 | } | ||
| 250 | _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => { | ||
| 251 | // Batch window elapsed - apply all pending updates | ||
| 252 | self.apply_batched_updates(pending_updates.drain(..).collect()).await; | ||
| 253 | batch_deadline = None; | ||
| 254 | } | ||
| 255 | } | ||
| 256 | } | ||
| 257 | } | ||
| 258 | |||
| 259 | fn classify_update(&self, event: &Event) -> PendingUpdate { | ||
| 260 | match event.kind.as_u16() { | ||
| 261 | 30617 => PendingUpdate::NewAnnouncement(event.clone()), | ||
| 262 | 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()), | ||
| 263 | _ => PendingUpdate::None, | ||
| 264 | } | ||
| 265 | } | ||
| 266 | } | ||
| 267 | ``` | ||
| 268 | |||
| 269 | ### Applying Batched Updates | ||
| 270 | |||
| 271 | When the batch window closes, we process all pending updates together: | ||
| 272 | |||
| 273 | ```rust | ||
| 274 | /// Batched updates grouped by relay | ||
| 275 | struct RelayUpdateBatch { | ||
| 276 | /// New repo refs to subscribe to (Layer 2) | ||
| 277 | new_repo_refs: HashSet<String>, | ||
| 278 | /// New event IDs to subscribe to (Layer 3) | ||
| 279 | new_event_ids: HashSet<EventId>, | ||
| 280 | /// Whether this is a newly discovered relay | ||
| 281 | is_new_relay: bool, | ||
| 282 | } | ||
| 283 | |||
| 284 | impl SelfSubscriber { | ||
| 285 | async fn apply_batched_updates(&mut self, updates: Vec<PendingUpdate>) { | ||
| 286 | // Step 1: Process all updates and update HashMaps | ||
| 287 | // Build batched actions per relay | ||
| 288 | let mut relay_batches: HashMap<String, RelayUpdateBatch> = HashMap::new(); | ||
| 289 | |||
| 290 | for update in updates { | ||
| 291 | match update { | ||
| 292 | PendingUpdate::NewAnnouncement(event) => { | ||
| 293 | self.process_announcement(&event, &mut relay_batches).await; | ||
| 294 | } | ||
| 295 | PendingUpdate::NewRootEvent(event) => { | ||
| 296 | self.process_root_event(&event, &mut relay_batches).await; | ||
| 297 | } | ||
| 298 | PendingUpdate::None => {} | ||
| 299 | } | ||
| 300 | } | ||
| 301 | |||
| 302 | // Step 2: Apply batched updates to each relay | ||
| 303 | for (relay_url, batch) in relay_batches { | ||
| 304 | self.apply_batch_to_relay(&relay_url, batch).await; | ||
| 305 | } | ||
| 306 | |||
| 307 | // Step 3: Check for relay removal (repos removed from announcements) | ||
| 308 | self.check_relay_removal().await; | ||
| 309 | } | ||
| 310 | |||
| 311 | async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) { | ||
| 312 | if batch.is_new_relay { | ||
| 313 | // Spawn new relay connection with full filters | ||
| 314 | self.spawn_sync_relay(relay_url.to_string()).await; | ||
| 315 | return; | ||
| 316 | } | ||
| 317 | |||
| 318 | // Build incremental filters for new content (NO since - get historical) | ||
| 319 | let incremental_filters = self.build_incremental_filters(&batch); | ||
| 320 | |||
| 321 | if incremental_filters.is_empty() { | ||
| 322 | return; | ||
| 323 | } | ||
| 324 | |||
| 325 | // Check if we need to consolidate | ||
| 326 | let current_filter_count = self.get_filter_count_for_relay(relay_url).await; | ||
| 327 | let new_filter_count = current_filter_count + incremental_filters.len(); | ||
| 328 | |||
| 329 | if new_filter_count > 70 { | ||
| 330 | // Consolidate: add incremental filters first (no since), wait for EOSE, | ||
| 331 | // then close all and resubscribe with consolidated filters (with since) | ||
| 332 | self.consolidate_relay_subscription(relay_url, incremental_filters).await; | ||
| 333 | } else { | ||
| 334 | // Just add incremental filters (no since - to get historical events) | ||
| 335 | self.send_filters_to_relay(relay_url, incremental_filters).await; | ||
| 336 | } | ||
| 337 | } | ||
| 338 | |||
| 339 | fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec<Filter> { | ||
| 340 | let mut filters = Vec::new(); | ||
| 341 | |||
| 342 | // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags) | ||
| 343 | if !batch.new_repo_refs.is_empty() { | ||
| 344 | let refs: Vec<String> = batch.new_repo_refs.iter().cloned().collect(); | ||
| 345 | for chunk in refs.chunks(100) { | ||
| 346 | // All kinds with lowercase 'a' tag | ||
| 347 | filters.push( | ||
| 348 | Filter::new() | ||
| 349 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) | ||
| 350 | ); | ||
| 351 | // All kinds with uppercase 'A' tag | ||
| 352 | filters.push( | ||
| 353 | Filter::new() | ||
| 354 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) | ||
| 355 | ); | ||
| 356 | // All kinds with 'q' tag (quote) | ||
| 357 | filters.push( | ||
| 358 | Filter::new() | ||
| 359 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 360 | ); | ||
| 361 | } | ||
| 362 | } | ||
| 363 | |||
| 364 | // Layer 3: New event IDs | ||
| 365 | if !batch.new_event_ids.is_empty() { | ||
| 366 | let ids: Vec<String> = batch.new_event_ids.iter() | ||
| 367 | .map(|id| id.to_hex()) | ||
| 368 | .collect(); | ||
| 369 | for chunk in ids.chunks(100) { | ||
| 370 | filters.push( | ||
| 371 | Filter::new() | ||
| 372 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) | ||
| 373 | ); | ||
| 374 | filters.push( | ||
| 375 | Filter::new() | ||
| 376 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) | ||
| 377 | ); | ||
| 378 | filters.push( | ||
| 379 | Filter::new() | ||
| 380 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 381 | ); | ||
| 382 | } | ||
| 383 | } | ||
| 384 | |||
| 385 | filters | ||
| 386 | } | ||
| 387 | } | ||
| 388 | ``` | ||
| 389 | |||
| 390 | ### Consolidation Strategy | ||
| 391 | |||
| 392 | When consolidating, we need a two-phase approach: | ||
| 393 | |||
| 394 | 1. First, subscribe with incremental filters (no `since`) to get any historical events we missed | ||
| 395 | 2. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`) | ||
| 396 | |||
| 397 | ```rust | ||
| 398 | async fn consolidate_relay_subscription( | ||
| 399 | &mut self, | ||
| 400 | relay_url: &str, | ||
| 401 | incremental_filters: Vec<Filter>, | ||
| 402 | ) { | ||
| 403 | // Phase 1: Add incremental filters WITHOUT since to catch up on new content | ||
| 404 | // These filters are for new repo_refs / event_ids we just discovered | ||
| 405 | let phase1_sub_id = self.send_filters_to_relay_and_wait_eose( | ||
| 406 | relay_url, | ||
| 407 | incremental_filters | ||
| 408 | ).await; | ||
| 409 | |||
| 410 | // Phase 2: After EOSE, consolidate everything | ||
| 411 | // Close ALL existing subscriptions for this relay | ||
| 412 | self.close_all_subscriptions(relay_url).await; | ||
| 413 | |||
| 414 | // Build fresh consolidated filters using current HashMap state | ||
| 415 | let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await; | ||
| 416 | |||
| 417 | // Resubscribe with since = now - 15 minutes | ||
| 418 | let since = Timestamp::now() - 900; | ||
| 419 | let filters_with_since: Vec<Filter> = consolidated_filters | ||
| 420 | .into_iter() | ||
| 421 | .map(|f| f.since(since)) | ||
| 422 | .collect(); | ||
| 423 | |||
| 424 | self.send_filters_to_relay(relay_url, filters_with_since).await; | ||
| 425 | } | ||
| 426 | ``` | ||
| 427 | |||
| 428 | ## Sync Relay Connections | ||
| 429 | |||
| 430 | Each sync relay connection uses the three-layer filter strategy: | ||
| 431 | |||
| 432 | ```rust | ||
| 433 | impl SyncRelayConnection { | ||
| 434 | async fn start(&mut self) { | ||
| 435 | loop { | ||
| 436 | match self.connect_and_subscribe().await { | ||
| 437 | Ok(()) => { | ||
| 438 | // Record successful connection | ||
| 439 | self.last_successful_connection = Instant::now(); | ||
| 440 | self.health_tracker.record_success(&self.url); | ||
| 441 | |||
| 442 | // Run event loop until disconnect | ||
| 443 | self.run_event_loop().await; | ||
| 444 | } | ||
| 445 | Err(e) => { | ||
| 446 | self.health_tracker.record_failure(&self.url); | ||
| 447 | } | ||
| 448 | } | ||
| 449 | |||
| 450 | // Reconnect with backoff and since filter | ||
| 451 | let backoff = self.health_tracker.get_backoff(&self.url); | ||
| 452 | tokio::time::sleep(backoff).await; | ||
| 453 | |||
| 454 | // On reconnect, use since = last_successful - 15 minutes | ||
| 455 | self.reconnect_since = Some( | ||
| 456 | Timestamp::from(self.last_successful_connection - Duration::from_secs(900)) | ||
| 457 | ); | ||
| 458 | } | ||
| 459 | } | ||
| 460 | |||
| 461 | async fn connect_and_subscribe(&mut self) -> Result<()> { | ||
| 462 | self.client.connect().await?; | ||
| 463 | |||
| 464 | let filters = self.build_three_layer_filters().await; | ||
| 465 | |||
| 466 | // Apply since filter if reconnecting | ||
| 467 | let filters = if let Some(since) = self.reconnect_since { | ||
| 468 | filters.into_iter().map(|f| f.since(since)).collect() | ||
| 469 | } else { | ||
| 470 | filters | ||
| 471 | }; | ||
| 472 | |||
| 473 | for filter in filters { | ||
| 474 | self.client.subscribe(filter, None).await?; | ||
| 475 | } | ||
| 476 | |||
| 477 | Ok(()) | ||
| 478 | } | ||
| 479 | } | ||
| 480 | ``` | ||
| 481 | |||
| 482 | ## Three-Layer Filter Strategy | ||
| 483 | |||
| 484 | ```rust | ||
| 485 | impl SyncRelayConnection { | ||
| 486 | async fn build_three_layer_filters(&self) -> Vec<Filter> { | ||
| 487 | let mut filters = Vec::new(); | ||
| 488 | |||
| 489 | // Get repos for this relay | ||
| 490 | let repos = self.sync_relays.read().await | ||
| 491 | .get(&self.url) | ||
| 492 | .cloned() | ||
| 493 | .unwrap_or_default(); | ||
| 494 | |||
| 495 | // Layer 1: Announcements (kinds 30617 + 30618) | ||
| 496 | // Note: 30618 is ONLY synced from remote relays, not self-subscribed | ||
| 497 | // Always included even if relay has no repos (bootstrap relay case) | ||
| 498 | filters.push( | ||
| 499 | Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) | ||
| 500 | ); | ||
| 501 | |||
| 502 | // Layer 2: Events tagging repos with 'a' tags (ALL kinds) | ||
| 503 | // Batched per 100 repo refs | ||
| 504 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 505 | for chunk in repo_refs.chunks(100) { | ||
| 506 | filters.push( | ||
| 507 | Filter::new() | ||
| 508 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) | ||
| 509 | ); | ||
| 510 | filters.push( | ||
| 511 | Filter::new() | ||
| 512 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) | ||
| 513 | ); | ||
| 514 | filters.push( | ||
| 515 | Filter::new() | ||
| 516 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 517 | ); | ||
| 518 | } | ||
| 519 | |||
| 520 | // Layer 3: Events tagging root events (batch per 100 event IDs) | ||
| 521 | let all_event_ids: HashSet<EventId> = repos.values() | ||
| 522 | .flat_map(|ids| ids.iter().cloned()) | ||
| 523 | .collect(); | ||
| 524 | |||
| 525 | let event_id_strs: Vec<String> = all_event_ids | ||
| 526 | .iter() | ||
| 527 | .map(|id| id.to_hex()) | ||
| 528 | .collect(); | ||
| 529 | |||
| 530 | for chunk in event_id_strs.chunks(100) { | ||
| 531 | filters.push( | ||
| 532 | Filter::new() | ||
| 533 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) | ||
| 534 | ); | ||
| 535 | filters.push( | ||
| 536 | Filter::new() | ||
| 537 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) | ||
| 538 | ); | ||
| 539 | filters.push( | ||
| 540 | Filter::new() | ||
| 541 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 542 | ); | ||
| 543 | } | ||
| 544 | |||
| 545 | filters | ||
| 546 | } | ||
| 547 | } | ||
| 548 | ``` | ||
| 549 | |||
| 550 | ## Relay Removal | ||
| 551 | |||
| 552 | ```rust | ||
| 553 | async fn check_relay_removal(&mut self) { | ||
| 554 | let relays_to_check: Vec<String> = self.sync_relays.read().await | ||
| 555 | .keys() | ||
| 556 | .cloned() | ||
| 557 | .collect(); | ||
| 558 | |||
| 559 | for relay_url in relays_to_check { | ||
| 560 | // Never remove bootstrap relay | ||
| 561 | if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() { | ||
| 562 | continue; | ||
| 563 | } | ||
| 564 | |||
| 565 | // Check if relay has any repos left | ||
| 566 | let should_remove = { | ||
| 567 | let sync_relays = self.sync_relays.read().await; | ||
| 568 | sync_relays.get(&relay_url) | ||
| 569 | .map(|repos| repos.is_empty()) | ||
| 570 | .unwrap_or(true) | ||
| 571 | }; | ||
| 572 | |||
| 573 | if should_remove { | ||
| 574 | // Remove from HashMap | ||
| 575 | self.sync_relays.write().await.remove(&relay_url); | ||
| 576 | |||
| 577 | // Close connection | ||
| 578 | self.close_relay_connection(&relay_url).await; | ||
| 579 | } | ||
| 580 | } | ||
| 581 | } | ||
| 582 | ``` | ||
| 583 | |||
| 584 | ## Prometheus Metrics (Same as v1) | ||
| 585 | |||
| 586 | | Metric | Type | Labels | Description | | ||
| 587 | | ------------------------------------- | ------- | ------------- | ----------------------- | | ||
| 588 | | `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 | | ||
| 589 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome | | ||
| 590 | | `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 | | ||
| 591 | | `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures | | ||
| 592 | | `ngit_sync_events_total` | Counter | source | Events by source type | | ||
| 593 | | `ngit_sync_gap_events_total` | Counter | relay | Gap events filled | | ||
| 594 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | | ||
| 595 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected | | ||
| 596 | | `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count | | ||
| 597 | |||
| 598 | ## Module Structure (Simplified) | ||
| 599 | |||
| 600 | ``` | ||
| 601 | src/sync/ | ||
| 602 | ├── mod.rs # Module exports, constants | ||
| 603 | ├── manager.rs # SyncManager - orchestrates sync | ||
| 604 | ├── state.rs # FollowingRepoRootEvents + SyncRelays HashMaps | ||
| 605 | ├── self_subscriber.rs # Self-subscriber + batching logic | ||
| 606 | ├── relay_connection.rs # Per-relay WebSocket + filters | ||
| 607 | ├── health.rs # RelayHealthTracker (reuse from v1) | ||
| 608 | └── metrics.rs # SyncMetrics (reuse from v1) | ||
| 609 | ``` | ||
| 610 | |||
| 611 | ## Comparison: v1 vs v2 | ||
| 612 | |||
| 613 | | Aspect | v1 (Current) | v2 (Simplified) | | ||
| 614 | | ------------------- | ------------------------------------------------------------------ | --------------------------------------------- | | ||
| 615 | | **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB | | ||
| 616 | | **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe | | ||
| 617 | | **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) | | ||
| 618 | | **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters | | ||
| 619 | | **Batching** | Per 100 tags | Per 100 tags | | ||
| 620 | | **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min | | ||
| 621 | | **Jitter** | Startup jitter | None (see design decision) | | ||
| 622 | | **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed | | ||
| 623 | | **1621 (Issues)** | Not included | Included with 1617/1618/1619 | | ||
| 624 | | **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags | | ||
| 625 | | **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily | | ||
| 626 | |||
| 627 | ## Key Design Decisions | ||
| 628 | |||
| 629 | 1. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database | ||
| 630 | 2. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them | ||
| 631 | 3. **Batched Filter Updates**: 5-second window that starts on first event (not fixed interval) | ||
| 632 | 4. **Uniform Reconnection**: Always use `since = last_successful - 15min` | ||
| 633 | 5. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns | ||
| 634 | 6. **Bootstrap Relay Protected**: Never removed from sync_relays; subscribes with Layer 1 even if empty | ||
| 635 | 7. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed | ||
| 636 | 8. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation | ||
| 637 | 9. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters | ||
| 638 | 10. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds | ||
| 639 | 11. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since | ||
| 640 | 12. **Multiple Repo Refs**: Handle events that tag multiple repos correctly | ||
| 641 | |||
| 642 | --- | ||
| 643 | |||
| 644 | ## Detailed Flow Diagram | ||
| 645 | |||
| 646 | ```mermaid | ||
| 647 | sequenceDiagram | ||
| 648 | participant DB as Database | ||
| 649 | participant SM as SyncManager | ||
| 650 | participant SS as Self-Subscriber | ||
| 651 | participant RC as RelayConnection | ||
| 652 | |||
| 653 | Note over SM: Startup | ||
| 654 | SM->>SM: Add bootstrap relay to sync_relays | ||
| 655 | SM->>DB: Query kinds 1617/1618/1619/1621 | ||
| 656 | DB-->>SM: Root events | ||
| 657 | SM->>SM: Build following_repo_root_events | ||
| 658 | SM->>SM: Handle multi-repo events | ||
| 659 | |||
| 660 | SM->>DB: Query kind 30617 | ||
| 661 | DB-->>SM: Announcements | ||
| 662 | SM->>SM: Build sync_relays | ||
| 663 | |||
| 664 | SM->>RC: Spawn connections for each relay | ||
| 665 | RC->>RC: Build 3-layer filters from sync_relays | ||
| 666 | RC->>RC: Connect and subscribe | ||
| 667 | |||
| 668 | Note over SS: Event-Driven Updates | ||
| 669 | SS->>SS: Subscribe to 30617/1617/1618/1619/1621 | ||
| 670 | SS->>SS: Receive event - start 5s batch timer | ||
| 671 | SS->>SS: Collect more events in batch window | ||
| 672 | SS->>SS: Batch window closes | ||
| 673 | SS->>SM: Apply batched updates to HashMaps | ||
| 674 | |||
| 675 | alt New Relay Discovered | ||
| 676 | SM->>RC: Spawn new connection | ||
| 677 | else New Content for Existing Relay | ||
| 678 | alt Under 70 filter limit | ||
| 679 | SM->>RC: Add incremental filter - no since | ||
| 680 | else Over 70 filter limit | ||
| 681 | SM->>RC: Add incremental filters - no since | ||
| 682 | RC-->>SM: EOSE received | ||
| 683 | SM->>RC: Close all subscriptions | ||
| 684 | SM->>RC: Resubscribe consolidated - with since | ||
| 685 | end | ||
| 686 | else Relay Has No More Repos | ||
| 687 | alt Is Bootstrap Relay | ||
| 688 | SM->>SM: Keep connection - Layer 1 only | ||
| 689 | else Not Bootstrap Relay | ||
| 690 | SM->>RC: Close connection | ||
| 691 | end | ||
| 692 | end | ||
| 693 | |||
| 694 | Note over RC: Connection Lifecycle | ||
| 695 | RC->>RC: Process incoming events | ||
| 696 | RC->>DB: Store via acceptance policy | ||
| 697 | |||
| 698 | RC->>RC: Connection drops | ||
| 699 | RC->>RC: Wait backoff - 5s to 1h exponential | ||
| 700 | RC->>RC: Reconnect with since = last_success - 15min | ||
| 701 | |||
| 702 | Note over RC: If failures continue 24h | ||
| 703 | RC->>RC: Mark dead - retry once per 24h | ||
| 704 | ``` | ||