diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 14:32:01 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 14:32:01 +0000 |
| commit | 18ad93f8d0b8ce172c9c227385a21af66a507950 (patch) | |
| tree | 275ada806570a2105f4e75388a565f61276209c8 /docs | |
| parent | 4e5a937a4ef5288e702ba2bae3daf2a78398b690 (diff) | |
docs: remove old grasp-02 design doc versions
Diffstat (limited to 'docs')
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync-v2.md | 785 | ||||
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync-v3.md | 871 | ||||
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync-v4.md | 1330 | ||||
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync.md | 1811 |
4 files changed, 1106 insertions, 3691 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync-v2.md b/docs/explanation/grasp-02-proactive-sync-v2.md deleted file mode 100644 index 311e93c..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v2.md +++ /dev/null | |||
| @@ -1,785 +0,0 @@ | |||
| 1 | # GRASP-02: Proactive Sync v2 - Simplified Design | ||
| 2 | |||
| 3 | ## Overview | ||
| 4 | |||
| 5 | This document presents a simplified redesign of the proactive sync module. The key insight is that **all sync filters can be derived from two database queries**, with incremental updates via self-subscription. | ||
| 6 | |||
| 7 | ## Goals (Same as v1) | ||
| 8 | |||
| 9 | 1. **Data Availability**: Ensure we have all relevant events for repositories we host | ||
| 10 | 2. **Resilience**: Handle relay failures gracefully with backoff and health tracking | ||
| 11 | 3. **Efficiency**: Minimize connections and bandwidth through filter consolidation | ||
| 12 | 4. **Consistency**: Use unified filters for both live sync and catchup | ||
| 13 | |||
| 14 | ## Scale Targets & Upper Bounds | ||
| 15 | |||
| 16 | This design targets the following scale: | ||
| 17 | |||
| 18 | | Metric | Target | Notes | | ||
| 19 | | ----------------------------- | -------- | ----------------------------------------- | | ||
| 20 | | **Repositories** | 1,000 | Repos we host/track | | ||
| 21 | | **Root events per repo** | 50 (avg) | PRs, Issues, Patches per repo | | ||
| 22 | | **Total relays in ecosystem** | 100 | Unique relays across all repos | | ||
| 23 | | **Relays per repo** | 5 (avg) | Relays listed in each repo's announcement | | ||
| 24 | | **Total root events** | ~50,000 | 1,000 repos × 50 events | | ||
| 25 | | **Sync connections** | ~50-100 | Based on relay overlap | | ||
| 26 | |||
| 27 | **Memory Estimate (in-memory HashMaps):** | ||
| 28 | |||
| 29 | - `FollowingRepoRootEvents`: ~1,000 entries × 50 EventIds = ~3-5 MB | ||
| 30 | - `SyncRelays`: ~100 entries × varying repo counts = ~2-3 MB | ||
| 31 | - **Total in-memory state**: ~10 MB (well within acceptable limits) | ||
| 32 | |||
| 33 | **Upper Bounds (redesign triggers):** | ||
| 34 | |||
| 35 | - **10,000+ repos**: Consider database-backed state instead of in-memory HashMaps | ||
| 36 | - **500+ sync relays**: Consider connection pooling or relay prioritization | ||
| 37 | - **500+ root events per repo**: Consider per-repo pagination in Layer 3 filters | ||
| 38 | - **Sustained >100 events/second**: Consider write batching to database | ||
| 39 | |||
| 40 | Beyond these limits, the in-memory HashMap model may need to evolve to a database-backed approach with lazy loading. | ||
| 41 | |||
| 42 | ## Core Data Structures | ||
| 43 | |||
| 44 | The entire sync filter state is captured in two HashMaps, initialized from database queries at startup: | ||
| 45 | |||
| 46 | ```rust | ||
| 47 | /// Repository root events we're following | ||
| 48 | /// Key: repo addressable reference (e.g., "30617:pubkey:identifier") | ||
| 49 | /// Value: Set of event IDs (kinds 1617, 1618, 1619, 1621) that tag this repo | ||
| 50 | /// | ||
| 51 | /// Note: May include a few extra repo refs that aren't in sync_relays. | ||
| 52 | /// This is acceptable - we won't query other relays for them. | ||
| 53 | type FollowingRepoRootEvents = Arc<RwLock<HashMap<String, HashSet<EventId>>>>; | ||
| 54 | |||
| 55 | /// Relays we sync from, including their repos and events | ||
| 56 | /// Key: relay URL | ||
| 57 | /// Value: Map of repo_ref -> event IDs for repos that list both this relay AND our service | ||
| 58 | /// | ||
| 59 | /// Note: Bootstrap relay (if configured) is always present and excluded from removal logic. | ||
| 60 | type SyncRelays = Arc<RwLock<HashMap<String, HashMap<String, HashSet<EventId>>>>>; | ||
| 61 | ``` | ||
| 62 | |||
| 63 | ## Architecture Overview | ||
| 64 | |||
| 65 | ```mermaid | ||
| 66 | flowchart TB | ||
| 67 | subgraph Startup | ||
| 68 | DB[(Database)] | ||
| 69 | Q1[Query kinds 1617/1618/1619/1621] | ||
| 70 | Q2[Query kind 30617] | ||
| 71 | DB --> Q1 --> FRRE[following_repo_root_events] | ||
| 72 | DB --> Q2 --> SR[sync_relays] | ||
| 73 | BR[Bootstrap Relay] --> SR | ||
| 74 | end | ||
| 75 | |||
| 76 | subgraph SyncManager | ||
| 77 | SS[Self-Subscriber] | ||
| 78 | FRRE --> SS | ||
| 79 | SR --> SS | ||
| 80 | end | ||
| 81 | |||
| 82 | subgraph SyncRelays | ||
| 83 | R1[Relay Connection 1] | ||
| 84 | R2[Relay Connection 2] | ||
| 85 | RN[Relay Connection N] | ||
| 86 | end | ||
| 87 | |||
| 88 | SS -->|spawn/update| R1 | ||
| 89 | SS -->|spawn/update| R2 | ||
| 90 | SS -->|spawn/update| RN | ||
| 91 | |||
| 92 | R1 -->|events| AP[Acceptance Policy] | ||
| 93 | R2 -->|events| AP | ||
| 94 | RN -->|events| AP | ||
| 95 | AP -->|store| DB | ||
| 96 | ``` | ||
| 97 | |||
| 98 | ## Module Structure | ||
| 99 | |||
| 100 | The sync module is organized following the pattern used by `src/http/mod.rs` and `src/metrics/mod.rs` where the primary struct lives in `mod.rs`: | ||
| 101 | |||
| 102 | ``` | ||
| 103 | src/sync/ | ||
| 104 | ├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays) | ||
| 105 | ├── self_subscriber.rs # SelfSubscriber struct and batching logic | ||
| 106 | ├── relay_connection.rs # Per-relay WebSocket connection management | ||
| 107 | ├── health.rs # RelayHealthTracker for backoff and dead relay detection | ||
| 108 | └── metrics.rs # SyncMetrics for Prometheus integration | ||
| 109 | ``` | ||
| 110 | |||
| 111 | **Rationale:** The state type aliases (`FollowingRepoRootEvents`, `SyncRelays`) are simple `Arc<RwLock<HashMap<...>>>` wrappers owned by SyncManager. Rather than creating a separate `state.rs` for two type aliases, they are colocated with SyncManager in `mod.rs` to reduce file count while maintaining clarity. | ||
| 112 | |||
| 113 | ## Design Decision: No Jitter | ||
| 114 | |||
| 115 | We considered adding jitter to prevent thundering herd scenarios when: | ||
| 116 | |||
| 117 | - Multiple relay connections initialize simultaneously | ||
| 118 | - Batched updates affect multiple relays | ||
| 119 | - Filter consolidation triggers across connections | ||
| 120 | |||
| 121 | **Decision: No jitter implemented.** | ||
| 122 | |||
| 123 | **Rationale:** | ||
| 124 | |||
| 125 | - Our GRASP server should handle the load of simultaneous operations | ||
| 126 | - Jitter would lead to more orphan filters (filters added one at a time rather than atomically) | ||
| 127 | - Jitter creates inefficiency - partial subscriptions miss events during the stagger window | ||
| 128 | - The batching window (5s) already provides natural smoothing without the downsides | ||
| 129 | |||
| 130 | ## Health Tracking & Backoff | ||
| 131 | |||
| 132 | ```rust | ||
| 133 | /// Health state machine for relay connections | ||
| 134 | enum HealthState { | ||
| 135 | Healthy, // Connected and working | ||
| 136 | Backoff(u32), // Failed, attempt count for exponential backoff | ||
| 137 | Dead, // 24h+ continuous failures | ||
| 138 | } | ||
| 139 | |||
| 140 | impl RelayHealthTracker { | ||
| 141 | /// Backoff durations: | ||
| 142 | /// - Attempt 1: 5s | ||
| 143 | /// - Attempt 2: 10s | ||
| 144 | /// - Attempt 3: 20s | ||
| 145 | /// - Attempt 4: 40s | ||
| 146 | /// - ... exponential up to 1h max | ||
| 147 | /// - After reaching 1h, continue hourly until 24h total failure time | ||
| 148 | /// - After 24h: marked Dead, retry once per 24h | ||
| 149 | fn get_backoff(&self, relay_url: &str) -> Duration; | ||
| 150 | } | ||
| 151 | ``` | ||
| 152 | |||
| 153 | | State | Retry Behavior | | ||
| 154 | | ----------- | -------------------------------------------- | | ||
| 155 | | **Healthy** | Immediate reconnect on disconnect | | ||
| 156 | | **Backoff** | 5s → 10s → 20s → ... → 1h max (exponential) | | ||
| 157 | | **Hourly** | Once per hour after hitting 1h cap | | ||
| 158 | | **Dead** | After 24h total failures, retry once per 24h | | ||
| 159 | |||
| 160 | ## Startup Initialization | ||
| 161 | |||
| 162 | At startup, two database queries initialize the sync state: | ||
| 163 | |||
| 164 | ```rust | ||
| 165 | impl SyncManager { | ||
| 166 | async fn initialize_from_database(&mut self) -> Result<()> { | ||
| 167 | // Initialize bootstrap relay if configured (never removed) | ||
| 168 | if let Some(bootstrap_url) = &self.config.bootstrap_relay_url { | ||
| 169 | self.sync_relays.write().await.insert( | ||
| 170 | bootstrap_url.clone(), | ||
| 171 | HashMap::new() // Repos potentially populated below but may stay empty (Layer 1 only) | ||
| 172 | ); | ||
| 173 | } | ||
| 174 | |||
| 175 | // Query 1: Build following_repo_root_events | ||
| 176 | // Find all 1617/1618/1619/1621 events and extract their repo references | ||
| 177 | let root_events = self.database | ||
| 178 | .query(Filter::new().kinds([ | ||
| 179 | Kind::GitPatch, // 1617 | ||
| 180 | Kind::Custom(1618), // PRs | ||
| 181 | Kind::Custom(1619), // PR updates | ||
| 182 | Kind::Custom(1621), // Issues | ||
| 183 | ])) | ||
| 184 | .await?; | ||
| 185 | |||
| 186 | for event in root_events { | ||
| 187 | // An event may have multiple 'a' tags pointing to different repos | ||
| 188 | let repo_refs = self.extract_all_repo_refs(&event); | ||
| 189 | for repo_ref in repo_refs { | ||
| 190 | self.following_repo_root_events | ||
| 191 | .write().await | ||
| 192 | .entry(repo_ref) | ||
| 193 | .or_default() | ||
| 194 | .insert(event.id); | ||
| 195 | } | ||
| 196 | } | ||
| 197 | |||
| 198 | // Query 2: Build sync_relays from kind 30617 announcements | ||
| 199 | let announcements = self.database | ||
| 200 | .query(Filter::new().kind(Kind::Custom(30617))) | ||
| 201 | .await?; | ||
| 202 | |||
| 203 | for event in announcements { | ||
| 204 | let repo_ref = self.build_repo_ref(&event); | ||
| 205 | let relay_urls = self.extract_relay_urls(&event); | ||
| 206 | |||
| 207 | // Only track repos that list BOTH a remote relay AND our service | ||
| 208 | if self.lists_our_service(&event) { | ||
| 209 | for relay_url in relay_urls { | ||
| 210 | if !self.is_own_relay(&relay_url) { | ||
| 211 | // Get events for this repo from following_repo_root_events | ||
| 212 | let events = self.following_repo_root_events | ||
| 213 | .read().await | ||
| 214 | .get(&repo_ref) | ||
| 215 | .cloned() | ||
| 216 | .unwrap_or_default(); | ||
| 217 | |||
| 218 | self.sync_relays | ||
| 219 | .write().await | ||
| 220 | .entry(relay_url) | ||
| 221 | .or_default() | ||
| 222 | .insert(repo_ref.clone(), events); | ||
| 223 | } | ||
| 224 | } | ||
| 225 | } | ||
| 226 | } | ||
| 227 | |||
| 228 | Ok(()) | ||
| 229 | } | ||
| 230 | |||
| 231 | /// Extract ALL repo refs from an event (it may tag multiple repos) | ||
| 232 | fn extract_all_repo_refs(&self, event: &Event) -> Vec<String> { | ||
| 233 | event.tags.iter() | ||
| 234 | .filter_map(|tag| { | ||
| 235 | let tag_vec = tag.clone().to_vec(); | ||
| 236 | if tag_vec.len() >= 2 && tag_vec[0] == "a" { | ||
| 237 | // Validate it's a 30617 reference | ||
| 238 | if tag_vec[1].starts_with("30617:") { | ||
| 239 | Some(tag_vec[1].clone()) | ||
| 240 | } else { | ||
| 241 | None | ||
| 242 | } | ||
| 243 | } else { | ||
| 244 | None | ||
| 245 | } | ||
| 246 | }) | ||
| 247 | .collect() | ||
| 248 | } | ||
| 249 | } | ||
| 250 | ``` | ||
| 251 | |||
| 252 | ## Self-Subscriber: Event-Driven Updates | ||
| 253 | |||
| 254 | A single self-subscriber watches for new events from **our own relay** and updates the HashMaps. | ||
| 255 | |||
| 256 | **Important:** The self-subscriber does NOT subscribe to kind 30618 as this would never lead to refreshing the sync filters. Those events are synced from remote relays only (via Layer 1 filter on sync relay connections). | ||
| 257 | |||
| 258 | ### Batching Strategy | ||
| 259 | |||
| 260 | The batch timer **starts only when the first event arrives**, not on a fixed interval. This prevents the scenario where an event arriving at second 4 of a 5-second interval only gets 1 second before the batch fires. | ||
| 261 | |||
| 262 | **Important:** Once the batch timer starts, it does NOT reset when additional events arrive. The batch will fire exactly 5 seconds after the first event, regardless of how many subsequent events are queued. This ensures predictable latency and prevents indefinite batching during high-activity periods. | ||
| 263 | |||
| 264 | ```rust | ||
| 265 | impl SelfSubscriber { | ||
| 266 | async fn run(&self) { | ||
| 267 | // Subscribe to our own relay for relevant kinds | ||
| 268 | // Note: 30618 NOT included - synced from remote relays only | ||
| 269 | let filter = Filter::new() | ||
| 270 | .kinds([ | ||
| 271 | Kind::Custom(30617), // Repository announcements | ||
| 272 | Kind::GitPatch, // 1617 Patches | ||
| 273 | Kind::Custom(1618), // PRs | ||
| 274 | Kind::Custom(1619), // PR updates | ||
| 275 | Kind::Custom(1621), // Issues | ||
| 276 | ]); | ||
| 277 | |||
| 278 | let mut pending_updates: Vec<PendingUpdate> = Vec::new(); | ||
| 279 | let mut batch_deadline: Option<Instant> = None; | ||
| 280 | |||
| 281 | loop { | ||
| 282 | let timeout = batch_deadline | ||
| 283 | .map(|d| d.saturating_duration_since(Instant::now())) | ||
| 284 | .unwrap_or(Duration::MAX); | ||
| 285 | |||
| 286 | tokio::select! { | ||
| 287 | Some(event) = self.event_receiver.recv() => { | ||
| 288 | pending_updates.push(self.classify_update(&event)); | ||
| 289 | |||
| 290 | // Start batch timer on first event | ||
| 291 | if batch_deadline.is_none() { | ||
| 292 | batch_deadline = Some(Instant::now() + Duration::from_secs(5)); | ||
| 293 | } | ||
| 294 | } | ||
| 295 | _ = tokio::time::sleep(timeout), if batch_deadline.is_some() => { | ||
| 296 | // Batch window elapsed - apply all pending updates | ||
| 297 | self.apply_batched_updates(pending_updates.drain(..).collect()).await; | ||
| 298 | batch_deadline = None; | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
| 302 | } | ||
| 303 | |||
| 304 | fn classify_update(&self, event: &Event) -> PendingUpdate { | ||
| 305 | match event.kind.as_u16() { | ||
| 306 | 30617 => PendingUpdate::NewAnnouncement(event.clone()), | ||
| 307 | 1617 | 1618 | 1619 | 1621 => PendingUpdate::NewRootEvent(event.clone()), | ||
| 308 | _ => PendingUpdate::None, | ||
| 309 | } | ||
| 310 | } | ||
| 311 | } | ||
| 312 | ``` | ||
| 313 | |||
| 314 | ### Applying Batched Updates | ||
| 315 | |||
| 316 | When the batch window closes, we process all pending updates together: | ||
| 317 | |||
| 318 | ```rust | ||
| 319 | /// Batched updates grouped by relay | ||
| 320 | struct RelayUpdateBatch { | ||
| 321 | /// New repo refs to subscribe to (Layer 2) | ||
| 322 | new_repo_refs: HashSet<String>, | ||
| 323 | /// New event IDs to subscribe to (Layer 3) | ||
| 324 | new_event_ids: HashSet<EventId>, | ||
| 325 | /// Whether this is a newly discovered relay | ||
| 326 | is_new_relay: bool, | ||
| 327 | } | ||
| 328 | |||
| 329 | impl SelfSubscriber { | ||
| 330 | async fn apply_batched_updates(&mut self, updates: Vec<PendingUpdate>) { | ||
| 331 | // Step 1: Process all updates and update HashMaps | ||
| 332 | // Build batched actions per relay | ||
| 333 | let mut relay_batches: HashMap<String, RelayUpdateBatch> = HashMap::new(); | ||
| 334 | |||
| 335 | for update in updates { | ||
| 336 | match update { | ||
| 337 | PendingUpdate::NewAnnouncement(event) => { | ||
| 338 | self.process_announcement(&event, &mut relay_batches).await; | ||
| 339 | } | ||
| 340 | PendingUpdate::NewRootEvent(event) => { | ||
| 341 | self.process_root_event(&event, &mut relay_batches).await; | ||
| 342 | } | ||
| 343 | PendingUpdate::None => {} | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | // Step 2: Apply batched updates to each relay | ||
| 348 | for (relay_url, batch) in relay_batches { | ||
| 349 | self.apply_batch_to_relay(&relay_url, batch).await; | ||
| 350 | } | ||
| 351 | |||
| 352 | // Step 3: Check for relay removal (repos removed from announcements) | ||
| 353 | self.check_relay_removal().await; | ||
| 354 | } | ||
| 355 | |||
| 356 | async fn apply_batch_to_relay(&mut self, relay_url: &str, batch: RelayUpdateBatch) { | ||
| 357 | if batch.is_new_relay { | ||
| 358 | // Spawn new relay connection with full filters | ||
| 359 | self.spawn_sync_relay(relay_url.to_string()).await; | ||
| 360 | return; | ||
| 361 | } | ||
| 362 | |||
| 363 | // Build incremental filters for new content (NO since - get historical) | ||
| 364 | let incremental_filters = self.build_incremental_filters(&batch); | ||
| 365 | |||
| 366 | if incremental_filters.is_empty() { | ||
| 367 | return; | ||
| 368 | } | ||
| 369 | |||
| 370 | // Check if we need to consolidate | ||
| 371 | let current_filter_count = self.get_filter_count_for_relay(relay_url).await; | ||
| 372 | let new_filter_count = current_filter_count + incremental_filters.len(); | ||
| 373 | |||
| 374 | // Note: 70 is a conservative threshold that may need tuning based on | ||
| 375 | // production observations. It was chosen to trigger consolidation earlier | ||
| 376 | // than v1's 150, but optimal value depends on relay behavior. | ||
| 377 | if new_filter_count > 70 { | ||
| 378 | // Consolidate: add incremental filters first (no since), wait for EOSE, | ||
| 379 | // then close all and resubscribe with consolidated filters (with since) | ||
| 380 | self.consolidate_relay_subscription(relay_url, incremental_filters).await; | ||
| 381 | } else { | ||
| 382 | // Just add incremental filters (no since - to get historical events) | ||
| 383 | self.send_filters_to_relay(relay_url, incremental_filters).await; | ||
| 384 | } | ||
| 385 | } | ||
| 386 | |||
| 387 | fn build_incremental_filters(&self, batch: &RelayUpdateBatch) -> Vec<Filter> { | ||
| 388 | let mut filters = Vec::new(); | ||
| 389 | |||
| 390 | // Layer 2: New repo refs (for ALL kinds that tag repos with 'a' tags) | ||
| 391 | if !batch.new_repo_refs.is_empty() { | ||
| 392 | let refs: Vec<String> = batch.new_repo_refs.iter().cloned().collect(); | ||
| 393 | for chunk in refs.chunks(100) { | ||
| 394 | // All kinds with lowercase 'a' tag | ||
| 395 | filters.push( | ||
| 396 | Filter::new() | ||
| 397 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) | ||
| 398 | ); | ||
| 399 | // All kinds with uppercase 'A' tag | ||
| 400 | filters.push( | ||
| 401 | Filter::new() | ||
| 402 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) | ||
| 403 | ); | ||
| 404 | // All kinds with 'q' tag (quote) | ||
| 405 | filters.push( | ||
| 406 | Filter::new() | ||
| 407 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 408 | ); | ||
| 409 | } | ||
| 410 | } | ||
| 411 | |||
| 412 | // Layer 3: New event IDs | ||
| 413 | if !batch.new_event_ids.is_empty() { | ||
| 414 | let ids: Vec<String> = batch.new_event_ids.iter() | ||
| 415 | .map(|id| id.to_hex()) | ||
| 416 | .collect(); | ||
| 417 | for chunk in ids.chunks(100) { | ||
| 418 | filters.push( | ||
| 419 | Filter::new() | ||
| 420 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) | ||
| 421 | ); | ||
| 422 | filters.push( | ||
| 423 | Filter::new() | ||
| 424 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) | ||
| 425 | ); | ||
| 426 | filters.push( | ||
| 427 | Filter::new() | ||
| 428 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 429 | ); | ||
| 430 | } | ||
| 431 | } | ||
| 432 | |||
| 433 | filters | ||
| 434 | } | ||
| 435 | } | ||
| 436 | ``` | ||
| 437 | |||
| 438 | ### Consolidation Strategy | ||
| 439 | |||
| 440 | When consolidating, we need a two-phase approach: | ||
| 441 | |||
| 442 | 1. First, subscribe with incremental filters (no `since`) to get any historical events we missed | ||
| 443 | 2. After receiving EOSE, close all subscriptions and resubscribe with consolidated filters (with `since`) | ||
| 444 | |||
| 445 | ```rust | ||
| 446 | async fn consolidate_relay_subscription( | ||
| 447 | &mut self, | ||
| 448 | relay_url: &str, | ||
| 449 | incremental_filters: Vec<Filter>, | ||
| 450 | ) { | ||
| 451 | // Phase 1: Add incremental filters WITHOUT since to catch up on new content | ||
| 452 | // These filters are for new repo_refs / event_ids we just discovered | ||
| 453 | let phase1_sub_id = self.send_filters_to_relay_and_wait_eose( | ||
| 454 | relay_url, | ||
| 455 | incremental_filters | ||
| 456 | ).await; | ||
| 457 | |||
| 458 | // Phase 2: After EOSE, consolidate everything | ||
| 459 | // Close ALL existing subscriptions for this relay | ||
| 460 | self.close_all_subscriptions(relay_url).await; | ||
| 461 | |||
| 462 | // Build fresh consolidated filters using current HashMap state | ||
| 463 | let consolidated_filters = self.build_three_layer_filters_for_relay(relay_url).await; | ||
| 464 | |||
| 465 | // Resubscribe with since = now - 15 minutes | ||
| 466 | let since = Timestamp::now() - 900; | ||
| 467 | let filters_with_since: Vec<Filter> = consolidated_filters | ||
| 468 | .into_iter() | ||
| 469 | .map(|f| f.since(since)) | ||
| 470 | .collect(); | ||
| 471 | |||
| 472 | self.send_filters_to_relay(relay_url, filters_with_since).await; | ||
| 473 | } | ||
| 474 | ``` | ||
| 475 | |||
| 476 | ## Daily Full Catchup | ||
| 477 | |||
| 478 | To capture events that may have taken longer than 15 minutes to propagate through the nostr network, we perform a daily full catchup: | ||
| 479 | |||
| 480 | ```rust | ||
| 481 | impl SyncManager { | ||
| 482 | /// Runs approximately every 24 hours per relay connection | ||
| 483 | async fn daily_catchup(&mut self, relay_url: &str) { | ||
| 484 | // Close all current subscriptions for this relay | ||
| 485 | self.close_all_subscriptions(relay_url).await; | ||
| 486 | |||
| 487 | // Rebuild fresh filters from current HashMap state | ||
| 488 | let filters = self.build_three_layer_filters_for_relay(relay_url).await; | ||
| 489 | |||
| 490 | // Subscribe WITHOUT since filter to get full historical sync | ||
| 491 | for filter in filters { | ||
| 492 | self.subscribe_to_relay(relay_url, filter).await; | ||
| 493 | } | ||
| 494 | |||
| 495 | // After EOSE, switch back to live mode with since filter | ||
| 496 | self.wait_for_eose(relay_url).await; | ||
| 497 | |||
| 498 | // Re-add since filter for ongoing live sync | ||
| 499 | let since = Timestamp::now() - 900; // 15 minutes ago | ||
| 500 | self.resubscribe_with_since(relay_url, since).await; | ||
| 501 | } | ||
| 502 | } | ||
| 503 | ``` | ||
| 504 | |||
| 505 | **Rationale:** The 15-minute reconnection window is standard for nostr event propagation, but some events may take longer. Rather than increasing the window (which would cause more duplicate processing), we do a daily full catchup to ensure nothing is missed. This adds minimal complexity while providing comprehensive coverage. | ||
| 506 | |||
| 507 | ## Sync Relay Connections | ||
| 508 | |||
| 509 | Each sync relay connection uses the three-layer filter strategy: | ||
| 510 | |||
| 511 | ```rust | ||
| 512 | impl SyncRelayConnection { | ||
| 513 | async fn start(&mut self) { | ||
| 514 | loop { | ||
| 515 | match self.connect_and_subscribe().await { | ||
| 516 | Ok(()) => { | ||
| 517 | // Record successful connection | ||
| 518 | self.last_successful_connection = Instant::now(); | ||
| 519 | self.health_tracker.record_success(&self.url); | ||
| 520 | |||
| 521 | // Run event loop until disconnect | ||
| 522 | self.run_event_loop().await; | ||
| 523 | } | ||
| 524 | Err(e) => { | ||
| 525 | self.health_tracker.record_failure(&self.url); | ||
| 526 | } | ||
| 527 | } | ||
| 528 | |||
| 529 | // Reconnect with backoff and since filter | ||
| 530 | let backoff = self.health_tracker.get_backoff(&self.url); | ||
| 531 | tokio::time::sleep(backoff).await; | ||
| 532 | |||
| 533 | // On reconnect, use since = last_successful - 15 minutes | ||
| 534 | self.reconnect_since = Some( | ||
| 535 | Timestamp::from(self.last_successful_connection - Duration::from_secs(900)) | ||
| 536 | ); | ||
| 537 | } | ||
| 538 | } | ||
| 539 | |||
| 540 | async fn connect_and_subscribe(&mut self) -> Result<()> { | ||
| 541 | self.client.connect().await?; | ||
| 542 | |||
| 543 | let filters = self.build_three_layer_filters().await; | ||
| 544 | |||
| 545 | // Apply since filter if reconnecting | ||
| 546 | let filters = if let Some(since) = self.reconnect_since { | ||
| 547 | filters.into_iter().map(|f| f.since(since)).collect() | ||
| 548 | } else { | ||
| 549 | filters | ||
| 550 | }; | ||
| 551 | |||
| 552 | for filter in filters { | ||
| 553 | self.client.subscribe(filter, None).await?; | ||
| 554 | } | ||
| 555 | |||
| 556 | Ok(()) | ||
| 557 | } | ||
| 558 | } | ||
| 559 | ``` | ||
| 560 | |||
| 561 | ## Three-Layer Filter Strategy | ||
| 562 | |||
| 563 | ```rust | ||
| 564 | impl SyncRelayConnection { | ||
| 565 | async fn build_three_layer_filters(&self) -> Vec<Filter> { | ||
| 566 | let mut filters = Vec::new(); | ||
| 567 | |||
| 568 | // Get repos for this relay | ||
| 569 | let repos = self.sync_relays.read().await | ||
| 570 | .get(&self.url) | ||
| 571 | .cloned() | ||
| 572 | .unwrap_or_default(); | ||
| 573 | |||
| 574 | // Layer 1: Announcements (kinds 30617 + 30618) | ||
| 575 | // Note: 30618 is ONLY synced from remote relays, not self-subscribed | ||
| 576 | // Always included even if relay has no repos (bootstrap relay case) | ||
| 577 | filters.push( | ||
| 578 | Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) | ||
| 579 | ); | ||
| 580 | |||
| 581 | // Layer 2: Events tagging repos with 'a' tags (ALL kinds) | ||
| 582 | // Batched per 100 repo refs | ||
| 583 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 584 | for chunk in repo_refs.chunks(100) { | ||
| 585 | filters.push( | ||
| 586 | Filter::new() | ||
| 587 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk.to_vec()) | ||
| 588 | ); | ||
| 589 | filters.push( | ||
| 590 | Filter::new() | ||
| 591 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk.to_vec()) | ||
| 592 | ); | ||
| 593 | filters.push( | ||
| 594 | Filter::new() | ||
| 595 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 596 | ); | ||
| 597 | } | ||
| 598 | |||
| 599 | // Layer 3: Events tagging root events (batch per 100 event IDs) | ||
| 600 | let all_event_ids: HashSet<EventId> = repos.values() | ||
| 601 | .flat_map(|ids| ids.iter().cloned()) | ||
| 602 | .collect(); | ||
| 603 | |||
| 604 | let event_id_strs: Vec<String> = all_event_ids | ||
| 605 | .iter() | ||
| 606 | .map(|id| id.to_hex()) | ||
| 607 | .collect(); | ||
| 608 | |||
| 609 | for chunk in event_id_strs.chunks(100) { | ||
| 610 | filters.push( | ||
| 611 | Filter::new() | ||
| 612 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk.to_vec()) | ||
| 613 | ); | ||
| 614 | filters.push( | ||
| 615 | Filter::new() | ||
| 616 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk.to_vec()) | ||
| 617 | ); | ||
| 618 | filters.push( | ||
| 619 | Filter::new() | ||
| 620 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk.to_vec()) | ||
| 621 | ); | ||
| 622 | } | ||
| 623 | |||
| 624 | filters | ||
| 625 | } | ||
| 626 | } | ||
| 627 | ``` | ||
| 628 | |||
| 629 | ## Relay Removal | ||
| 630 | |||
| 631 | ```rust | ||
| 632 | async fn check_relay_removal(&mut self) { | ||
| 633 | let relays_to_check: Vec<String> = self.sync_relays.read().await | ||
| 634 | .keys() | ||
| 635 | .cloned() | ||
| 636 | .collect(); | ||
| 637 | |||
| 638 | for relay_url in relays_to_check { | ||
| 639 | // Never remove bootstrap relay | ||
| 640 | if Some(relay_url.as_str()) == self.config.bootstrap_relay_url.as_deref() { | ||
| 641 | continue; | ||
| 642 | } | ||
| 643 | |||
| 644 | // Check if relay has any repos left | ||
| 645 | let should_remove = { | ||
| 646 | let sync_relays = self.sync_relays.read().await; | ||
| 647 | sync_relays.get(&relay_url) | ||
| 648 | .map(|repos| repos.is_empty()) | ||
| 649 | .unwrap_or(true) | ||
| 650 | }; | ||
| 651 | |||
| 652 | if should_remove { | ||
| 653 | // Remove from HashMap | ||
| 654 | self.sync_relays.write().await.remove(&relay_url); | ||
| 655 | |||
| 656 | // Close connection | ||
| 657 | self.close_relay_connection(&relay_url).await; | ||
| 658 | } | ||
| 659 | } | ||
| 660 | } | ||
| 661 | ``` | ||
| 662 | |||
| 663 | ## Prometheus Metrics (Same as v1) | ||
| 664 | |||
| 665 | | Metric | Type | Labels | Description | | ||
| 666 | | ------------------------------------- | ------- | ------------- | ----------------------- | | ||
| 667 | | `ngit_sync_relay_connected` | Gauge | relay | Connection status 1/0 | | ||
| 668 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome | | ||
| 669 | | `ngit_sync_relay_status` | Gauge | relay | Health state 1/2/3 | | ||
| 670 | | `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures | | ||
| 671 | | `ngit_sync_events_total` | Counter | source | Events by source type | | ||
| 672 | | `ngit_sync_gap_events_total` | Counter | relay | Gap events filled | | ||
| 673 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | | ||
| 674 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected | | ||
| 675 | | `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count | | ||
| 676 | |||
| 677 | ## Module Structure (Simplified) | ||
| 678 | |||
| 679 | ``` | ||
| 680 | src/sync/ | ||
| 681 | ├── mod.rs # SyncManager + state types (FollowingRepoRootEvents, SyncRelays) | ||
| 682 | ├── self_subscriber.rs # SelfSubscriber + batching logic | ||
| 683 | ├── relay_connection.rs # Per-relay WebSocket + filters | ||
| 684 | ├── health.rs # RelayHealthTracker (reuse from v1) | ||
| 685 | └── metrics.rs # SyncMetrics (reuse from v1) | ||
| 686 | ``` | ||
| 687 | |||
| 688 | > **Note:** SyncManager and state type aliases are colocated in `mod.rs` following the pattern of `src/http/mod.rs` (HttpService) and `src/metrics/mod.rs` (Metrics). See the earlier "Module Structure" section for rationale. | ||
| 689 | |||
| 690 | ## Comparison: v1 vs v2 | ||
| 691 | |||
| 692 | | Aspect | v1 (Current) | v2 (Simplified) | | ||
| 693 | | ------------------- | ------------------------------------------------------------------ | --------------------------------------------- | | ||
| 694 | | **State Model** | Spread across FilterService, SubscriptionManager, ConnectionState | Two HashMaps derived from DB | | ||
| 695 | | **Relay Discovery** | Multiple paths: bootstrap, DB query, self-subscribe, remote events | Single path: DB init + self-subscribe | | ||
| 696 | | **Filter Updates** | Dynamic per-event subscription adds | Batched updates (5s window, starts on event) | | ||
| 697 | | **Consolidation** | Per-connection at 150 filters | Per-connection at 70 filters | | ||
| 698 | | **Batching** | Per 100 tags | Per 100 tags | | ||
| 699 | | **Reconnection** | Various backoff strategies | Unified: since = last_success - 15min | | ||
| 700 | | **Jitter** | Startup jitter | None (see design decision) | | ||
| 701 | | **30618 Handling** | Synced everywhere | Remote relays only, not self-subscribed | | ||
| 702 | | **1621 (Issues)** | Not included | Included with 1617/1618/1619 | | ||
| 703 | | **Layer 2 Scope** | Specific NIP-34 kinds | ALL kinds with 'a' tags | | ||
| 704 | | **Health Backoff** | Variable | 5s → exp → 1h max → hourly → dead@24h → daily | | ||
| 705 | |||
| 706 | ## Key Design Decisions | ||
| 707 | |||
| 708 | 1. **Single Source of Truth**: Two HashMaps represent all sync state, initialized from database | ||
| 709 | 2. **Event-Driven Updates**: Self-subscriber updates HashMaps; relay connections read from them | ||
| 710 | 3. **Batched Filter Updates**: 5-second window that starts on first event (timer does NOT reset on subsequent events) | ||
| 711 | 4. **Uniform Reconnection**: Always use `since = last_successful - 15min` | ||
| 712 | 5. **No Jitter**: Trade-offs not worth it - orphan filters and inefficiency outweigh thundering herd concerns | ||
| 713 | 6. **Bootstrap Relay Protected**: Never removed from sync_relays - ensures at least one sync connection exists even when no repositories currently list our service (cold start / recovery scenario) | ||
| 714 | 7. **30618 Remote-Only**: Maintainer state synced from remote relays, not self-subscribed | ||
| 715 | 8. **70 Filter Consolidation Threshold**: Lower than v1's 150 for earlier consolidation (conservative value that may need tuning based on production observation) | ||
| 716 | 9. **100-Tag Batching**: Consistent batch size for Layer 2 and Layer 3 filters | ||
| 717 | 10. **Layer 2 All Kinds**: Subscribe to ALL events with 'a' tags, not just NIP-34 kinds | ||
| 718 | 11. **Two-Phase Consolidation**: Incremental filters WITHOUT since first, then consolidated WITH since | ||
| 719 | 12. **Multiple Repo Refs**: Handle events that tag multiple repos correctly | ||
| 720 | 13. **Daily Full Catchup**: Periodic sync restart without `since` filter (~24h) to catch slow-propagating events | ||
| 721 | 14. **Dual DB Queries at Startup**: Separate queries for root events and announcements. Could be combined into a single query, but some relays cache by kind which may make separate queries more efficient. Trade-off deferred for future optimization. | ||
| 722 | |||
| 723 | --- | ||
| 724 | |||
| 725 | ## Detailed Flow Diagram | ||
| 726 | |||
| 727 | ```mermaid | ||
| 728 | sequenceDiagram | ||
| 729 | participant DB as Database | ||
| 730 | participant SM as SyncManager | ||
| 731 | participant SS as Self-Subscriber | ||
| 732 | participant RC as RelayConnection | ||
| 733 | |||
| 734 | Note over SM: Startup | ||
| 735 | SM->>SM: Add bootstrap relay to sync_relays | ||
| 736 | SM->>DB: Query kinds 1617/1618/1619/1621 | ||
| 737 | DB-->>SM: Root events | ||
| 738 | SM->>SM: Build following_repo_root_events | ||
| 739 | SM->>SM: Handle multi-repo events | ||
| 740 | |||
| 741 | SM->>DB: Query kind 30617 | ||
| 742 | DB-->>SM: Announcements | ||
| 743 | SM->>SM: Build sync_relays | ||
| 744 | |||
| 745 | SM->>RC: Spawn connections for each relay | ||
| 746 | RC->>RC: Build 3-layer filters from sync_relays | ||
| 747 | RC->>RC: Connect and subscribe | ||
| 748 | |||
| 749 | Note over SS: Event-Driven Updates | ||
| 750 | SS->>SS: Subscribe to 30617/1617/1618/1619/1621 | ||
| 751 | SS->>SS: Receive event - start 5s batch timer | ||
| 752 | SS->>SS: Collect more events in batch window | ||
| 753 | SS->>SS: Batch window closes | ||
| 754 | SS->>SM: Apply batched updates to HashMaps | ||
| 755 | |||
| 756 | alt New Relay Discovered | ||
| 757 | SM->>RC: Spawn new connection | ||
| 758 | else New Content for Existing Relay | ||
| 759 | alt Under 70 filter limit | ||
| 760 | SM->>RC: Add incremental filter - no since | ||
| 761 | else Over 70 filter limit | ||
| 762 | SM->>RC: Add incremental filters - no since | ||
| 763 | RC-->>SM: EOSE received | ||
| 764 | SM->>RC: Close all subscriptions | ||
| 765 | SM->>RC: Resubscribe consolidated - with since | ||
| 766 | end | ||
| 767 | else Relay Has No More Repos | ||
| 768 | alt Is Bootstrap Relay | ||
| 769 | SM->>SM: Keep connection - Layer 1 only | ||
| 770 | else Not Bootstrap Relay | ||
| 771 | SM->>RC: Close connection | ||
| 772 | end | ||
| 773 | end | ||
| 774 | |||
| 775 | Note over RC: Connection Lifecycle | ||
| 776 | RC->>RC: Process incoming events | ||
| 777 | RC->>DB: Store via acceptance policy | ||
| 778 | |||
| 779 | RC->>RC: Connection drops | ||
| 780 | RC->>RC: Wait backoff - 5s to 1h exponential | ||
| 781 | RC->>RC: Reconnect with since = last_success - 15min | ||
| 782 | |||
| 783 | Note over RC: If failures continue 24h | ||
| 784 | RC->>RC: Mark dead - retry once per 24h | ||
| 785 | ``` | ||
diff --git a/docs/explanation/grasp-02-proactive-sync-v3.md b/docs/explanation/grasp-02-proactive-sync-v3.md deleted file mode 100644 index 30b3102..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v3.md +++ /dev/null | |||
| @@ -1,871 +0,0 @@ | |||
| 1 | # GRASP-02: Proactive Sync v3 - Event-Driven Design | ||
| 2 | |||
| 3 | ## Overview | ||
| 4 | |||
| 5 | This document presents v3 of the proactive sync design. Key principles: | ||
| 6 | |||
| 7 | 1. **Self-subscription as the only mechanism** - No database initialization at startup | ||
| 8 | 2. **Batch-based pending tracking** - Each batch confirms independently | ||
| 9 | 3. **Single action type** - AddFilters only, auto-spawn connections | ||
| 10 | 4. **Three-way state model** - RepoSyncIndex (want) → PendingSyncIndex (in-flight) → RelaySyncIndex (confirmed) | ||
| 11 | |||
| 12 | --- | ||
| 13 | |||
| 14 | ## Data Model | ||
| 15 | |||
| 16 | ### RepoSyncIndex (Source of Truth) | ||
| 17 | |||
| 18 | ```rust | ||
| 19 | /// What we WANT to sync - derived from events received via self-subscription. | ||
| 20 | /// Updated immediately when self-subscriber batch fires. | ||
| 21 | /// Key: repo addressable ref ("30617:pubkey:identifier") | ||
| 22 | pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>; | ||
| 23 | |||
| 24 | #[derive(Debug, Clone, Default)] | ||
| 25 | pub struct RepoSyncNeeds { | ||
| 26 | /// Relay URLs listed in this repo's 30617 announcement | ||
| 27 | pub relays: HashSet<String>, | ||
| 28 | /// Root event IDs (1617/1618/1619/1621) that reference this repo | ||
| 29 | pub root_events: HashSet<EventId>, | ||
| 30 | } | ||
| 31 | ``` | ||
| 32 | |||
| 33 | ### RelaySyncIndex (Confirmed State + Connection) | ||
| 34 | |||
| 35 | ```rust | ||
| 36 | /// What we've CONFIRMED syncing - includes connection state for integrated lifecycle. | ||
| 37 | /// Key: relay URL | ||
| 38 | pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>; | ||
| 39 | |||
| 40 | /// Connection status for a relay | ||
| 41 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 42 | pub enum ConnectionStatus { | ||
| 43 | /// Not currently connected | ||
| 44 | Disconnected, | ||
| 45 | /// Connection attempt in progress | ||
| 46 | Connecting, | ||
| 47 | /// Successfully connected and subscribed | ||
| 48 | Connected, | ||
| 49 | } | ||
| 50 | |||
| 51 | /// Complete state for a single relay - combines sync needs with connection lifecycle | ||
| 52 | #[derive(Debug)] | ||
| 53 | pub struct RelayState { | ||
| 54 | /// Repos we've confirmed syncing from this relay | ||
| 55 | pub repos: HashSet<String>, | ||
| 56 | /// Root events we've confirmed tracking | ||
| 57 | pub root_events: HashSet<EventId>, | ||
| 58 | /// If true, never disconnect this relay | ||
| 59 | pub is_bootstrap: bool, | ||
| 60 | /// Current connection status | ||
| 61 | pub connection_status: ConnectionStatus, | ||
| 62 | /// When we last successfully connected (for since filter on reconnect) | ||
| 63 | pub last_connected: Option<Timestamp>, | ||
| 64 | /// When we disconnected (for 15-minute state retention rule) | ||
| 65 | pub disconnected_at: Option<Timestamp>, | ||
| 66 | /// The active connection (None if disconnected) | ||
| 67 | pub connection: Option<RelayConnection>, | ||
| 68 | } | ||
| 69 | |||
| 70 | impl RelayState { | ||
| 71 | /// Check if state should be cleared based on 15-minute rule | ||
| 72 | pub fn should_clear_state(&self) -> bool { | ||
| 73 | match self.disconnected_at { | ||
| 74 | Some(disconnected) => { | ||
| 75 | let now = Timestamp::now(); | ||
| 76 | now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes | ||
| 77 | } | ||
| 78 | None => false, // Still connected or never connected | ||
| 79 | } | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Clear repos and root_events (called when reconnect takes > 15 minutes) | ||
| 83 | pub fn clear_sync_state(&mut self) { | ||
| 84 | self.repos.clear(); | ||
| 85 | self.root_events.clear(); | ||
| 86 | } | ||
| 87 | } | ||
| 88 | ``` | ||
| 89 | |||
| 90 | ### PendingSyncIndex (In-Flight Batches) | ||
| 91 | |||
| 92 | ```rust | ||
| 93 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. | ||
| 94 | /// Each batch has its own ID and can confirm independently. | ||
| 95 | /// Key: relay URL | ||
| 96 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | ||
| 97 | |||
| 98 | #[derive(Debug, Clone)] | ||
| 99 | pub struct PendingBatch { | ||
| 100 | /// Unique ID for this batch (for debugging/logging) | ||
| 101 | pub batch_id: u64, | ||
| 102 | /// The items this batch is syncing | ||
| 103 | pub items: PendingItems, | ||
| 104 | /// Subscription IDs that must ALL receive EOSE before confirming | ||
| 105 | pub outstanding_subs: HashSet<SubscriptionId>, | ||
| 106 | } | ||
| 107 | |||
| 108 | #[derive(Debug, Clone, Default)] | ||
| 109 | pub struct PendingItems { | ||
| 110 | pub repos: HashSet<String>, | ||
| 111 | pub root_events: HashSet<EventId>, | ||
| 112 | } | ||
| 113 | ``` | ||
| 114 | |||
| 115 | --- | ||
| 116 | |||
| 117 | ## State Flow | ||
| 118 | |||
| 119 | ```mermaid | ||
| 120 | flowchart TB | ||
| 121 | subgraph Input | ||
| 122 | SS[SelfSubscriber] | ||
| 123 | OWN[Own Relay] | ||
| 124 | end | ||
| 125 | |||
| 126 | subgraph RepoSyncIndex - Want | ||
| 127 | RSI[HashMap of Repo to Relays+Events] | ||
| 128 | end | ||
| 129 | |||
| 130 | subgraph Derived Target | ||
| 131 | DT[derive_relay_targets fn] | ||
| 132 | TGT[Per-relay: repos + events we should sync] | ||
| 133 | end | ||
| 134 | |||
| 135 | subgraph PendingSyncIndex - In Flight | ||
| 136 | PSI[Vec of PendingBatch per relay] | ||
| 137 | end | ||
| 138 | |||
| 139 | subgraph RelaySyncIndex - State + Connection | ||
| 140 | RLI[RelayState per relay] | ||
| 141 | CONN[connection: Option of RelayConnection] | ||
| 142 | STATUS[connection_status: Connected/Disconnected/Connecting] | ||
| 143 | REPOS[repos + root_events] | ||
| 144 | end | ||
| 145 | |||
| 146 | SS -->|subscribe| OWN | ||
| 147 | OWN -->|events| SS | ||
| 148 | SS -->|batch fires| RSI | ||
| 149 | RSI --> DT | ||
| 150 | DT --> TGT | ||
| 151 | TGT -->|diff: target - pending - confirmed| DIFF[Compute new items] | ||
| 152 | PSI --> DIFF | ||
| 153 | RLI --> DIFF | ||
| 154 | DIFF -->|skip if disconnected| CHECK{Connected?} | ||
| 155 | CHECK -->|yes| AF[AddFilters] | ||
| 156 | CHECK -->|no| QUEUE[Queued in RelayState.repos] | ||
| 157 | AF -->|subscribe| CONN | ||
| 158 | AF -->|create batch| PSI | ||
| 159 | CONN -->|EOSE| PSI | ||
| 160 | PSI -->|batch complete| REPOS | ||
| 161 | CONN -->|disconnect event| DISC[Mark Disconnected + set disconnected_at] | ||
| 162 | DISC -->|reconnect| RECONN[On Reconnect] | ||
| 163 | RECONN -->|check 15min rule| RULE{disconnected > 15min?} | ||
| 164 | RULE -->|yes| CLEAR[Clear repos/root_events] | ||
| 165 | RULE -->|no| RETAIN[Keep retained state] | ||
| 166 | CLEAR --> REGEN[Regenerate AddFilters from RepoSyncIndex] | ||
| 167 | RETAIN --> RESUB[Resubscribe with since filter] | ||
| 168 | ``` | ||
| 169 | |||
| 170 | ### Connection Lifecycle Integration | ||
| 171 | |||
| 172 | The `RelayState` struct now owns both the connection and sync state: | ||
| 173 | |||
| 174 | ```rust | ||
| 175 | // On disconnect (detected via RelayPoolNotification::Shutdown or handle_notifications returning) | ||
| 176 | fn handle_disconnect(&mut self, relay_url: &str) { | ||
| 177 | if let Some(state) = self.relay_sync_index.write().await.get_mut(relay_url) { | ||
| 178 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 179 | state.disconnected_at = Some(Timestamp::now()); | ||
| 180 | state.connection = None; | ||
| 181 | |||
| 182 | // Clear any pending batches for this relay | ||
| 183 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 184 | } | ||
| 185 | } | ||
| 186 | |||
| 187 | // On reconnect | ||
| 188 | async fn handle_reconnect(&mut self, relay_url: &str) -> Result<(), Error> { | ||
| 189 | let mut index = self.relay_sync_index.write().await; | ||
| 190 | let state = index.get_mut(relay_url).ok_or("Relay not in index")?; | ||
| 191 | |||
| 192 | // Apply 15-minute state retention rule | ||
| 193 | if state.should_clear_state() { | ||
| 194 | tracing::info!("Reconnect after >15min for {}, clearing state", relay_url); | ||
| 195 | state.clear_sync_state(); | ||
| 196 | } | ||
| 197 | |||
| 198 | // Create new connection | ||
| 199 | state.connection_status = ConnectionStatus::Connecting; | ||
| 200 | let connection = RelayConnection::new(relay_url.to_string()); | ||
| 201 | |||
| 202 | // Connect with since filter if we have last_connected | ||
| 203 | let since = state.last_connected.map(|ts| { | ||
| 204 | Timestamp::from(ts.as_u64().saturating_sub(900)) // -15 min buffer | ||
| 205 | }); | ||
| 206 | |||
| 207 | connection.connect_and_subscribe_with_since(since).await?; | ||
| 208 | |||
| 209 | state.connection = Some(connection); | ||
| 210 | state.connection_status = ConnectionStatus::Connected; | ||
| 211 | state.last_connected = Some(Timestamp::now()); | ||
| 212 | state.disconnected_at = None; | ||
| 213 | |||
| 214 | drop(index); // Release lock | ||
| 215 | |||
| 216 | // Regenerate AddFilters from current state (either retained or fresh from RepoSyncIndex) | ||
| 217 | self.regenerate_filters_for_relay(relay_url).await; | ||
| 218 | |||
| 219 | Ok(()) | ||
| 220 | } | ||
| 221 | |||
| 222 | /// Regenerate AddFilters for a relay after reconnection | ||
| 223 | async fn regenerate_filters_for_relay(&mut self, relay_url: &str) { | ||
| 224 | let repo_index = self.repo_sync_index.read().await; | ||
| 225 | let targets = derive_relay_targets(&repo_index); | ||
| 226 | |||
| 227 | if let Some(target) = targets.get(relay_url) { | ||
| 228 | // Build filters for everything this relay should sync | ||
| 229 | let filters = build_filters(&target.repos, &target.root_events); | ||
| 230 | |||
| 231 | // Create and process AddFilters action | ||
| 232 | let action = AddFilters { | ||
| 233 | relay_url: relay_url.to_string(), | ||
| 234 | repos: target.repos.clone(), | ||
| 235 | root_events: target.root_events.clone(), | ||
| 236 | filters, | ||
| 237 | }; | ||
| 238 | |||
| 239 | self.handle_add_filters(action).await; | ||
| 240 | } | ||
| 241 | } | ||
| 242 | ``` | ||
| 243 | |||
| 244 | --- | ||
| 245 | |||
| 246 | ## Action Type | ||
| 247 | |||
| 248 | ```rust | ||
| 249 | /// Action sent from SelfSubscriber to SyncManager. | ||
| 250 | /// SyncManager auto-spawns relay connections if they don't exist. | ||
| 251 | pub struct AddFilters { | ||
| 252 | pub relay_url: String, | ||
| 253 | /// Items this action covers (for pending tracking) | ||
| 254 | pub repos: HashSet<String>, | ||
| 255 | pub root_events: HashSet<EventId>, | ||
| 256 | /// Pre-batched filters (each with <= 100 tags) | ||
| 257 | pub filters: Vec<Filter>, | ||
| 258 | } | ||
| 259 | ``` | ||
| 260 | |||
| 261 | --- | ||
| 262 | |||
| 263 | ## Core Algorithms | ||
| 264 | |||
| 265 | ### 1. derive_relay_targets | ||
| 266 | |||
| 267 | Transform RepoSyncIndex into per-relay sync targets: | ||
| 268 | |||
| 269 | ```rust | ||
| 270 | fn derive_relay_targets( | ||
| 271 | repo_index: &HashMap<String, RepoSyncNeeds> | ||
| 272 | ) -> HashMap<String, RelaySyncNeeds> { | ||
| 273 | let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); | ||
| 274 | |||
| 275 | for (repo_ref, needs) in repo_index { | ||
| 276 | for relay_url in &needs.relays { | ||
| 277 | let target = targets.entry(relay_url.clone()).or_default(); | ||
| 278 | target.repos.insert(repo_ref.clone()); | ||
| 279 | target.root_events.extend(needs.root_events.iter().cloned()); | ||
| 280 | } | ||
| 281 | } | ||
| 282 | |||
| 283 | targets | ||
| 284 | } | ||
| 285 | ``` | ||
| 286 | |||
| 287 | ### 2. compute_actions (Three-Way Diff) | ||
| 288 | |||
| 289 | ```rust | ||
| 290 | fn compute_actions( | ||
| 291 | targets: &HashMap<String, RelaySyncNeeds>, | ||
| 292 | pending: &HashMap<String, Vec<PendingBatch>>, | ||
| 293 | confirmed: &HashMap<String, RelayState>, | ||
| 294 | ) -> Vec<AddFilters> { | ||
| 295 | let mut actions = Vec::new(); | ||
| 296 | |||
| 297 | for (relay_url, target) in targets { | ||
| 298 | // Skip disconnected relays - they'll get AddFilters on reconnect | ||
| 299 | if let Some(state) = confirmed.get(relay_url) { | ||
| 300 | if state.connection_status != ConnectionStatus::Connected { | ||
| 301 | continue; | ||
| 302 | } | ||
| 303 | } | ||
| 304 | |||
| 305 | // Collect all pending items for this relay | ||
| 306 | let pending_repos: HashSet<_> = pending.get(relay_url) | ||
| 307 | .map(|batches| batches.iter() | ||
| 308 | .flat_map(|b| b.items.repos.iter().cloned()) | ||
| 309 | .collect()) | ||
| 310 | .unwrap_or_default(); | ||
| 311 | let pending_events: HashSet<_> = pending.get(relay_url) | ||
| 312 | .map(|batches| batches.iter() | ||
| 313 | .flat_map(|b| b.items.root_events.iter().cloned()) | ||
| 314 | .collect()) | ||
| 315 | .unwrap_or_default(); | ||
| 316 | |||
| 317 | // Collect confirmed items for this relay | ||
| 318 | let confirmed_repos = confirmed.get(relay_url) | ||
| 319 | .map(|c| &c.repos) | ||
| 320 | .unwrap_or(&HashSet::new()); | ||
| 321 | let confirmed_events = confirmed.get(relay_url) | ||
| 322 | .map(|c| &c.root_events) | ||
| 323 | .unwrap_or(&HashSet::new()); | ||
| 324 | |||
| 325 | // New = target - pending - confirmed | ||
| 326 | let new_repos: HashSet<_> = target.repos.iter() | ||
| 327 | .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) | ||
| 328 | .cloned() | ||
| 329 | .collect(); | ||
| 330 | let new_events: HashSet<_> = target.root_events.iter() | ||
| 331 | .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) | ||
| 332 | .cloned() | ||
| 333 | .collect(); | ||
| 334 | |||
| 335 | if !new_repos.is_empty() || !new_events.is_empty() { | ||
| 336 | let filters = build_filters(&new_repos, &new_events); | ||
| 337 | actions.push(AddFilters { | ||
| 338 | relay_url: relay_url.clone(), | ||
| 339 | repos: new_repos, | ||
| 340 | root_events: new_events, | ||
| 341 | filters, | ||
| 342 | }); | ||
| 343 | } | ||
| 344 | } | ||
| 345 | |||
| 346 | actions | ||
| 347 | } | ||
| 348 | ``` | ||
| 349 | |||
| 350 | ### 3. handle_add_filters (SyncManager) | ||
| 351 | |||
| 352 | ```rust | ||
| 353 | impl SyncManager { | ||
| 354 | async fn handle_add_filters(&mut self, action: AddFilters) { | ||
| 355 | let AddFilters { relay_url, repos, root_events, filters } = action; | ||
| 356 | |||
| 357 | // Auto-spawn connection if needed | ||
| 358 | if !self.connections.contains_key(&relay_url) { | ||
| 359 | self.spawn_connection(&relay_url).await; | ||
| 360 | } | ||
| 361 | |||
| 362 | let conn = self.connections.get(&relay_url).unwrap(); | ||
| 363 | |||
| 364 | // Subscribe and collect subscription IDs | ||
| 365 | // nostr-sdk 0.44: subscribe returns Output<Vec<SubscriptionId>> | ||
| 366 | // since we're only subscribed to one relay per connection | ||
| 367 | let mut sub_ids = HashSet::new(); | ||
| 368 | for filter in filters { | ||
| 369 | // cloned filter for each subscription call | ||
| 370 | match conn.client.subscribe(filter, None).await { | ||
| 371 | Ok(output) => { | ||
| 372 | // Output contains subscription IDs for each relay | ||
| 373 | for sub_id in output.val { | ||
| 374 | sub_ids.insert(sub_id); | ||
| 375 | } | ||
| 376 | } | ||
| 377 | Err(e) => { | ||
| 378 | tracing::warn!("Failed to subscribe: {}", e); | ||
| 379 | } | ||
| 380 | } | ||
| 381 | } | ||
| 382 | |||
| 383 | // Create pending batch | ||
| 384 | let batch = PendingBatch { | ||
| 385 | batch_id: self.next_batch_id(), | ||
| 386 | items: PendingItems { repos, root_events }, | ||
| 387 | outstanding_subs: sub_ids, | ||
| 388 | }; | ||
| 389 | |||
| 390 | // Add to pending index | ||
| 391 | self.pending_sync_index.write().await | ||
| 392 | .entry(relay_url) | ||
| 393 | .or_default() | ||
| 394 | .push(batch); | ||
| 395 | } | ||
| 396 | } | ||
| 397 | ``` | ||
| 398 | |||
| 399 | ### 4. handle_eose (Batch Completion) | ||
| 400 | |||
| 401 | ```rust | ||
| 402 | impl SyncManager { | ||
| 403 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { | ||
| 404 | let mut pending = self.pending_sync_index.write().await; | ||
| 405 | |||
| 406 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 407 | // Find which batch this subscription belongs to | ||
| 408 | for batch in batches.iter_mut() { | ||
| 409 | if batch.outstanding_subs.remove(&sub_id) { | ||
| 410 | // Check if batch is now complete | ||
| 411 | if batch.outstanding_subs.is_empty() { | ||
| 412 | // Move items to confirmed | ||
| 413 | let items = batch.items.clone(); | ||
| 414 | drop(pending); // Release lock before acquiring another | ||
| 415 | |||
| 416 | let mut confirmed = self.relay_sync_index.write().await; | ||
| 417 | let relay_confirmed = confirmed | ||
| 418 | .entry(relay_url.to_string()) | ||
| 419 | .or_default(); | ||
| 420 | relay_confirmed.repos.extend(items.repos); | ||
| 421 | relay_confirmed.root_events.extend(items.root_events); | ||
| 422 | |||
| 423 | tracing::info!( | ||
| 424 | "Batch {} complete for {} - confirmed {} repos, {} events", | ||
| 425 | batch.batch_id, relay_url, | ||
| 426 | items.repos.len(), items.root_events.len() | ||
| 427 | ); | ||
| 428 | } | ||
| 429 | break; | ||
| 430 | } | ||
| 431 | } | ||
| 432 | |||
| 433 | // Clean up completed batches | ||
| 434 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 435 | batches.retain(|b| !b.outstanding_subs.is_empty()); | ||
| 436 | } | ||
| 437 | } | ||
| 438 | } | ||
| 439 | } | ||
| 440 | ``` | ||
| 441 | |||
| 442 | --- | ||
| 443 | |||
| 444 | ## Self-Subscriber Flow | ||
| 445 | |||
| 446 | ### State Tracking | ||
| 447 | |||
| 448 | ```rust | ||
| 449 | pub struct SelfSubscriber { | ||
| 450 | own_relay_url: String, | ||
| 451 | relay_domain: String, | ||
| 452 | repo_sync_index: RepoSyncIndex, | ||
| 453 | pending_sync_index: PendingSyncIndex, | ||
| 454 | relay_sync_index: RelaySyncIndex, | ||
| 455 | action_tx: mpsc::Sender<AddFilters>, | ||
| 456 | /// Timestamp of last successful connection - used for since filter on reconnection | ||
| 457 | last_connected: Option<Timestamp>, | ||
| 458 | /// Is this the first connection attempt since startup? | ||
| 459 | is_initial_connect: bool, | ||
| 460 | } | ||
| 461 | ``` | ||
| 462 | |||
| 463 | ### On Startup | ||
| 464 | |||
| 465 | ```rust | ||
| 466 | impl SelfSubscriber { | ||
| 467 | async fn run(mut self) { | ||
| 468 | // Connect to own relay | ||
| 469 | let client = Client::new(Keys::generate()); | ||
| 470 | client.add_relay(&self.own_relay_url).await?; | ||
| 471 | client.connect().await; | ||
| 472 | |||
| 473 | // Track connection time | ||
| 474 | self.last_connected = Some(Timestamp::now()); | ||
| 475 | |||
| 476 | // Subscribe WITHOUT since filter (get all historical) on first connect | ||
| 477 | let filter = Filter::new().kinds([ | ||
| 478 | Kind::Custom(30617), // Repository announcements | ||
| 479 | Kind::GitPatch, // 1617 | ||
| 480 | Kind::Custom(1618), // PRs | ||
| 481 | Kind::Custom(1619), // PR updates | ||
| 482 | Kind::GitIssue, // 1621 | ||
| 483 | ]); | ||
| 484 | |||
| 485 | client.subscribe(filter, None).await?; | ||
| 486 | self.is_initial_connect = false; | ||
| 487 | |||
| 488 | // Run event loop with batching | ||
| 489 | self.event_loop(&client).await; | ||
| 490 | } | ||
| 491 | } | ||
| 492 | ``` | ||
| 493 | |||
| 494 | ### On Reconnection | ||
| 495 | |||
| 496 | ```rust | ||
| 497 | impl SelfSubscriber { | ||
| 498 | async fn reconnect(&mut self, client: &Client) -> Result<(), Error> { | ||
| 499 | // Reconnect to own relay | ||
| 500 | client.connect().await; | ||
| 501 | |||
| 502 | // On reconnection ONLY, use since filter based on last_connected | ||
| 503 | let since = match self.last_connected { | ||
| 504 | Some(ts) => Timestamp::from(ts.as_u64().saturating_sub(900)), // -15 minutes buffer | ||
| 505 | None => Timestamp::from(0), // Shouldn't happen, but fall back to full sync | ||
| 506 | }; | ||
| 507 | |||
| 508 | // Update last_connected AFTER computing since | ||
| 509 | self.last_connected = Some(Timestamp::now()); | ||
| 510 | |||
| 511 | let filter = Filter::new() | ||
| 512 | .kinds([ | ||
| 513 | Kind::Custom(30617), | ||
| 514 | Kind::GitPatch, | ||
| 515 | Kind::Custom(1618), | ||
| 516 | Kind::Custom(1619), | ||
| 517 | Kind::GitIssue, | ||
| 518 | ]) | ||
| 519 | .since(since); | ||
| 520 | |||
| 521 | client.subscribe(filter, None).await?; | ||
| 522 | Ok(()) | ||
| 523 | } | ||
| 524 | } | ||
| 525 | ``` | ||
| 526 | |||
| 527 | ### Batching Logic | ||
| 528 | |||
| 529 | ```rust | ||
| 530 | impl SelfSubscriber { | ||
| 531 | async fn event_loop(&self, client: &Client) { | ||
| 532 | let mut pending_events: Vec<Event> = Vec::new(); | ||
| 533 | let mut batch_timer: Option<Instant> = None; | ||
| 534 | let batch_window = Duration::from_secs(5); | ||
| 535 | |||
| 536 | loop { | ||
| 537 | let timeout = batch_timer | ||
| 538 | .map(|t| batch_window.saturating_sub(t.elapsed())) | ||
| 539 | .unwrap_or(Duration::from_secs(60)); | ||
| 540 | |||
| 541 | tokio::select! { | ||
| 542 | notification = client.notifications().recv() => { | ||
| 543 | if let Ok(RelayPoolNotification::Event { event, .. }) = notification { | ||
| 544 | pending_events.push(*event); | ||
| 545 | |||
| 546 | // Start timer on first event (does NOT reset) | ||
| 547 | if batch_timer.is_none() { | ||
| 548 | batch_timer = Some(Instant::now()); | ||
| 549 | } | ||
| 550 | } | ||
| 551 | } | ||
| 552 | _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { | ||
| 553 | // Batch window elapsed | ||
| 554 | self.process_batch(pending_events.drain(..).collect()).await; | ||
| 555 | batch_timer = None; | ||
| 556 | } | ||
| 557 | } | ||
| 558 | } | ||
| 559 | } | ||
| 560 | |||
| 561 | async fn process_batch(&self, events: Vec<Event>) { | ||
| 562 | // 1. Update RepoSyncIndex | ||
| 563 | for event in events { | ||
| 564 | match event.kind.as_u16() { | ||
| 565 | 30617 => self.handle_announcement(&event).await, | ||
| 566 | 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, | ||
| 567 | _ => {} | ||
| 568 | } | ||
| 569 | } | ||
| 570 | |||
| 571 | // 2. Derive targets and compute actions | ||
| 572 | let repo_index = self.repo_sync_index.read().await; | ||
| 573 | let targets = derive_relay_targets(&repo_index); | ||
| 574 | |||
| 575 | let pending = self.pending_sync_index.read().await; | ||
| 576 | let confirmed = self.relay_sync_index.read().await; | ||
| 577 | |||
| 578 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 579 | |||
| 580 | drop(repo_index); | ||
| 581 | drop(pending); | ||
| 582 | drop(confirmed); | ||
| 583 | |||
| 584 | // 3. Send actions to SyncManager | ||
| 585 | for action in actions { | ||
| 586 | let _ = self.action_tx.send(action).await; | ||
| 587 | } | ||
| 588 | } | ||
| 589 | } | ||
| 590 | ``` | ||
| 591 | |||
| 592 | --- | ||
| 593 | |||
| 594 | ## Bootstrap Relay | ||
| 595 | |||
| 596 | ```rust | ||
| 597 | impl SyncManager { | ||
| 598 | async fn initialize_bootstrap(&mut self) { | ||
| 599 | if let Some(url) = &self.config.bootstrap_relay_url { | ||
| 600 | // Pre-mark as bootstrap (never removed) | ||
| 601 | self.relay_sync_index.write().await.insert( | ||
| 602 | url.clone(), | ||
| 603 | RelaySyncNeeds { | ||
| 604 | repos: HashSet::new(), | ||
| 605 | root_events: HashSet::new(), | ||
| 606 | is_bootstrap: true, | ||
| 607 | } | ||
| 608 | ); | ||
| 609 | |||
| 610 | // Send Layer 1 filter | ||
| 611 | let filters = vec![ | ||
| 612 | Filter::new().kinds([Kind::Custom(30617), Kind::Custom(30618)]) | ||
| 613 | ]; | ||
| 614 | |||
| 615 | self.handle_add_filters(AddFilters { | ||
| 616 | relay_url: url.clone(), | ||
| 617 | repos: HashSet::new(), // Layer 1 doesn't track specific repos | ||
| 618 | root_events: HashSet::new(), | ||
| 619 | filters, | ||
| 620 | }).await; | ||
| 621 | } | ||
| 622 | } | ||
| 623 | } | ||
| 624 | ``` | ||
| 625 | |||
| 626 | --- | ||
| 627 | |||
| 628 | ## Disconnect Handling | ||
| 629 | |||
| 630 | Direct in SyncManager (not via action): | ||
| 631 | |||
| 632 | ```rust | ||
| 633 | impl SyncManager { | ||
| 634 | async fn check_disconnects(&mut self) { | ||
| 635 | let confirmed = self.relay_sync_index.read().await; | ||
| 636 | |||
| 637 | for (relay_url, state) in confirmed.iter() { | ||
| 638 | if state.is_bootstrap { | ||
| 639 | continue; // Never disconnect bootstrap | ||
| 640 | } | ||
| 641 | |||
| 642 | if state.repos.is_empty() && state.root_events.is_empty() { | ||
| 643 | // No repos - disconnect | ||
| 644 | self.disconnect_relay(relay_url).await; | ||
| 645 | } | ||
| 646 | } | ||
| 647 | } | ||
| 648 | |||
| 649 | async fn disconnect_relay(&mut self, relay_url: &str) { | ||
| 650 | self.relay_sync_index.write().await.remove(relay_url); | ||
| 651 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 652 | |||
| 653 | if let Some(conn) = self.connections.remove(relay_url) { | ||
| 654 | conn.disconnect().await; | ||
| 655 | } | ||
| 656 | } | ||
| 657 | } | ||
| 658 | ``` | ||
| 659 | |||
| 660 | --- | ||
| 661 | |||
| 662 | ## Relay Connection Lifecycle | ||
| 663 | |||
| 664 | ### State Machine for External Relays | ||
| 665 | |||
| 666 | ```mermaid | ||
| 667 | stateDiagram-v2 | ||
| 668 | [*] --> Connecting: spawn_connection | ||
| 669 | Connecting --> Connected: success | ||
| 670 | Connecting --> Backoff: failure | ||
| 671 | Connected --> Disconnected: connection lost | ||
| 672 | Connected --> [*]: intentional disconnect | ||
| 673 | Disconnected --> Backoff: record_failure | ||
| 674 | Backoff --> Connecting: backoff elapsed | ||
| 675 | Backoff --> Dead: 24h continuous failures | ||
| 676 | Dead --> Connecting: daily retry | ||
| 677 | ``` | ||
| 678 | |||
| 679 | ### Health Integration | ||
| 680 | |||
| 681 | Uses `RelayHealthTracker` from [`src/sync/health.rs`](../../src/sync/health.rs): | ||
| 682 | |||
| 683 | ```rust | ||
| 684 | impl SyncManager { | ||
| 685 | /// Spawn a connection with health tracking | ||
| 686 | async fn spawn_connection(&mut self, relay_url: &str) { | ||
| 687 | // Check if we should attempt connection | ||
| 688 | if !self.health_tracker.should_attempt_connection(relay_url) { | ||
| 689 | let remaining = self.health_tracker.get_remaining_backoff(relay_url); | ||
| 690 | tracing::debug!( | ||
| 691 | "Skipping connection to {} - backoff {:?}", | ||
| 692 | relay_url, | ||
| 693 | remaining | ||
| 694 | ); | ||
| 695 | return; | ||
| 696 | } | ||
| 697 | |||
| 698 | match self.try_connect(relay_url).await { | ||
| 699 | Ok(conn) => { | ||
| 700 | self.health_tracker.record_success(relay_url); | ||
| 701 | self.connections.insert(relay_url.to_string(), conn); | ||
| 702 | } | ||
| 703 | Err(e) => { | ||
| 704 | self.health_tracker.record_failure(relay_url); | ||
| 705 | tracing::warn!("Connection to {} failed: {}", relay_url, e); | ||
| 706 | } | ||
| 707 | } | ||
| 708 | } | ||
| 709 | } | ||
| 710 | ``` | ||
| 711 | |||
| 712 | ### Reconnection Loop | ||
| 713 | |||
| 714 | Each relay connection runs its own reconnection loop: | ||
| 715 | |||
| 716 | ```rust | ||
| 717 | impl RelayConnection { | ||
| 718 | async fn run_with_reconnection( | ||
| 719 | mut self, | ||
| 720 | health_tracker: Arc<RelayHealthTracker>, | ||
| 721 | event_tx: mpsc::Sender<RelayEvent>, | ||
| 722 | ) { | ||
| 723 | loop { | ||
| 724 | // Check backoff before attempting | ||
| 725 | if !health_tracker.should_attempt_connection(&self.url) { | ||
| 726 | if let Some(remaining) = health_tracker.get_remaining_backoff(&self.url) { | ||
| 727 | tokio::time::sleep(remaining).await; | ||
| 728 | continue; | ||
| 729 | } | ||
| 730 | } | ||
| 731 | |||
| 732 | // Attempt connection | ||
| 733 | match self.connect_and_subscribe().await { | ||
| 734 | Ok(()) => { | ||
| 735 | health_tracker.record_success(&self.url); | ||
| 736 | |||
| 737 | // Track when we connected for since filter on reconnect | ||
| 738 | let connected_at = Timestamp::now(); | ||
| 739 | |||
| 740 | // Run event loop until disconnection | ||
| 741 | self.run_event_loop(&event_tx).await; | ||
| 742 | |||
| 743 | // Connection lost - will reconnect with since filter | ||
| 744 | health_tracker.record_failure(&self.url); | ||
| 745 | |||
| 746 | // On reconnect, use since = connected_at - 15 minutes | ||
| 747 | self.set_reconnect_since(connected_at); | ||
| 748 | } | ||
| 749 | Err(e) => { | ||
| 750 | health_tracker.record_failure(&self.url); | ||
| 751 | tracing::warn!("Connection to {} failed: {}", self.url, e); | ||
| 752 | } | ||
| 753 | } | ||
| 754 | |||
| 755 | // Get backoff duration and wait | ||
| 756 | let state = health_tracker.get_state(&self.url); | ||
| 757 | if state == HealthState::Dead { | ||
| 758 | // Dead relays retry once per 24 hours | ||
| 759 | tokio::time::sleep(Duration::from_secs(24 * 3600)).await; | ||
| 760 | } | ||
| 761 | // Otherwise, loop will check should_attempt_connection | ||
| 762 | } | ||
| 763 | } | ||
| 764 | } | ||
| 765 | ``` | ||
| 766 | |||
| 767 | ### Backoff Configuration | ||
| 768 | |||
| 769 | From existing [`RelayHealthTracker`](../../src/sync/health.rs:91): | ||
| 770 | |||
| 771 | | Parameter | Value | Notes | | ||
| 772 | |-----------|-------|-------| | ||
| 773 | | Base backoff | 5 seconds | First failure | | ||
| 774 | | Backoff multiplier | 2x | Exponential increase | | ||
| 775 | | Max backoff | 1 hour (configurable) | `sync_max_backoff_secs` | | ||
| 776 | | Dead threshold | 24 hours | Continuous failures | | ||
| 777 | | Dead retry interval | 24 hours | Once per day | | ||
| 778 | |||
| 779 | --- | ||
| 780 | |||
| 781 | ## Consolidation | ||
| 782 | |||
| 783 | ### Threshold-Based (70 filters) | ||
| 784 | |||
| 785 | ```rust | ||
| 786 | impl SyncManager { | ||
| 787 | async fn maybe_consolidate(&mut self, relay_url: &str) { | ||
| 788 | let filter_count = self.get_filter_count(relay_url).await; | ||
| 789 | |||
| 790 | if filter_count > 70 { | ||
| 791 | self.consolidate(relay_url).await; | ||
| 792 | } | ||
| 793 | } | ||
| 794 | |||
| 795 | async fn consolidate(&mut self, relay_url: &str) { | ||
| 796 | // 1. Wait for all pending batches to complete | ||
| 797 | self.wait_pending_complete(relay_url).await; | ||
| 798 | |||
| 799 | // 2. Close all subscriptions | ||
| 800 | self.close_all_subs(relay_url).await; | ||
| 801 | |||
| 802 | // 3. Rebuild filters from confirmed state | ||
| 803 | let confirmed = self.relay_sync_index.read().await; | ||
| 804 | let state = confirmed.get(relay_url)?; | ||
| 805 | let filters = build_filters(&state.repos, &state.root_events); | ||
| 806 | |||
| 807 | // 4. Resubscribe with since = now - 15 minutes | ||
| 808 | let since = Timestamp::now() - 900; | ||
| 809 | for filter in filters { | ||
| 810 | self.subscribe(relay_url, filter.since(since)).await; | ||
| 811 | } | ||
| 812 | } | ||
| 813 | } | ||
| 814 | ``` | ||
| 815 | |||
| 816 | ### Daily Timer (23-25h Random) | ||
| 817 | |||
| 818 | ```rust | ||
| 819 | impl SyncManager { | ||
| 820 | async fn run_daily_consolidation(&self) { | ||
| 821 | loop { | ||
| 822 | let hours = 23 + rand::random::<f64>() * 2.0; | ||
| 823 | tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; | ||
| 824 | |||
| 825 | for relay_url in self.connections.keys() { | ||
| 826 | self.consolidate(relay_url).await; | ||
| 827 | } | ||
| 828 | } | ||
| 829 | } | ||
| 830 | } | ||
| 831 | ``` | ||
| 832 | |||
| 833 | --- | ||
| 834 | |||
| 835 | ## Key Design Decisions | ||
| 836 | |||
| 837 | | Decision | Choice | Rationale | | ||
| 838 | |----------|--------|-----------| | ||
| 839 | | Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | | ||
| 840 | | Since filter | Only on reconnection | Initial subscribe gets full history | | ||
| 841 | | Pending tracking | Per-batch with batch ID | Independent confirmation, no blocking | | ||
| 842 | | EOSE requirement | All subs in batch must complete | Single repo may need multiple filter subs | | ||
| 843 | | Action type | Struct not enum | Only one action type needed | | ||
| 844 | | Relay spawning | Auto-spawn on AddFilters | Simplifies action logic | | ||
| 845 | | Disconnect | Direct in SyncManager | Not worth an action type | | ||
| 846 | | Consolidation | 70 filters + daily timer | Threshold for growth, timer for staleness | | ||
| 847 | | Timestamps | In-memory only | Not critical for correctness | | ||
| 848 | | Health tracking | Reuse existing RelayHealthTracker | Already implements exponential backoff, dead relay detection | | ||
| 849 | | Reconnection backoff | Exponential to 1h max | Prevents hammering failed relays | | ||
| 850 | | Dead relay policy | 24h threshold, daily retry | Balance between giving up and resource waste | | ||
| 851 | | last_connected tracking | Per-connection in-memory | Enables 15-minute buffer on reconnect | | ||
| 852 | | Connection ownership | Inside RelayState | Ties connection lifecycle to sync state, simpler than separate maps | | ||
| 853 | | State retention rule | Clear if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | | ||
| 854 | | Skip disconnected | compute_actions skips disconnected | Prevents queuing AddFilters for offline relays | | ||
| 855 | | Reconnect triggers | handle_notifications returns or Shutdown | nostr-sdk signals disconnect via event loop exit | | ||
| 856 | | On-reconnect flow | Regenerate AddFilters from RepoSyncIndex | Fresh subscriptions for what we actually need | | ||
| 857 | |||
| 858 | --- | ||
| 859 | |||
| 860 | ## Module Structure | ||
| 861 | |||
| 862 | ``` | ||
| 863 | src/sync/ | ||
| 864 | ├── mod.rs # SyncManager, main loop | ||
| 865 | ├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types | ||
| 866 | ├── actions.rs # AddFilters struct, compute_actions | ||
| 867 | ├── self_subscriber.rs # SelfSubscriber, batching logic | ||
| 868 | ├── relay_connection.rs # Per-relay WebSocket connection | ||
| 869 | ├── consolidation.rs # Consolidation logic, daily timer | ||
| 870 | ├── health.rs # Health tracking (reuse from v2) | ||
| 871 | └── metrics.rs # Prometheus metrics (reuse from v2) \ No newline at end of file | ||
diff --git a/docs/explanation/grasp-02-proactive-sync-v4.md b/docs/explanation/grasp-02-proactive-sync-v4.md deleted file mode 100644 index dd508b3..0000000 --- a/docs/explanation/grasp-02-proactive-sync-v4.md +++ /dev/null | |||
| @@ -1,1330 +0,0 @@ | |||
| 1 | # GRASP-02: Proactive Sync v4 - Health & Reconnection Design | ||
| 2 | |||
| 3 | ## Overview | ||
| 4 | |||
| 5 | This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles: | ||
| 6 | |||
| 7 | 1. **Self-subscription as the only mechanism** - No database initialization at startup | ||
| 8 | 2. **compute_actions as single decision point** - Determines what NEW subscriptions to create | ||
| 9 | 3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) | ||
| 10 | 4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch | ||
| 11 | 5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary | ||
| 12 | |||
| 13 | --- | ||
| 14 | |||
| 15 | ## Data Model | ||
| 16 | |||
| 17 | ### RepoSyncIndex (Source of Truth) | ||
| 18 | |||
| 19 | ```rust | ||
| 20 | /// What we WANT to sync - derived from events received via self-subscription. | ||
| 21 | /// Updated immediately when self-subscriber batch fires. | ||
| 22 | /// Key: repo addressable ref - 30617:pubkey:identifier | ||
| 23 | pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>; | ||
| 24 | |||
| 25 | #[derive(Debug, Clone, Default)] | ||
| 26 | pub struct RepoSyncNeeds { | ||
| 27 | /// Relay URLs listed in this repo's 30617 announcement | ||
| 28 | pub relays: HashSet<String>, | ||
| 29 | /// Root event IDs - 1617/1618/1619/1621 - that reference this repo | ||
| 30 | pub root_events: HashSet<EventId>, | ||
| 31 | } | ||
| 32 | ``` | ||
| 33 | |||
| 34 | ### RelaySyncIndex (Confirmed State + Connection) | ||
| 35 | |||
| 36 | ```rust | ||
| 37 | /// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. | ||
| 38 | /// Key: relay URL | ||
| 39 | pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>; | ||
| 40 | |||
| 41 | /// Connection status for a relay | ||
| 42 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 43 | pub enum ConnectionStatus { | ||
| 44 | /// Not currently connected | ||
| 45 | Disconnected, | ||
| 46 | /// Connection attempt in progress | ||
| 47 | Connecting, | ||
| 48 | /// Successfully connected and subscribed | ||
| 49 | Connected, | ||
| 50 | } | ||
| 51 | |||
| 52 | /// Complete state for a single relay - combines sync needs with connection lifecycle | ||
| 53 | #[derive(Debug)] | ||
| 54 | pub struct RelayState { | ||
| 55 | /// Repos we have confirmed syncing from this relay | ||
| 56 | pub repos: HashSet<String>, | ||
| 57 | /// Root events we have confirmed tracking | ||
| 58 | pub root_events: HashSet<EventId>, | ||
| 59 | /// If true, never disconnect this relay | ||
| 60 | pub is_bootstrap: bool, | ||
| 61 | /// Current connection status | ||
| 62 | pub connection_status: ConnectionStatus, | ||
| 63 | /// When we last successfully connected - used for since filter on reconnect | ||
| 64 | pub last_connected: Option<Timestamp>, | ||
| 65 | /// When we disconnected - for 15-minute state retention rule | ||
| 66 | pub disconnected_at: Option<Timestamp>, | ||
| 67 | } | ||
| 68 | |||
| 69 | impl RelayState { | ||
| 70 | /// Check if state should be cleared based on 15-minute rule | ||
| 71 | pub fn should_clear_state(&self) -> bool { | ||
| 72 | match self.disconnected_at { | ||
| 73 | Some(disconnected) => { | ||
| 74 | let now = Timestamp::now(); | ||
| 75 | now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes | ||
| 76 | } | ||
| 77 | None => false, // Still connected or never connected | ||
| 78 | } | ||
| 79 | } | ||
| 80 | |||
| 81 | /// Clear repos and root_events - called when reconnect takes > 15 minutes | ||
| 82 | pub fn clear_sync_state(&mut self) { | ||
| 83 | self.repos.clear(); | ||
| 84 | self.root_events.clear(); | ||
| 85 | } | ||
| 86 | } | ||
| 87 | ``` | ||
| 88 | |||
| 89 | ### PendingSyncIndex (In-Flight Batches) | ||
| 90 | |||
| 91 | ```rust | ||
| 92 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. | ||
| 93 | /// Each batch has its own ID and can confirm independently. | ||
| 94 | /// Key: relay URL | ||
| 95 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | ||
| 96 | |||
| 97 | #[derive(Debug, Clone)] | ||
| 98 | pub struct PendingBatch { | ||
| 99 | /// Unique ID for this batch - for debugging/logging | ||
| 100 | pub batch_id: u64, | ||
| 101 | /// The items this batch is syncing | ||
| 102 | pub items: PendingItems, | ||
| 103 | /// Subscription IDs that must ALL receive EOSE before confirming | ||
| 104 | pub outstanding_subs: HashSet<SubscriptionId>, | ||
| 105 | } | ||
| 106 | |||
| 107 | #[derive(Debug, Clone, Default)] | ||
| 108 | pub struct PendingItems { | ||
| 109 | pub repos: HashSet<String>, | ||
| 110 | pub root_events: HashSet<EventId>, | ||
| 111 | } | ||
| 112 | ``` | ||
| 113 | |||
| 114 | --- | ||
| 115 | |||
| 116 | ## Connection Lifecycle State Machine | ||
| 117 | |||
| 118 | ```mermaid | ||
| 119 | stateDiagram-v2 | ||
| 120 | [*] --> Disconnected: discover relay via RepoSyncIndex | ||
| 121 | Disconnected --> Connecting: AddFilters triggers spawn_connection | ||
| 122 | Connecting --> Connected: success | ||
| 123 | Connecting --> Disconnected: failure + record in health tracker | ||
| 124 | Connected --> Disconnected: connection lost | ||
| 125 | Connected --> [*]: intentional disconnect via check_disconnects | ||
| 126 | |||
| 127 | note right of Disconnected: disconnected_at set for 15min rule | ||
| 128 | note right of Connected: last_connected tracked for since filter | ||
| 129 | ``` | ||
| 130 | |||
| 131 | --- | ||
| 132 | |||
| 133 | ## Flow Scenarios | ||
| 134 | |||
| 135 | ### Scenario 1: Initial Connect via handle_connect_or_reconnect | ||
| 136 | |||
| 137 | ```mermaid | ||
| 138 | flowchart TB | ||
| 139 | START[Startup] --> SS[Self-subscribe to own relay] | ||
| 140 | SS --> |no since filter| EVENTS[Receive historical events] | ||
| 141 | EVENTS --> RSI[Update RepoSyncIndex] | ||
| 142 | RSI --> DT[derive_relay_targets] | ||
| 143 | DT --> CA[compute_actions with targets and empty confirmed] | ||
| 144 | CA --> AF[AddFilters for each relay] | ||
| 145 | AF --> SPAWN{Relay connected?} | ||
| 146 | SPAWN --> |no| CONN[spawn_connection] | ||
| 147 | CONN --> HC[handle_connect_or_reconnect] | ||
| 148 | SPAWN --> |yes| SUB | ||
| 149 | |||
| 150 | subgraph handle_connect_or_reconnect - Fresh Sync | ||
| 151 | HC --> CHECK_FRESH{is_fresh_sync?} | ||
| 152 | CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] | ||
| 153 | L1 --> RCA[recompute_actions_for_relay] | ||
| 154 | end | ||
| 155 | |||
| 156 | RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] | ||
| 157 | SUB --> PB[Create PendingBatch] | ||
| 158 | PB --> EOSE[Wait for EOSE] | ||
| 159 | EOSE --> CONFIRM[Move items to confirmed repos/root_events] | ||
| 160 | ``` | ||
| 161 | |||
| 162 | **Key points:** | ||
| 163 | |||
| 164 | - No `since` filter on initial connect - get full history | ||
| 165 | - `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` | ||
| 166 | - Layer 1: `build_announcement_filter(None)` - subscribed immediately without since | ||
| 167 | - Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking | ||
| 168 | |||
| 169 | ### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes | ||
| 170 | |||
| 171 | ```mermaid | ||
| 172 | flowchart TB | ||
| 173 | DISC[Connection lost] --> MARK[Set disconnected_at = now] | ||
| 174 | MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] | ||
| 175 | CLEAR_PEND --> WAIT[Wait for reconnection] | ||
| 176 | WAIT --> RECONN[Connection restored] | ||
| 177 | RECONN --> HC[handle_connect_or_reconnect] | ||
| 178 | |||
| 179 | subgraph handle_connect_or_reconnect - Quick Reconnect | ||
| 180 | HC --> CHECK{is_fresh_sync?} | ||
| 181 | CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] | ||
| 182 | SINCE --> L1[build_announcement_filter - with since] | ||
| 183 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | ||
| 184 | L23 --> RCA[recompute_actions_for_relay] | ||
| 185 | end | ||
| 186 | |||
| 187 | RCA --> AF[AddFilters for new items only] | ||
| 188 | AF --> SUB[Subscribe] | ||
| 189 | SUB --> PB[Create PendingBatch] | ||
| 190 | PB --> EOSE[Wait for EOSE] | ||
| 191 | EOSE --> EXTEND[Extend confirmed state] | ||
| 192 | ``` | ||
| 193 | |||
| 194 | **Key points:** | ||
| 195 | |||
| 196 | - PendingSyncIndex cleared on disconnect (not reconnect) | ||
| 197 | - `handle_connect_or_reconnect`: | ||
| 198 | 1. `build_announcement_filter(Some(since))` - Layer 1 with since | ||
| 199 | 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since | ||
| 200 | 3. `recompute_actions_for_relay` - check for new items | ||
| 201 | - since = last_connected - 15min ensures we catch events during disconnection | ||
| 202 | |||
| 203 | ### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes | ||
| 204 | |||
| 205 | ```mermaid | ||
| 206 | flowchart TB | ||
| 207 | RECONN[Connection restored] --> HC[handle_connect_or_reconnect] | ||
| 208 | |||
| 209 | subgraph handle_connect_or_reconnect - Stale Reconnect | ||
| 210 | HC --> CHECK{is_fresh_sync?} | ||
| 211 | CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] | ||
| 212 | CLEAR --> L1[build_announcement_filter - no since] | ||
| 213 | L1 --> RCA[recompute_actions_for_relay] | ||
| 214 | end | ||
| 215 | |||
| 216 | RCA --> CA[compute_actions with empty confirmed] | ||
| 217 | CA --> AF[AddFilters for everything] | ||
| 218 | AF --> SUB[Subscribe - no since filter] | ||
| 219 | SUB --> PB[Create PendingBatch] | ||
| 220 | PB --> EOSE[Wait for EOSE] | ||
| 221 | EOSE --> CONFIRM[Populate confirmed state fresh] | ||
| 222 | ``` | ||
| 223 | |||
| 224 | **Key points:** | ||
| 225 | |||
| 226 | - `should_clear_state()` returns true → triggers fresh sync | ||
| 227 | - Same path as initial connect after clearing state | ||
| 228 | - Layer 1: `build_announcement_filter(None)` - full history | ||
| 229 | - Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything | ||
| 230 | |||
| 231 | ### Scenario 4: Consolidation - Triggered on Filter Add | ||
| 232 | |||
| 233 | ```mermaid | ||
| 234 | flowchart TB | ||
| 235 | AF[handle_add_filters called] --> COUNT{current + new > 70?} | ||
| 236 | COUNT --> |yes| CONSOLIDATE[consolidate] | ||
| 237 | CONSOLIDATE --> WAIT_PEND[wait_pending_complete] | ||
| 238 | WAIT_PEND --> CLOSE[unsubscribe_all] | ||
| 239 | CLOSE --> SINCE[since = now - 15min] | ||
| 240 | SINCE --> L1[build_announcement_filter - with since] | ||
| 241 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | ||
| 242 | COUNT --> |no| SUB[Subscribe new filters] | ||
| 243 | SUB --> PB[Create PendingBatch] | ||
| 244 | ``` | ||
| 245 | |||
| 246 | **Key points:** | ||
| 247 | |||
| 248 | - Consolidation checked in `handle_add_filters` BEFORE adding new filters | ||
| 249 | - After closing all subscriptions, re-subscribe: | ||
| 250 | 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since | ||
| 251 | 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since | ||
| 252 | - `since = now - 15min` prevents re-fetching old events | ||
| 253 | - Keeps confirmed state, just reduces filter count | ||
| 254 | |||
| 255 | ### Scenario 5: Daily Timer - 23 to 25h Random | ||
| 256 | |||
| 257 | ```mermaid | ||
| 258 | flowchart TB | ||
| 259 | DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] | ||
| 260 | CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] | ||
| 261 | CLEAR_PEND --> CLEAR_STATE[clear_sync_state] | ||
| 262 | CLEAR_STATE --> L1[build_announcement_filter - no since] | ||
| 263 | L1 --> RCA[recompute_actions_for_relay] | ||
| 264 | RCA --> CA[compute_actions with empty confirmed] | ||
| 265 | CA --> AF[AddFilters for everything] | ||
| 266 | AF --> SUB[Subscribe - no since filter] | ||
| 267 | SUB --> PB[Create PendingBatch] | ||
| 268 | PB --> EOSE[Wait for EOSE] | ||
| 269 | EOSE --> CONFIRM[Repopulate confirmed state] | ||
| 270 | ``` | ||
| 271 | |||
| 272 | **Key points:** | ||
| 273 | |||
| 274 | - Daily timer is a full fresh sync, NOT consolidation | ||
| 275 | - Clears both PendingSyncIndex and confirmed state | ||
| 276 | - Layer 1: `build_announcement_filter(None)` - full history | ||
| 277 | - Layer 2+3: via compute_actions with empty confirmed - full history | ||
| 278 | - Detects any state drift accumulated over 24 hours | ||
| 279 | |||
| 280 | --- | ||
| 281 | |||
| 282 | ## Core Algorithms | ||
| 283 | |||
| 284 | ### 1. derive_relay_targets | ||
| 285 | |||
| 286 | Transform RepoSyncIndex into per-relay sync targets: | ||
| 287 | |||
| 288 | ```rust | ||
| 289 | /// Inverts RepoSyncIndex to get per-relay view | ||
| 290 | fn derive_relay_targets( | ||
| 291 | repo_index: &HashMap<String, RepoSyncNeeds> | ||
| 292 | ) -> HashMap<String, RelaySyncNeeds> { | ||
| 293 | let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); | ||
| 294 | |||
| 295 | for (repo_ref, needs) in repo_index { | ||
| 296 | for relay_url in &needs.relays { | ||
| 297 | let target = targets.entry(relay_url.clone()).or_default(); | ||
| 298 | target.repos.insert(repo_ref.clone()); | ||
| 299 | target.root_events.extend(needs.root_events.iter().cloned()); | ||
| 300 | } | ||
| 301 | } | ||
| 302 | |||
| 303 | targets | ||
| 304 | } | ||
| 305 | ``` | ||
| 306 | |||
| 307 | ### 2. compute_actions (Three-Way Diff) | ||
| 308 | |||
| 309 | **This is the ONLY decision point for what NEW subscriptions to create.** | ||
| 310 | |||
| 311 | ```rust | ||
| 312 | /// Computes AddFilters for items that are: | ||
| 313 | /// - In targets (what we want) | ||
| 314 | /// - NOT in pending (already in-flight) | ||
| 315 | /// - NOT in confirmed (already confirmed) | ||
| 316 | fn compute_actions( | ||
| 317 | targets: &HashMap<String, RelaySyncNeeds>, | ||
| 318 | pending: &HashMap<String, Vec<PendingBatch>>, | ||
| 319 | confirmed: &HashMap<String, RelayState>, | ||
| 320 | ) -> Vec<AddFilters> { | ||
| 321 | let mut actions = Vec::new(); | ||
| 322 | |||
| 323 | for (relay_url, target) in targets { | ||
| 324 | // Skip disconnected relays - they will get AddFilters on reconnect | ||
| 325 | if let Some(state) = confirmed.get(relay_url) { | ||
| 326 | if state.connection_status != ConnectionStatus::Connected { | ||
| 327 | continue; | ||
| 328 | } | ||
| 329 | } | ||
| 330 | |||
| 331 | // Collect all pending items for this relay | ||
| 332 | let pending_repos: HashSet<_> = pending.get(relay_url) | ||
| 333 | .map(|batches| batches.iter() | ||
| 334 | .flat_map(|b| b.items.repos.iter().cloned()) | ||
| 335 | .collect()) | ||
| 336 | .unwrap_or_default(); | ||
| 337 | let pending_events: HashSet<_> = pending.get(relay_url) | ||
| 338 | .map(|batches| batches.iter() | ||
| 339 | .flat_map(|b| b.items.root_events.iter().cloned()) | ||
| 340 | .collect()) | ||
| 341 | .unwrap_or_default(); | ||
| 342 | |||
| 343 | // Collect confirmed items for this relay | ||
| 344 | let confirmed_repos = confirmed.get(relay_url) | ||
| 345 | .map(|c| &c.repos) | ||
| 346 | .unwrap_or(&HashSet::new()); | ||
| 347 | let confirmed_events = confirmed.get(relay_url) | ||
| 348 | .map(|c| &c.root_events) | ||
| 349 | .unwrap_or(&HashSet::new()); | ||
| 350 | |||
| 351 | // New = target - pending - confirmed | ||
| 352 | let new_repos: HashSet<_> = target.repos.iter() | ||
| 353 | .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) | ||
| 354 | .cloned() | ||
| 355 | .collect(); | ||
| 356 | let new_events: HashSet<_> = target.root_events.iter() | ||
| 357 | .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) | ||
| 358 | .cloned() | ||
| 359 | .collect(); | ||
| 360 | |||
| 361 | if !new_repos.is_empty() || !new_events.is_empty() { | ||
| 362 | let filters = build_filters(&new_repos, &new_events); | ||
| 363 | actions.push(AddFilters { | ||
| 364 | relay_url: relay_url.clone(), | ||
| 365 | repos: new_repos, | ||
| 366 | root_events: new_events, | ||
| 367 | filters, | ||
| 368 | }); | ||
| 369 | } | ||
| 370 | } | ||
| 371 | |||
| 372 | actions | ||
| 373 | } | ||
| 374 | ``` | ||
| 375 | |||
| 376 | ### 3. Filter Building Functions (Three-Layer Strategy) | ||
| 377 | |||
| 378 | The filter strategy uses three layers: | ||
| 379 | |||
| 380 | - **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation | ||
| 381 | - **Layer 2**: Events tagging our repos | ||
| 382 | - **Layer 3**: Events tagging our root events | ||
| 383 | |||
| 384 | **Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). | ||
| 385 | |||
| 386 | ```rust | ||
| 387 | /// Layer 1: Announcements filter (kinds 30617 + 30618) | ||
| 388 | /// Subscribed ONCE on connect - NOT included in consolidation rebuilds. | ||
| 389 | /// Note: 30618 is ONLY synced from remote relays, not self-subscribed. | ||
| 390 | fn build_announcement_filter(since: Option<Timestamp>) -> Filter { | ||
| 391 | let filter = Filter::new().kinds([ | ||
| 392 | Kind::Custom(30617), // Repository announcements | ||
| 393 | Kind::Custom(30618), // Maintainer lists | ||
| 394 | ]); | ||
| 395 | |||
| 396 | match since { | ||
| 397 | Some(ts) => filter.since(ts), | ||
| 398 | None => filter, | ||
| 399 | } | ||
| 400 | } | ||
| 401 | |||
| 402 | /// Layer 2: Events tagging one of our repos | ||
| 403 | /// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. | ||
| 404 | /// Batched per 100 repo refs. | ||
| 405 | fn tagged_one_of_our_repo_event_filters( | ||
| 406 | repos: &HashSet<String>, | ||
| 407 | since: Option<Timestamp>, | ||
| 408 | ) -> Vec<Filter> { | ||
| 409 | let mut filters = Vec::new(); | ||
| 410 | let repo_refs: Vec<_> = repos.iter().collect(); | ||
| 411 | |||
| 412 | for chunk in repo_refs.chunks(100) { | ||
| 413 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); | ||
| 414 | |||
| 415 | // Lowercase 'a' tag - standard addressable reference | ||
| 416 | let mut f1 = Filter::new() | ||
| 417 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); | ||
| 418 | // Uppercase 'A' tag - some clients use this | ||
| 419 | let mut f2 = Filter::new() | ||
| 420 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone()); | ||
| 421 | // Quote 'q' tag - NIP-10 quote references to addressable events | ||
| 422 | let mut f3 = Filter::new() | ||
| 423 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); | ||
| 424 | |||
| 425 | if let Some(ts) = since { | ||
| 426 | f1 = f1.since(ts); | ||
| 427 | f2 = f2.since(ts); | ||
| 428 | f3 = f3.since(ts); | ||
| 429 | } | ||
| 430 | |||
| 431 | filters.push(f1); | ||
| 432 | filters.push(f2); | ||
| 433 | filters.push(f3); | ||
| 434 | } | ||
| 435 | |||
| 436 | filters | ||
| 437 | } | ||
| 438 | |||
| 439 | /// Layer 3: Events tagging one of our root events | ||
| 440 | /// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. | ||
| 441 | /// Batched per 100 event IDs. | ||
| 442 | fn tagged_one_of_our_root_event_filters( | ||
| 443 | root_events: &HashSet<EventId>, | ||
| 444 | since: Option<Timestamp>, | ||
| 445 | ) -> Vec<Filter> { | ||
| 446 | let mut filters = Vec::new(); | ||
| 447 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); | ||
| 448 | |||
| 449 | for chunk in event_ids.chunks(100) { | ||
| 450 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); | ||
| 451 | |||
| 452 | // Lowercase 'e' tag - standard event reference | ||
| 453 | let mut f1 = Filter::new() | ||
| 454 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); | ||
| 455 | // Uppercase 'E' tag - some clients use this | ||
| 456 | let mut f2 = Filter::new() | ||
| 457 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone()); | ||
| 458 | // Quote 'q' tag - NIP-10 quote references to events | ||
| 459 | let mut f3 = Filter::new() | ||
| 460 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); | ||
| 461 | |||
| 462 | if let Some(ts) = since { | ||
| 463 | f1 = f1.since(ts); | ||
| 464 | f2 = f2.since(ts); | ||
| 465 | f3 = f3.since(ts); | ||
| 466 | } | ||
| 467 | |||
| 468 | filters.push(f1); | ||
| 469 | filters.push(f2); | ||
| 470 | filters.push(f3); | ||
| 471 | } | ||
| 472 | |||
| 473 | filters | ||
| 474 | } | ||
| 475 | |||
| 476 | /// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) | ||
| 477 | /// Used by: | ||
| 478 | /// - compute_actions for incremental subscriptions | ||
| 479 | /// - consolidation rebuilds (Layer 1 remains active) | ||
| 480 | fn build_layer2_and_layer3_filters( | ||
| 481 | repos: &HashSet<String>, | ||
| 482 | root_events: &HashSet<EventId>, | ||
| 483 | since: Option<Timestamp>, | ||
| 484 | ) -> Vec<Filter> { | ||
| 485 | let mut filters = Vec::new(); | ||
| 486 | filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); | ||
| 487 | filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); | ||
| 488 | filters | ||
| 489 | } | ||
| 490 | ``` | ||
| 491 | |||
| 492 | **Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently. | ||
| 493 | |||
| 494 | ### 4. handle_add_filters (SyncManager) | ||
| 495 | |||
| 496 | ```rust | ||
| 497 | impl SyncManager { | ||
| 498 | async fn handle_add_filters(&mut self, action: AddFilters) { | ||
| 499 | let AddFilters { relay_url, repos, root_events, filters } = action; | ||
| 500 | |||
| 501 | // Auto-spawn connection if needed | ||
| 502 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); | ||
| 503 | match state { | ||
| 504 | None => { | ||
| 505 | // New relay discovered - create entry and spawn connection | ||
| 506 | self.relay_sync_index.write().await.insert( | ||
| 507 | relay_url.clone(), | ||
| 508 | RelayState { | ||
| 509 | repos: HashSet::new(), | ||
| 510 | root_events: HashSet::new(), | ||
| 511 | is_bootstrap: false, | ||
| 512 | connection_status: ConnectionStatus::Connecting, | ||
| 513 | last_connected: None, | ||
| 514 | disconnected_at: None, | ||
| 515 | connection: None, | ||
| 516 | } | ||
| 517 | ); | ||
| 518 | self.spawn_connection(&relay_url).await; | ||
| 519 | return; // Subscriptions will happen on connection success | ||
| 520 | } | ||
| 521 | Some(state) if state.connection_status != ConnectionStatus::Connected => { | ||
| 522 | // Not connected - subscriptions will happen on connection success | ||
| 523 | return; | ||
| 524 | } | ||
| 525 | Some(_) => { | ||
| 526 | // Already connected - proceed with subscription | ||
| 527 | } | ||
| 528 | } | ||
| 529 | |||
| 530 | // Subscribe and collect subscription IDs | ||
| 531 | let conn = self.connections.get(&relay_url).unwrap(); | ||
| 532 | let mut sub_ids = HashSet::new(); | ||
| 533 | |||
| 534 | for filter in filters { | ||
| 535 | match conn.client.subscribe(filter, None).await { | ||
| 536 | Ok(output) => { | ||
| 537 | for sub_id in output.val { | ||
| 538 | sub_ids.insert(sub_id); | ||
| 539 | } | ||
| 540 | } | ||
| 541 | Err(e) => { | ||
| 542 | tracing::warn!("Failed to subscribe: {}", e); | ||
| 543 | } | ||
| 544 | } | ||
| 545 | } | ||
| 546 | |||
| 547 | // Create pending batch | ||
| 548 | let batch = PendingBatch { | ||
| 549 | batch_id: self.next_batch_id(), | ||
| 550 | items: PendingItems { repos, root_events }, | ||
| 551 | outstanding_subs: sub_ids, | ||
| 552 | }; | ||
| 553 | |||
| 554 | // Add to pending index | ||
| 555 | self.pending_sync_index.write().await | ||
| 556 | .entry(relay_url) | ||
| 557 | .or_default() | ||
| 558 | .push(batch); | ||
| 559 | } | ||
| 560 | } | ||
| 561 | ``` | ||
| 562 | |||
| 563 | ### 5. handle_disconnect | ||
| 564 | |||
| 565 | ```rust | ||
| 566 | impl SyncManager { | ||
| 567 | /// Called when connection to a relay is lost | ||
| 568 | async fn handle_disconnect(&mut self, relay_url: &str) { | ||
| 569 | let mut index = self.relay_sync_index.write().await; | ||
| 570 | |||
| 571 | if let Some(state) = index.get_mut(relay_url) { | ||
| 572 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 573 | state.disconnected_at = Some(Timestamp::now()); | ||
| 574 | state.connection = None; | ||
| 575 | } | ||
| 576 | |||
| 577 | // Clear pending batches - these items were not confirmed | ||
| 578 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 579 | |||
| 580 | // Remove from active connections map | ||
| 581 | self.connections.remove(relay_url); | ||
| 582 | |||
| 583 | // Health tracker records failure for backoff | ||
| 584 | self.health_tracker.record_failure(relay_url); | ||
| 585 | } | ||
| 586 | } | ||
| 587 | ``` | ||
| 588 | |||
| 589 | ### 6. handle_connect_or_reconnect (Unified) | ||
| 590 | |||
| 591 | This method handles BOTH initial connection AND reconnection with unified logic: | ||
| 592 | |||
| 593 | ```rust | ||
| 594 | impl SyncManager { | ||
| 595 | /// Called when connection to a relay succeeds - handles both initial connect and reconnect. | ||
| 596 | /// | ||
| 597 | /// Decision tree: | ||
| 598 | /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history | ||
| 599 | /// - Quick reconnect (<15min): since = last_connected - 15min | ||
| 600 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | ||
| 601 | let mut index = self.relay_sync_index.write().await; | ||
| 602 | let state = match index.get_mut(relay_url) { | ||
| 603 | Some(s) => s, | ||
| 604 | None => return, // Relay was removed while disconnected | ||
| 605 | }; | ||
| 606 | |||
| 607 | // Determine if this is a fresh sync or quick reconnect | ||
| 608 | let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); | ||
| 609 | let last_connected = state.last_connected; | ||
| 610 | |||
| 611 | if is_fresh_sync && state.last_connected.is_some() { | ||
| 612 | // Stale reconnect (>15min) - clear state | ||
| 613 | tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); | ||
| 614 | state.clear_sync_state(); | ||
| 615 | } | ||
| 616 | |||
| 617 | // Update connection state | ||
| 618 | state.connection_status = ConnectionStatus::Connected; | ||
| 619 | state.last_connected = Some(Timestamp::now()); | ||
| 620 | state.disconnected_at = None; | ||
| 621 | |||
| 622 | // Record success in health tracker | ||
| 623 | self.health_tracker.record_success(relay_url); | ||
| 624 | |||
| 625 | drop(index); // Release lock | ||
| 626 | |||
| 627 | let conn = match self.connections.get(relay_url) { | ||
| 628 | Some(c) => c, | ||
| 629 | None => return, | ||
| 630 | }; | ||
| 631 | |||
| 632 | if is_fresh_sync { | ||
| 633 | // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions | ||
| 634 | |||
| 635 | // Step 1: Subscribe Layer 1 (announcements) without since | ||
| 636 | let layer1 = build_announcement_filter(None); | ||
| 637 | let _ = conn.client.subscribe(layer1, None).await; | ||
| 638 | |||
| 639 | // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) | ||
| 640 | self.recompute_actions_for_relay(relay_url).await; | ||
| 641 | } else { | ||
| 642 | // Quick reconnect: Layer 1 with since, Layer 2+3 with since | ||
| 643 | let since = last_connected | ||
| 644 | .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) | ||
| 645 | .unwrap_or(Timestamp::from(0)); | ||
| 646 | |||
| 647 | // Step 1: Subscribe Layer 1 (announcements) with since | ||
| 648 | let layer1 = build_announcement_filter(Some(since)); | ||
| 649 | let _ = conn.client.subscribe(layer1, None).await; | ||
| 650 | |||
| 651 | // Step 2: Rebuild Layer 2+3 for confirmed items with since | ||
| 652 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | ||
| 653 | |||
| 654 | // Step 3: Check for NEW items via compute_actions | ||
| 655 | self.recompute_actions_for_relay(relay_url).await; | ||
| 656 | } | ||
| 657 | } | ||
| 658 | |||
| 659 | /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). | ||
| 660 | /// Used by: | ||
| 661 | /// - Quick reconnect: rebuild confirmed items with since filter | ||
| 662 | /// - Consolidation: close and rebuild with since filter | ||
| 663 | async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) { | ||
| 664 | let confirmed = self.relay_sync_index.read().await; | ||
| 665 | let state = match confirmed.get(relay_url) { | ||
| 666 | Some(s) => s, | ||
| 667 | None => return, | ||
| 668 | }; | ||
| 669 | |||
| 670 | // Build Layer 2+3 filters WITH since | ||
| 671 | let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); | ||
| 672 | drop(confirmed); | ||
| 673 | |||
| 674 | // Subscribe directly - no PendingBatch for catch-up (items already confirmed) | ||
| 675 | let conn = match self.connections.get(relay_url) { | ||
| 676 | Some(c) => c, | ||
| 677 | None => return, | ||
| 678 | }; | ||
| 679 | |||
| 680 | for filter in filters { | ||
| 681 | let _ = conn.client.subscribe(filter, None).await; | ||
| 682 | } | ||
| 683 | } | ||
| 684 | |||
| 685 | /// Rerun compute_actions for a specific relay and process resulting AddFilters. | ||
| 686 | /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. | ||
| 687 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { | ||
| 688 | let repo_index = self.repo_sync_index.read().await; | ||
| 689 | let targets = derive_relay_targets(&repo_index); | ||
| 690 | drop(repo_index); | ||
| 691 | |||
| 692 | // Filter to just this relay | ||
| 693 | let target = match targets.get(relay_url) { | ||
| 694 | Some(t) => t.clone(), | ||
| 695 | None => return, // No repos reference this relay anymore | ||
| 696 | }; | ||
| 697 | |||
| 698 | let pending = self.pending_sync_index.read().await; | ||
| 699 | let confirmed = self.relay_sync_index.read().await; | ||
| 700 | |||
| 701 | let mut single_relay_targets = HashMap::new(); | ||
| 702 | single_relay_targets.insert(relay_url.to_string(), target); | ||
| 703 | |||
| 704 | let actions = compute_actions(&single_relay_targets, &pending, &confirmed); | ||
| 705 | |||
| 706 | drop(pending); | ||
| 707 | drop(confirmed); | ||
| 708 | |||
| 709 | // Process AddFilters | ||
| 710 | for action in actions { | ||
| 711 | self.handle_add_filters(action).await; | ||
| 712 | } | ||
| 713 | } | ||
| 714 | } | ||
| 715 | ``` | ||
| 716 | |||
| 717 | ### 7. Daily Timer | ||
| 718 | |||
| 719 | ```rust | ||
| 720 | impl SyncManager { | ||
| 721 | async fn run_daily_timer(&self) { | ||
| 722 | loop { | ||
| 723 | // Random 23-25 hours | ||
| 724 | let hours = 23.0 + rand::random::<f64>() * 2.0; | ||
| 725 | tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; | ||
| 726 | |||
| 727 | let relay_urls: Vec<_> = self.relay_sync_index.read().await | ||
| 728 | .keys() | ||
| 729 | .cloned() | ||
| 730 | .collect(); | ||
| 731 | |||
| 732 | for relay_url in relay_urls { | ||
| 733 | self.daily_sync(&relay_url).await; | ||
| 734 | } | ||
| 735 | } | ||
| 736 | } | ||
| 737 | |||
| 738 | /// Perform daily fresh sync for a relay | ||
| 739 | async fn daily_sync(&mut self, relay_url: &str) { | ||
| 740 | tracing::info!("Daily sync triggered for {}", relay_url); | ||
| 741 | |||
| 742 | // Close all subscriptions | ||
| 743 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 744 | conn.client.unsubscribe_all().await; | ||
| 745 | } | ||
| 746 | |||
| 747 | // Clear PendingSyncIndex | ||
| 748 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 749 | |||
| 750 | // Clear confirmed state - triggers fresh sync | ||
| 751 | { | ||
| 752 | let mut index = self.relay_sync_index.write().await; | ||
| 753 | if let Some(state) = index.get_mut(relay_url) { | ||
| 754 | state.clear_sync_state(); | ||
| 755 | } | ||
| 756 | } | ||
| 757 | |||
| 758 | // Recompute actions - will generate AddFilters for everything | ||
| 759 | self.recompute_actions_for_relay(relay_url).await; | ||
| 760 | } | ||
| 761 | } | ||
| 762 | ``` | ||
| 763 | |||
| 764 | ### 8. Consolidation (Threshold-Based, Triggered on Add) | ||
| 765 | |||
| 766 | Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active. | ||
| 767 | |||
| 768 | ```rust | ||
| 769 | impl SyncManager { | ||
| 770 | /// Check filter count and consolidate if needed. | ||
| 771 | /// Called from handle_add_filters BEFORE adding new filters. | ||
| 772 | async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { | ||
| 773 | let current_count = self.get_filter_count(relay_url).await; | ||
| 774 | |||
| 775 | if current_count + new_filter_count > 70 { | ||
| 776 | self.consolidate(relay_url).await; | ||
| 777 | } | ||
| 778 | } | ||
| 779 | |||
| 780 | /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. | ||
| 781 | /// Does NOT clear state - just reduces filter count. | ||
| 782 | async fn consolidate(&mut self, relay_url: &str) { | ||
| 783 | tracing::info!("Consolidating filters for {} (count > 70)", relay_url); | ||
| 784 | |||
| 785 | // Wait for all pending batches to complete first | ||
| 786 | self.wait_pending_complete(relay_url).await; | ||
| 787 | |||
| 788 | // Close Layer 2+3 subscriptions only - Layer 1 remains active | ||
| 789 | // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately | ||
| 790 | // For simplicity, we close all and re-add Layer 1 | ||
| 791 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 792 | conn.client.unsubscribe_all().await; | ||
| 793 | } | ||
| 794 | |||
| 795 | // Re-subscribe Layer 1 with since (maintains announcements stream) | ||
| 796 | let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); | ||
| 797 | let conn = self.connections.get(relay_url).unwrap(); | ||
| 798 | let layer1 = build_announcement_filter(Some(since)); | ||
| 799 | let _ = conn.client.subscribe(layer1, None).await; | ||
| 800 | |||
| 801 | // Rebuild Layer 2+3 only | ||
| 802 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | ||
| 803 | } | ||
| 804 | } | ||
| 805 | ``` | ||
| 806 | |||
| 807 | **Updated handle_add_filters to check consolidation:** | ||
| 808 | |||
| 809 | ```rust | ||
| 810 | impl SyncManager { | ||
| 811 | async fn handle_add_filters(&mut self, action: AddFilters) { | ||
| 812 | let AddFilters { relay_url, repos, root_events, filters } = action; | ||
| 813 | |||
| 814 | // Auto-spawn connection if needed (unchanged) | ||
| 815 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); | ||
| 816 | match state { | ||
| 817 | None => { | ||
| 818 | // New relay discovered - create entry and spawn connection | ||
| 819 | self.relay_sync_index.write().await.insert( | ||
| 820 | relay_url.clone(), | ||
| 821 | RelayState { | ||
| 822 | repos: HashSet::new(), | ||
| 823 | root_events: HashSet::new(), | ||
| 824 | is_bootstrap: false, | ||
| 825 | connection_status: ConnectionStatus::Connecting, | ||
| 826 | last_connected: None, | ||
| 827 | disconnected_at: None, | ||
| 828 | connection: None, | ||
| 829 | } | ||
| 830 | ); | ||
| 831 | self.spawn_connection(&relay_url).await; | ||
| 832 | return; // Subscriptions will happen on connection success | ||
| 833 | } | ||
| 834 | Some(state) if state.connection_status != ConnectionStatus::Connected => { | ||
| 835 | return; // Not connected - subscriptions will happen on connection success | ||
| 836 | } | ||
| 837 | Some(_) => { | ||
| 838 | // Already connected - proceed | ||
| 839 | } | ||
| 840 | } | ||
| 841 | |||
| 842 | // CHECK CONSOLIDATION BEFORE ADDING | ||
| 843 | self.maybe_consolidate(&relay_url, filters.len()).await; | ||
| 844 | |||
| 845 | // Subscribe and collect subscription IDs | ||
| 846 | let conn = self.connections.get(&relay_url).unwrap(); | ||
| 847 | let mut sub_ids = HashSet::new(); | ||
| 848 | |||
| 849 | for filter in filters { | ||
| 850 | match conn.client.subscribe(filter, None).await { | ||
| 851 | Ok(output) => { | ||
| 852 | for sub_id in output.val { | ||
| 853 | sub_ids.insert(sub_id); | ||
| 854 | } | ||
| 855 | } | ||
| 856 | Err(e) => { | ||
| 857 | tracing::warn!("Failed to subscribe: {}", e); | ||
| 858 | } | ||
| 859 | } | ||
| 860 | } | ||
| 861 | |||
| 862 | // Create pending batch (unchanged) | ||
| 863 | let batch = PendingBatch { | ||
| 864 | batch_id: self.next_batch_id(), | ||
| 865 | items: PendingItems { repos, root_events }, | ||
| 866 | outstanding_subs: sub_ids, | ||
| 867 | }; | ||
| 868 | |||
| 869 | self.pending_sync_index.write().await | ||
| 870 | .entry(relay_url) | ||
| 871 | .or_default() | ||
| 872 | .push(batch); | ||
| 873 | } | ||
| 874 | } | ||
| 875 | ``` | ||
| 876 | |||
| 877 | --- | ||
| 878 | |||
| 879 | ## Disconnect (Relay Removal) Handling | ||
| 880 | |||
| 881 | ```rust | ||
| 882 | impl SyncManager { | ||
| 883 | /// Periodically check for relays that should be disconnected | ||
| 884 | async fn check_disconnects(&mut self) { | ||
| 885 | let confirmed = self.relay_sync_index.read().await; | ||
| 886 | let relays_to_disconnect: Vec<_> = confirmed.iter() | ||
| 887 | .filter(|(_, state)| { | ||
| 888 | !state.is_bootstrap && | ||
| 889 | state.repos.is_empty() && | ||
| 890 | state.root_events.is_empty() | ||
| 891 | }) | ||
| 892 | .map(|(url, _)| url.clone()) | ||
| 893 | .collect(); | ||
| 894 | drop(confirmed); | ||
| 895 | |||
| 896 | for relay_url in relays_to_disconnect { | ||
| 897 | self.disconnect_relay(&relay_url).await; | ||
| 898 | } | ||
| 899 | } | ||
| 900 | |||
| 901 | async fn disconnect_relay(&mut self, relay_url: &str) { | ||
| 902 | tracing::info!("Disconnecting relay {} (no repos)", relay_url); | ||
| 903 | |||
| 904 | self.relay_sync_index.write().await.remove(relay_url); | ||
| 905 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 906 | |||
| 907 | if let Some(conn) = self.connections.remove(relay_url) { | ||
| 908 | let _ = conn.client.disconnect().await; | ||
| 909 | } | ||
| 910 | } | ||
| 911 | } | ||
| 912 | ``` | ||
| 913 | |||
| 914 | --- | ||
| 915 | |||
| 916 | ## State Flow Summary | ||
| 917 | |||
| 918 | ```mermaid | ||
| 919 | flowchart TB | ||
| 920 | subgraph Input | ||
| 921 | SS[SelfSubscriber] | ||
| 922 | OWN[Own Relay] | ||
| 923 | end | ||
| 924 | |||
| 925 | subgraph RepoSyncIndex - What We Want | ||
| 926 | RSI[HashMap: Repo to Relays+Events] | ||
| 927 | end | ||
| 928 | |||
| 929 | subgraph Derived Target | ||
| 930 | DT[derive_relay_targets fn] | ||
| 931 | TGT[Per-relay: repos + events we should sync] | ||
| 932 | end | ||
| 933 | |||
| 934 | subgraph compute_actions - Decision Point | ||
| 935 | CA[Three-way diff: target - pending - confirmed] | ||
| 936 | end | ||
| 937 | |||
| 938 | subgraph PendingSyncIndex - In Flight | ||
| 939 | PSI[Vec PendingBatch per relay] | ||
| 940 | end | ||
| 941 | |||
| 942 | subgraph RelaySyncIndex - Confirmed State | ||
| 943 | RLI[RelayState per relay] | ||
| 944 | CONN[connection_status] | ||
| 945 | REPOS[repos + root_events] | ||
| 946 | TIMES[last_connected + disconnected_at] | ||
| 947 | end | ||
| 948 | |||
| 949 | SS -->|subscribe| OWN | ||
| 950 | OWN -->|events| SS | ||
| 951 | SS -->|batch fires| RSI | ||
| 952 | RSI --> DT | ||
| 953 | DT --> TGT | ||
| 954 | TGT --> CA | ||
| 955 | PSI --> CA | ||
| 956 | RLI --> CA | ||
| 957 | CA -->|Layer 2+3 new items| AF[AddFilters] | ||
| 958 | AF -->|check filter count| CONSOL{count + new > 70?} | ||
| 959 | CONSOL -->|yes| CONSOLIDATE[consolidate] | ||
| 960 | CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] | ||
| 961 | L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] | ||
| 962 | CONSOL -->|no| SUB[subscribe] | ||
| 963 | AF -->|spawn if needed| CONN | ||
| 964 | SUB --> PSI | ||
| 965 | PSI -->|EOSE| REPOS | ||
| 966 | |||
| 967 | CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] | ||
| 968 | DISC -->|any reconnect| HC[handle_connect_or_reconnect] | ||
| 969 | |||
| 970 | subgraph handle_connect_or_reconnect | ||
| 971 | HC --> FRESH_CHECK{is_fresh_sync?} | ||
| 972 | FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] | ||
| 973 | FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] | ||
| 974 | L1_FRESH --> RCA1[recompute_actions_for_relay] | ||
| 975 | L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] | ||
| 976 | L23_QUICK --> RCA2[recompute_actions_for_relay] | ||
| 977 | end | ||
| 978 | ``` | ||
| 979 | |||
| 980 | --- | ||
| 981 | |||
| 982 | ## Key Design Decisions | ||
| 983 | |||
| 984 | | Decision | Choice | Rationale | | ||
| 985 | | -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | | ||
| 986 | | Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | | ||
| 987 | | Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | | ||
| 988 | | Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | | ||
| 989 | | Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | | ||
| 990 | | Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | | ||
| 991 | | Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | | ||
| 992 | | Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | | ||
| 993 | | compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | | ||
| 994 | | Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | | ||
| 995 | | Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | | ||
| 996 | | Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | | ||
| 997 | | Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | | ||
| 998 | | Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | | ||
| 999 | | 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | | ||
| 1000 | | Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | | ||
| 1001 | | Connection spawning | Via AddFilters handler | Single path for new relay discovery | | ||
| 1002 | | Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | | ||
| 1003 | |||
| 1004 | --- | ||
| 1005 | |||
| 1006 | ## Module Structure | ||
| 1007 | |||
| 1008 | ``` | ||
| 1009 | src/sync/ | ||
| 1010 | ├── mod.rs # SyncManager, main loop | ||
| 1011 | ├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types | ||
| 1012 | ├── actions.rs # AddFilters struct, compute_actions, build_filters | ||
| 1013 | ├── self_subscriber.rs # SelfSubscriber, batching logic | ||
| 1014 | ├── relay_connection.rs # Per-relay WebSocket connection | ||
| 1015 | ├── consolidation.rs # Consolidation logic, daily timer | ||
| 1016 | ├── health.rs # Health tracking (reuse from v2) | ||
| 1017 | └── metrics.rs # Prometheus metrics (reuse from v2) | ||
| 1018 | ``` | ||
| 1019 | |||
| 1020 | --- | ||
| 1021 | |||
| 1022 | ## Comparison: v3 vs v4 | ||
| 1023 | |||
| 1024 | | Aspect | v3 | v4 | | ||
| 1025 | | ------------------------ | ----------------------------------------- | --------------------------------------------- | | ||
| 1026 | | Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | | ||
| 1027 | | Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | | ||
| 1028 | | Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | | ||
| 1029 | | Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | | ||
| 1030 | | Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | | ||
| 1031 | | Since filter application | Applied in handle_reconnect | build_all_filters with optional since | | ||
| 1032 | | PSI clearing | On disconnect | On disconnect (confirmed) | | ||
| 1033 | | Daily timer | Consolidation-style | Fresh sync (different from consolidation) | | ||
| 1034 | |||
| 1035 | --- | ||
| 1036 | |||
| 1037 | ## Self-Subscriber Flow | ||
| 1038 | |||
| 1039 | The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions. | ||
| 1040 | |||
| 1041 | ### State Tracking | ||
| 1042 | |||
| 1043 | ```rust | ||
| 1044 | pub struct SelfSubscriber { | ||
| 1045 | own_relay_url: String, | ||
| 1046 | relay_domain: String, | ||
| 1047 | repo_sync_index: RepoSyncIndex, | ||
| 1048 | pending_sync_index: PendingSyncIndex, | ||
| 1049 | relay_sync_index: RelaySyncIndex, | ||
| 1050 | action_tx: mpsc::Sender<AddFilters>, | ||
| 1051 | /// Timestamp of last successful connection - used for since filter on reconnection | ||
| 1052 | last_connected: Option<Timestamp>, | ||
| 1053 | /// The active client connection | ||
| 1054 | client: Option<Client>, | ||
| 1055 | } | ||
| 1056 | ``` | ||
| 1057 | |||
| 1058 | ### On Startup / Reconnect (Unified) | ||
| 1059 | |||
| 1060 | Both initial startup and reconnection use the same `connect_and_subscribe` method: | ||
| 1061 | |||
| 1062 | ```rust | ||
| 1063 | impl SelfSubscriber { | ||
| 1064 | async fn run(mut self) { | ||
| 1065 | loop { | ||
| 1066 | // Connect or reconnect | ||
| 1067 | if let Err(e) = self.connect_and_subscribe().await { | ||
| 1068 | tracing::warn!("Connection failed: {}, will retry", e); | ||
| 1069 | tokio::time::sleep(Duration::from_secs(5)).await; | ||
| 1070 | continue; | ||
| 1071 | } | ||
| 1072 | |||
| 1073 | // Run event loop until disconnection | ||
| 1074 | self.event_loop().await; | ||
| 1075 | |||
| 1076 | // Loop will retry connection | ||
| 1077 | } | ||
| 1078 | } | ||
| 1079 | |||
| 1080 | async fn connect_and_subscribe(&mut self) -> Result<(), Error> { | ||
| 1081 | let client = Client::new(Keys::generate()); | ||
| 1082 | client.add_relay(&self.own_relay_url).await?; | ||
| 1083 | client.connect().await; | ||
| 1084 | |||
| 1085 | // Build filter - add since only on reconnect | ||
| 1086 | let filter = Filter::new().kinds([ | ||
| 1087 | Kind::Custom(30617), // Repository announcements | ||
| 1088 | Kind::GitPatch, // 1617 | ||
| 1089 | Kind::Custom(1618), // PRs | ||
| 1090 | Kind::Custom(1619), // PR updates | ||
| 1091 | Kind::GitIssue, // 1621 | ||
| 1092 | ]); | ||
| 1093 | |||
| 1094 | let filter = if let Some(ts) = self.last_connected { | ||
| 1095 | // Reconnection: use since filter | ||
| 1096 | let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer | ||
| 1097 | filter.since(since) | ||
| 1098 | } else { | ||
| 1099 | // Initial connect: no since filter - get full history | ||
| 1100 | filter | ||
| 1101 | }; | ||
| 1102 | |||
| 1103 | // Update last_connected AFTER computing since | ||
| 1104 | self.last_connected = Some(Timestamp::now()); | ||
| 1105 | |||
| 1106 | client.subscribe(filter, None).await?; | ||
| 1107 | self.client = Some(client); | ||
| 1108 | Ok(()) | ||
| 1109 | } | ||
| 1110 | } | ||
| 1111 | ``` | ||
| 1112 | |||
| 1113 | ### Event Loop with Batching | ||
| 1114 | |||
| 1115 | ```rust | ||
| 1116 | impl SelfSubscriber { | ||
| 1117 | async fn event_loop(&mut self) { | ||
| 1118 | let client = self.client.as_ref().unwrap(); | ||
| 1119 | let mut pending_events: Vec<Event> = Vec::new(); | ||
| 1120 | let mut batch_timer: Option<Instant> = None; | ||
| 1121 | let batch_window = Duration::from_secs(5); | ||
| 1122 | |||
| 1123 | loop { | ||
| 1124 | let timeout = batch_timer | ||
| 1125 | .map(|t| batch_window.saturating_sub(t.elapsed())) | ||
| 1126 | .unwrap_or(Duration::from_secs(60)); | ||
| 1127 | |||
| 1128 | tokio::select! { | ||
| 1129 | notification = client.notifications().recv() => { | ||
| 1130 | match notification { | ||
| 1131 | Ok(RelayPoolNotification::Event { event, .. }) => { | ||
| 1132 | pending_events.push(*event); | ||
| 1133 | |||
| 1134 | // Start timer on first event - does NOT reset | ||
| 1135 | if batch_timer.is_none() { | ||
| 1136 | batch_timer = Some(Instant::now()); | ||
| 1137 | } | ||
| 1138 | } | ||
| 1139 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 1140 | // Connection lost | ||
| 1141 | break; | ||
| 1142 | } | ||
| 1143 | _ => {} | ||
| 1144 | } | ||
| 1145 | } | ||
| 1146 | _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { | ||
| 1147 | // Batch window elapsed | ||
| 1148 | self.process_batch(pending_events.drain(..).collect()).await; | ||
| 1149 | batch_timer = None; | ||
| 1150 | } | ||
| 1151 | } | ||
| 1152 | } | ||
| 1153 | } | ||
| 1154 | |||
| 1155 | async fn process_batch(&self, events: Vec<Event>) { | ||
| 1156 | // 1. Update RepoSyncIndex | ||
| 1157 | for event in events { | ||
| 1158 | match event.kind.as_u16() { | ||
| 1159 | 30617 => self.handle_announcement(&event).await, | ||
| 1160 | 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, | ||
| 1161 | _ => {} | ||
| 1162 | } | ||
| 1163 | } | ||
| 1164 | |||
| 1165 | // 2. Derive targets and compute actions | ||
| 1166 | let repo_index = self.repo_sync_index.read().await; | ||
| 1167 | let targets = derive_relay_targets(&repo_index); | ||
| 1168 | |||
| 1169 | let pending = self.pending_sync_index.read().await; | ||
| 1170 | let confirmed = self.relay_sync_index.read().await; | ||
| 1171 | |||
| 1172 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 1173 | |||
| 1174 | drop(repo_index); | ||
| 1175 | drop(pending); | ||
| 1176 | drop(confirmed); | ||
| 1177 | |||
| 1178 | // 3. Send actions to SyncManager | ||
| 1179 | for action in actions { | ||
| 1180 | let _ = self.action_tx.send(action).await; | ||
| 1181 | } | ||
| 1182 | } | ||
| 1183 | |||
| 1184 | async fn handle_announcement(&self, event: &Event) { | ||
| 1185 | // Extract repo_ref from event - 30617:pubkey:identifier | ||
| 1186 | let d_tag = event.tags.iter() | ||
| 1187 | .find_map(|tag| { | ||
| 1188 | if tag.kind() == TagKind::D { | ||
| 1189 | tag.content().map(|s| s.to_string()) | ||
| 1190 | } else { | ||
| 1191 | None | ||
| 1192 | } | ||
| 1193 | }) | ||
| 1194 | .unwrap_or_default(); | ||
| 1195 | |||
| 1196 | let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); | ||
| 1197 | |||
| 1198 | // Extract relay URLs from 'r' tags | ||
| 1199 | let relays: HashSet<String> = event.tags.iter() | ||
| 1200 | .filter_map(|tag| { | ||
| 1201 | if tag.kind() == TagKind::Relay { | ||
| 1202 | tag.content().map(|s| s.to_string()) | ||
| 1203 | } else { | ||
| 1204 | None | ||
| 1205 | } | ||
| 1206 | }) | ||
| 1207 | .collect(); | ||
| 1208 | |||
| 1209 | // Update RepoSyncIndex | ||
| 1210 | let mut index = self.repo_sync_index.write().await; | ||
| 1211 | let needs = index.entry(repo_ref).or_default(); | ||
| 1212 | needs.relays = relays; | ||
| 1213 | } | ||
| 1214 | |||
| 1215 | async fn handle_root_event(&self, event: &Event) { | ||
| 1216 | // Extract repo_ref from 'a' tag | ||
| 1217 | let repo_ref = event.tags.iter() | ||
| 1218 | .find_map(|tag| { | ||
| 1219 | if tag.kind() == TagKind::A { | ||
| 1220 | tag.content().map(|s| s.to_string()) | ||
| 1221 | } else { | ||
| 1222 | None | ||
| 1223 | } | ||
| 1224 | }); | ||
| 1225 | |||
| 1226 | if let Some(repo_ref) = repo_ref { | ||
| 1227 | let mut index = self.repo_sync_index.write().await; | ||
| 1228 | let needs = index.entry(repo_ref).or_default(); | ||
| 1229 | needs.root_events.insert(event.id); | ||
| 1230 | } | ||
| 1231 | } | ||
| 1232 | } | ||
| 1233 | ``` | ||
| 1234 | |||
| 1235 | --- | ||
| 1236 | |||
| 1237 | ## Implementation Notes | ||
| 1238 | |||
| 1239 | This section documents the actual implementation details as of December 2024 (Phases 1-10 complete). | ||
| 1240 | |||
| 1241 | ### Architectural Decisions During Implementation | ||
| 1242 | |||
| 1243 | **Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup: | ||
| 1244 | |||
| 1245 | ```rust | ||
| 1246 | // 7. Wrap self in Arc<Mutex> for sharing with timer task | ||
| 1247 | let sync_manager = Arc::new(Mutex::new(self)); | ||
| 1248 | ``` | ||
| 1249 | |||
| 1250 | This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. | ||
| 1251 | |||
| 1252 | **Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: | ||
| 1253 | |||
| 1254 | - `DashMap` for thread-safe concurrent access without external locking | ||
| 1255 | - Three states: `Healthy`, `Degraded`, `Dead` | ||
| 1256 | - Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff | ||
| 1257 | - Dead threshold: 24 hours of continuous failures | ||
| 1258 | - Dead relay retry: Once per 24 hours | ||
| 1259 | |||
| 1260 | ### Implementation Constants | ||
| 1261 | |||
| 1262 | | Constant | Value | Purpose | | ||
| 1263 | | --------------------------------- | ---------- | ------------------------------------------------ | | ||
| 1264 | | `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | | ||
| 1265 | | `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | | ||
| 1266 | | `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | | ||
| 1267 | | `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | | ||
| 1268 | | `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | | ||
| 1269 | | `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | | ||
| 1270 | |||
| 1271 | ### Daily Timer Randomization | ||
| 1272 | |||
| 1273 | The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running: | ||
| 1274 | |||
| 1275 | ```rust | ||
| 1276 | let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0; | ||
| 1277 | ``` | ||
| 1278 | |||
| 1279 | ### Bootstrap Relay Protection | ||
| 1280 | |||
| 1281 | Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out: | ||
| 1282 | |||
| 1283 | ```rust | ||
| 1284 | .filter(|(_, state)| { | ||
| 1285 | !state.is_bootstrap && | ||
| 1286 | state.repos.is_empty() && | ||
| 1287 | state.root_events.is_empty() | ||
| 1288 | }) | ||
| 1289 | ``` | ||
| 1290 | |||
| 1291 | ### Graceful Shutdown | ||
| 1292 | |||
| 1293 | Shutdown uses a tokio broadcast channel for coordinated termination: | ||
| 1294 | |||
| 1295 | ```rust | ||
| 1296 | let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); | ||
| 1297 | ``` | ||
| 1298 | |||
| 1299 | Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop. | ||
| 1300 | |||
| 1301 | ### Actual Module Structure | ||
| 1302 | |||
| 1303 | The implemented module structure differs from the original spec: | ||
| 1304 | |||
| 1305 | ``` | ||
| 1306 | src/sync/ | ||
| 1307 | ├── mod.rs # SyncManager, main loop, index types, metrics | ||
| 1308 | ├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters | ||
| 1309 | ├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters | ||
| 1310 | ├── health.rs # RelayHealthTracker, HealthState, exponential backoff | ||
| 1311 | ├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling | ||
| 1312 | └── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic | ||
| 1313 | ``` | ||
| 1314 | |||
| 1315 | Key differences from spec: | ||
| 1316 | |||
| 1317 | - No separate `state.rs` - types are defined in `mod.rs` | ||
| 1318 | - No separate `actions.rs` - moved to `algorithms.rs` | ||
| 1319 | - No separate `consolidation.rs` - consolidation logic in `mod.rs` | ||
| 1320 | - No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs` | ||
| 1321 | |||
| 1322 | ### Deviations from Original v4 Spec | ||
| 1323 | |||
| 1324 | 1. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`. | ||
| 1325 | |||
| 1326 | 2. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct. | ||
| 1327 | |||
| 1328 | 3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. | ||
| 1329 | |||
| 1330 | 4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. | ||
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md index c07f07c..dd508b3 100644 --- a/docs/explanation/grasp-02-proactive-sync.md +++ b/docs/explanation/grasp-02-proactive-sync.md | |||
| @@ -1,929 +1,1330 @@ | |||
| 1 | # GRASP-02: Proactive Sync - Design Document | 1 | # GRASP-02: Proactive Sync v4 - Health & Reconnection Design |
| 2 | 2 | ||
| 3 | ## Overview | 3 | ## Overview |
| 4 | 4 | ||
| 5 | GRASP-02 Proactive Sync enables ngit-grasp to maintain live WebSocket connections to other relays listed in repository announcement events, synchronizing NIP-34 related events using both **live sync** (real-time subscriptions) and **negentropy catchup** (NIP-77 set reconciliation). | 5 | This document presents v4 of the proactive sync design, refining the connection lifecycle and reconnection patterns. Key principles: |
| 6 | 6 | ||
| 7 | This document covers **event syncing only**. Git data syncing is out of scope for this phase. | 7 | 1. **Self-subscription as the only mechanism** - No database initialization at startup |
| 8 | 2. **compute_actions as single decision point** - Determines what NEW subscriptions to create | ||
| 9 | 3. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) | ||
| 10 | 4. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch | ||
| 11 | 5. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary | ||
| 8 | 12 | ||
| 9 | ## Goals | 13 | --- |
| 10 | 14 | ||
| 11 | 1. **Data Availability**: Ensure we have all relevant events for repositories we host | 15 | ## Data Model |
| 12 | 2. **Resilience**: Handle relay failures gracefully with backoff and health tracking | ||
| 13 | 3. **Efficiency**: Minimize connections and bandwidth through filter consolidation | ||
| 14 | 4. **Consistency**: Use unified filters for both live sync and negentropy catchup | ||
| 15 | 16 | ||
| 16 | ## Architecture Overview | 17 | ### RepoSyncIndex (Source of Truth) |
| 17 | 18 | ||
| 18 | ```mermaid | 19 | ```rust |
| 19 | flowchart TB | 20 | /// What we WANT to sync - derived from events received via self-subscription. |
| 20 | subgraph ngit-grasp | 21 | /// Updated immediately when self-subscriber batch fires. |
| 21 | subgraph SyncManager | 22 | /// Key: repo addressable ref - 30617:pubkey:identifier |
| 22 | SS[Self-Subscriber] | 23 | pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>; |
| 23 | RC[Remote Connections] | 24 | |
| 24 | end | 25 | #[derive(Debug, Clone, Default)] |
| 25 | WS[WebSocket Server] | 26 | pub struct RepoSyncNeeds { |
| 26 | FS[FilterService] | 27 | /// Relay URLs listed in this repo's 30617 announcement |
| 27 | RH[RelayHealthTracker] | 28 | pub relays: HashSet<String>, |
| 28 | DB[(Database)] | 29 | /// Root event IDs - 1617/1618/1619/1621 - that reference this repo |
| 29 | AP[AcceptancePolicy] | 30 | pub root_events: HashSet<EventId>, |
| 30 | MET[Prometheus Metrics] | 31 | } |
| 31 | end | 32 | ``` |
| 32 | 33 | ||
| 33 | subgraph External Relays | 34 | ### RelaySyncIndex (Confirmed State + Connection) |
| 34 | R1[relay.example.com] | ||
| 35 | R2[other-grasp.io] | ||
| 36 | R3[nostr.land] | ||
| 37 | end | ||
| 38 | 35 | ||
| 39 | WS -->|broadcasts events| SS | 36 | ```rust |
| 40 | SS -->|discovers relays| RC | 37 | /// What we have CONFIRMED syncing - includes connection state for integrated lifecycle. |
| 41 | RC -->|builds filters| FS | 38 | /// Key: relay URL |
| 42 | RC -->|tracks health| RH | 39 | pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>; |
| 43 | RC -->|stores events| DB | 40 | |
| 44 | RC -->|validates| AP | 41 | /// Connection status for a relay |
| 42 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 43 | pub enum ConnectionStatus { | ||
| 44 | /// Not currently connected | ||
| 45 | Disconnected, | ||
| 46 | /// Connection attempt in progress | ||
| 47 | Connecting, | ||
| 48 | /// Successfully connected and subscribed | ||
| 49 | Connected, | ||
| 50 | } | ||
| 51 | |||
| 52 | /// Complete state for a single relay - combines sync needs with connection lifecycle | ||
| 53 | #[derive(Debug)] | ||
| 54 | pub struct RelayState { | ||
| 55 | /// Repos we have confirmed syncing from this relay | ||
| 56 | pub repos: HashSet<String>, | ||
| 57 | /// Root events we have confirmed tracking | ||
| 58 | pub root_events: HashSet<EventId>, | ||
| 59 | /// If true, never disconnect this relay | ||
| 60 | pub is_bootstrap: bool, | ||
| 61 | /// Current connection status | ||
| 62 | pub connection_status: ConnectionStatus, | ||
| 63 | /// When we last successfully connected - used for since filter on reconnect | ||
| 64 | pub last_connected: Option<Timestamp>, | ||
| 65 | /// When we disconnected - for 15-minute state retention rule | ||
| 66 | pub disconnected_at: Option<Timestamp>, | ||
| 67 | } | ||
| 45 | 68 | ||
| 46 | RC <-->|WebSocket + NEG| R1 | 69 | impl RelayState { |
| 47 | RC <-->|WebSocket + NEG| R2 | 70 | /// Check if state should be cleared based on 15-minute rule |
| 48 | RC <-->|WebSocket + NEG| R3 | 71 | pub fn should_clear_state(&self) -> bool { |
| 72 | match self.disconnected_at { | ||
| 73 | Some(disconnected) => { | ||
| 74 | let now = Timestamp::now(); | ||
| 75 | now.as_u64().saturating_sub(disconnected.as_u64()) > 900 // 15 minutes | ||
| 76 | } | ||
| 77 | None => false, // Still connected or never connected | ||
| 78 | } | ||
| 79 | } | ||
| 49 | 80 | ||
| 50 | RH -->|exposes state| MET | 81 | /// Clear repos and root_events - called when reconnect takes > 15 minutes |
| 82 | pub fn clear_sync_state(&mut self) { | ||
| 83 | self.repos.clear(); | ||
| 84 | self.root_events.clear(); | ||
| 85 | } | ||
| 86 | } | ||
| 51 | ``` | 87 | ``` |
| 52 | 88 | ||
| 53 | **Key Insight: Self-Subscribe Architecture** | 89 | ### PendingSyncIndex (In-Flight Batches) |
| 54 | 90 | ||
| 55 | The SyncManager uses a "self-subscribe" pattern for relay discovery. Rather than polling the database periodically, it connects to its own WebSocket server as a client and subscribes to kind 30617 events. When new announcements are saved (from any source), the self-subscriber receives them instantly and can spawn connections to newly discovered relays. | 91 | ```rust |
| 92 | /// Tracks batches of subscriptions that are in-flight, awaiting EOSE. | ||
| 93 | /// Each batch has its own ID and can confirm independently. | ||
| 94 | /// Key: relay URL | ||
| 95 | pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | ||
| 96 | |||
| 97 | #[derive(Debug, Clone)] | ||
| 98 | pub struct PendingBatch { | ||
| 99 | /// Unique ID for this batch - for debugging/logging | ||
| 100 | pub batch_id: u64, | ||
| 101 | /// The items this batch is syncing | ||
| 102 | pub items: PendingItems, | ||
| 103 | /// Subscription IDs that must ALL receive EOSE before confirming | ||
| 104 | pub outstanding_subs: HashSet<SubscriptionId>, | ||
| 105 | } | ||
| 56 | 106 | ||
| 57 | ## Connection Management | 107 | #[derive(Debug, Clone, Default)] |
| 108 | pub struct PendingItems { | ||
| 109 | pub repos: HashSet<String>, | ||
| 110 | pub root_events: HashSet<EventId>, | ||
| 111 | } | ||
| 112 | ``` | ||
| 58 | 113 | ||
| 59 | ### Relay Discovery | 114 | --- |
| 60 | 115 | ||
| 61 | Relays to connect to are discovered using a **self-subscribe architecture** rather than periodic polling. The SyncManager connects to its own relay as a client and subscribes to kind 30617 (repository announcement) events. When a new announcement is saved to the database (from direct submission or sync), the self-subscriber receives it immediately and discovers new relays to connect to. | 116 | ## Connection Lifecycle State Machine |
| 62 | 117 | ||
| 63 | ```mermaid | 118 | ```mermaid |
| 64 | flowchart LR | 119 | stateDiagram-v2 |
| 65 | subgraph Relay | 120 | [*] --> Disconnected: discover relay via RepoSyncIndex |
| 66 | WS[WebSocket Server] | 121 | Disconnected --> Connecting: AddFilters triggers spawn_connection |
| 67 | DB[(Database)] | 122 | Connecting --> Connected: success |
| 68 | end | 123 | Connecting --> Disconnected: failure + record in health tracker |
| 69 | 124 | Connected --> Disconnected: connection lost | |
| 70 | subgraph SyncManager | 125 | Connected --> [*]: intentional disconnect via check_disconnects |
| 71 | SS[Self-Subscribe Client] | 126 | |
| 72 | RC[Remote Connections] | 127 | note right of Disconnected: disconnected_at set for 15min rule |
| 73 | end | 128 | note right of Connected: last_connected tracked for since filter |
| 74 | |||
| 75 | WS -->|broadcast| SS | ||
| 76 | SS -->|extract relay URLs| RC | ||
| 77 | RC -->|sync events| WS | ||
| 78 | ``` | 129 | ``` |
| 79 | 130 | ||
| 80 | **Why Self-Subscribe vs Polling?** | 131 | --- |
| 81 | 132 | ||
| 82 | | Approach | Latency | Complexity | Resource Use | | 133 | ## Flow Scenarios |
| 83 | |----------|---------|------------|--------------| | ||
| 84 | | Self-Subscribe | Instant | Low | Minimal (1 WS connection) | | ||
| 85 | | Periodic Polling | 30s+ delay | Higher | DB queries every N seconds | | ||
| 86 | 134 | ||
| 87 | The self-subscribe approach provides: | 135 | ### Scenario 1: Initial Connect via handle_connect_or_reconnect |
| 88 | - **Immediate discovery**: New relays discovered instantly when announcement saved | ||
| 89 | - **No polling overhead**: No periodic database queries | ||
| 90 | - **Simple architecture**: Reuses existing WebSocket infrastructure | ||
| 91 | 136 | ||
| 92 | **Implementation Pattern:** | 137 | ```mermaid |
| 138 | flowchart TB | ||
| 139 | START[Startup] --> SS[Self-subscribe to own relay] | ||
| 140 | SS --> |no since filter| EVENTS[Receive historical events] | ||
| 141 | EVENTS --> RSI[Update RepoSyncIndex] | ||
| 142 | RSI --> DT[derive_relay_targets] | ||
| 143 | DT --> CA[compute_actions with targets and empty confirmed] | ||
| 144 | CA --> AF[AddFilters for each relay] | ||
| 145 | AF --> SPAWN{Relay connected?} | ||
| 146 | SPAWN --> |no| CONN[spawn_connection] | ||
| 147 | CONN --> HC[handle_connect_or_reconnect] | ||
| 148 | SPAWN --> |yes| SUB | ||
| 149 | |||
| 150 | subgraph handle_connect_or_reconnect - Fresh Sync | ||
| 151 | HC --> CHECK_FRESH{is_fresh_sync?} | ||
| 152 | CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] | ||
| 153 | L1 --> RCA[recompute_actions_for_relay] | ||
| 154 | end | ||
| 93 | 155 | ||
| 94 | ```rust | 156 | RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] |
| 95 | // In SyncManager::run() | 157 | SUB --> PB[Create PendingBatch] |
| 96 | let self_client = Client::default(); | 158 | PB --> EOSE[Wait for EOSE] |
| 97 | self_client.add_relay(&own_relay_url).await?; | 159 | EOSE --> CONFIRM[Move items to confirmed repos/root_events] |
| 98 | self_client.connect().await; | ||
| 99 | |||
| 100 | let filter = Filter::new().kind(Kind::Custom(30617)); | ||
| 101 | self_client.subscribe(filter, None).await?; | ||
| 102 | |||
| 103 | // Handle notifications - when announcement arrives, extract relay URLs | ||
| 104 | client.handle_notifications(|notification| async { | ||
| 105 | if let RelayPoolNotification::Event { event, .. } = notification { | ||
| 106 | let new_urls = filter_service.extract_relay_urls_from_event(&event); | ||
| 107 | for url in new_urls { | ||
| 108 | if !active_relays.contains(&url) && !is_own_relay(&url) { | ||
| 109 | spawn_connection(url, tx.clone(), filter_service.clone()); | ||
| 110 | } | ||
| 111 | } | ||
| 112 | } | ||
| 113 | Ok(false) // Continue processing | ||
| 114 | }); | ||
| 115 | ``` | 160 | ``` |
| 116 | 161 | ||
| 117 | **Startup Discovery:** At startup, existing announcements in the database are queried once to discover initial relays. After startup, all discovery is event-driven via self-subscribe. | 162 | **Key points:** |
| 118 | 163 | ||
| 119 | **Reconnection:** The self-subscriber has built-in exponential backoff reconnection (1s → 60s max) to handle temporary disconnections from our own relay. | 164 | - No `since` filter on initial connect - get full history |
| 165 | - `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` | ||
| 166 | - Layer 1: `build_announcement_filter(None)` - subscribed immediately without since | ||
| 167 | - Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking | ||
| 120 | 168 | ||
| 121 | ### Connection Lifecycle | 169 | ### Scenario 2: Quick Reconnect via handle_connect_or_reconnect - less than 15 minutes |
| 122 | 170 | ||
| 123 | ```mermaid | 171 | ```mermaid |
| 124 | stateDiagram-v2 | 172 | flowchart TB |
| 125 | [*] --> Connecting: startup/new relay | 173 | DISC[Connection lost] --> MARK[Set disconnected_at = now] |
| 126 | Connecting --> Connected: success | 174 | MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] |
| 127 | Connecting --> Backoff: failure | 175 | CLEAR_PEND --> WAIT[Wait for reconnection] |
| 128 | Connected --> Disconnected: connection lost | 176 | WAIT --> RECONN[Connection restored] |
| 129 | Disconnected --> Backoff: reconnect failed | 177 | RECONN --> HC[handle_connect_or_reconnect] |
| 130 | Backoff --> Connecting: backoff timer expires | 178 | |
| 131 | Backoff --> Dead: 24h continuous failures | 179 | subgraph handle_connect_or_reconnect - Quick Reconnect |
| 132 | Dead --> Connecting: daily retry timer | 180 | HC --> CHECK{is_fresh_sync?} |
| 133 | Connected --> Updating: filter change | 181 | CHECK --> |no - last_connected exists AND <15min| SINCE[since = last_connected - 15min] |
| 134 | Updating --> Connected: complete | 182 | SINCE --> L1[build_announcement_filter - with since] |
| 183 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | ||
| 184 | L23 --> RCA[recompute_actions_for_relay] | ||
| 185 | end | ||
| 186 | |||
| 187 | RCA --> AF[AddFilters for new items only] | ||
| 188 | AF --> SUB[Subscribe] | ||
| 189 | SUB --> PB[Create PendingBatch] | ||
| 190 | PB --> EOSE[Wait for EOSE] | ||
| 191 | EOSE --> EXTEND[Extend confirmed state] | ||
| 135 | ``` | 192 | ``` |
| 136 | 193 | ||
| 137 | ### Health Tracking & Backoff | 194 | **Key points:** |
| 138 | 195 | ||
| 139 | | State | Behavior | | 196 | - PendingSyncIndex cleared on disconnect (not reconnect) |
| 140 | | ----------- | --------------------------------------------------- | | 197 | - `handle_connect_or_reconnect`: |
| 141 | | **Healthy** | Normal operation, immediate reconnect on disconnect | | 198 | 1. `build_announcement_filter(Some(since))` - Layer 1 with since |
| 142 | | **Backoff** | Exponential backoff: 5s → 10s → 20s → ... → 1h max | | 199 | 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since |
| 143 | | **Dead** | 24h of continuous failures, retry once per day | | 200 | 3. `recompute_actions_for_relay` - check for new items |
| 201 | - since = last_connected - 15min ensures we catch events during disconnection | ||
| 144 | 202 | ||
| 145 | Health state is **kept in-memory** using a `DashMap` for lock-free concurrent access: | 203 | ### Scenario 3: Stale Reconnect via handle_connect_or_reconnect - greater than 15 minutes |
| 146 | 204 | ||
| 147 | ```rust | 205 | ```mermaid |
| 148 | /// In-memory relay health tracking (NOT persisted to database) | 206 | flowchart TB |
| 149 | /// | 207 | RECONN[Connection restored] --> HC[handle_connect_or_reconnect] |
| 150 | /// Design rationale: For <100 relays, persistence adds complexity without | ||
| 151 | /// significant benefit. Conservative initial backoff on restart avoids | ||
| 152 | /// thundering herd issues. | ||
| 153 | struct RelayHealthTracker { | ||
| 154 | health: DashMap<RelayUrl, RelayHealth>, | ||
| 155 | metrics: SyncMetrics, // Prometheus metrics for operator visibility | ||
| 156 | } | ||
| 157 | 208 | ||
| 158 | struct RelayHealth { | 209 | subgraph handle_connect_or_reconnect - Stale Reconnect |
| 159 | url: RelayUrl, | 210 | HC --> CHECK{is_fresh_sync?} |
| 160 | status: RelayStatus, // Healthy, Backoff, Dead | 211 | CHECK --> |yes - disconnected >15min| CLEAR[clear_sync_state] |
| 161 | consecutive_failures: u32, | 212 | CLEAR --> L1[build_announcement_filter - no since] |
| 162 | last_failure_at: Option<Instant>, | 213 | L1 --> RCA[recompute_actions_for_relay] |
| 163 | last_success_at: Option<Instant>, | 214 | end |
| 164 | next_retry_at: Instant, | ||
| 165 | } | ||
| 166 | 215 | ||
| 167 | enum RelayStatus { | 216 | RCA --> CA[compute_actions with empty confirmed] |
| 168 | Healthy, | 217 | CA --> AF[AddFilters for everything] |
| 169 | Backoff { attempt: u32 }, // backoff = min(5 * 2^attempt, 3600) seconds | 218 | AF --> SUB[Subscribe - no since filter] |
| 170 | Dead, // retry in 24h | 219 | SUB --> PB[Create PendingBatch] |
| 171 | } | 220 | PB --> EOSE[Wait for EOSE] |
| 221 | EOSE --> CONFIRM[Populate confirmed state fresh] | ||
| 172 | ``` | 222 | ``` |
| 173 | 223 | ||
| 174 | ### Restart Behavior (Graceful Degradation) | 224 | **Key points:** |
| 175 | 225 | ||
| 176 | On restart, all relay health state is reset. To avoid thundering herd: | 226 | - `should_clear_state()` returns true → triggers fresh sync |
| 227 | - Same path as initial connect after clearing state | ||
| 228 | - Layer 1: `build_announcement_filter(None)` - full history | ||
| 229 | - Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything | ||
| 177 | 230 | ||
| 178 | 1. **Conservative initial backoff**: Start with 5s delay (not immediate) for all relays | 231 | ### Scenario 4: Consolidation - Triggered on Filter Add |
| 179 | 2. **Staggered connection attempts**: Add random jitter (0-2s) per relay | ||
| 180 | 3. **Health rebuilds organically**: Relays prove themselves healthy through successful connections | ||
| 181 | 232 | ||
| 182 | ```rust | 233 | ```mermaid |
| 183 | impl RelayHealthTracker { | 234 | flowchart TB |
| 184 | fn new(metrics: SyncMetrics) -> Self { | 235 | AF[handle_add_filters called] --> COUNT{current + new > 70?} |
| 185 | Self { | 236 | COUNT --> |yes| CONSOLIDATE[consolidate] |
| 186 | health: DashMap::new(), | 237 | CONSOLIDATE --> WAIT_PEND[wait_pending_complete] |
| 187 | metrics, | 238 | WAIT_PEND --> CLOSE[unsubscribe_all] |
| 188 | } | 239 | CLOSE --> SINCE[since = now - 15min] |
| 189 | } | 240 | SINCE --> L1[build_announcement_filter - with since] |
| 241 | L1 --> L23[rebuild_layer2_and_layer3 - with since] | ||
| 242 | COUNT --> |no| SUB[Subscribe new filters] | ||
| 243 | SUB --> PB[Create PendingBatch] | ||
| 244 | ``` | ||
| 190 | 245 | ||
| 191 | /// Called on startup for each discovered relay | 246 | **Key points:** |
| 192 | fn initialize_relay(&self, url: RelayUrl) { | ||
| 193 | self.health.insert(url.clone(), RelayHealth { | ||
| 194 | url, | ||
| 195 | status: RelayStatus::Backoff { attempt: 0 }, // Start conservative | ||
| 196 | consecutive_failures: 0, | ||
| 197 | last_failure_at: None, | ||
| 198 | last_success_at: None, | ||
| 199 | next_retry_at: Instant::now() + Self::initial_backoff_with_jitter(), | ||
| 200 | }); | ||
| 201 | } | ||
| 202 | 247 | ||
| 203 | fn initial_backoff_with_jitter() -> Duration { | 248 | - Consolidation checked in `handle_add_filters` BEFORE adding new filters |
| 204 | Duration::from_secs(5) + Duration::from_millis(rand::random::<u64>() % 2000) | 249 | - After closing all subscriptions, re-subscribe: |
| 205 | } | 250 | 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since |
| 206 | } | 251 | 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since |
| 252 | - `since = now - 15min` prevents re-fetching old events | ||
| 253 | - Keeps confirmed state, just reduces filter count | ||
| 254 | |||
| 255 | ### Scenario 5: Daily Timer - 23 to 25h Random | ||
| 256 | |||
| 257 | ```mermaid | ||
| 258 | flowchart TB | ||
| 259 | DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] | ||
| 260 | CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] | ||
| 261 | CLEAR_PEND --> CLEAR_STATE[clear_sync_state] | ||
| 262 | CLEAR_STATE --> L1[build_announcement_filter - no since] | ||
| 263 | L1 --> RCA[recompute_actions_for_relay] | ||
| 264 | RCA --> CA[compute_actions with empty confirmed] | ||
| 265 | CA --> AF[AddFilters for everything] | ||
| 266 | AF --> SUB[Subscribe - no since filter] | ||
| 267 | SUB --> PB[Create PendingBatch] | ||
| 268 | PB --> EOSE[Wait for EOSE] | ||
| 269 | EOSE --> CONFIRM[Repopulate confirmed state] | ||
| 207 | ``` | 270 | ``` |
| 208 | 271 | ||
| 209 | **Trade-off**: We lose knowledge of chronically failing relays across restarts. This is acceptable because: | 272 | **Key points:** |
| 273 | |||
| 274 | - Daily timer is a full fresh sync, NOT consolidation | ||
| 275 | - Clears both PendingSyncIndex and confirmed state | ||
| 276 | - Layer 1: `build_announcement_filter(None)` - full history | ||
| 277 | - Layer 2+3: via compute_actions with empty confirmed - full history | ||
| 278 | - Detects any state drift accumulated over 24 hours | ||
| 210 | 279 | ||
| 211 | - Scale is small (<100 relays) | 280 | --- |
| 212 | - Conservative initial backoff prevents hammering bad relays | ||
| 213 | - Prometheus metrics preserve historical health data for operators | ||
| 214 | 281 | ||
| 215 | ## Filter Strategy | 282 | ## Core Algorithms |
| 216 | 283 | ||
| 217 | ### Unified Filters for Live Sync and Negentropy | 284 | ### 1. derive_relay_targets |
| 218 | 285 | ||
| 219 | The same filter logic is used for both live subscriptions and negentropy reconciliation: | 286 | Transform RepoSyncIndex into per-relay sync targets: |
| 220 | 287 | ||
| 221 | ```mermaid | 288 | ```rust |
| 222 | flowchart LR | 289 | /// Inverts RepoSyncIndex to get per-relay view |
| 223 | subgraph Filter Layers | 290 | fn derive_relay_targets( |
| 224 | F1[Layer 1: All 30617+30618] | 291 | repo_index: &HashMap<String, RepoSyncNeeds> |
| 225 | F2[Layer 2: Events tagging repos via A/a/q] | 292 | ) -> HashMap<String, RelaySyncNeeds> { |
| 226 | F3[Layer 3: Events tagging PRs/Issues via E/e/q] | 293 | let mut targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); |
| 227 | end | 294 | |
| 295 | for (repo_ref, needs) in repo_index { | ||
| 296 | for relay_url in &needs.relays { | ||
| 297 | let target = targets.entry(relay_url.clone()).or_default(); | ||
| 298 | target.repos.insert(repo_ref.clone()); | ||
| 299 | target.root_events.extend(needs.root_events.iter().cloned()); | ||
| 300 | } | ||
| 301 | } | ||
| 228 | 302 | ||
| 229 | F1 -->|client-side| AP[Acceptance Policy] | 303 | targets |
| 230 | F2 -->|server-side| Relay | 304 | } |
| 231 | F3 -->|server-side| Relay | ||
| 232 | ``` | 305 | ``` |
| 233 | 306 | ||
| 234 | ### Layer 1: Repository Announcements & States | 307 | ### 2. compute_actions (Three-Way Diff) |
| 235 | 308 | ||
| 236 | Get ALL kind 30617 and 30618 events with unified `since` timestamp, then filter client-side through acceptance policy: | 309 | **This is the ONLY decision point for what NEW subscriptions to create.** |
| 237 | 310 | ||
| 238 | ```rust | 311 | ```rust |
| 239 | // Use same since filter as other layers for consistency | 312 | /// Computes AddFilters for items that are: |
| 240 | let layer1_filter = Filter::new() | 313 | /// - In targets (what we want) |
| 241 | .kinds([Kind::from(30617), Kind::from(30618)]) | 314 | /// - NOT in pending (already in-flight) |
| 242 | .since(since_timestamp); // Unified with Layer 2/3 | 315 | /// - NOT in confirmed (already confirmed) |
| 316 | fn compute_actions( | ||
| 317 | targets: &HashMap<String, RelaySyncNeeds>, | ||
| 318 | pending: &HashMap<String, Vec<PendingBatch>>, | ||
| 319 | confirmed: &HashMap<String, RelayState>, | ||
| 320 | ) -> Vec<AddFilters> { | ||
| 321 | let mut actions = Vec::new(); | ||
| 322 | |||
| 323 | for (relay_url, target) in targets { | ||
| 324 | // Skip disconnected relays - they will get AddFilters on reconnect | ||
| 325 | if let Some(state) = confirmed.get(relay_url) { | ||
| 326 | if state.connection_status != ConnectionStatus::Connected { | ||
| 327 | continue; | ||
| 328 | } | ||
| 329 | } | ||
| 330 | |||
| 331 | // Collect all pending items for this relay | ||
| 332 | let pending_repos: HashSet<_> = pending.get(relay_url) | ||
| 333 | .map(|batches| batches.iter() | ||
| 334 | .flat_map(|b| b.items.repos.iter().cloned()) | ||
| 335 | .collect()) | ||
| 336 | .unwrap_or_default(); | ||
| 337 | let pending_events: HashSet<_> = pending.get(relay_url) | ||
| 338 | .map(|batches| batches.iter() | ||
| 339 | .flat_map(|b| b.items.root_events.iter().cloned()) | ||
| 340 | .collect()) | ||
| 341 | .unwrap_or_default(); | ||
| 342 | |||
| 343 | // Collect confirmed items for this relay | ||
| 344 | let confirmed_repos = confirmed.get(relay_url) | ||
| 345 | .map(|c| &c.repos) | ||
| 346 | .unwrap_or(&HashSet::new()); | ||
| 347 | let confirmed_events = confirmed.get(relay_url) | ||
| 348 | .map(|c| &c.root_events) | ||
| 349 | .unwrap_or(&HashSet::new()); | ||
| 350 | |||
| 351 | // New = target - pending - confirmed | ||
| 352 | let new_repos: HashSet<_> = target.repos.iter() | ||
| 353 | .filter(|r| !pending_repos.contains(*r) && !confirmed_repos.contains(*r)) | ||
| 354 | .cloned() | ||
| 355 | .collect(); | ||
| 356 | let new_events: HashSet<_> = target.root_events.iter() | ||
| 357 | .filter(|e| !pending_events.contains(*e) && !confirmed_events.contains(*e)) | ||
| 358 | .cloned() | ||
| 359 | .collect(); | ||
| 360 | |||
| 361 | if !new_repos.is_empty() || !new_events.is_empty() { | ||
| 362 | let filters = build_filters(&new_repos, &new_events); | ||
| 363 | actions.push(AddFilters { | ||
| 364 | relay_url: relay_url.clone(), | ||
| 365 | repos: new_repos, | ||
| 366 | root_events: new_events, | ||
| 367 | filters, | ||
| 368 | }); | ||
| 369 | } | ||
| 370 | } | ||
| 371 | |||
| 372 | actions | ||
| 373 | } | ||
| 243 | ``` | 374 | ``` |
| 244 | 375 | ||
| 245 | **Client-side validation**: Only store events that pass our [`Nip34WritePolicy`](src/nostr/builder.rs:51). | 376 | ### 3. Filter Building Functions (Three-Layer Strategy) |
| 246 | 377 | ||
| 247 | ### Layer 2: Events Tagging Repositories | 378 | The filter strategy uses three layers: |
| 248 | 379 | ||
| 249 | For repo announcements **that list BOTH this relay AND our service**: | 380 | - **Layer 1**: Announcements (30617/30618) - subscribed ONCE on connect, NOT rebuilt during consolidation |
| 381 | - **Layer 2**: Events tagging our repos | ||
| 382 | - **Layer 3**: Events tagging our root events | ||
| 383 | |||
| 384 | **Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). | ||
| 250 | 385 | ||
| 251 | ```rust | 386 | ```rust |
| 252 | // Build addressable references: 30617:<pubkey>:<identifier> | 387 | /// Layer 1: Announcements filter (kinds 30617 + 30618) |
| 253 | let repo_refs: Vec<String> = announcements | 388 | /// Subscribed ONCE on connect - NOT included in consolidation rebuilds. |
| 254 | .iter() | 389 | /// Note: 30618 is ONLY synced from remote relays, not self-subscribed. |
| 255 | .filter(|a| a.relays.contains(&this_relay) && a.lists_service(&our_domain)) | 390 | fn build_announcement_filter(since: Option<Timestamp>) -> Filter { |
| 256 | .map(|a| format!("30617:{}:{}", a.pubkey.to_hex(), a.identifier)) | 391 | let filter = Filter::new().kinds([ |
| 257 | .collect(); | 392 | Kind::Custom(30617), // Repository announcements |
| 258 | 393 | Kind::Custom(30618), // Maintainer lists | |
| 259 | let layer2_filter = Filter::new() | 394 | ]); |
| 260 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_refs.clone()) | 395 | |
| 261 | .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo_refs)); | 396 | match since { |
| 262 | ``` | 397 | Some(ts) => filter.since(ts), |
| 398 | None => filter, | ||
| 399 | } | ||
| 400 | } | ||
| 263 | 401 | ||
| 264 | ### Layer 3: Events Tagging Issues/PRs/Patches | 402 | /// Layer 2: Events tagging one of our repos |
| 403 | /// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. | ||
| 404 | /// Batched per 100 repo refs. | ||
| 405 | fn tagged_one_of_our_repo_event_filters( | ||
| 406 | repos: &HashSet<String>, | ||
| 407 | since: Option<Timestamp>, | ||
| 408 | ) -> Vec<Filter> { | ||
| 409 | let mut filters = Vec::new(); | ||
| 410 | let repo_refs: Vec<_> = repos.iter().collect(); | ||
| 411 | |||
| 412 | for chunk in repo_refs.chunks(100) { | ||
| 413 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); | ||
| 414 | |||
| 415 | // Lowercase 'a' tag - standard addressable reference | ||
| 416 | let mut f1 = Filter::new() | ||
| 417 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), chunk_vec.clone()); | ||
| 418 | // Uppercase 'A' tag - some clients use this | ||
| 419 | let mut f2 = Filter::new() | ||
| 420 | .custom_tag(SingleLetterTag::uppercase(Alphabet::A), chunk_vec.clone()); | ||
| 421 | // Quote 'q' tag - NIP-10 quote references to addressable events | ||
| 422 | let mut f3 = Filter::new() | ||
| 423 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); | ||
| 424 | |||
| 425 | if let Some(ts) = since { | ||
| 426 | f1 = f1.since(ts); | ||
| 427 | f2 = f2.since(ts); | ||
| 428 | f3 = f3.since(ts); | ||
| 429 | } | ||
| 265 | 430 | ||
| 266 | For events that reference PRs, Patches, or Issues from repos we track: | 431 | filters.push(f1); |
| 432 | filters.push(f2); | ||
| 433 | filters.push(f3); | ||
| 434 | } | ||
| 267 | 435 | ||
| 268 | ```rust | 436 | filters |
| 269 | // Collect event IDs of PRs, Patches, Issues we've stored | 437 | } |
| 270 | let tagged_event_ids: Vec<EventId> = database | ||
| 271 | .query(Filter::new().kinds([1618, 1619, 1621, 1622, 1630])) // PR, PR Update, Issue, Patch, etc. | ||
| 272 | .iter() | ||
| 273 | .filter(|e| references_tracked_repo(e, &announcements)) | ||
| 274 | .map(|e| e.id) | ||
| 275 | .collect(); | ||
| 276 | |||
| 277 | let layer3_filter = Filter::new() | ||
| 278 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), tagged_event_ids.clone()) | ||
| 279 | .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), tagged_event_ids)); | ||
| 280 | ``` | ||
| 281 | 438 | ||
| 282 | ### Filter Size Management | 439 | /// Layer 3: Events tagging one of our root events |
| 440 | /// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. | ||
| 441 | /// Batched per 100 event IDs. | ||
| 442 | fn tagged_one_of_our_root_event_filters( | ||
| 443 | root_events: &HashSet<EventId>, | ||
| 444 | since: Option<Timestamp>, | ||
| 445 | ) -> Vec<Filter> { | ||
| 446 | let mut filters = Vec::new(); | ||
| 447 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); | ||
| 448 | |||
| 449 | for chunk in event_ids.chunks(100) { | ||
| 450 | let chunk_vec: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect(); | ||
| 451 | |||
| 452 | // Lowercase 'e' tag - standard event reference | ||
| 453 | let mut f1 = Filter::new() | ||
| 454 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), chunk_vec.clone()); | ||
| 455 | // Uppercase 'E' tag - some clients use this | ||
| 456 | let mut f2 = Filter::new() | ||
| 457 | .custom_tag(SingleLetterTag::uppercase(Alphabet::E), chunk_vec.clone()); | ||
| 458 | // Quote 'q' tag - NIP-10 quote references to events | ||
| 459 | let mut f3 = Filter::new() | ||
| 460 | .custom_tag(SingleLetterTag::lowercase(Alphabet::Q), chunk_vec); | ||
| 461 | |||
| 462 | if let Some(ts) = since { | ||
| 463 | f1 = f1.since(ts); | ||
| 464 | f2 = f2.since(ts); | ||
| 465 | f3 = f3.since(ts); | ||
| 466 | } | ||
| 283 | 467 | ||
| 284 | When the tag list exceeds a threshold, split into batches: | 468 | filters.push(f1); |
| 469 | filters.push(f2); | ||
| 470 | filters.push(f3); | ||
| 471 | } | ||
| 285 | 472 | ||
| 286 | ```rust | 473 | filters |
| 287 | const MAX_TAGS_PER_FILTER: usize = 100; | 474 | } |
| 288 | 475 | ||
| 289 | fn build_filters(tag_values: Vec<String>) -> Vec<Filter> { | 476 | /// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) |
| 290 | tag_values | 477 | /// Used by: |
| 291 | .chunks(MAX_TAGS_PER_FILTER) | 478 | /// - compute_actions for incremental subscriptions |
| 292 | .map(|chunk| Filter::new().custom_tag(tag, chunk.to_vec())) | 479 | /// - consolidation rebuilds (Layer 1 remains active) |
| 293 | .collect() | 480 | fn build_layer2_and_layer3_filters( |
| 481 | repos: &HashSet<String>, | ||
| 482 | root_events: &HashSet<EventId>, | ||
| 483 | since: Option<Timestamp>, | ||
| 484 | ) -> Vec<Filter> { | ||
| 485 | let mut filters = Vec::new(); | ||
| 486 | filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); | ||
| 487 | filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); | ||
| 488 | filters | ||
| 294 | } | 489 | } |
| 295 | ``` | 490 | ``` |
| 296 | 491 | ||
| 297 | **Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch. | 492 | **Note**: There is no `build_all_filters` function. Layer 1 is subscribed separately on connect, and Layer 2+3 are managed independently. |
| 298 | 493 | ||
| 299 | ### Filter Generation vs. Policy Validation | 494 | ### 4. handle_add_filters (SyncManager) |
| 300 | 495 | ||
| 301 | The filter strategy and acceptance policies serve **different purposes** even though they share conceptual knowledge: | 496 | ```rust |
| 497 | impl SyncManager { | ||
| 498 | async fn handle_add_filters(&mut self, action: AddFilters) { | ||
| 499 | let AddFilters { relay_url, repos, root_events, filters } = action; | ||
| 500 | |||
| 501 | // Auto-spawn connection if needed | ||
| 502 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); | ||
| 503 | match state { | ||
| 504 | None => { | ||
| 505 | // New relay discovered - create entry and spawn connection | ||
| 506 | self.relay_sync_index.write().await.insert( | ||
| 507 | relay_url.clone(), | ||
| 508 | RelayState { | ||
| 509 | repos: HashSet::new(), | ||
| 510 | root_events: HashSet::new(), | ||
| 511 | is_bootstrap: false, | ||
| 512 | connection_status: ConnectionStatus::Connecting, | ||
| 513 | last_connected: None, | ||
| 514 | disconnected_at: None, | ||
| 515 | connection: None, | ||
| 516 | } | ||
| 517 | ); | ||
| 518 | self.spawn_connection(&relay_url).await; | ||
| 519 | return; // Subscriptions will happen on connection success | ||
| 520 | } | ||
| 521 | Some(state) if state.connection_status != ConnectionStatus::Connected => { | ||
| 522 | // Not connected - subscriptions will happen on connection success | ||
| 523 | return; | ||
| 524 | } | ||
| 525 | Some(_) => { | ||
| 526 | // Already connected - proceed with subscription | ||
| 527 | } | ||
| 528 | } | ||
| 302 | 529 | ||
| 303 | | Concern | Filters | Policies | | 530 | // Subscribe and collect subscription IDs |
| 304 | |---------|---------|----------| | 531 | let conn = self.connections.get(&relay_url).unwrap(); |
| 305 | | **Direction** | What to request FROM remote relays | What to accept INTO local database | | 532 | let mut sub_ids = HashSet::new(); |
| 306 | | **Input** | Stored events (announcements, PRs, etc.) | Single incoming event | | 533 | |
| 307 | | **Output** | Filter specification | Accept/Reject decision | | 534 | for filter in filters { |
| 535 | match conn.client.subscribe(filter, None).await { | ||
| 536 | Ok(output) => { | ||
| 537 | for sub_id in output.val { | ||
| 538 | sub_ids.insert(sub_id); | ||
| 539 | } | ||
| 540 | } | ||
| 541 | Err(e) => { | ||
| 542 | tracing::warn!("Failed to subscribe: {}", e); | ||
| 543 | } | ||
| 544 | } | ||
| 545 | } | ||
| 308 | 546 | ||
| 309 | The modular sub-policies ([`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24), [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25), etc.) encode knowledge about event kinds and tag types, but this knowledge is applied differently: | 547 | // Create pending batch |
| 548 | let batch = PendingBatch { | ||
| 549 | batch_id: self.next_batch_id(), | ||
| 550 | items: PendingItems { repos, root_events }, | ||
| 551 | outstanding_subs: sub_ids, | ||
| 552 | }; | ||
| 310 | 553 | ||
| 311 | - **In filters**: We enumerate **all** addressable refs (`30617:pubkey:id`) from stored announcements | 554 | // Add to pending index |
| 312 | - **In policies**: [`RelatedEventPolicy::check_references()`](../../src/nostr/policy/related.rs:39) checks if incoming event references **any** accepted event | 555 | self.pending_sync_index.write().await |
| 556 | .entry(relay_url) | ||
| 557 | .or_default() | ||
| 558 | .push(batch); | ||
| 559 | } | ||
| 560 | } | ||
| 561 | ``` | ||
| 313 | 562 | ||
| 314 | Because of this fundamental difference, filter generation logic stays in `src/sync/filter.rs` rather than being delegated to policy modules. Both share the understanding of NIP-34 event relationships, but they answer different questions. | 563 | ### 5. handle_disconnect |
| 315 | 564 | ||
| 316 | ## Subscription Updates | 565 | ```rust |
| 566 | impl SyncManager { | ||
| 567 | /// Called when connection to a relay is lost | ||
| 568 | async fn handle_disconnect(&mut self, relay_url: &str) { | ||
| 569 | let mut index = self.relay_sync_index.write().await; | ||
| 570 | |||
| 571 | if let Some(state) = index.get_mut(relay_url) { | ||
| 572 | state.connection_status = ConnectionStatus::Disconnected; | ||
| 573 | state.disconnected_at = Some(Timestamp::now()); | ||
| 574 | state.connection = None; | ||
| 575 | } | ||
| 317 | 576 | ||
| 318 | ### Dynamic Subscription Management | 577 | // Clear pending batches - these items were not confirmed |
| 578 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 319 | 579 | ||
| 320 | When new events arrive that affect our filter criteria: | 580 | // Remove from active connections map |
| 581 | self.connections.remove(relay_url); | ||
| 321 | 582 | ||
| 322 | ```mermaid | 583 | // Health tracker records failure for backoff |
| 323 | sequenceDiagram | 584 | self.health_tracker.record_failure(relay_url); |
| 324 | participant LocalRelay | 585 | } |
| 325 | participant SyncManager | 586 | } |
| 326 | participant RemoteRelay | ||
| 327 | |||
| 328 | LocalRelay->>SyncManager: New PR event accepted | ||
| 329 | SyncManager->>SyncManager: Extract event ID | ||
| 330 | SyncManager->>SyncManager: Build new filter for E/e/q tags | ||
| 331 | SyncManager->>RemoteRelay: REQ with new filter | ||
| 332 | RemoteRelay-->>SyncManager: Events matching new filter | ||
| 333 | ``` | 587 | ``` |
| 334 | 588 | ||
| 335 | **Events that trigger subscription updates**: | 589 | ### 6. handle_connect_or_reconnect (Unified) |
| 336 | 590 | ||
| 337 | - New repository announcement accepted (adds to Layer 2) | 591 | This method handles BOTH initial connection AND reconnection with unified logic: |
| 338 | - New PR/Issue/Patch accepted (adds to Layer 3) | ||
| 339 | 592 | ||
| 340 | ### When to Consolidate | 593 | ```rust |
| 594 | impl SyncManager { | ||
| 595 | /// Called when connection to a relay succeeds - handles both initial connect and reconnect. | ||
| 596 | /// | ||
| 597 | /// Decision tree: | ||
| 598 | /// - Fresh sync (no last_connected OR disconnected >15min): No since filter, full history | ||
| 599 | /// - Quick reconnect (<15min): since = last_connected - 15min | ||
| 600 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | ||
| 601 | let mut index = self.relay_sync_index.write().await; | ||
| 602 | let state = match index.get_mut(relay_url) { | ||
| 603 | Some(s) => s, | ||
| 604 | None => return, // Relay was removed while disconnected | ||
| 605 | }; | ||
| 341 | 606 | ||
| 342 | Track subscription count per connection: | 607 | // Determine if this is a fresh sync or quick reconnect |
| 608 | let is_fresh_sync = state.last_connected.is_none() || state.should_clear_state(); | ||
| 609 | let last_connected = state.last_connected; | ||
| 343 | 610 | ||
| 344 | ```rust | 611 | if is_fresh_sync && state.last_connected.is_some() { |
| 345 | struct ConnectionState { | 612 | // Stale reconnect (>15min) - clear state |
| 346 | relay_url: RelayUrl, | 613 | tracing::info!("Reconnect after >15min for {}, clearing state for fresh sync", relay_url); |
| 347 | subscriptions: Vec<SubscriptionId>, | 614 | state.clear_sync_state(); |
| 348 | total_filter_count: usize, | 615 | } |
| 349 | } | ||
| 350 | 616 | ||
| 351 | impl ConnectionState { | 617 | // Update connection state |
| 352 | fn should_consolidate(&self) -> bool { | 618 | state.connection_status = ConnectionStatus::Connected; |
| 353 | self.total_filter_count > 150 | 619 | state.last_connected = Some(Timestamp::now()); |
| 354 | } | 620 | state.disconnected_at = None; |
| 355 | 621 | ||
| 356 | async fn consolidate(&mut self) { | 622 | // Record success in health tracker |
| 357 | // Close all subscriptions | 623 | self.health_tracker.record_success(relay_url); |
| 358 | // Rebuild from scratch with current database state | ||
| 359 | } | ||
| 360 | } | ||
| 361 | ``` | ||
| 362 | 624 | ||
| 363 | ## Negentropy Catchup | 625 | drop(index); // Release lock |
| 364 | 626 | ||
| 365 | ### NIP-77 Reconciliation Protocol | 627 | let conn = match self.connections.get(relay_url) { |
| 628 | Some(c) => c, | ||
| 629 | None => return, | ||
| 630 | }; | ||
| 366 | 631 | ||
| 367 | Negentropy enables efficient set reconciliation - discovering which events we're missing without transferring full event lists. | 632 | if is_fresh_sync { |
| 633 | // Fresh sync: Layer 1 without since, Layer 2+3 handled by compute_actions | ||
| 368 | 634 | ||
| 369 | ### Timing | 635 | // Step 1: Subscribe Layer 1 (announcements) without since |
| 636 | let layer1 = build_announcement_filter(None); | ||
| 637 | let _ = conn.client.subscribe(layer1, None).await; | ||
| 370 | 638 | ||
| 371 | | Trigger | Behavior | | 639 | // Step 2: compute_actions will handle Layer 2+3 (with since=None in build) |
| 372 | | ------------------- | -------------------------------------------------------------------- | | 640 | self.recompute_actions_for_relay(relay_url).await; |
| 373 | | **Initial startup** | Warm-up delay, staggered if many filters, initializes daily schedule | | 641 | } else { |
| 374 | | **After reconnect** | Delay to avoid rate limiting, limited to events from last 3 days | | 642 | // Quick reconnect: Layer 1 with since, Layer 2+3 with since |
| 375 | | **Daily** | Staggered batches, max 100 tagged events per filter | | 643 | let since = last_connected |
| 644 | .map(|ts| Timestamp::from(ts.as_u64().saturating_sub(900))) | ||
| 645 | .unwrap_or(Timestamp::from(0)); | ||
| 376 | 646 | ||
| 377 | ### Startup Flow | 647 | // Step 1: Subscribe Layer 1 (announcements) with since |
| 648 | let layer1 = build_announcement_filter(Some(since)); | ||
| 649 | let _ = conn.client.subscribe(layer1, None).await; | ||
| 378 | 650 | ||
| 379 | ```mermaid | 651 | // Step 2: Rebuild Layer 2+3 for confirmed items with since |
| 380 | sequenceDiagram | 652 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; |
| 381 | participant Server | ||
| 382 | participant SyncManager | ||
| 383 | participant Relay | ||
| 384 | |||
| 385 | Server->>SyncManager: Start | ||
| 386 | SyncManager->>SyncManager: Wait warm-up delay | ||
| 387 | SyncManager->>SyncManager: Build batched filters | ||
| 388 | |||
| 389 | loop For each relay with stagger delay | ||
| 390 | SyncManager->>Relay: NEG-OPEN with filter batch 1 | ||
| 391 | Relay-->>SyncManager: NEG-MSG with differences | ||
| 392 | SyncManager->>Relay: NEG-MSG response | ||
| 393 | Note over SyncManager,Relay: Reconciliation rounds | ||
| 394 | Relay-->>SyncManager: NEG-CLOSE or events | ||
| 395 | SyncManager->>SyncManager: Validate + store events | ||
| 396 | |||
| 397 | alt More batches | ||
| 398 | SyncManager->>SyncManager: Wait stagger delay | ||
| 399 | SyncManager->>Relay: NEG-OPEN with next batch | ||
| 400 | end | ||
| 401 | end | ||
| 402 | 653 | ||
| 403 | SyncManager->>SyncManager: Schedule daily catchup | 654 | // Step 3: Check for NEW items via compute_actions |
| 404 | ``` | 655 | self.recompute_actions_for_relay(relay_url).await; |
| 656 | } | ||
| 657 | } | ||
| 405 | 658 | ||
| 406 | ### Reconnection Catchup | 659 | /// Rebuild Layer 2+3 subscriptions only (NOT Layer 1). |
| 660 | /// Used by: | ||
| 661 | /// - Quick reconnect: rebuild confirmed items with since filter | ||
| 662 | /// - Consolidation: close and rebuild with since filter | ||
| 663 | async fn rebuild_layer2_and_layer3(&mut self, relay_url: &str, since: Option<Timestamp>) { | ||
| 664 | let confirmed = self.relay_sync_index.read().await; | ||
| 665 | let state = match confirmed.get(relay_url) { | ||
| 666 | Some(s) => s, | ||
| 667 | None => return, | ||
| 668 | }; | ||
| 407 | 669 | ||
| 408 | After connection reestablished: | 670 | // Build Layer 2+3 filters WITH since |
| 671 | let filters = build_layer2_and_layer3_filters(&state.repos, &state.root_events, since); | ||
| 672 | drop(confirmed); | ||
| 409 | 673 | ||
| 410 | ```rust | 674 | // Subscribe directly - no PendingBatch for catch-up (items already confirmed) |
| 411 | async fn catchup_after_reconnect(&self, relay: &RelayUrl) { | 675 | let conn = match self.connections.get(relay_url) { |
| 412 | // Delay to avoid immediate disconnect for too many requests | 676 | Some(c) => c, |
| 413 | tokio::time::sleep(RECONNECT_CATCHUP_DELAY).await; | 677 | None => return, |
| 678 | }; | ||
| 679 | |||
| 680 | for filter in filters { | ||
| 681 | let _ = conn.client.subscribe(filter, None).await; | ||
| 682 | } | ||
| 683 | } | ||
| 684 | |||
| 685 | /// Rerun compute_actions for a specific relay and process resulting AddFilters. | ||
| 686 | /// compute_actions builds Layer 2+3 filters for NEW items not yet in confirmed state. | ||
| 687 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { | ||
| 688 | let repo_index = self.repo_sync_index.read().await; | ||
| 689 | let targets = derive_relay_targets(&repo_index); | ||
| 690 | drop(repo_index); | ||
| 691 | |||
| 692 | // Filter to just this relay | ||
| 693 | let target = match targets.get(relay_url) { | ||
| 694 | Some(t) => t.clone(), | ||
| 695 | None => return, // No repos reference this relay anymore | ||
| 696 | }; | ||
| 414 | 697 | ||
| 415 | // Only catch up on recent events (last 3 days) | 698 | let pending = self.pending_sync_index.read().await; |
| 416 | let since = Timestamp::now() - Duration::from_secs(3 * 24 * 60 * 60); | 699 | let confirmed = self.relay_sync_index.read().await; |
| 417 | 700 | ||
| 418 | let filters = self.build_filters_for_relay(relay) | 701 | let mut single_relay_targets = HashMap::new(); |
| 419 | .into_iter() | 702 | single_relay_targets.insert(relay_url.to_string(), target); |
| 420 | .map(|f| f.since(since)) | ||
| 421 | .collect(); | ||
| 422 | 703 | ||
| 423 | self.run_negentropy(relay, filters).await; | 704 | let actions = compute_actions(&single_relay_targets, &pending, &confirmed); |
| 705 | |||
| 706 | drop(pending); | ||
| 707 | drop(confirmed); | ||
| 708 | |||
| 709 | // Process AddFilters | ||
| 710 | for action in actions { | ||
| 711 | self.handle_add_filters(action).await; | ||
| 712 | } | ||
| 713 | } | ||
| 424 | } | 714 | } |
| 425 | ``` | 715 | ``` |
| 426 | 716 | ||
| 427 | ### Daily Catchup Schedule | 717 | ### 7. Daily Timer |
| 428 | 718 | ||
| 429 | ```rust | 719 | ```rust |
| 430 | // Daily catchup runs at consistent time, staggered across relays | 720 | impl SyncManager { |
| 431 | async fn schedule_daily_catchup(&self) { | 721 | async fn run_daily_timer(&self) { |
| 432 | let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); | 722 | loop { |
| 723 | // Random 23-25 hours | ||
| 724 | let hours = 23.0 + rand::random::<f64>() * 2.0; | ||
| 725 | tokio::time::sleep(Duration::from_secs_f64(hours * 3600.0)).await; | ||
| 726 | |||
| 727 | let relay_urls: Vec<_> = self.relay_sync_index.read().await | ||
| 728 | .keys() | ||
| 729 | .cloned() | ||
| 730 | .collect(); | ||
| 731 | |||
| 732 | for relay_url in relay_urls { | ||
| 733 | self.daily_sync(&relay_url).await; | ||
| 734 | } | ||
| 735 | } | ||
| 736 | } | ||
| 433 | 737 | ||
| 434 | loop { | 738 | /// Perform daily fresh sync for a relay |
| 435 | interval.tick().await; | 739 | async fn daily_sync(&mut self, relay_url: &str) { |
| 740 | tracing::info!("Daily sync triggered for {}", relay_url); | ||
| 436 | 741 | ||
| 437 | for (i, relay) in self.healthy_relays().enumerate() { | 742 | // Close all subscriptions |
| 438 | // Stagger: 5 minute delay between relays | 743 | if let Some(conn) = self.connections.get(relay_url) { |
| 439 | tokio::time::sleep(Duration::from_secs(i as u64 * 300)).await; | 744 | conn.client.unsubscribe_all().await; |
| 745 | } | ||
| 440 | 746 | ||
| 441 | // Batch filters to max 100 tagged events each | 747 | // Clear PendingSyncIndex |
| 442 | let batches = self.build_batched_filters(&relay, 100); | 748 | self.pending_sync_index.write().await.remove(relay_url); |
| 443 | 749 | ||
| 444 | for batch in batches { | 750 | // Clear confirmed state - triggers fresh sync |
| 445 | self.run_negentropy(&relay, batch).await; | 751 | { |
| 446 | tokio::time::sleep(Duration::from_secs(60)).await; // 1 min between batches | 752 | let mut index = self.relay_sync_index.write().await; |
| 753 | if let Some(state) = index.get_mut(relay_url) { | ||
| 754 | state.clear_sync_state(); | ||
| 447 | } | 755 | } |
| 448 | } | 756 | } |
| 757 | |||
| 758 | // Recompute actions - will generate AddFilters for everything | ||
| 759 | self.recompute_actions_for_relay(relay_url).await; | ||
| 449 | } | 760 | } |
| 450 | } | 761 | } |
| 451 | ``` | 762 | ``` |
| 452 | 763 | ||
| 453 | ## Event Processing | 764 | ### 8. Consolidation (Threshold-Based, Triggered on Add) |
| 454 | 765 | ||
| 455 | ### Acceptance Policy | 766 | Consolidation is checked when adding new subscriptions, not periodically. **Key insight**: Consolidation only closes and rebuilds Layer 2+3 - Layer 1 remains active. |
| 456 | 767 | ||
| 457 | All synced events go through our acceptance policy, reusing the same [`Nip34WritePolicy`](../../src/nostr/builder.rs:36) validation logic used for direct client submissions. | 768 | ```rust |
| 769 | impl SyncManager { | ||
| 770 | /// Check filter count and consolidate if needed. | ||
| 771 | /// Called from handle_add_filters BEFORE adding new filters. | ||
| 772 | async fn maybe_consolidate(&mut self, relay_url: &str, new_filter_count: usize) { | ||
| 773 | let current_count = self.get_filter_count(relay_url).await; | ||
| 458 | 774 | ||
| 459 | #### Design: Reusing admit_event() | 775 | if current_count + new_filter_count > 70 { |
| 776 | self.consolidate(relay_url).await; | ||
| 777 | } | ||
| 778 | } | ||
| 460 | 779 | ||
| 461 | The [`WritePolicy::admit_event()`](../../src/nostr/builder.rs:256-269) trait method takes a `SocketAddr` parameter designed for client connections: | 780 | /// Consolidate filters - only rebuilds Layer 2+3, Layer 1 stays active. |
| 781 | /// Does NOT clear state - just reduces filter count. | ||
| 782 | async fn consolidate(&mut self, relay_url: &str) { | ||
| 783 | tracing::info!("Consolidating filters for {} (count > 70)", relay_url); | ||
| 462 | 784 | ||
| 463 | ```rust | 785 | // Wait for all pending batches to complete first |
| 464 | // From nostr-relay-builder WritePolicy trait | 786 | self.wait_pending_complete(relay_url).await; |
| 465 | fn admit_event<'a>( | 787 | |
| 466 | &'a self, | 788 | // Close Layer 2+3 subscriptions only - Layer 1 remains active |
| 467 | event: &'a Event, | 789 | // NOTE: In practice, we close all then re-add Layer 1, or track sub IDs separately |
| 468 | _addr: &'a SocketAddr, // Unused in our implementation | 790 | // For simplicity, we close all and re-add Layer 1 |
| 469 | ) -> BoxedFuture<'a, PolicyResult>; | 791 | if let Some(conn) = self.connections.get(relay_url) { |
| 792 | conn.client.unsubscribe_all().await; | ||
| 793 | } | ||
| 794 | |||
| 795 | // Re-subscribe Layer 1 with since (maintains announcements stream) | ||
| 796 | let since = Timestamp::from(Timestamp::now().as_u64().saturating_sub(900)); | ||
| 797 | let conn = self.connections.get(relay_url).unwrap(); | ||
| 798 | let layer1 = build_announcement_filter(Some(since)); | ||
| 799 | let _ = conn.client.subscribe(layer1, None).await; | ||
| 800 | |||
| 801 | // Rebuild Layer 2+3 only | ||
| 802 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | ||
| 803 | } | ||
| 804 | } | ||
| 470 | ``` | 805 | ``` |
| 471 | 806 | ||
| 472 | For synced events from remote relays, we pass a **synthetic localhost address** since: | 807 | **Updated handle_add_filters to check consolidation:** |
| 473 | 1. The `_addr` parameter is currently unused in our [`Nip34WritePolicy`](../../src/nostr/builder.rs:259) | ||
| 474 | 2. All meaningful validation is done by the modular sub-policies (see below) | ||
| 475 | 3. This allows reusing 100% of the existing validation logic | ||
| 476 | 808 | ||
| 477 | ```rust | 809 | ```rust |
| 478 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 810 | impl SyncManager { |
| 479 | 811 | async fn handle_add_filters(&mut self, action: AddFilters) { | |
| 480 | /// Synthetic address for synced events (not from a direct client connection) | 812 | let AddFilters { relay_url, repos, root_events, filters } = action; |
| 481 | const SYNC_SOURCE_ADDR: SocketAddr = SocketAddr::new( | 813 | |
| 482 | IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), | 814 | // Auto-spawn connection if needed (unchanged) |
| 483 | 0 | 815 | let state = self.relay_sync_index.read().await.get(&relay_url).cloned(); |
| 484 | ); | 816 | match state { |
| 485 | 817 | None => { | |
| 486 | async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { | 818 | // New relay discovered - create entry and spawn connection |
| 487 | // Apply our Nip34WritePolicy using synthetic address | 819 | self.relay_sync_index.write().await.insert( |
| 488 | // The SocketAddr is unused - all validation is by the modular sub-policies | 820 | relay_url.clone(), |
| 489 | let result = self.acceptance_policy | 821 | RelayState { |
| 490 | .admit_event(&event, &SYNC_SOURCE_ADDR) | 822 | repos: HashSet::new(), |
| 491 | .await; | 823 | root_events: HashSet::new(), |
| 492 | 824 | is_bootstrap: false, | |
| 493 | match result { | 825 | connection_status: ConnectionStatus::Connecting, |
| 494 | PolicyResult::Accept => { | 826 | last_connected: None, |
| 495 | self.database.save_event(&event).await?; | 827 | disconnected_at: None, |
| 496 | tracing::debug!( | 828 | connection: None, |
| 497 | "Accepted synced event {} from {}", | 829 | } |
| 498 | event.id.to_hex(), | 830 | ); |
| 499 | source_relay | 831 | self.spawn_connection(&relay_url).await; |
| 500 | ); | 832 | return; // Subscriptions will happen on connection success |
| 501 | self.trigger_subscription_updates(&event).await; | 833 | } |
| 834 | Some(state) if state.connection_status != ConnectionStatus::Connected => { | ||
| 835 | return; // Not connected - subscriptions will happen on connection success | ||
| 836 | } | ||
| 837 | Some(_) => { | ||
| 838 | // Already connected - proceed | ||
| 839 | } | ||
| 502 | } | 840 | } |
| 503 | PolicyResult::Reject(reason) => { | 841 | |
| 504 | tracing::debug!( | 842 | // CHECK CONSOLIDATION BEFORE ADDING |
| 505 | "Rejected synced event {} from {}: {}", | 843 | self.maybe_consolidate(&relay_url, filters.len()).await; |
| 506 | event.id.to_hex(), | 844 | |
| 507 | source_relay, | 845 | // Subscribe and collect subscription IDs |
| 508 | reason | 846 | let conn = self.connections.get(&relay_url).unwrap(); |
| 509 | ); | 847 | let mut sub_ids = HashSet::new(); |
| 848 | |||
| 849 | for filter in filters { | ||
| 850 | match conn.client.subscribe(filter, None).await { | ||
| 851 | Ok(output) => { | ||
| 852 | for sub_id in output.val { | ||
| 853 | sub_ids.insert(sub_id); | ||
| 854 | } | ||
| 855 | } | ||
| 856 | Err(e) => { | ||
| 857 | tracing::warn!("Failed to subscribe: {}", e); | ||
| 858 | } | ||
| 859 | } | ||
| 510 | } | 860 | } |
| 511 | } | ||
| 512 | 861 | ||
| 513 | Ok(()) | 862 | // Create pending batch (unchanged) |
| 863 | let batch = PendingBatch { | ||
| 864 | batch_id: self.next_batch_id(), | ||
| 865 | items: PendingItems { repos, root_events }, | ||
| 866 | outstanding_subs: sub_ids, | ||
| 867 | }; | ||
| 868 | |||
| 869 | self.pending_sync_index.write().await | ||
| 870 | .entry(relay_url) | ||
| 871 | .or_default() | ||
| 872 | .push(batch); | ||
| 873 | } | ||
| 514 | } | 874 | } |
| 515 | ``` | 875 | ``` |
| 516 | 876 | ||
| 517 | #### Modular Sub-Policies | 877 | --- |
| 518 | |||
| 519 | The [`Nip34WritePolicy`](../../src/nostr/builder.rs:36-42) delegates to specialized sub-policies in [`src/nostr/policy/`](../../src/nostr/policy/mod.rs:1-41): | ||
| 520 | 878 | ||
| 521 | | Sub-Policy | Kinds | Responsibility | | 879 | ## Disconnect (Relay Removal) Handling |
| 522 | |------------|-------|----------------| | ||
| 523 | | [`AnnouncementPolicy`](../../src/nostr/policy/announcement.rs:24-27) | 30617 | Validates service listing, maintainer exception, creates bare repos | | ||
| 524 | | [`StatePolicy`](../../src/nostr/policy/state.rs:43-46) | 30618 | Validates state structure, aligns git refs with authorized state | | ||
| 525 | | [`PrEventPolicy`](../../src/nostr/policy/pr_event.rs) | 1618, 1619 | Validates PR/PR Update events, manages refs/nostr/* | | ||
| 526 | | [`RelatedEventPolicy`](../../src/nostr/policy/related.rs:25-29) | All others | Checks forward/backward references to accepted repos/events | | ||
| 527 | 880 | ||
| 528 | All sub-policies share a common [`PolicyContext`](../../src/nostr/policy/mod.rs:22-27) containing: | 881 | ```rust |
| 529 | - `domain`: Our service domain for validation | 882 | impl SyncManager { |
| 530 | - `database`: For querying existing events | 883 | /// Periodically check for relays that should be disconnected |
| 531 | - `git_data_path`: For git operations | 884 | async fn check_disconnects(&mut self) { |
| 885 | let confirmed = self.relay_sync_index.read().await; | ||
| 886 | let relays_to_disconnect: Vec<_> = confirmed.iter() | ||
| 887 | .filter(|(_, state)| { | ||
| 888 | !state.is_bootstrap && | ||
| 889 | state.repos.is_empty() && | ||
| 890 | state.root_events.is_empty() | ||
| 891 | }) | ||
| 892 | .map(|(url, _)| url.clone()) | ||
| 893 | .collect(); | ||
| 894 | drop(confirmed); | ||
| 895 | |||
| 896 | for relay_url in relays_to_disconnect { | ||
| 897 | self.disconnect_relay(&relay_url).await; | ||
| 898 | } | ||
| 899 | } | ||
| 532 | 900 | ||
| 533 | #### Why Not Call Sub-Policies Directly? | 901 | async fn disconnect_relay(&mut self, relay_url: &str) { |
| 902 | tracing::info!("Disconnecting relay {} (no repos)", relay_url); | ||
| 534 | 903 | ||
| 535 | While we could bypass `admit_event()` and call sub-policies directly: | 904 | self.relay_sync_index.write().await.remove(relay_url); |
| 905 | self.pending_sync_index.write().await.remove(relay_url); | ||
| 536 | 906 | ||
| 537 | ```rust | 907 | if let Some(conn) = self.connections.remove(relay_url) { |
| 538 | // Alternative: Direct sub-policy calls (NOT recommended) | 908 | let _ = conn.client.disconnect().await; |
| 539 | match event.kind.as_u16() { | 909 | } |
| 540 | 30617 => self.announcement_policy.validate(&event).await, | 910 | } |
| 541 | 30618 => self.state_policy.validate(&event), | ||
| 542 | 1618 | 1619 => self.pr_event_policy.validate_nostr_ref(&event).await, | ||
| 543 | _ => self.related_event_policy.check_references(&event).await, | ||
| 544 | } | 911 | } |
| 545 | ``` | 912 | ``` |
| 546 | 913 | ||
| 547 | This is **not recommended** because: | 914 | --- |
| 548 | 1. Duplicates the kind-routing logic from [`admit_event()`](../../src/nostr/builder.rs:261-268) | ||
| 549 | 2. Misses important post-validation steps (e.g., `handle_announcement()` also calls `ensure_bare_repository()`) | ||
| 550 | 3. Creates maintenance burden when policy logic changes | ||
| 551 | 915 | ||
| 552 | ## Module Structure | 916 | ## State Flow Summary |
| 553 | 917 | ||
| 554 | ### New `src/sync/` Module | 918 | ```mermaid |
| 919 | flowchart TB | ||
| 920 | subgraph Input | ||
| 921 | SS[SelfSubscriber] | ||
| 922 | OWN[Own Relay] | ||
| 923 | end | ||
| 555 | 924 | ||
| 556 | ``` | 925 | subgraph RepoSyncIndex - What We Want |
| 557 | src/ | 926 | RSI[HashMap: Repo to Relays+Events] |
| 558 | ├── sync/ | 927 | end |
| 559 | │ ├── mod.rs # Module exports | ||
| 560 | │ ├── manager.rs # SyncManager - main coordinator | ||
| 561 | │ ├── connection.rs # Per-relay connection handling | ||
| 562 | │ ├── filter.rs # Filter building and batching | ||
| 563 | │ ├── health.rs # RelayHealth tracking | ||
| 564 | │ ├── negentropy.rs # NIP-77 reconciliation logic | ||
| 565 | │ └── subscription.rs # Dynamic subscription management | ||
| 566 | ├── nostr/ | ||
| 567 | │ └── ... (existing) | ||
| 568 | └── ... | ||
| 569 | ``` | ||
| 570 | 928 | ||
| 571 | ### Integration with Main Binary | 929 | subgraph Derived Target |
| 930 | DT[derive_relay_targets fn] | ||
| 931 | TGT[Per-relay: repos + events we should sync] | ||
| 932 | end | ||
| 572 | 933 | ||
| 573 | ```rust | 934 | subgraph compute_actions - Decision Point |
| 574 | // In main.rs | 935 | CA[Three-way diff: target - pending - confirmed] |
| 575 | async fn main() -> Result<()> { | 936 | end |
| 576 | // ... existing setup ... | ||
| 577 | 937 | ||
| 578 | // Start sync manager as background task | 938 | subgraph PendingSyncIndex - In Flight |
| 579 | let sync_manager = SyncManager::new( | 939 | PSI[Vec PendingBatch per relay] |
| 580 | database.clone(), | 940 | end |
| 581 | config.domain.clone(), | ||
| 582 | ); | ||
| 583 | 941 | ||
| 584 | tokio::spawn(async move { | 942 | subgraph RelaySyncIndex - Confirmed State |
| 585 | sync_manager.run().await | 943 | RLI[RelayState per relay] |
| 586 | }); | 944 | CONN[connection_status] |
| 945 | REPOS[repos + root_events] | ||
| 946 | TIMES[last_connected + disconnected_at] | ||
| 947 | end | ||
| 587 | 948 | ||
| 588 | // ... rest of server startup ... | 949 | SS -->|subscribe| OWN |
| 589 | } | 950 | OWN -->|events| SS |
| 951 | SS -->|batch fires| RSI | ||
| 952 | RSI --> DT | ||
| 953 | DT --> TGT | ||
| 954 | TGT --> CA | ||
| 955 | PSI --> CA | ||
| 956 | RLI --> CA | ||
| 957 | CA -->|Layer 2+3 new items| AF[AddFilters] | ||
| 958 | AF -->|check filter count| CONSOL{count + new > 70?} | ||
| 959 | CONSOL -->|yes| CONSOLIDATE[consolidate] | ||
| 960 | CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] | ||
| 961 | L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] | ||
| 962 | CONSOL -->|no| SUB[subscribe] | ||
| 963 | AF -->|spawn if needed| CONN | ||
| 964 | SUB --> PSI | ||
| 965 | PSI -->|EOSE| REPOS | ||
| 966 | |||
| 967 | CONN -->|disconnect| DISC[Clear PSI + set disconnected_at] | ||
| 968 | DISC -->|any reconnect| HC[handle_connect_or_reconnect] | ||
| 969 | |||
| 970 | subgraph handle_connect_or_reconnect | ||
| 971 | HC --> FRESH_CHECK{is_fresh_sync?} | ||
| 972 | FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since] | ||
| 973 | FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since] | ||
| 974 | L1_FRESH --> RCA1[recompute_actions_for_relay] | ||
| 975 | L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since] | ||
| 976 | L23_QUICK --> RCA2[recompute_actions_for_relay] | ||
| 977 | end | ||
| 590 | ``` | 978 | ``` |
| 591 | 979 | ||
| 592 | ## Metrics & Observability | 980 | --- |
| 981 | |||
| 982 | ## Key Design Decisions | ||
| 983 | |||
| 984 | | Decision | Choice | Rationale | | ||
| 985 | | -------------------------- | -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | | ||
| 986 | | Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | | ||
| 987 | | Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | | ||
| 988 | | Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | | ||
| 989 | | Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | | ||
| 990 | | Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | | ||
| 991 | | Layer 2+3 tags | tagged_one_of_our_repo_event_filters, tagged_one_of_our_root_event_filters | Descriptive names, uses a/A/q for repos, e/E/q for events | | ||
| 992 | | Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | | ||
| 993 | | compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | | ||
| 994 | | Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | | ||
| 995 | | Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | | ||
| 996 | | Consolidation Layer 1 | Re-subscribe with since after unsubscribe_all | Maintains announcement stream | | ||
| 997 | | Consolidation Layer 2+3 | rebuild_layer2_and_layer3 with since | Shared logic with quick_reconnect | | ||
| 998 | | Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect | | ||
| 999 | | 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions | | ||
| 1000 | | Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift | | ||
| 1001 | | Connection spawning | Via AddFilters handler | Single path for new relay discovery | | ||
| 1002 | | Self-subscriber reconnect | Use since-15min filter | Simpler than immediate RepoSyncIndex updates | | ||
| 593 | 1003 | ||
| 594 | All sync metrics are exposed via Prometheus at `/metrics`. For <100 relays, per-relay labels are acceptable cardinality. | 1004 | --- |
| 595 | 1005 | ||
| 596 | ### Prometheus Metrics | 1006 | ## Module Structure |
| 597 | 1007 | ||
| 598 | ```rust | 1008 | ``` |
| 599 | /// Sync module metrics registered with the global Prometheus registry | 1009 | src/sync/ |
| 600 | pub struct SyncMetrics { | 1010 | ├── mod.rs # SyncManager, main loop |
| 601 | // === Connection Metrics (per relay) === | 1011 | ├── state.rs # RepoSyncIndex, RelaySyncIndex, PendingSyncIndex types |
| 602 | /// Active outbound connections: ngit_sync_relay_connected{relay="wss://..."} | 1012 | ├── actions.rs # AddFilters struct, compute_actions, build_filters |
| 603 | relay_connected: IntGaugeVec, // labels: [relay] | 1013 | ├── self_subscriber.rs # SelfSubscriber, batching logic |
| 1014 | ├── relay_connection.rs # Per-relay WebSocket connection | ||
| 1015 | ├── consolidation.rs # Consolidation logic, daily timer | ||
| 1016 | ├── health.rs # Health tracking (reuse from v2) | ||
| 1017 | └── metrics.rs # Prometheus metrics (reuse from v2) | ||
| 1018 | ``` | ||
| 604 | 1019 | ||
| 605 | /// Connection attempts: ngit_sync_connection_attempts_total{relay="wss://...", result="success|failure"} | 1020 | --- |
| 606 | connection_attempts: CounterVec, // labels: [relay, result] | ||
| 607 | 1021 | ||
| 608 | // === Relay Health Status === | 1022 | ## Comparison: v3 vs v4 |
| 609 | /// Current status: ngit_sync_relay_status{relay="wss://...", status="healthy|backoff|dead"} | ||
| 610 | relay_status: IntGaugeVec, // labels: [relay, status] | ||
| 611 | 1023 | ||
| 612 | /// Consecutive failures: ngit_sync_relay_failures{relay="wss://..."} | 1024 | | Aspect | v3 | v4 | |
| 613 | relay_failures: IntGaugeVec, // labels: [relay] | 1025 | | ------------------------ | ----------------------------------------- | --------------------------------------------- | |
| 1026 | | Connect handling | Separate initial vs reconnect | Unified handle_connect_or_reconnect | | ||
| 1027 | | Layer 1 handling | Mixed with other layers | Separate build_layer1_filter, always included | | ||
| 1028 | | Layer 2+3 tags | Basic a/e tags | Comprehensive a/A/q and e/E/q per v2 | | ||
| 1029 | | Rebuild logic | Duplicated in reconnect and consolidation | Shared rebuild_all_subscriptions method | | ||
| 1030 | | Consolidation trigger | Maybe periodic | On filter add in handle_add_filters | | ||
| 1031 | | Since filter application | Applied in handle_reconnect | build_all_filters with optional since | | ||
| 1032 | | PSI clearing | On disconnect | On disconnect (confirmed) | | ||
| 1033 | | Daily timer | Consolidation-style | Fresh sync (different from consolidation) | | ||
| 614 | 1034 | ||
| 615 | // === Event Source Tracking === | 1035 | --- |
| 616 | /// Events received by source: ngit_sync_events_total{source="direct|live_sync|catchup|daily_catchup"} | ||
| 617 | events_total: CounterVec, // labels: [source] | ||
| 618 | 1036 | ||
| 619 | /// Sync gap events (should have been live synced): ngit_sync_gap_events_total{relay="wss://..."} | 1037 | ## Self-Subscriber Flow |
| 620 | sync_gap_events: CounterVec, // labels: [relay] | ||
| 621 | 1038 | ||
| 622 | // === Aggregate Metrics === | 1039 | The SelfSubscriber connects to the own relay and maintains a subscription to discover repos and events. It batches incoming events and triggers compute_actions. |
| 623 | /// Total relays being tracked | ||
| 624 | relays_tracked_total: IntGauge, | ||
| 625 | 1040 | ||
| 626 | /// Relays currently connected | 1041 | ### State Tracking |
| 627 | relays_connected_total: IntGauge, | ||
| 628 | 1042 | ||
| 629 | /// Relays in dead state | 1043 | ```rust |
| 630 | relays_dead_total: IntGauge, | 1044 | pub struct SelfSubscriber { |
| 1045 | own_relay_url: String, | ||
| 1046 | relay_domain: String, | ||
| 1047 | repo_sync_index: RepoSyncIndex, | ||
| 1048 | pending_sync_index: PendingSyncIndex, | ||
| 1049 | relay_sync_index: RelaySyncIndex, | ||
| 1050 | action_tx: mpsc::Sender<AddFilters>, | ||
| 1051 | /// Timestamp of last successful connection - used for since filter on reconnection | ||
| 1052 | last_connected: Option<Timestamp>, | ||
| 1053 | /// The active client connection | ||
| 1054 | client: Option<Client>, | ||
| 631 | } | 1055 | } |
| 632 | ``` | 1056 | ``` |
| 633 | 1057 | ||
| 634 | ### Metric Definitions | 1058 | ### On Startup / Reconnect (Unified) |
| 635 | |||
| 636 | | Metric | Type | Labels | Description | | ||
| 637 | | ------------------------------------- | ------- | ------------- | ------------------------------------------------------ | | ||
| 638 | | `ngit_sync_relay_connected` | Gauge | relay | 1 if connected, 0 if not | | ||
| 639 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Connection attempt outcomes | | ||
| 640 | | `ngit_sync_relay_status` | Gauge | relay, status | 1 for current status, 0 otherwise | | ||
| 641 | | `ngit_sync_relay_failures` | Gauge | relay | Current consecutive failure count | | ||
| 642 | | `ngit_sync_events_total` | Counter | source | Events received by source type | | ||
| 643 | | `ngit_sync_gap_events_total` | Counter | relay | Events found during catchup that should have been live | | ||
| 644 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered from announcements | | ||
| 645 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected relay count | | ||
| 646 | | `ngit_sync_relays_dead_total` | Gauge | - | Relays marked as dead | | ||
| 647 | 1059 | ||
| 648 | **Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. The `ngit_sync_gap_events_total` metric tracks this per relay. | 1060 | Both initial startup and reconnection use the same `connect_and_subscribe` method: |
| 649 | |||
| 650 | ### Observability Integration | ||
| 651 | 1061 | ||
| 652 | ```rust | 1062 | ```rust |
| 653 | impl SyncManager { | 1063 | impl SelfSubscriber { |
| 654 | fn record_event_received(&self, event: &Event, source: EventSource) { | 1064 | async fn run(mut self) { |
| 655 | match source { | 1065 | loop { |
| 656 | EventSource::DirectSubmission => { | 1066 | // Connect or reconnect |
| 657 | self.metrics.events_total.with_label_values(&["direct"]).inc(); | 1067 | if let Err(e) = self.connect_and_subscribe().await { |
| 1068 | tracing::warn!("Connection failed: {}, will retry", e); | ||
| 1069 | tokio::time::sleep(Duration::from_secs(5)).await; | ||
| 1070 | continue; | ||
| 658 | } | 1071 | } |
| 659 | EventSource::LiveSync(relay) => { | ||
| 660 | self.metrics.events_total.with_label_values(&["live_sync"]).inc(); | ||
| 661 | } | ||
| 662 | EventSource::Catchup(relay) => { | ||
| 663 | // This is a sync gap - we should have gotten it via live sync | ||
| 664 | self.metrics.events_total.with_label_values(&["catchup"]).inc(); | ||
| 665 | self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc(); | ||
| 666 | tracing::warn!( | ||
| 667 | relay = %relay, | ||
| 668 | event_id = %event.id.to_hex(), | ||
| 669 | "Sync gap detected: event found during catchup" | ||
| 670 | ); | ||
| 671 | } | ||
| 672 | EventSource::DailyCatchup(relay) => { | ||
| 673 | // Sustained sync gap - missed by both live sync and initial catchup | ||
| 674 | self.metrics.events_total.with_label_values(&["daily_catchup"]).inc(); | ||
| 675 | self.metrics.sync_gap_events.with_label_values(&[relay.as_str()]).inc(); | ||
| 676 | tracing::error!( | ||
| 677 | relay = %relay, | ||
| 678 | event_id = %event.id.to_hex(), | ||
| 679 | "Sustained sync gap: event found during daily catchup" | ||
| 680 | ); | ||
| 681 | } | ||
| 682 | } | ||
| 683 | } | ||
| 684 | 1072 | ||
| 685 | fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) { | 1073 | // Run event loop until disconnection |
| 686 | let result = if success { "success" } else { "failure" }; | 1074 | self.event_loop().await; |
| 687 | self.metrics.connection_attempts | ||
| 688 | .with_label_values(&[relay.as_str(), result]) | ||
| 689 | .inc(); | ||
| 690 | } | ||
| 691 | 1075 | ||
| 692 | fn update_relay_status(&self, relay: &RelayUrl, status: &RelayStatus) { | 1076 | // Loop will retry connection |
| 693 | // Reset all status labels for this relay | ||
| 694 | for s in ["healthy", "backoff", "dead"] { | ||
| 695 | self.metrics.relay_status | ||
| 696 | .with_label_values(&[relay.as_str(), s]) | ||
| 697 | .set(0); | ||
| 698 | } | 1077 | } |
| 699 | // Set current status | ||
| 700 | let status_label = match status { | ||
| 701 | RelayStatus::Healthy => "healthy", | ||
| 702 | RelayStatus::Backoff { .. } => "backoff", | ||
| 703 | RelayStatus::Dead => "dead", | ||
| 704 | }; | ||
| 705 | self.metrics.relay_status | ||
| 706 | .with_label_values(&[relay.as_str(), status_label]) | ||
| 707 | .set(1); | ||
| 708 | } | 1078 | } |
| 709 | } | ||
| 710 | ``` | ||
| 711 | |||
| 712 | ### Example Grafana Queries | ||
| 713 | 1079 | ||
| 714 | ```promql | 1080 | async fn connect_and_subscribe(&mut self) -> Result<(), Error> { |
| 715 | # Relay health overview - count by status | 1081 | let client = Client::new(Keys::generate()); |
| 716 | sum by (status) (ngit_sync_relay_status == 1) | 1082 | client.add_relay(&self.own_relay_url).await?; |
| 717 | 1083 | client.connect().await; | |
| 718 | # Connection success rate over last hour | 1084 | |
| 719 | sum(rate(ngit_sync_connection_attempts_total{result="success"}[1h])) | 1085 | // Build filter - add since only on reconnect |
| 720 | / sum(rate(ngit_sync_connection_attempts_total[1h])) | 1086 | let filter = Filter::new().kinds([ |
| 721 | 1087 | Kind::Custom(30617), // Repository announcements | |
| 722 | # Sync gap detection - events that should have been live synced | 1088 | Kind::GitPatch, // 1617 |
| 723 | sum(rate(ngit_sync_gap_events_total[1h])) by (relay) | 1089 | Kind::Custom(1618), // PRs |
| 724 | 1090 | Kind::Custom(1619), // PR updates | |
| 725 | # Live sync effectiveness (lower is better - fewer gaps) | 1091 | Kind::GitIssue, // 1621 |
| 726 | sum(rate(ngit_sync_events_total{source=~"catchup|daily_catchup"}[1h])) | 1092 | ]); |
| 727 | / sum(rate(ngit_sync_events_total[1h])) | 1093 | |
| 1094 | let filter = if let Some(ts) = self.last_connected { | ||
| 1095 | // Reconnection: use since filter | ||
| 1096 | let since = Timestamp::from(ts.as_u64().saturating_sub(900)); // -15 min buffer | ||
| 1097 | filter.since(since) | ||
| 1098 | } else { | ||
| 1099 | // Initial connect: no since filter - get full history | ||
| 1100 | filter | ||
| 1101 | }; | ||
| 728 | 1102 | ||
| 729 | # Relays with high failure counts (potential issues) | 1103 | // Update last_connected AFTER computing since |
| 730 | topk(10, ngit_sync_relay_failures) | 1104 | self.last_connected = Some(Timestamp::now()); |
| 731 | 1105 | ||
| 732 | # Alert: relay stuck in dead state | 1106 | client.subscribe(filter, None).await?; |
| 733 | ngit_sync_relay_status{status="dead"} == 1 | 1107 | self.client = Some(client); |
| 1108 | Ok(()) | ||
| 1109 | } | ||
| 1110 | } | ||
| 734 | ``` | 1111 | ``` |
| 735 | 1112 | ||
| 736 | ### Log Levels for Sync Events | 1113 | ### Event Loop with Batching |
| 737 | 1114 | ||
| 738 | | Event | Level | Context | | 1115 | ```rust |
| 739 | | ----------------------- | ----- | ----------------------------- | | 1116 | impl SelfSubscriber { |
| 740 | | Event via live sync | DEBUG | Normal operation | | 1117 | async fn event_loop(&mut self) { |
| 741 | | Event via catchup | WARN | Sync gap detected | | 1118 | let client = self.client.as_ref().unwrap(); |
| 742 | | Event via daily catchup | ERROR | Sustained gap | | 1119 | let mut pending_events: Vec<Event> = Vec::new(); |
| 743 | | Connection established | INFO | Relay URL | | 1120 | let mut batch_timer: Option<Instant> = None; |
| 744 | | Connection failed | WARN | Relay URL, attempt #, backoff | | 1121 | let batch_window = Duration::from_secs(5); |
| 745 | | Relay marked dead | ERROR | Relay URL, failure duration | | 1122 | |
| 746 | | Peer missing events | WARN | Relay URL, repo, count | | 1123 | loop { |
| 1124 | let timeout = batch_timer | ||
| 1125 | .map(|t| batch_window.saturating_sub(t.elapsed())) | ||
| 1126 | .unwrap_or(Duration::from_secs(60)); | ||
| 1127 | |||
| 1128 | tokio::select! { | ||
| 1129 | notification = client.notifications().recv() => { | ||
| 1130 | match notification { | ||
| 1131 | Ok(RelayPoolNotification::Event { event, .. }) => { | ||
| 1132 | pending_events.push(*event); | ||
| 1133 | |||
| 1134 | // Start timer on first event - does NOT reset | ||
| 1135 | if batch_timer.is_none() { | ||
| 1136 | batch_timer = Some(Instant::now()); | ||
| 1137 | } | ||
| 1138 | } | ||
| 1139 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 1140 | // Connection lost | ||
| 1141 | break; | ||
| 1142 | } | ||
| 1143 | _ => {} | ||
| 1144 | } | ||
| 1145 | } | ||
| 1146 | _ = tokio::time::sleep(timeout), if batch_timer.is_some() => { | ||
| 1147 | // Batch window elapsed | ||
| 1148 | self.process_batch(pending_events.drain(..).collect()).await; | ||
| 1149 | batch_timer = None; | ||
| 1150 | } | ||
| 1151 | } | ||
| 1152 | } | ||
| 1153 | } | ||
| 747 | 1154 | ||
| 748 | ## Configuration | 1155 | async fn process_batch(&self, events: Vec<Event>) { |
| 1156 | // 1. Update RepoSyncIndex | ||
| 1157 | for event in events { | ||
| 1158 | match event.kind.as_u16() { | ||
| 1159 | 30617 => self.handle_announcement(&event).await, | ||
| 1160 | 1617 | 1618 | 1619 | 1621 => self.handle_root_event(&event).await, | ||
| 1161 | _ => {} | ||
| 1162 | } | ||
| 1163 | } | ||
| 749 | 1164 | ||
| 750 | ```rust | 1165 | // 2. Derive targets and compute actions |
| 751 | pub struct SyncConfig { | 1166 | let repo_index = self.repo_sync_index.read().await; |
| 752 | /// Warm-up delay before starting initial catchup | 1167 | let targets = derive_relay_targets(&repo_index); |
| 753 | pub startup_delay: Duration, // Default: 30s | ||
| 754 | 1168 | ||
| 755 | /// Delay between filter batches during catchup | 1169 | let pending = self.pending_sync_index.read().await; |
| 756 | pub batch_delay: Duration, // Default: 60s | 1170 | let confirmed = self.relay_sync_index.read().await; |
| 757 | 1171 | ||
| 758 | /// Delay after reconnect before catchup | 1172 | let actions = compute_actions(&targets, &pending, &confirmed); |
| 759 | pub reconnect_delay: Duration, // Default: 10s | ||
| 760 | 1173 | ||
| 761 | /// Maximum events in last N days for reconnect catchup | 1174 | drop(repo_index); |
| 762 | pub reconnect_lookback_days: u32, // Default: 3 | 1175 | drop(pending); |
| 1176 | drop(confirmed); | ||
| 763 | 1177 | ||
| 764 | /// Maximum tagged event IDs per filter | 1178 | // 3. Send actions to SyncManager |
| 765 | pub max_tags_per_filter: usize, // Default: 100 | 1179 | for action in actions { |
| 1180 | let _ = self.action_tx.send(action).await; | ||
| 1181 | } | ||
| 1182 | } | ||
| 766 | 1183 | ||
| 767 | /// Consolidate subscriptions when count exceeds | 1184 | async fn handle_announcement(&self, event: &Event) { |
| 768 | pub max_subscriptions: usize, // Default: 150 | 1185 | // Extract repo_ref from event - 30617:pubkey:identifier |
| 1186 | let d_tag = event.tags.iter() | ||
| 1187 | .find_map(|tag| { | ||
| 1188 | if tag.kind() == TagKind::D { | ||
| 1189 | tag.content().map(|s| s.to_string()) | ||
| 1190 | } else { | ||
| 1191 | None | ||
| 1192 | } | ||
| 1193 | }) | ||
| 1194 | .unwrap_or_default(); | ||
| 1195 | |||
| 1196 | let repo_ref = format!("30617:{}:{}", event.pubkey, d_tag); | ||
| 1197 | |||
| 1198 | // Extract relay URLs from 'r' tags | ||
| 1199 | let relays: HashSet<String> = event.tags.iter() | ||
| 1200 | .filter_map(|tag| { | ||
| 1201 | if tag.kind() == TagKind::Relay { | ||
| 1202 | tag.content().map(|s| s.to_string()) | ||
| 1203 | } else { | ||
| 1204 | None | ||
| 1205 | } | ||
| 1206 | }) | ||
| 1207 | .collect(); | ||
| 1208 | |||
| 1209 | // Update RepoSyncIndex | ||
| 1210 | let mut index = self.repo_sync_index.write().await; | ||
| 1211 | let needs = index.entry(repo_ref).or_default(); | ||
| 1212 | needs.relays = relays; | ||
| 1213 | } | ||
| 769 | 1214 | ||
| 770 | /// Backoff configuration | 1215 | async fn handle_root_event(&self, event: &Event) { |
| 771 | pub max_backoff: Duration, // Default: 1h | 1216 | // Extract repo_ref from 'a' tag |
| 772 | pub dead_threshold: Duration, // Default: 24h | 1217 | let repo_ref = event.tags.iter() |
| 773 | pub dead_retry_interval: Duration, // Default: 24h | 1218 | .find_map(|tag| { |
| 1219 | if tag.kind() == TagKind::A { | ||
| 1220 | tag.content().map(|s| s.to_string()) | ||
| 1221 | } else { | ||
| 1222 | None | ||
| 1223 | } | ||
| 1224 | }); | ||
| 1225 | |||
| 1226 | if let Some(repo_ref) = repo_ref { | ||
| 1227 | let mut index = self.repo_sync_index.write().await; | ||
| 1228 | let needs = index.entry(repo_ref).or_default(); | ||
| 1229 | needs.root_events.insert(event.id); | ||
| 1230 | } | ||
| 1231 | } | ||
| 774 | } | 1232 | } |
| 775 | ``` | 1233 | ``` |
| 776 | 1234 | ||
| 777 | ## Summary | ||
| 778 | |||
| 779 | | Component | Responsibility | | ||
| 780 | | ---------------------- | -------------------------------------------------------------- | | ||
| 781 | | **SyncManager** | Orchestrates connections, triggers catchup, processes events | | ||
| 782 | | **FilterService** | Builds unified filters from database state | | ||
| 783 | | **RelayHealthTracker** | Manages backoff, dead relay detection (in-memory + Prometheus) | | ||
| 784 | | **ConnectionState** | Per-relay WebSocket + subscription management | | ||
| 785 | | **SyncMetrics** | Prometheus metrics for operator visibility | | ||
| 786 | |||
| 787 | ### Key Design Decisions | ||
| 788 | |||
| 789 | 1. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism | ||
| 790 | 2. **Exclude ourselves** from relay list to prevent loops | ||
| 791 | 3. **One connection per relay** with combined filters for efficiency | ||
| 792 | 4. **In-memory health state** with Prometheus metrics for visibility (no database persistence needed for <100 relays) | ||
| 793 | 5. **Graceful degradation on restart** - conservative initial backoff with jitter avoids thundering herd | ||
| 794 | 6. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up | ||
| 795 | 7. **Client-side filtering** for 30617/30618, server-side for Layer 2/3 | ||
| 796 | 8. **Dynamic subscription addition** with periodic consolidation | ||
| 797 | 9. **Custom acceptance policy** excluding rate limiting defaults | ||
| 798 | 10. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps, tracked in Prometheus | ||
| 799 | |||
| 800 | --- | 1235 | --- |
| 801 | 1236 | ||
| 802 | ## Implementation Notes (Phase 6) | 1237 | ## Implementation Notes |
| 803 | |||
| 804 | This section documents the final implementation as of Phase 6 (Observability & Production Readiness). | ||
| 805 | 1238 | ||
| 806 | ### What Was Actually Built | 1239 | This section documents the actual implementation details as of December 2024 (Phases 1-10 complete). |
| 807 | 1240 | ||
| 808 | The implementation closely follows the design document with the following completed components: | 1241 | ### Architectural Decisions During Implementation |
| 809 | 1242 | ||
| 810 | #### Phase 1: Basic Sync (commit b167f1b) | 1243 | **Phase 7 Refactoring**: The `SyncManager::run()` method required refactoring to use `Arc<Mutex<SyncManager>>` for shared access. The daily timer and disconnect checker tasks need to access the manager, so `self` is wrapped after initial setup: |
| 811 | - [`SyncManager`](../../src/sync/manager.rs) - Main coordinator for proactive sync | ||
| 812 | - Bootstrap relay sync via `NGIT_SYNC_BOOTSTRAP_RELAY_URL` configuration | ||
| 813 | - Dynamic relay discovery from repository announcements that list our service | ||
| 814 | - Event validation through existing [`Nip34WritePolicy`](../../src/nostr/builder.rs) | ||
| 815 | 1244 | ||
| 816 | #### Phase 2: Three-Layer Filters (commit bf558b0) | 1245 | ```rust |
| 817 | - [`FilterService`](../../src/sync/filter.rs) - Builds three-layer filter strategy | 1246 | // 7. Wrap self in Arc<Mutex> for sharing with timer task |
| 818 | - Layer 1: All kind 30617+30618 (announcements) | 1247 | let sync_manager = Arc::new(Mutex::new(self)); |
| 819 | - Layer 2: A/a tag filters for repository events | 1248 | ``` |
| 820 | - Layer 3: E/e tag filters for related events (PRs, Issues) | ||
| 821 | - Multi-relay discovery from stored announcements | ||
| 822 | |||
| 823 | #### Phase 3: Health Tracking (commit f639ecf) | ||
| 824 | - [`RelayHealthTracker`](../../src/sync/health.rs) - DashMap-based health tracking | ||
| 825 | - Three states: Healthy → Degraded → Dead | ||
| 826 | - Exponential backoff: 5s → 10s → 20s → ... → max (default 1h) | ||
| 827 | - Dead relay detection after 24h continuous failures | ||
| 828 | - Startup jitter (0-10s) to prevent thundering herd | ||
| 829 | 1249 | ||
| 830 | #### Phase 4: Dynamic Subscriptions (commit a19ff57) | 1250 | This allows background tasks (daily timer, disconnect checker) to acquire the lock when needed while the main event loop handles actions from the self-subscriber. |
| 831 | - [`SubscriptionManager`](../../src/sync/subscription.rs) - Per-connection subscription tracking | ||
| 832 | - Dynamic Layer 2 subscriptions when new announcements arrive | ||
| 833 | - Dynamic Layer 3 subscriptions when new PRs/Issues arrive | ||
| 834 | - Filter consolidation at threshold (150 filters) | ||
| 835 | 1251 | ||
| 836 | #### Phase 5: Catchup & Gap Detection (commit 950c2e4) | 1252 | **Health Module**: The health tracking module was adapted from the v3 implementation at `work/sync-v3/health.rs`. The implementation uses: |
| 837 | - [`NegentropyService`](../../src/sync/negentropy.rs) - Gap-filling catchup operations | ||
| 838 | - Startup catchup (configurable delay) | ||
| 839 | - Reconnection catchup (limited lookback) | ||
| 840 | - Daily catchup (not yet implemented - placeholder) | ||
| 841 | 1253 | ||
| 842 | #### Phase 6: Observability (this phase) | 1254 | - `DashMap` for thread-safe concurrent access without external locking |
| 843 | - [`SyncMetrics`](../../src/sync/metrics.rs) - Full Prometheus integration | 1255 | - Three states: `Healthy`, `Degraded`, `Dead` |
| 844 | - Grafana dashboard panels for sync monitoring | 1256 | - Exponential backoff: `base * 2^(failures-1)`, capped at max_backoff |
| 845 | - Documentation updates | 1257 | - Dead threshold: 24 hours of continuous failures |
| 1258 | - Dead relay retry: Once per 24 hours | ||
| 846 | 1259 | ||
| 847 | ### Differences from Original Design | 1260 | ### Implementation Constants |
| 848 | 1261 | ||
| 849 | 1. **Negentropy (NIP-77)**: Simplified gap-filling was used instead of full NIP-77 negentropy reconciliation, as nostr-sdk 0.44 lacks built-in negentropy support. The current implementation uses timestamp-based catchup queries. | 1262 | | Constant | Value | Purpose | |
| 1263 | | --------------------------------- | ---------- | ------------------------------------------------ | | ||
| 1264 | | `CONSOLIDATION_THRESHOLD` | 70 filters | Maximum filters before triggering consolidation | | ||
| 1265 | | `CONSOLIDATION_WAIT_TIMEOUT_SECS` | 30 seconds | Timeout for pending batches during consolidation | | ||
| 1266 | | `QUICK_RECONNECT_WINDOW_SECS` | 15 minutes | Window for quick reconnect vs fresh sync | | ||
| 1267 | | `DISCONNECT_CHECK_INTERVAL_SECS` | 60 seconds | Interval for checking empty relays to disconnect | | ||
| 1268 | | `DEAD_THRESHOLD_HOURS` | 24 hours | Time before relay marked as dead | | ||
| 1269 | | `BASE_BACKOFF_SECS` | 5 seconds | Base duration for exponential backoff | | ||
| 850 | 1270 | ||
| 851 | 2. **Filter Consolidation Threshold**: Set at 150 filters (as designed) based on typical relay filter limits. | 1271 | ### Daily Timer Randomization |
| 852 | 1272 | ||
| 853 | 3. **Health Tracking**: Implemented exactly as designed - in-memory only (not persisted to database), which is acceptable for production as health state rebuilds quickly on restart. | 1273 | The daily timer uses randomization between 23-25 hours to prevent thundering herd effects when multiple ngit-grasp instances are running: |
| 854 | 1274 | ||
| 855 | 4. **Metric Label Strategy**: Used simpler numeric encoding for health status (1=healthy, 2=degraded, 3=dead) instead of multiple label values per relay, reducing cardinality. | 1275 | ```rust |
| 1276 | let hours = 23.0 + rand::thread_rng().gen::<f64>() * 2.0; | ||
| 1277 | ``` | ||
| 856 | 1278 | ||
| 857 | 5. **Event Source Tracking**: Implemented four source types (`live`, `startup`, `reconnect`, `daily`) instead of the original (`direct`, `live_sync`, `catchup`, `daily_catchup`). | 1279 | ### Bootstrap Relay Protection |
| 858 | 1280 | ||
| 859 | ### Three-Layer Filter Strategy (As Implemented) | 1281 | Bootstrap relays are never disconnected by the cleanup system. The `check_disconnects()` method explicitly filters them out: |
| 860 | 1282 | ||
| 1283 | ```rust | ||
| 1284 | .filter(|(_, state)| { | ||
| 1285 | !state.is_bootstrap && | ||
| 1286 | state.repos.is_empty() && | ||
| 1287 | state.root_events.is_empty() | ||
| 1288 | }) | ||
| 861 | ``` | 1289 | ``` |
| 862 | Layer 1: Discovery Layer | ||
| 863 | ├── Query: kinds [30617, 30618] (announcements) | ||
| 864 | ├── Applied: At startup and during sync | ||
| 865 | └── Purpose: Discover all repositories across network | ||
| 866 | |||
| 867 | Layer 2: Repository Events | ||
| 868 | ├── Query: Events with A/a tags pointing to tracked repos | ||
| 869 | ├── Format: A tag = "30617:<pubkey>:<identifier>" | ||
| 870 | ├── Triggered: When new announcement is accepted | ||
| 871 | └── Purpose: Get PRs, issues, patches for repositories | ||
| 872 | |||
| 873 | Layer 3: Related Events | ||
| 874 | ├── Query: Events with E/e tags pointing to tracked PRs/Issues | ||
| 875 | ├── Triggered: When new PR/Issue is accepted | ||
| 876 | └── Purpose: Get comments, reviews, status updates | ||
| 877 | ``` | ||
| 878 | |||
| 879 | ### Prometheus Metrics (As Implemented) | ||
| 880 | 1290 | ||
| 881 | | Metric | Type | Labels | Description | | 1291 | ### Graceful Shutdown |
| 882 | |--------|------|--------|-------------| | ||
| 883 | | `ngit_sync_relay_connected` | Gauge | relay | Connection status (1/0) | | ||
| 884 | | `ngit_sync_connection_attempts_total` | Counter | relay, result | Attempts by outcome | | ||
| 885 | | `ngit_sync_relay_status` | Gauge | relay | Health state (1/2/3) | | ||
| 886 | | `ngit_sync_relay_failures` | Gauge | relay | Consecutive failures | | ||
| 887 | | `ngit_sync_events_total` | Counter | source | Events by source type | | ||
| 888 | | `ngit_sync_gap_events_total` | Counter | relay | Gap events filled | | ||
| 889 | | `ngit_sync_relays_tracked_total` | Gauge | - | Total relays discovered | | ||
| 890 | | `ngit_sync_relays_connected_total` | Gauge | - | Currently connected | | ||
| 891 | | `ngit_sync_relays_dead_total` | Gauge | - | Dead relay count | | ||
| 892 | 1292 | ||
| 893 | ### Configuration Options (As Implemented) | 1293 | Shutdown uses a tokio broadcast channel for coordinated termination: |
| 894 | 1294 | ||
| 895 | All configuration via environment variables or CLI flags: | 1295 | ```rust |
| 1296 | let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); | ||
| 1297 | ``` | ||
| 896 | 1298 | ||
| 897 | | Option | Type | Default | Description | | 1299 | Each background task (self-subscriber, daily timer, disconnect checker) receives its own `broadcast::Receiver` subscription and monitors for the shutdown signal in its main loop. |
| 898 | |--------|------|---------|-------------| | ||
| 899 | | `NGIT_SYNC_BOOTSTRAP_RELAY_URL` | String | None | Bootstrap relay URL for initial sync | | ||
| 900 | | `NGIT_SYNC_MAX_BACKOFF_SECS` | u64 | 3600 | Max backoff delay (seconds) | | ||
| 901 | | `NGIT_SYNC_STARTUP_DELAY_SECS` | u64 | 30 | Catchup delay after startup | | ||
| 902 | | `NGIT_SYNC_RECONNECT_DELAY_SECS` | u64 | 10 | Catchup delay after reconnect | | ||
| 903 | | `NGIT_SYNC_RECONNECT_LOOKBACK_DAYS` | u64 | 3 | Days to look back on reconnect | | ||
| 904 | 1300 | ||
| 905 | **Note:** Additional relays are automatically discovered from repository announcements (kind 30617) that list our service domain. The bootstrap relay provides an initial sync source but is not required - sync will discover relays from stored announcements. | 1301 | ### Actual Module Structure |
| 906 | 1302 | ||
| 907 | ### Module Structure (As Implemented) | 1303 | The implemented module structure differs from the original spec: |
| 908 | 1304 | ||
| 909 | ``` | 1305 | ``` |
| 910 | src/sync/ | 1306 | src/sync/ |
| 911 | ├── mod.rs # Module exports, constants | 1307 | ├── mod.rs # SyncManager, main loop, index types, metrics |
| 912 | ├── manager.rs # SyncManager - orchestrates sync | 1308 | ├── algorithms.rs # derive_relay_targets, compute_actions, AddFilters |
| 913 | ├── connection.rs # SyncConnection - per-relay WebSocket | 1309 | ├── filters.rs # build_announcement_filter, build_layer2_and_layer3_filters |
| 914 | ├── filter.rs # FilterService - three-layer filters | 1310 | ├── health.rs # RelayHealthTracker, HealthState, exponential backoff |
| 915 | ├── health.rs # RelayHealthTracker - health states | 1311 | ├── relay_connection.rs # RelayConnection, RelayEvent, WebSocket handling |
| 916 | ├── metrics.rs # SyncMetrics - Prometheus integration | 1312 | └── self_subscriber.rs # SelfSubscriber, RelayAction, batching logic |
| 917 | ├── negentropy.rs # NegentropyService - gap-filling | ||
| 918 | └── subscription.rs # SubscriptionManager - dynamic subs | ||
| 919 | ``` | 1313 | ``` |
| 920 | 1314 | ||
| 921 | ### Production Readiness Checklist | 1315 | Key differences from spec: |
| 1316 | |||
| 1317 | - No separate `state.rs` - types are defined in `mod.rs` | ||
| 1318 | - No separate `actions.rs` - moved to `algorithms.rs` | ||
| 1319 | - No separate `consolidation.rs` - consolidation logic in `mod.rs` | ||
| 1320 | - No separate `metrics.rs` - `SyncMetrics` defined in `mod.rs` | ||
| 1321 | |||
| 1322 | ### Deviations from Original v4 Spec | ||
| 1323 | |||
| 1324 | 1. **RelayState lacks `connection` field**: The spec showed `connection: Option<RelayConnection>` in `RelayState`, but the implementation stores connections in a separate `HashMap<String, RelayConnection>` in `SyncManager`. | ||
| 1325 | |||
| 1326 | 2. **SelfSubscriber simplified**: The actual implementation uses `RelayAction` enum (SpawnRelay/AddFilters) rather than directly using `AddFilters` struct. | ||
| 1327 | |||
| 1328 | 3. **Consolidation wait_pending_complete**: The spec described a `wait_pending_complete()` method, but the implementation uses a simpler timeout-based approach checking pending batches. | ||
| 922 | 1329 | ||
| 923 | - [x] All metrics exposed at `/metrics` endpoint | 1330 | 4. **Timestamp API**: Uses `Timestamp::now().as_secs()` instead of `.as_u64()` due to nostr-sdk 0.43 API. |
| 924 | - [x] Health state tracking with configurable backoff | ||
| 925 | - [x] Dead relay detection and minimal retry | ||
| 926 | - [x] Startup jitter to prevent thundering herd | ||
| 927 | - [x] Grafana dashboard with sync panels | ||
| 928 | - [x] Configuration documented | ||
| 929 | - [x] Integration tests passing | ||