upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation/grasp-02-proactive-sync.md
blob: ed8fdbf37a08a86671dddec3508fb096fa68e4c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
# GRASP-02: Proactive Sync - Design & Implementation

## Overview

Proactively Sync Nostr Events from other relays listed in accepted repository announcements.

**Note**: This document covers **relay-to-relay event sync**. For automatic git data fetching when events arrive without their data, see [GRASP-02 Purgatory Git Data Fetching](grasp-02-proactive-sync-purgatory-git-data.md).

Features:

- Fetches all repository announcements from connected relays to discover new repos listing our service
- Discovers and dynamically connects to new relays listed by repository announcements we have accepted (with optional bootstrap relay to get started)
- Fetches events tagging repositories we are interested in, as well as events tagging Issues, Patches and PRs of these repositories
- Supports live sync and historic sync (tries NIP-77 negentropy but falls back to REQ+EOSE with 'until' based pagination)
- Plays nicely with other relays - connection backoff and rate-limiting detection with cooldown
- Does a full reconciliation daily
- Prometheus metrics
- **Triggers purgatory git data sync**: When events arrive via sync, they're enqueued for immediate git data fetching (500ms delay to batch bursts)

Key Architectural Points:

- **Simple data model** for tracking target, pending and actual filter state against relays
- **Self-subscription** enables a deduplicated feed of all accepted events which leads to an updated target sync state
- **Clear separation** between Live sync (using `limit:0`) and Historic Sync (handled via negentropy falling back to REQ+EOSE with 'until' based pagination support)
- **Discovery management**: The nature of discovery inherently leads to a drip feed of root_events (e.g., Repo Announcements, Issues, Patches and PRs) that require additional subscriptions. Without careful management this can lead to large numbers of subscriptions and potentially rate limiting. Mitigation strategies:
  - Self-subscriber waits for 5s to batch updates before creating new filters / subscriptions, allowing time for most events to be received from outstanding subscriptions from connected relays
  - PendingBatch tracks each new set of filters that may require pagination until they are complete
  - Avoid long awaits - recompute desired filters when connection is established to ensure filters are as consolidated as possible
  - Consolidation function ensures number of live_sync subscriptions don't reach rate-limiting limits (threshold: 70 filters)
- **Quick Reconnect** (< 15mins) - doesn't do a full reconciliation vs fresh start (longer disconnect or relaunch binary)
- **Background timers** handle relay connection health and metrics, handling reconnects after backoff and recovery after rate-limiting

Sections:

- Data Model
- Connection Lifecycle
- Live vs Historic Sync
- Triggers and Flow
- Background Tasks

## Data Model

The state of which relays we want to connect to, the progress of historic sync, and the active live filters is captures in this simple data model.

This state starts afresh when the binary loads.

### RepoSyncIndex (Source of Truth)

```rust
/// What we WANT to sync - derived from events received via self-subscription.
/// Updated immediately when self-subscriber batch fires.
/// Key: repo addressable ref - 30617:pubkey:identifier
pub type RepoSyncIndex = Arc<RwLock<HashMap<String, RepoSyncNeeds>>>;

#[derive(Debug, Clone, Default)]
pub struct RepoSyncNeeds {
    /// Relay URLs listed in this repo's 30617 announcement
    pub relays: HashSet<String>,
    /// Root event IDs - 1617/1618/1621 - that reference this repo
    pub root_events: HashSet<EventId>,
}
```

### RelaySyncIndex (Confirmed State + Connection)

```rust
/// What we have CONFIRMED syncing - includes connection state for integrated lifecycle.
/// Key: relay URL
pub type RelaySyncIndex = Arc<RwLock<HashMap<String, RelayState>>>;

/// Connection status for a relay
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionStatus {
    /// Not currently connected
    Disconnected,
    /// Connection attempt in progress
    Connecting,
    /// Successfully connected, historic sync in progress
    Syncing,
    /// Successfully connected, historic sync completed
    Connected,
    /// Successfully connected, historic sync had failures but live sync active
    ConnectedHistoricSyncFailures,
}

/// Complete state for a single relay - combines sync needs with connection lifecycle
#[derive(Debug)]
pub struct RelayState {
    /// Repos we have confirmed syncing from this relay
    pub repos: HashSet<String>,
    /// Root events we have confirmed tracking
    pub root_events: HashSet<EventId>,
    /// If true, never disconnect this relay
    pub is_bootstrap: bool,
    /// Current connection status
    pub connection_status: ConnectionStatus,
    /// When we last successfully connected - used for since filter on reconnect
    pub last_connected: Option<Timestamp>,
    /// When we disconnected - for 15-minute state retention rule
    pub disconnected_at: Option<Timestamp>,
    /// Whether announcement filter historic sync has completed for this relay
    /// Used to determine if we can use `since` filter on reconnect for Layer 1
    pub announcements_synced: bool,
    /// Whether initial historic sync has fully completed (all layers)
    /// Used to transition from Syncing -> Connected status
    pub historic_sync_completed: bool,
    /// When historic sync completed (None if never completed or cleared on fresh_start)
    pub historic_sync_completed_at: Option<Timestamp>,
}

impl RelayState {
    /// Check if state should be cleared based on 15-minute rule
    pub fn should_clear_state(&self) -> bool {
        match self.disconnected_at {
            Some(disconnected) => {
                let now = Timestamp::now();
                now.as_secs().saturating_sub(disconnected.as_secs()) > 900 // 15 minutes
            }
            None => false, // Still connected or never connected
        }
    }

    /// Clear repos and root_events - called when reconnect takes > 15 minutes
    pub fn clear_sync_state(&mut self) {
        self.repos.clear();
        self.root_events.clear();
    }
}
```

### PendingSyncIndex (In-Flight Batches)

```rust
/// Method used for synchronization
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMethod {
    /// Traditional REQ+EOSE flow - waits for EOSE on subscriptions
    ReqEose,
    /// NIP-77 negentropy sync - confirms immediately after sync completes
    Negentropy,
}

/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
/// Each batch has its own ID and can confirm independently.
/// Key: relay URL
pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;

/// Pagination state for a subscription in non-Negentropy historic sync
#[derive(Debug, Clone)]
pub struct PaginationState {
    /// Number of events received for this subscription
    pub event_count: usize,
    /// Smallest created_at timestamp seen (for pagination with `until`)
    pub min_created_at: Option<Timestamp>,
    /// Original filter to reconstruct for next page
    pub original_filter: Filter,
}

pub struct PendingBatch {
    /// Unique ID for this batch - for debugging/logging
    pub batch_id: u64,
    /// The items this batch is syncing
    pub items: PendingItems,
    /// Subscription IDs that must ALL receive EOSE before confirming (for ReqEose)
    /// Empty for Negentropy sync method
    pub outstanding_subs: HashSet<SubscriptionId>,
    /// The sync method used for this batch
    pub sync_method: SyncMethod,
    /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy)
    /// Maps subscription ID to its pagination state
    pub pagination_state: HashMap<SubscriptionId, PaginationState>,
}

#[derive(Debug, Clone, Default)]
pub struct PendingItems {
    pub repos: HashSet<String>,
    pub root_events: HashSet<EventId>,
}
```

**Pagination for REQ+EOSE Historic Sync:**

When a relay doesn't support NIP-77 Negentropy, historic sync falls back to traditional REQ+EOSE. To handle large result sets efficiently:

- **`PaginationState`** tracks per-subscription pagination progress
  - `event_count`: Number of events received so far
  - `min_created_at`: Smallest timestamp seen, used to set `until` for next page
  - `original_filter`: Base filter to reconstruct with updated `until` parameter
- **Automatic pagination**: When EOSE is received, if enough events were received to suggest more may exist, the system automatically issues a follow-up request with `until` set to `min_created_at`
- **Completion**: Pagination continues until an EOSE is received with fewer events than expected, indicating the end of results

---

## Connection Lifecycle

### Object vs Connection Lifecycle

**Key Principle**: RelayConnection objects persist forever, WebSocket connections are transient.

- **RelayConnection object**: Created once via `register_relay()`, stored in HashMap permanently
- **WebSocket connection**: Transient, established via `try_connect_relay()`, dies on disconnect
- **Event loop**: Spawned by `handle_connect_or_reconnect()`, must be respawned after every reconnection

### Connection State Machine

```mermaid
stateDiagram-v2
    [*] --> Disconnected: discover relay → register_relay()
    Disconnected --> Connecting: retry_disconnected_relays → try_connect_relay
    Connecting --> Syncing: success → handle_connect_or_reconnect
    Connecting --> Disconnected: failure + record in health tracker
    Syncing --> Connected: all batches succeed → check_and_complete_historic_sync
    Syncing --> ConnectedHistoricSyncFailures: any batch failed → check_and_complete_historic_sync
    Syncing --> Disconnected: connection lost → handle_disconnect
    Connected --> Disconnected: connection lost → handle_disconnect
    ConnectedHistoricSyncFailures --> Disconnected: connection lost → handle_disconnect
    Connected --> [*]: intentional disconnect via check_disconnects
    ConnectedHistoricSyncFailures --> [*]: intentional disconnect via check_disconnects

    note right of Disconnected: disconnected_at set for 15min rule<br/>RelayConnection kept in HashMap
    note right of Connecting: connection attempt with timeout
    note right of Syncing: historic sync in progress<br/>event loop spawned here
    note right of Connected: historic sync complete<br/>last_connected tracked for since filter
    note right of ConnectedHistoricSyncFailures: historic sync had failures (missing events)<br/>live sync active, partial data
```

### Connection Flow Methods

| Method                              | Purpose                      | When Called                       | Actions                                                         |
| ----------------------------------- | ---------------------------- | --------------------------------- | --------------------------------------------------------------- |
| `register_relay()`                  | Initialize relay tracking    | Discovery via RepoSyncIndex       | Creates RelayConnection, stores in HashMap, returns immediately |
| `try_connect_relay()`               | Attempt connection           | Health tracker allows retry       | Calls connection.connect(), sends notification on success       |
| `handle_connect_or_reconnect()`     | Setup after connection       | ConnectNotification received      | Spawns event loop, sets Syncing, decides sync strategy          |
| `check_and_complete_historic_sync()` | Detect sync completion       | After each batch confirmation     | Transitions Syncing → Connected when no pending batches        |
| `handle_disconnect()`               | Cleanup after disconnect     | DisconnectNotification received   | Updates state, clears pending, KEEPS RelayConnection            |
| `retry_disconnected_relays()`       | Periodic reconnection        | Every 2s (health & metrics timer) | For each ready relay: try_connect_relay()                       |

### Historic Sync Completion

When a relay first connects, it enters the **Syncing** state and begins historic sync:

1. **Layer 1 (Announcements)**: Generic filter for all repository announcements
2. **Layer 2 (Repo Events)**: Filters for events tagging discovered repositories  
3. **Layer 3 (Root Events)**: Filters for events tagging discovered PRs/Issues/Patches

Each layer creates one or more `PendingBatch` entries tracked in `PendingSyncIndex`. As EOSE messages arrive:

- `handle_eose()` confirms each batch via `confirm_batch()`
- `confirm_batch()` moves items to confirmed state, tracks if batch failed, and calls `check_and_complete_historic_sync()`
- `check_and_complete_historic_sync()` uses a **double-check pattern** to avoid race conditions:
  1. First check: Are there pending batches? If yes, return early
  2. Wait 6 seconds (batch window + buffer) for self-subscriber to process in-flight events
  3. Second check: Are there still no pending batches? If yes, return early
  4. If no pending batches after wait:
     - If any batch failed: transition `Syncing` → `ConnectedHistoricSyncFailures`
     - If all batches succeeded: transition `Syncing` → `Connected`
     - Set `historic_sync_completed = true`

**Why the double-check?** There's an async gap between receiving EOSE and the self-subscriber processing events to create Layer 2/3 filters. The 6-second wait (5s batch window + 1s buffer) ensures we don't prematurely mark sync complete while Layer 2/3 batches are being created.

**Batch Failure Tracking**: When negentropy retry protection triggers (relay returns zero requested events on retry), the batch is marked as `failed = true`. This causes the relay to transition to `ConnectedHistoricSyncFailures` instead of `Connected`, signaling that live sync is active but historic sync is incomplete.

**Metrics tracking**: The `ngit_sync_relay_connected` metric shows:
- `0` = Disconnected
- `1` = Connecting
- `2` = Syncing (historic sync in progress)
- `3` = Connected (historic sync complete, live sync active)
- `4` = ConnectedHistoricSyncFailures (historic sync had failures, live sync active, partial data)

This allows operators to monitor sync progress and distinguish between "connected but still catching up" vs "fully synced and live" vs "historic sync failures (missing historic data)".

### Event Loop Lifecycle

**Critical**: Event loops die on disconnect and cannot be reused.

```mermaid
flowchart LR
    CONN[Connection Success] --> SPAWN[handle_connect_or_reconnect<br/>spawns event loop]
    SPAWN --> RUN[run_event_loop active]
    RUN --> DISC[Disconnect detected]
    DISC --> EXIT[Event loop breaks + task exits]
    EXIT --> RETRY[retry_disconnected_relays]
    RETRY --> RECONN[try_connect_relay]
    RECONN --> |success| SPAWN
```

**Why respawn is required**:

- `run_event_loop()` breaks on RelayStatus::Disconnected
- The spawned task completely exits
- Cannot resume terminated task - must spawn fresh
- Happens for both initial connection AND every reconnect

---

## Background Tasks

The sync system uses three background tasks that run continuously:

### 1. Daily Timer (`run_daily_timer`)

**Purpose**: Periodic full reconciliation to detect state drift

**Interval**: Random 23-25 hours (prevents thundering herd)

**Actions**:

- Triggers `daily_sync()` for all connected relays
- Same as `fresh_start()` but without recording disconnect metrics
- Ensures consistency over time

### 2. Health and Metrics Checker (`run_health_and_metrics_checker`)

**Purpose**: Combined health management and metrics updates

**Interval**: 2 seconds

**Actions**:

1. **Disconnect checking**: Calls `check_disconnects()` to remove relays with no repos/events (except bootstrap)
2. **Retry disconnected**: Calls `retry_disconnected_relays()` to attempt reconnection per health tracker backoff
3. **Rate limit recovery**: Calls `check_rate_limit_recovery()` to clear expired rate limits
4. **Metrics update**: Updates Prometheus metrics with current health states

**Why combined**: The 2-second interval provides good responsiveness for health changes while minimizing overhead. All operations are lightweight (index checks, no I/O except actual connection attempts).

### 3. Self-Subscriber (`SelfSubscriber::run`)

**Purpose**: Monitor own relay for repository announcements and root events

**Subscribed kinds**: 30617, 1617, 1618, 1621 (NOT 30618)

**Batching**: 5-second window (configurable via `NGIT_SYNC_BATCH_WINDOW_MS`)

**Flow**:

1. Queue events to `PendingUpdates`
2. Timer fires (interval, does not reset on events)
3. Process batch: update RepoSyncIndex → derive targets → send AddFilters to SyncManager

---

## Core Architecture: Live vs Historic Sync

The sync system is built on two fundamental primitives that are clearly separated:

### Sync Primitives

| Primitive         | Purpose                 | Filter Modifier  | Tracking         |
| ----------------- | ----------------------- | ---------------- | ---------------- |
| `sync_live()`     | Ongoing event stream    | `limit: 0`       | Not tracked      |
| `historic_sync()` | Catch up on past events | Optional `since` | PendingSyncIndex |

### Layer Strategy

| Layer   | Content                                 | When Subscribed       | Managed By              |
| ------- | --------------------------------------- | --------------------- | ----------------------- |
| Layer 1 | 30617 Announcements, 30618 Maintainers  | On connect (any type) | Connection lifecycle    |
| Layer 2 | Events tagging our repos (a/A/q tags)   | Via AddFilters        | handle_new_sync_filters |
| Layer 3 | Events tagging root events (e/E/q tags) | Via AddFilters        | handle_new_sync_filters |

**Key insight**: Layer 1 is connection-level (handled at connect time), Layer 2+3 are item-level (flow through AddFilters → handle_new_sync_filters via two paths).

---

## Triggers and Flow

### Two Paths to AddFilters

The system has **two independent paths** that create and process AddFilters actions:

| Source                     | When                                | Flow                                                                             |
| -------------------------- | ----------------------------------- | -------------------------------------------------------------------------------- |
| Self-subscriber batch      | New events discovered on own relay  | Build AddFilters directly → send via channel → handle_new_sync_filters           |
| Connect/reconnect triggers | fresh_start, quick_reconnect, daily | recompute_new_sync_filters_for_relay → compute_actions → handle_new_sync_filters |

**Path 1: Self-Subscriber (direct AddFilters construction)**

The [`SelfSubscriber::process_batch()`](src/sync/self_subscriber.rs:448) method:

1. Updates `RepoSyncIndex` with discovered repos
2. Calls `derive_relay_targets()` to get per-relay targets
3. Builds `AddFilters` directly using `build_layer2_and_layer3_filters()`
4. Sends via `action_tx` channel to SyncManager
5. SyncManager receives via `action_rx` and calls `handle_new_sync_filters()`

**Path 2: Connect/Reconnect (via compute_actions)**

The `SyncManager::recompute_new_sync_filters_for_relay()` method:

1. Calls `derive_relay_targets()` from `RepoSyncIndex`
2. Calls `compute_actions(targets, pending, confirmed)` - three-way diff
3. Calls `handle_new_sync_filters()` for each resulting AddFilters action

### When Each Path is Used

| Trigger                     | Path Used                   | Why                                          |
| --------------------------- | --------------------------- | -------------------------------------------- |
| Self-subscriber batch fires | Direct (no compute_actions) | Building from scratch, no diff needed        |
| fresh_start()               | compute_actions             | Diff against pending/confirmed state         |
| quick_reconnect()           | compute_actions             | Check for NEW items discovered while offline |
| consolidate()               | compute_actions             | Check for new items during filter rebuild    |

### The Core Flow (Path 2: Connect/Reconnect)

```mermaid
flowchart TB
    TRIGGER[Connect/Reconnect trigger] --> RECOMPUTE[recompute_new_sync_filters_for_relay]
    RECOMPUTE --> DRT[derive_relay_targets]
    DRT --> |derives from| RSI[RepoSyncIndex]
    DRT --> CA[compute_actions]
    CA --> |subtracts| PSI[PendingSyncIndex]
    CA --> |subtracts| RLI[RelaySyncIndex]
    CA --> |produces| AF[AddFilters actions]
    AF --> HNSF[handle_new_sync_filters]
    HNSF --> LIVE[sync_live - L2+L3]
    HNSF --> HIST[historic_sync - L2+L3]
    HIST --> PSI_UPDATE[Update PendingSyncIndex]
    PSI_UPDATE --> |EOSE received| CONFIRM[Move to RelaySyncIndex]
```

### The Self-Subscriber Flow (Path 1: Direct)

```mermaid
flowchart TB
    EVENTS[Events from own relay] --> QUEUE[Queue to PendingUpdates]
    QUEUE --> TIMER[Batch timer fires - 5 seconds]
    TIMER --> PB[process_batch]
    PB --> UPDATE[Update RepoSyncIndex]
    UPDATE --> DRT[derive_relay_targets]
    DRT --> BUILD[build_layer2_and_layer3_filters]
    BUILD --> AF[Create AddFilters]
    AF --> CHAN[Send via action_tx channel]
    CHAN --> RX[SyncManager receives via action_rx]
    RX --> HNSF[handle_new_sync_filters]
    HNSF --> LIVE[sync_live - L2+L3]
    HNSF --> HIST[historic_sync - L2+L3]
```

---

## Flow Scenarios

### Scenario 1: Fresh Start (Initial Connect / Long Reconnect / Daily Sync)

```mermaid
flowchart TB
    DISC[Relay discovered via RepoSyncIndex] --> REG[register_relay]
    REG --> CREATE[Create RelayConnection, store in HashMap]
    CREATE --> RET[Returns immediately]
    RET --> LOOP[retry_disconnected_relays - 500ms periodic]
    LOOP --> CHECK[health_tracker.should_attempt_connection?]
    CHECK --> |ready| TRY[try_connect_relay]
    TRY --> CONN[connection.connect_and_subscribe]
    CONN --> |success| NOTIFY[Send ConnectNotification]
    NOTIFY --> HANDLE[handle_connect_or_reconnect called]
    HANDLE --> UPD[Update state to Connected]
    UPD --> SPAWN[Spawn event loop + processor]
    SPAWN --> STRAT[Decide strategy: fresh_start]
    STRAT --> CLEAR_PSI[Clear PendingSyncIndex]
    CLEAR_PSI --> CLEAR_RSI[Clear RelaySyncIndex]
    CLEAR_RSI --> L1_LIVE[L1: sync_live - announcements]
    L1_LIVE --> L1_HIST[L1: historic_sync - no since]
    L1_HIST --> NEG{NIP-77 supported?}
    NEG --> |yes| NEGENTROPY[negentropy sync]
    NEG --> |no| REQ[REQ+EOSE]
    NEGENTROPY --> RECOMPUTE[recompute_new_sync_filters_for_relay]
    REQ --> RECOMPUTE
    RECOMPUTE --> CA[compute_actions]
    CA --> |empty RelaySyncIndex| AF[AddFilters for ALL repos]
    AF --> HNSF[handle_new_sync_filters]
    HNSF --> L23_LIVE[L2+L3: sync_live]
    HNSF --> L23_HIST[L2+L3: historic_sync]
    L23_HIST --> PB[Create PendingBatch]
    PB --> EOSE[Wait for EOSE]
    EOSE --> CONFIRM[Move items to RelaySyncIndex]
```

**Key points:**

- Always clear PendingSyncIndex first, then RelaySyncIndex
- L1 live + L1 historic (uses negentropy if available)
- Empty RelaySyncIndex means diff produces AddFilters for everything
- L2+L3 flow through `recompute_new_sync_filters_for_relay` → `handle_new_sync_filters` with proper pending tracking

### Scenario 2: Quick Reconnect (< 15 minutes)

```mermaid
flowchart TB
    DISC[Connection lost detected] --> LOOP_EXIT[Event loop breaks]
    LOOP_EXIT --> TASK_EXIT[Event processor task exits]
    TASK_EXIT --> NOTIFY_DISC[Send DisconnectNotification]
    NOTIFY_DISC --> HANDLE_DISC[handle_disconnect called]
    HANDLE_DISC --> UPD_STATE[Update state to Disconnected]
    UPD_STATE --> MARK[Set disconnected_at = now]
    MARK --> CLEAR[Clear pending batches]
    CLEAR --> KEEP[Keep RelayConnection in HashMap]
    KEEP --> WAIT[Wait < 15min]
    WAIT --> RETRY[retry_disconnected_relays - 500ms]
    RETRY --> CHECK[health_tracker checks backoff]
    CHECK --> |ready| TRY[try_connect_relay]
    TRY --> CONN[connection.connect_and_subscribe]
    CONN --> |success| NOTIFY[Send ConnectNotification]
    NOTIFY --> RECONN[handle_connect_or_reconnect]
    RECONN --> UPD_CONN[Update state to Connected]
    UPD_CONN --> SPAWN[Spawn NEW event loop + processor]
    SPAWN --> STRAT[Decide strategy: quick_reconnect]
    STRAT --> CLEAR_PSI[Clear PendingSyncIndex]
    CLEAR_PSI --> L1_LIVE[L1: sync_live - announcements]
    L1_LIVE --> L1_HIST[L1: historic_sync WITH since]
    L1_HIST --> RECON[reconstruct_filters from RelaySyncIndex]
    RECON --> L23_LIVE[L2+L3: sync_live]
    RECON --> L23_HIST[L2+L3: historic_sync WITH since]
    L23_HIST --> RECOMPUTE[recompute_new_sync_filters_for_relay]
    RECOMPUTE --> CA[compute_actions]
    CA --> |check for new items| AF{New items?}
    AF --> |yes| HNSF[handle_new_sync_filters]
    AF --> |no| DONE[Done]
    HNSF --> PB[Create PendingBatch]
```

**Key points:**

- Clear PendingSyncIndex first (old subscriptions are dead)
- L1 live (always on any connection)
- L1 historic WITH since (catches up missed announcements)
- L2+L3 rebuilt from RelaySyncIndex (confirmed state preserved)
- `recompute_new_sync_filters_for_relay` → `compute_actions` checks for any NEW items discovered during catchup

### Scenario 3: Long Reconnect (> 15 minutes)

```mermaid
flowchart TB
    RECONN[Connection restored > 15min] --> METRIC[Record disconnect/reconnect metric]
    METRIC --> FRESH[fresh_start]
    FRESH --> |same as initial connect| DONE[Full sync initiated]
```

**Key points:**

- Records disconnect/reconnect as a metric
- Delegates to fresh_start() - same as initial connect
- State too stale to trust, start fresh

### Scenario 4: Consolidation (Filter Count > Threshold)

```mermaid
flowchart TB
    CHECK[Filter count check] --> THRESHOLD{count > 70?}
    THRESHOLD --> |yes| CLEAR_PSI[Clear PendingSyncIndex]
    CLEAR_PSI --> UNSUB[unsubscribe_all]
    UNSUB --> RECON[reconstruct_filters from RelaySyncIndex]
    RECON --> L1_LIVE[L1: sync_live]
    RECON --> L23_LIVE[L2+L3: sync_live]
    L23_LIVE --> RECOMPUTE[recompute_new_sync_filters_for_relay]
    RECOMPUTE --> CA[compute_actions]
    CA --> |check for new items| AF{New items?}
    AF --> |yes| HNSF[handle_new_sync_filters]
    AF --> |no| DONE[Done]
    THRESHOLD --> |no| SKIP[Continue normally]
```

**Key points:**

- Clear PendingSyncIndex first
- NO historic sync needed - items already synced/syncing
- Only rebuilds live subscriptions from confirmed state
- `recompute_new_sync_filters_for_relay` → `compute_actions` catches any new items that need syncing

### Scenario 5: Daily Sync (23-25h Random Timer)

```mermaid
flowchart TB
    TIMER[Daily timer fires] --> FRESH[fresh_start]
    FRESH --> |NO disconnect metric| DONE[Full sync initiated]
```

**Key points:**

- Same as fresh_start() but WITHOUT recording disconnect/reconnect metric
- Ensures consistency, detects any drift accumulated over 24 hours

### Scenario 6: Self-Subscriber Batch

```mermaid
flowchart TB
    EVENTS[Events from own relay] --> QUEUE[Queue to PendingUpdates]
    QUEUE --> TIMER[Batch timer fires - 5 seconds]
    TIMER --> PB[process_batch]
    PB --> UPDATE[Update RepoSyncIndex]
    UPDATE --> DRT[derive_relay_targets]
    DRT --> BUILD[build_layer2_and_layer3_filters]
    BUILD --> AF[Create AddFilters directly]
    AF --> CHAN[Send via action_tx channel]
    CHAN --> RX[SyncManager receives]
    RX --> HNSF[handle_new_sync_filters]
    HNSF --> LIVE[sync_live - L2+L3]
    HNSF --> HIST[historic_sync - L2+L3]
```

**Key points:**

- Self-subscriber monitors own relay for 30617, 1617, 1618, 1621 (NOT 1619 or 30618)
- Batches events in `PendingUpdates` (5 second window via interval timer)
- `process_batch()` updates RepoSyncIndex, then builds AddFilters **directly** (no compute_actions)
- AddFilters sent via channel to SyncManager, which calls `handle_new_sync_filters()`
- This path does NOT use compute_actions because it's building fresh filters from the updated index

---

## Core Algorithms

### derive_relay_targets

Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay.

```rust
// Conceptual: inverts repo → relays to relay → repos
fn derive_relay_targets(repo_index: &HashMap<String, RepoSyncNeeds>)
    -> HashMap<String, RelaySyncNeeds>
```

### compute_actions (Three-Way Diff)

**This is the ONLY decision point for what NEW subscriptions to create.**

Performs a three-way diff: `target - pending - confirmed = new`

- **targets**: What we want (from derive_relay_targets)
- **pending**: What's already in-flight awaiting EOSE
- **confirmed**: What's already confirmed syncing

Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect).

```rust
fn compute_actions(
    targets: &HashMap<String, RelaySyncNeeds>,
    pending: &PendingSyncIndex,
    confirmed: &RelaySyncIndex,
) -> Vec<AddFilters>
```

---

## Key Implementation Methods

### Connection Lifecycle

- **`register_relay()`**: Creates RelayConnection object, stores in HashMap, returns immediately
- **`try_connect_relay()`**: Attempts connection using `connection.connect()` with timeout
- **`handle_connect_or_reconnect()`**: Spawns event loop, updates state, decides sync strategy (fresh_start/quick_reconnect)
- **`handle_disconnect()`**: Updates state to Disconnected, clears pending batches, keeps RelayConnection object
- **`retry_disconnected_relays()`**: Called every 2s, retries relays that pass health tracker checks

### Sync Entry Points

- **`fresh_start()`**: Full sync - clears all state, L1 historic (with negentropy if available), then L2+L3 via recompute
- **`quick_reconnect()`**: Incremental sync - preserves confirmed state, L1 historic with `since`, L2+L3 rebuild with `since`, then recompute for new items
- **`daily_sync()`**: Wrapper around `fresh_start()` without disconnect metrics
- **`consolidate()`**: Reduces filter count - clears pending, unsubscribes all, rebuilds live subscriptions only, then recompute for new items

### Sync Primitives

- **`sync_live()`**: Creates subscriptions with `limit: 0` for ongoing event stream (not tracked in PendingSyncIndex)
- **`historic_sync()`**: Dispatches to negentropy or REQ+EOSE based on relay capability, creates PendingBatch, returns batch_id

### Filter Processing

- **`handle_new_sync_filters()`**: Single entry point for AddFilters from both paths (self-subscriber OR recompute), orchestrates live+historic sync
- **`recompute_new_sync_filters_for_relay()`**: Calls derive_relay_targets → compute_actions → handle_new_sync_filters for each resulting action

---

## Method Relationships Summary

## Filter Building (Three-Layer Strategy)

### Layer 1: Announcements

- **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists)
- **When subscribed**: On connect (any type) - handled by connection lifecycle
- **Function**: `build_announcement_filter(since: Option<Timestamp>)`
- 30618 is ONLY synced from remote relays, not self-subscribed

### Layer 2: Events Tagging Our Repos

- **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage
- **Batching**: Per 100 repo refs
- **Function**: `build_repo_tag_filters(repos, since)`

### Layer 3: Events Tagging Our Root Events

- **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage
- **Batching**: Per 100 event IDs
- **Function**: `build_root_event_tag_filters(root_events, since)`

### Combined Layer 2+3

The `build_layer2_and_layer3_filters()` function combines both layers. Used by:

- `recompute_new_sync_filters_for_relay` for new item subscriptions
- `reconstruct_filters` for rebuilding from confirmed state

---

## NIP-77 Negentropy Sync

### What is Negentropy?

NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences.

### When Negentropy is Used

Negentropy sync is attempted for:

- **fresh_start()** - Full sync without `since`
- **daily_sync()** - Periodic full refresh (via fresh_start)

Negentropy is NOT used for:

- **quick_reconnect()** - Uses REQ with `since` (more efficient for small gaps)
- **Live subscriptions** - Always use REQ with `limit: 0`

### Fallback Behavior

If negentropy fails (relay doesn't support NIP-77, network error, etc.):

1. A warning is logged (once per relay to avoid spam)
2. The sync falls back to traditional REQ+EOSE
3. No error is raised - fallback is automatic

### Integration with Rejected Events Index

The rejected events index prevents wasteful re-fetching during negentropy sync by excluding rejected event IDs from the reconciliation process:

**During Negentropy Reconciliation:**

1. **Build "already have" set**: Combine event IDs from:
   - Events in database
   - Events in purgatory
   - **Events in rejected index (hot cache + cold index)**

2. **Send to negentropy**: This combined set represents "events we already have or don't want"

3. **Receive differences**: Relay only sends events we don't have and haven't rejected

4. **Process received events**: New events go through normal validation:
   - If accepted → saved to database
   - If rejected → added to rejected index
   - If waiting for dependencies → added to purgatory

**Why This Matters:**

Without the rejected events index, negentropy would repeatedly download events that don't list this service or are from unauthorized maintainers, wasting bandwidth on every sync cycle.

**Re-Processing on Dependency Arrival:**

When a dependency is satisfied (e.g., owner announcement accepted):
1. Related entries are **invalidated** (removed) from cold index
2. If event still in hot cache → immediate re-processing
3. If event expired from hot cache → will be re-fetched on next sync (now that dependency exists)

This prevents permanently excluding events that could become valid after dependencies arrive.

See [work/rejected-events-index-summary.md](../../work/rejected-events-index-summary.md) for complete implementation details.

---

## REQ+EOSE Pagination

When a relay doesn't support NIP-77 Negentropy, historic sync uses traditional REQ+EOSE with automatic pagination to handle large result sets efficiently.

### How Pagination Works

1. **Initial Request**: Send REQ with filters (may include `since` parameter)
2. **Track Events**: As events arrive, [`PaginationState`](src/sync/mod.rs:165) tracks:
   - `event_count`: Number of events received
   - `min_created_at`: Smallest timestamp seen (oldest event)
   - `original_filter`: Base filter for reconstruction
3. **EOSE Detection**: When EOSE arrives, check if pagination is needed
4. **Next Page**: If enough events were received (suggesting more exist):
   - Create new filter with `until: min_created_at`
   - Issue another REQ for events older than the oldest seen
   - Reuse same subscription ID
5. **Completion**: Repeat until EOSE arrives with fewer events, indicating end of results

### Pagination State Lifecycle

```mermaid
flowchart TB
    REQ[Send REQ with filters] --> TRACK[Initialize PaginationState]
    TRACK --> EVENT[Receive EVENT]
    EVENT --> UPDATE[Update event_count and min_created_at]
    UPDATE --> MORE{More events?}
    MORE --> |yes| EVENT
    MORE --> |no| EOSE[Receive EOSE]
    EOSE --> CHECK{event_count suggests more pages?}
    CHECK --> |yes| NEXT[Create filter with until=min_created_at]
    NEXT --> REQ2[Send next page REQ]
    REQ2 --> RESET[Reset event_count, keep min_created_at]
    RESET --> EVENT
    CHECK --> |no| DONE[Batch complete, confirm items]
```

### Pagination vs Negentropy

| Aspect              | Negentropy Sync              | REQ+EOSE Pagination                                       |
| ------------------- | ---------------------------- | --------------------------------------------------------- |
| **Efficiency**      | High (set reconciliation)    | Lower (sequential pages)                                  |
| **Bandwidth**       | Minimal (only missing items) | Higher (all matching events transferred)                  |
| **Relay support**   | Requires NIP-77              | Universal (standard Nostr)                                |
| **State tracking**  | None needed                  | [`PaginationState`](src/sync/mod.rs:165) per subscription |
| **Completion time** | Typically faster             | Slower for large sets                                     |
| **Use cases**       | Full sync, large event sets  | Fallback, small gaps with `since`                         |

---

## State Flow Summary

```mermaid
flowchart TB
    subgraph Input
        SS[SelfSubscriber]
        OWN[Own Relay]
    end

    subgraph RepoSyncIndex - What We Want
        RSI[HashMap: Repo → Relays+Events]
    end

    subgraph Triggers
        T1[Self-subscriber batch]
        T2[fresh_start after L1]
        T3[quick_reconnect after catchup]
        T4[consolidate after live rebuild]
    end

    subgraph compute_actions - Decision Point
        CA[Three-way diff: target - pending - confirmed]
    end

    subgraph PendingSyncIndex - In Flight
        PSI[Vec PendingBatch per relay]
    end

    subgraph RelaySyncIndex - Confirmed State
        RLI[RelayState per relay]
    end

    SS -->|subscribe| OWN
    OWN -->|events| SS
    SS -->|batch fires| RSI
    RSI --> T1
    T1 --> CA
    T2 --> CA
    T3 --> CA
    T4 --> CA
    PSI --> CA
    RLI --> CA
    CA -->|new items| AF[AddFilters]
    AF --> SFRE[recompute_new_sync_filters_for_relay]
    SFRE --> LIVE[sync_live L2+L3]
    SFRE --> HIST[historic_sync L2+L3]
    HIST --> PSI
    PSI -->|EOSE| RLI
```

---

## Module Structure

```
src/sync/
├── mod.rs              # SyncManager, main loop, data structures
├── algorithms.rs       # derive_relay_targets(), compute_actions()
├── filters.rs          # build_announcement_filter(), build_layer2_and_layer3_filters()
├── health.rs           # RelayHealthTracker with exponential backoff
├── relay_connection.rs # RelayConnection, RelayEvent handling
├── self_subscriber.rs  # SelfSubscriber with batching
└── metrics.rs          # SyncMetrics for Prometheus
```

---

## Health Tracking

The [`RelayHealthTracker`](src/sync/health.rs:209) manages connection health with exponential backoff and state transitions:

### Health States

1. **Healthy**: Working connection, no recent failures, proven stable (past 5-minute stability period)
2. **Disconnected**: Not currently connected, but no recent failures or issues
3. **Degraded**: Connection problems (actively failing to connect) OR recently recovered but not yet stable
4. **Dead**: 24+ hours of continuous failures, minimal retry (once per 24 hours)
5. **RateLimited**: Rate limited by relay, 65-second cooldown active

### State Transitions

```
Healthy <-> Disconnected: Normal connection/disconnection
Disconnected -> Degraded: Connection failure
Degraded -> Dead: 24h+ of continuous failures
Degraded -> Disconnected: Recovery (enters 5min stability period)
Disconnected -> Healthy: Stable for 5 minutes after recovery
Any -> RateLimited: NOTICE message from relay indicating rate limiting
RateLimited -> previous state: After 65-second cooldown expires
```

### Backoff Configuration

- **Formula**: `base_backoff * 2^(failures-1)`, capped at `max_backoff`
- **Default base**: 5 seconds (configurable via `sync_base_backoff_secs`)
- **Default max**: 1 hour (configurable via `sync_max_backoff_secs`)
- **Dead threshold**: 24 hours of continuous failures
- **Dead retry interval**: Once per 24 hours
- **Rate limit cooldown**: Fixed 65 seconds (60s typical limit + 5s buffer)
- **Stability period**: 5 minutes after recovery before marking as Healthy

### Special Behaviors

- **Bootstrap relays**: Never disconnected by cleanup system, even if empty
- **Rate limiting**: Distinct from connection failures - triggered by relay NOTICE messages
- **Connection timeout**: Set to `base_backoff_secs` to ensure retry timing works correctly

---

## Prometheus Metrics

The [`SyncMetrics`](src/sync/metrics.rs:18) module provides comprehensive monitoring via Prometheus:

### Connection Metrics

- `ngit_sync_relay_connected`: Per-relay connection status (1=connected, 0=disconnected)
- `ngit_sync_connection_attempts_total`: Total connection attempts by relay and result (success/failure)

### Health Metrics

- `ngit_sync_relay_status`: Per-relay health status (1=healthy, 2=disconnected, 3=degraded, 4=dead, 5=rate_limited)
- `ngit_sync_relay_failures`: Consecutive failure count per relay

### Event Metrics

- `ngit_sync_events_synced_total`: Total events synced (newly saved events only, not duplicates or rejected)

### Summary Metrics

- `ngit_sync_relays_tracked_total`: Total number of relays discovered and tracked
- `ngit_sync_relays_connected_total`: Number of currently connected relays
- `ngit_sync_relays_dead_total`: Number of relays marked as dead

All metrics follow the `ngit_sync_` prefix convention and are updated by the health and metrics checker every 2 seconds.

---