diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 11:09:49 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 11:09:49 +0000 |
| commit | 9f23a5677ebb2b0b31b31d6ebecb5d65eae10289 (patch) | |
| tree | dea34cde98ae89963f9e9d9eb705a01f6f6fc433 /docs/explanation | |
| parent | fe85c32b82b6c3e81d43c71d6152ed6bb4edf732 (diff) | |
docs: planed GRASP-2 proactive sync of just events
Diffstat (limited to 'docs/explanation')
| -rw-r--r-- | docs/explanation/grasp-02-proactive-sync.md | 565 |
1 files changed, 565 insertions, 0 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md new file mode 100644 index 0000000..250aece --- /dev/null +++ b/docs/explanation/grasp-02-proactive-sync.md | |||
| @@ -0,0 +1,565 @@ | |||
| 1 | # GRASP-02: Proactive Sync - Design Document | ||
| 2 | |||
| 3 | ## Overview | ||
| 4 | |||
| 5 | GRASP-02 Proactive Sync enables ngit-grasp to maintain live WebSocket connections to other relays listed in repository announcement events, synchronizing NIP-34 related events using both **live sync** (real-time subscriptions) and **negentropy catchup** (NIP-77 set reconciliation). | ||
| 6 | |||
| 7 | This document covers **event syncing only**. Git data syncing is out of scope for this phase. | ||
| 8 | |||
| 9 | ## Goals | ||
| 10 | |||
| 11 | 1. **Data Availability**: Ensure we have all relevant events for repositories we host | ||
| 12 | 2. **Resilience**: Handle relay failures gracefully with backoff and health tracking | ||
| 13 | 3. **Efficiency**: Minimize connections and bandwidth through filter consolidation | ||
| 14 | 4. **Consistency**: Use unified filters for both live sync and negentropy catchup | ||
| 15 | |||
| 16 | ## Architecture Overview | ||
| 17 | |||
| 18 | ```mermaid | ||
| 19 | flowchart TB | ||
| 20 | subgraph ngit-grasp | ||
| 21 | SM[SyncManager] | ||
| 22 | FS[FilterService] | ||
| 23 | RH[RelayHealthTracker] | ||
| 24 | DB[(Database)] | ||
| 25 | AP[AcceptancePolicy] | ||
| 26 | end | ||
| 27 | |||
| 28 | subgraph External Relays | ||
| 29 | R1[relay.example.com] | ||
| 30 | R2[other-grasp.io] | ||
| 31 | R3[nostr.land] | ||
| 32 | end | ||
| 33 | |||
| 34 | SM -->|builds filters| FS | ||
| 35 | SM -->|tracks health| RH | ||
| 36 | SM -->|stores events| DB | ||
| 37 | SM -->|validates| AP | ||
| 38 | |||
| 39 | SM <-->|WebSocket + NEG| R1 | ||
| 40 | SM <-->|WebSocket + NEG| R2 | ||
| 41 | SM <-->|WebSocket + NEG| R3 | ||
| 42 | |||
| 43 | RH -->|persists state| DB | ||
| 44 | ``` | ||
| 45 | |||
| 46 | ## Connection Management | ||
| 47 | |||
| 48 | ### Relay Discovery | ||
| 49 | |||
| 50 | Relays to connect to are discovered from **all stored repository announcements**: | ||
| 51 | |||
| 52 | ```rust | ||
| 53 | // Pseudocode for relay discovery | ||
| 54 | fn discover_relays(database: &Database) -> HashSet<RelayUrl> { | ||
| 55 | let announcements = database.query(Filter::new().kind(30617)); | ||
| 56 | let mut relays = HashSet::new(); | ||
| 57 | |||
| 58 | for announcement in announcements { | ||
| 59 | for relay_url in announcement.relays_tags() { | ||
| 60 | if relay_url != our_domain { // Exclude ourselves | ||
| 61 | relays.insert(relay_url); | ||
| 62 | } | ||
| 63 | } | ||
| 64 | } | ||
| 65 | relays | ||
| 66 | } | ||
| 67 | ``` | ||
| 68 | |||
| 69 | ### Connection Lifecycle | ||
| 70 | |||
| 71 | ```mermaid | ||
| 72 | stateDiagram-v2 | ||
| 73 | [*] --> Connecting: startup/new relay | ||
| 74 | Connecting --> Connected: success | ||
| 75 | Connecting --> Backoff: failure | ||
| 76 | Connected --> Disconnected: connection lost | ||
| 77 | Disconnected --> Backoff: reconnect failed | ||
| 78 | Backoff --> Connecting: backoff timer expires | ||
| 79 | Backoff --> Dead: 24h continuous failures | ||
| 80 | Dead --> Connecting: daily retry timer | ||
| 81 | Connected --> Updating: filter change | ||
| 82 | Updating --> Connected: complete | ||
| 83 | ``` | ||
| 84 | |||
| 85 | ### Health Tracking & Backoff | ||
| 86 | |||
| 87 | | State | Behavior | | ||
| 88 | | ----------- | --------------------------------------------------- | | ||
| 89 | | **Healthy** | Normal operation, immediate reconnect on disconnect | | ||
| 90 | | **Backoff** | Exponential backoff: 1s → 2s → 4s → ... → 1h max | | ||
| 91 | | **Dead** | 24h of continuous failures, retry once per day | | ||
| 92 | |||
| 93 | Health state is **persisted to database** to survive restarts: | ||
| 94 | |||
| 95 | ```rust | ||
| 96 | struct RelayHealth { | ||
| 97 | url: RelayUrl, | ||
| 98 | status: RelayStatus, // Healthy, Backoff, Dead | ||
| 99 | consecutive_failures: u32, | ||
| 100 | last_failure_at: Option<Timestamp>, | ||
| 101 | last_success_at: Option<Timestamp>, | ||
| 102 | next_retry_at: Timestamp, | ||
| 103 | } | ||
| 104 | |||
| 105 | enum RelayStatus { | ||
| 106 | Healthy, | ||
| 107 | Backoff { attempt: u32 }, // backoff = min(2^attempt, 3600) seconds | ||
| 108 | Dead, // retry in 24h | ||
| 109 | } | ||
| 110 | ``` | ||
| 111 | |||
| 112 | ## Filter Strategy | ||
| 113 | |||
| 114 | ### Unified Filters for Live Sync and Negentropy | ||
| 115 | |||
| 116 | The same filter logic is used for both live subscriptions and negentropy reconciliation: | ||
| 117 | |||
| 118 | ```mermaid | ||
| 119 | flowchart LR | ||
| 120 | subgraph Filter Layers | ||
| 121 | F1[Layer 1: All 30617+30618] | ||
| 122 | F2[Layer 2: Events tagging repos via A/a/q] | ||
| 123 | F3[Layer 3: Events tagging PRs/Issues via E/e/q] | ||
| 124 | end | ||
| 125 | |||
| 126 | F1 -->|client-side| AP[Acceptance Policy] | ||
| 127 | F2 -->|server-side| Relay | ||
| 128 | F3 -->|server-side| Relay | ||
| 129 | ``` | ||
| 130 | |||
| 131 | ### Layer 1: Repository Announcements & States | ||
| 132 | |||
| 133 | Get ALL kind 30617 and 30618 events with unified `since` timestamp, then filter client-side through acceptance policy: | ||
| 134 | |||
| 135 | ```rust | ||
| 136 | // Use same since filter as other layers for consistency | ||
| 137 | let layer1_filter = Filter::new() | ||
| 138 | .kinds([Kind::from(30617), Kind::from(30618)]) | ||
| 139 | .since(since_timestamp); // Unified with Layer 2/3 | ||
| 140 | ``` | ||
| 141 | |||
| 142 | **Client-side validation**: Only store events that pass our [`Nip34WritePolicy`](src/nostr/builder.rs:51). | ||
| 143 | |||
| 144 | ### Layer 2: Events Tagging Repositories | ||
| 145 | |||
| 146 | For repo announcements **that list BOTH this relay AND our service**: | ||
| 147 | |||
| 148 | ```rust | ||
| 149 | // Build addressable references: 30617:<pubkey>:<identifier> | ||
| 150 | let repo_refs: Vec<String> = announcements | ||
| 151 | .iter() | ||
| 152 | .filter(|a| a.relays.contains(&this_relay) && a.lists_service(&our_domain)) | ||
| 153 | .map(|a| format!("30617:{}:{}", a.pubkey.to_hex(), a.identifier)) | ||
| 154 | .collect(); | ||
| 155 | |||
| 156 | let layer2_filter = Filter::new() | ||
| 157 | .custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_refs.clone()) | ||
| 158 | .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo_refs)); | ||
| 159 | ``` | ||
| 160 | |||
| 161 | ### Layer 3: Events Tagging Issues/PRs/Patches | ||
| 162 | |||
| 163 | For events that reference PRs, Patches, or Issues from repos we track: | ||
| 164 | |||
| 165 | ```rust | ||
| 166 | // Collect event IDs of PRs, Patches, Issues we've stored | ||
| 167 | let tagged_event_ids: Vec<EventId> = database | ||
| 168 | .query(Filter::new().kinds([1618, 1619, 1621, 1622, 1630])) // PR, PR Update, Issue, Patch, etc. | ||
| 169 | .iter() | ||
| 170 | .filter(|e| references_tracked_repo(e, &announcements)) | ||
| 171 | .map(|e| e.id) | ||
| 172 | .collect(); | ||
| 173 | |||
| 174 | let layer3_filter = Filter::new() | ||
| 175 | .custom_tag(SingleLetterTag::lowercase(Alphabet::E), tagged_event_ids.clone()) | ||
| 176 | .or(Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::Q), tagged_event_ids)); | ||
| 177 | ``` | ||
| 178 | |||
| 179 | ### Filter Size Management | ||
| 180 | |||
| 181 | When the tag list exceeds a threshold, split into batches: | ||
| 182 | |||
| 183 | ```rust | ||
| 184 | const MAX_TAGS_PER_FILTER: usize = 100; | ||
| 185 | |||
| 186 | fn build_filters(tag_values: Vec<String>) -> Vec<Filter> { | ||
| 187 | tag_values | ||
| 188 | .chunks(MAX_TAGS_PER_FILTER) | ||
| 189 | .map(|chunk| Filter::new().custom_tag(tag, chunk.to_vec())) | ||
| 190 | .collect() | ||
| 191 | } | ||
| 192 | ``` | ||
| 193 | |||
| 194 | **Consolidation**: When total filter count exceeds ~150 across a connection, consolidate by rebuilding from scratch. | ||
| 195 | |||
| 196 | ## Subscription Updates | ||
| 197 | |||
| 198 | ### Dynamic Subscription Management | ||
| 199 | |||
| 200 | When new events arrive that affect our filter criteria: | ||
| 201 | |||
| 202 | ```mermaid | ||
| 203 | sequenceDiagram | ||
| 204 | participant LocalRelay | ||
| 205 | participant SyncManager | ||
| 206 | participant RemoteRelay | ||
| 207 | |||
| 208 | LocalRelay->>SyncManager: New PR event accepted | ||
| 209 | SyncManager->>SyncManager: Extract event ID | ||
| 210 | SyncManager->>SyncManager: Build new filter for E/e/q tags | ||
| 211 | SyncManager->>RemoteRelay: REQ with new filter | ||
| 212 | RemoteRelay-->>SyncManager: Events matching new filter | ||
| 213 | ``` | ||
| 214 | |||
| 215 | **Events that trigger subscription updates**: | ||
| 216 | |||
| 217 | - New repository announcement accepted (adds to Layer 2) | ||
| 218 | - New PR/Issue/Patch accepted (adds to Layer 3) | ||
| 219 | |||
| 220 | ### When to Consolidate | ||
| 221 | |||
| 222 | Track subscription count per connection: | ||
| 223 | |||
| 224 | ```rust | ||
| 225 | struct ConnectionState { | ||
| 226 | relay_url: RelayUrl, | ||
| 227 | subscriptions: Vec<SubscriptionId>, | ||
| 228 | total_filter_count: usize, | ||
| 229 | } | ||
| 230 | |||
| 231 | impl ConnectionState { | ||
| 232 | fn should_consolidate(&self) -> bool { | ||
| 233 | self.total_filter_count > 150 | ||
| 234 | } | ||
| 235 | |||
| 236 | async fn consolidate(&mut self) { | ||
| 237 | // Close all subscriptions | ||
| 238 | // Rebuild from scratch with current database state | ||
| 239 | } | ||
| 240 | } | ||
| 241 | ``` | ||
| 242 | |||
| 243 | ## Negentropy Catchup | ||
| 244 | |||
| 245 | ### NIP-77 Reconciliation Protocol | ||
| 246 | |||
| 247 | Negentropy enables efficient set reconciliation - discovering which events we're missing without transferring full event lists. | ||
| 248 | |||
| 249 | ### Timing | ||
| 250 | |||
| 251 | | Trigger | Behavior | | ||
| 252 | | ------------------- | -------------------------------------------------------------------- | | ||
| 253 | | **Initial startup** | Warm-up delay, staggered if many filters, initializes daily schedule | | ||
| 254 | | **After reconnect** | Delay to avoid rate limiting, limited to events from last 3 days | | ||
| 255 | | **Daily** | Staggered batches, max 100 tagged events per filter | | ||
| 256 | |||
| 257 | ### Startup Flow | ||
| 258 | |||
| 259 | ```mermaid | ||
| 260 | sequenceDiagram | ||
| 261 | participant Server | ||
| 262 | participant SyncManager | ||
| 263 | participant Relay | ||
| 264 | |||
| 265 | Server->>SyncManager: Start | ||
| 266 | SyncManager->>SyncManager: Wait warm-up delay | ||
| 267 | SyncManager->>SyncManager: Build batched filters | ||
| 268 | |||
| 269 | loop For each relay with stagger delay | ||
| 270 | SyncManager->>Relay: NEG-OPEN with filter batch 1 | ||
| 271 | Relay-->>SyncManager: NEG-MSG with differences | ||
| 272 | SyncManager->>Relay: NEG-MSG response | ||
| 273 | Note over SyncManager,Relay: Reconciliation rounds | ||
| 274 | Relay-->>SyncManager: NEG-CLOSE or events | ||
| 275 | SyncManager->>SyncManager: Validate + store events | ||
| 276 | |||
| 277 | alt More batches | ||
| 278 | SyncManager->>SyncManager: Wait stagger delay | ||
| 279 | SyncManager->>Relay: NEG-OPEN with next batch | ||
| 280 | end | ||
| 281 | end | ||
| 282 | |||
| 283 | SyncManager->>SyncManager: Schedule daily catchup | ||
| 284 | ``` | ||
| 285 | |||
| 286 | ### Reconnection Catchup | ||
| 287 | |||
| 288 | After connection reestablished: | ||
| 289 | |||
| 290 | ```rust | ||
| 291 | async fn catchup_after_reconnect(&self, relay: &RelayUrl) { | ||
| 292 | // Delay to avoid immediate disconnect for too many requests | ||
| 293 | tokio::time::sleep(RECONNECT_CATCHUP_DELAY).await; | ||
| 294 | |||
| 295 | // Only catch up on recent events (last 3 days) | ||
| 296 | let since = Timestamp::now() - Duration::from_secs(3 * 24 * 60 * 60); | ||
| 297 | |||
| 298 | let filters = self.build_filters_for_relay(relay) | ||
| 299 | .into_iter() | ||
| 300 | .map(|f| f.since(since)) | ||
| 301 | .collect(); | ||
| 302 | |||
| 303 | self.run_negentropy(relay, filters).await; | ||
| 304 | } | ||
| 305 | ``` | ||
| 306 | |||
| 307 | ### Daily Catchup Schedule | ||
| 308 | |||
| 309 | ```rust | ||
| 310 | // Daily catchup runs at consistent time, staggered across relays | ||
| 311 | async fn schedule_daily_catchup(&self) { | ||
| 312 | let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); | ||
| 313 | |||
| 314 | loop { | ||
| 315 | interval.tick().await; | ||
| 316 | |||
| 317 | for (i, relay) in self.healthy_relays().enumerate() { | ||
| 318 | // Stagger: 5 minute delay between relays | ||
| 319 | tokio::time::sleep(Duration::from_secs(i as u64 * 300)).await; | ||
| 320 | |||
| 321 | // Batch filters to max 100 tagged events each | ||
| 322 | let batches = self.build_batched_filters(&relay, 100); | ||
| 323 | |||
| 324 | for batch in batches { | ||
| 325 | self.run_negentropy(&relay, batch).await; | ||
| 326 | tokio::time::sleep(Duration::from_secs(60)).await; // 1 min between batches | ||
| 327 | } | ||
| 328 | } | ||
| 329 | } | ||
| 330 | } | ||
| 331 | ``` | ||
| 332 | |||
| 333 | ## Event Processing | ||
| 334 | |||
| 335 | ### Acceptance Policy | ||
| 336 | |||
| 337 | All synced events go through our acceptance policy (same as direct submissions), but **excluding nostr-sdk defaults** like rate limiting: | ||
| 338 | |||
| 339 | ```rust | ||
| 340 | async fn process_synced_event(&self, event: Event, source_relay: &RelayUrl) -> Result<()> { | ||
| 341 | // Skip nostr-sdk's built-in rate limiting - we trust our filter strategy | ||
| 342 | // and don't want to reject valid events just because they arrived quickly | ||
| 343 | |||
| 344 | // Apply our custom Nip34WritePolicy | ||
| 345 | let result = self.acceptance_policy | ||
| 346 | .admit_event(&event, source_relay) | ||
| 347 | .await; | ||
| 348 | |||
| 349 | match result { | ||
| 350 | PolicyResult::Accept => { | ||
| 351 | self.database.save_event(&event).await?; | ||
| 352 | self.trigger_subscription_updates(&event).await; | ||
| 353 | } | ||
| 354 | PolicyResult::Reject(reason) => { | ||
| 355 | tracing::debug!("Rejected synced event {}: {}", event.id.to_hex(), reason); | ||
| 356 | } | ||
| 357 | } | ||
| 358 | |||
| 359 | Ok(()) | ||
| 360 | } | ||
| 361 | ``` | ||
| 362 | |||
| 363 | ## Module Structure | ||
| 364 | |||
| 365 | ### New `src/sync/` Module | ||
| 366 | |||
| 367 | ``` | ||
| 368 | src/ | ||
| 369 | ├── sync/ | ||
| 370 | │ ├── mod.rs # Module exports | ||
| 371 | │ ├── manager.rs # SyncManager - main coordinator | ||
| 372 | │ ├── connection.rs # Per-relay connection handling | ||
| 373 | │ ├── filter.rs # Filter building and batching | ||
| 374 | │ ├── health.rs # RelayHealth tracking | ||
| 375 | │ ├── negentropy.rs # NIP-77 reconciliation logic | ||
| 376 | │ └── subscription.rs # Dynamic subscription management | ||
| 377 | ├── nostr/ | ||
| 378 | │ └── ... (existing) | ||
| 379 | └── ... | ||
| 380 | ``` | ||
| 381 | |||
| 382 | ### Integration with Main Binary | ||
| 383 | |||
| 384 | ```rust | ||
| 385 | // In main.rs | ||
| 386 | async fn main() -> Result<()> { | ||
| 387 | // ... existing setup ... | ||
| 388 | |||
| 389 | // Start sync manager as background task | ||
| 390 | let sync_manager = SyncManager::new( | ||
| 391 | database.clone(), | ||
| 392 | config.domain.clone(), | ||
| 393 | ); | ||
| 394 | |||
| 395 | tokio::spawn(async move { | ||
| 396 | sync_manager.run().await | ||
| 397 | }); | ||
| 398 | |||
| 399 | // ... rest of server startup ... | ||
| 400 | } | ||
| 401 | ``` | ||
| 402 | |||
| 403 | ## Metrics & Observability | ||
| 404 | |||
| 405 | ### Event Source Tracking | ||
| 406 | |||
| 407 | Track provenance of every event to measure live sync effectiveness: | ||
| 408 | |||
| 409 | ```rust | ||
| 410 | enum EventSource { | ||
| 411 | DirectSubmission, // Sent directly to our relay by a user | ||
| 412 | LiveSync(RelayUrl), // Received via live subscription | ||
| 413 | Catchup(RelayUrl), // Discovered during negentropy catchup | ||
| 414 | DailyCatchup(RelayUrl), // Found during daily reconciliation | ||
| 415 | } | ||
| 416 | |||
| 417 | struct SyncMetrics { | ||
| 418 | /// Events by source | ||
| 419 | events_from_direct: Counter, | ||
| 420 | events_from_live_sync: Counter, | ||
| 421 | events_from_catchup: Counter, // Indicates live sync failure | ||
| 422 | events_from_daily_catchup: Counter, // Indicates sustained sync gap | ||
| 423 | |||
| 424 | /// Catchup gap tracking - events found that should have been live synced | ||
| 425 | catchup_gap_total: Counter, | ||
| 426 | catchup_gap_by_relay: HashMap<RelayUrl, Counter>, | ||
| 427 | } | ||
| 428 | ``` | ||
| 429 | |||
| 430 | **Key insight**: Events discovered during catchup or daily reconciliation represent **live sync failures** - we should have received them in real-time. | ||
| 431 | |||
| 432 | ### Peer Reliability Tracking | ||
| 433 | |||
| 434 | Track relay health and data completeness: | ||
| 435 | |||
| 436 | ```rust | ||
| 437 | struct PeerReliability { | ||
| 438 | relay_url: RelayUrl, | ||
| 439 | |||
| 440 | // Connection metrics | ||
| 441 | connection_attempts: u64, | ||
| 442 | connection_failures: u64, | ||
| 443 | total_uptime_seconds: u64, | ||
| 444 | total_downtime_seconds: u64, | ||
| 445 | |||
| 446 | // Event coverage metrics (per repo we track) | ||
| 447 | repos_tracked: HashSet<RepoRef>, | ||
| 448 | missing_events_detected: HashMap<RepoRef, u64>, // Events we have that they dont | ||
| 449 | events_received_from: u64, | ||
| 450 | |||
| 451 | // Calculated scores | ||
| 452 | uptime_percentage: f64, // uptime / (uptime + downtime) | ||
| 453 | event_coverage_score: f64, // ratio of events we have vs what we expect from them | ||
| 454 | } | ||
| 455 | ``` | ||
| 456 | |||
| 457 | ### Observability Integration | ||
| 458 | |||
| 459 | ```rust | ||
| 460 | // Prometheus-style metrics | ||
| 461 | impl SyncManager { | ||
| 462 | fn record_event_received(&self, event: &Event, source: EventSource) { | ||
| 463 | match source { | ||
| 464 | EventSource::DirectSubmission => { | ||
| 465 | self.metrics.events_from_direct.inc(); | ||
| 466 | } | ||
| 467 | EventSource::LiveSync(relay) => { | ||
| 468 | self.metrics.events_from_live_sync.inc(); | ||
| 469 | self.peer_metrics.get(&relay).events_received_from += 1; | ||
| 470 | } | ||
| 471 | EventSource::Catchup(relay) => { | ||
| 472 | // This is a sync gap - we should have gotten it via live sync | ||
| 473 | self.metrics.events_from_catchup.inc(); | ||
| 474 | self.metrics.catchup_gap_total.inc(); | ||
| 475 | self.metrics.catchup_gap_by_relay.entry(relay.clone()).or_default().inc(); | ||
| 476 | tracing::warn!( | ||
| 477 | relay = %relay, | ||
| 478 | event_id = %event.id.to_hex(), | ||
| 479 | "Sync gap detected: event found during catchup" | ||
| 480 | ); | ||
| 481 | } | ||
| 482 | EventSource::DailyCatchup(relay) => { | ||
| 483 | // Sustained sync gap - missed by both live sync and initial catchup | ||
| 484 | self.metrics.events_from_daily_catchup.inc(); | ||
| 485 | tracing::error!( | ||
| 486 | relay = %relay, | ||
| 487 | event_id = %event.id.to_hex(), | ||
| 488 | "Sustained sync gap: event found during daily catchup" | ||
| 489 | ); | ||
| 490 | } | ||
| 491 | } | ||
| 492 | } | ||
| 493 | |||
| 494 | fn record_connection_attempt(&self, relay: &RelayUrl, success: bool) { | ||
| 495 | let peer = self.peer_metrics.entry(relay.clone()).or_default(); | ||
| 496 | peer.connection_attempts += 1; | ||
| 497 | if !success { | ||
| 498 | peer.connection_failures += 1; | ||
| 499 | } | ||
| 500 | } | ||
| 501 | } | ||
| 502 | ``` | ||
| 503 | |||
| 504 | ### Log Levels for Sync Events | ||
| 505 | |||
| 506 | | Event | Level | Context | | ||
| 507 | | ----------------------- | ----- | ----------------------------- | | ||
| 508 | | Event via live sync | DEBUG | Normal operation | | ||
| 509 | | Event via catchup | WARN | Sync gap detected | | ||
| 510 | | Event via daily catchup | ERROR | Sustained gap | | ||
| 511 | | Connection established | INFO | Relay URL | | ||
| 512 | | Connection failed | WARN | Relay URL, attempt #, backoff | | ||
| 513 | | Relay marked dead | ERROR | Relay URL, failure duration | | ||
| 514 | | Peer missing events | WARN | Relay URL, repo, count | | ||
| 515 | |||
| 516 | ## Configuration | ||
| 517 | |||
| 518 | ```rust | ||
| 519 | pub struct SyncConfig { | ||
| 520 | /// Warm-up delay before starting initial catchup | ||
| 521 | pub startup_delay: Duration, // Default: 30s | ||
| 522 | |||
| 523 | /// Delay between filter batches during catchup | ||
| 524 | pub batch_delay: Duration, // Default: 60s | ||
| 525 | |||
| 526 | /// Delay after reconnect before catchup | ||
| 527 | pub reconnect_delay: Duration, // Default: 10s | ||
| 528 | |||
| 529 | /// Maximum events in last N days for reconnect catchup | ||
| 530 | pub reconnect_lookback_days: u32, // Default: 3 | ||
| 531 | |||
| 532 | /// Maximum tagged event IDs per filter | ||
| 533 | pub max_tags_per_filter: usize, // Default: 100 | ||
| 534 | |||
| 535 | /// Consolidate subscriptions when count exceeds | ||
| 536 | pub max_subscriptions: usize, // Default: 150 | ||
| 537 | |||
| 538 | /// Backoff configuration | ||
| 539 | pub max_backoff: Duration, // Default: 1h | ||
| 540 | pub dead_threshold: Duration, // Default: 24h | ||
| 541 | pub dead_retry_interval: Duration, // Default: 24h | ||
| 542 | } | ||
| 543 | ``` | ||
| 544 | |||
| 545 | ## Summary | ||
| 546 | |||
| 547 | | Component | Responsibility | | ||
| 548 | | ---------------------- | ------------------------------------------------------------ | | ||
| 549 | | **SyncManager** | Orchestrates connections, triggers catchup, processes events | | ||
| 550 | | **FilterService** | Builds unified filters from database state | | ||
| 551 | | **RelayHealthTracker** | Manages backoff, dead relay detection, persistence | | ||
| 552 | | **ConnectionState** | Per-relay WebSocket + subscription management | | ||
| 553 | |||
| 554 | ### Key Design Decisions | ||
| 555 | |||
| 556 | 1. **Unified filters** for live sync and negentropy - same criteria, different delivery mechanism | ||
| 557 | 2. **Exclude ourselves** from relay list to prevent loops | ||
| 558 | 3. **One connection per relay** with combined filters for efficiency | ||
| 559 | 4. **Persisted health state** survives restarts | ||
| 560 | 5. **Staggered catchup** to avoid overwhelming relays - runs immediately at startup after warm-up | ||
| 561 | 6. **Client-side filtering** for 30617/30618, server-side for Layer 2/3 | ||
| 562 | 7. **Dynamic subscription addition** with periodic consolidation | ||
| 563 | 8. **Custom acceptance policy** excluding rate limiting defaults | ||
| 564 | 9. **Catchup as failure signal** - events found during catchup/daily indicate live sync gaps | ||
| 565 | 10. **Peer reliability tracking** - monitor uptime and event coverage per relay | ||