upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/explanation/grasp-02-proactive-sync.md729
-rw-r--r--src/sync/algorithms.rs33
-rw-r--r--src/sync/mod.rs700
-rw-r--r--src/sync/self_subscriber.rs8
4 files changed, 1120 insertions, 350 deletions
diff --git a/docs/explanation/grasp-02-proactive-sync.md b/docs/explanation/grasp-02-proactive-sync.md
index 2a86126..34b7bb6 100644
--- a/docs/explanation/grasp-02-proactive-sync.md
+++ b/docs/explanation/grasp-02-proactive-sync.md
@@ -4,12 +4,11 @@
4 4
5This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles: 5This document explains the proactive sync system that synchronizes repository data from external relays based on relay URLs listed in 30617 repository announcements. Key principles:
6 6
71. **Self-subscription as the only mechanism** - No database initialization at startup 71. **Triggers call compute_actions → sync_computed_filters** - Self-subscriber batches and connect/reconnect events trigger this flow
82. **compute_actions as single decision point** - Determines what NEW subscriptions to create 82. **Clear separation of live vs historic sync** - Two distinct primitives with different purposes
93. **Two subscription paths on reconnect** - Catch-up (retained, with since) vs new items (via compute_actions) 93. **Layer 1 on connect, Layer 2+3 via AddFilters** - L1 handled at connection time, L2+L3 flow through compute_actions
104. **Blank state = fresh sync** - Empty confirmed state triggers full historical fetch 104. **Always clear PendingSyncIndex first** - Before any reconnect/consolidate operation
115. **Clear on disconnect, not reconnect** - PendingSyncIndex cleared at event boundary 115. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported
126. **NIP-77 negentropy for historical sync** - Efficient set reconciliation, fallback to REQ if unsupported
13 12
14--- 13---
15 14
@@ -90,7 +89,6 @@ impl RelayState {
90### PendingSyncIndex (In-Flight Batches) 89### PendingSyncIndex (In-Flight Batches)
91 90
92```rust 91```rust
93
94/// Method used for synchronization 92/// Method used for synchronization
95#[derive(Debug, Clone, Copy, PartialEq, Eq)] 93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum SyncMethod { 94pub enum SyncMethod {
@@ -100,7 +98,6 @@ pub enum SyncMethod {
100 Negentropy, 98 Negentropy,
101} 99}
102 100
103
104/// Tracks batches of subscriptions that are in-flight, awaiting EOSE. 101/// Tracks batches of subscriptions that are in-flight, awaiting EOSE.
105/// Each batch has its own ID and can confirm independently. 102/// Each batch has its own ID and can confirm independently.
106/// Key: relay URL 103/// Key: relay URL
@@ -118,7 +115,6 @@ pub struct PendingBatch {
118 pub sync_method: SyncMethod, 115 pub sync_method: SyncMethod,
119} 116}
120 117
121
122#[derive(Debug, Clone, Default)] 118#[derive(Debug, Clone, Default)]
123pub struct PendingItems { 119pub struct PendingItems {
124 pub repos: HashSet<String>, 120 pub repos: HashSet<String>,
@@ -145,152 +141,202 @@ stateDiagram-v2
145 141
146--- 142---
147 143
148## Flow Scenarios 144## Core Architecture: Live vs Historic Sync
145
146The sync system is built on two fundamental primitives that are clearly separated:
147
148### Sync Primitives
149
150| Primitive | Purpose | Filter Modifier | Tracking |
151| ----------------- | ----------------------- | ---------------- | ---------------- |
152| `sync_live()` | Ongoing event stream | `limit: 0` | Not tracked |
153| `historic_sync()` | Catch up on past events | Optional `since` | PendingSyncIndex |
154
155### Why `limit: 0` for Live Sync?
156
157| Approach | Pros | Cons |
158| ------------ | --------------------------------------- | --------------------------------- |
159| `since: now` | Intuitive | Time-sensitive, clock skew issues |
160| `limit: 0` | Deterministic, mirrors filter structure | Less intuitive name |
161
162`limit: 0` is better because:
163
1641. **No time dependency**: Doesn't depend on synchronized clocks
1652. **Mirrors historic filters**: Same tag structure, just different limit
1663. **State reconstruction**: Can rebuild from repo/event lists without timestamps
167
168### Layer Strategy
169
170| Layer | Content | When Subscribed | Managed By |
171| ------- | --------------------------------------- | --------------------- | -------------------- |
172| Layer 1 | 30617 Announcements, 30618 Maintainers | On connect (any type) | Connection lifecycle |
173| Layer 2 | Events tagging our repos (a/A/q tags) | Via AddFilters | compute_actions |
174| Layer 3 | Events tagging root events (e/E/q tags) | Via AddFilters | compute_actions |
149 175
150### Scenario 1: Initial Connect 176**Key insight**: Layer 1 is connection-level (handled at connect time), Layer 2+3 are item-level (flow through compute_actions → sync_computed_filters).
177
178---
179
180## Triggers and Flow
181
182### What Triggers compute_actions → sync_computed_filters?
183
184| Trigger | When | What Happens |
185| --------------------------- | -------------------------------------- | ---------------------------------------- |
186| Self-subscriber batch fires | New events discovered on own relay | Update RepoSyncIndex → compute_actions |
187| fresh_start() | Initial connect, long_reconnect, daily | After L1 setup → compute_actions |
188| quick_reconnect() | Reconnect < 15 minutes | After L1+L2+L3 catchup → compute_actions |
189| consolidate() | Filter count > threshold | After live rebuild → compute_actions |
190
191### The Core Flow
151 192
152```mermaid 193```mermaid
153flowchart TB 194flowchart TB
154 START[Startup] --> SS[Self-subscribe to own relay] 195 TRIGGER[Trigger fires] --> CA[compute_actions]
155 SS --> |no since filter| EVENTS[Receive historical events] 196 CA --> |derives from| RSI[RepoSyncIndex]
156 EVENTS --> RSI[Update RepoSyncIndex] 197 CA --> |subtracts| RLI[RelaySyncIndex]
157 RSI --> DT[derive_relay_targets] 198 CA --> |subtracts| PSI[PendingSyncIndex]
158 DT --> CA[compute_actions with targets and empty confirmed] 199 CA --> |produces| AF[AddFilters actions]
159 CA --> AF[AddFilters for each relay] 200 AF --> SFRE[sync_computed_filters]
160 AF --> SPAWN{Relay connected?} 201 SFRE --> LIVE[sync_live - L2+L3]
161 SPAWN --> |no| CONN[spawn_connection] 202 SFRE --> HIST[historic_sync - L2+L3]
162 CONN --> HC[handle_connect_or_reconnect] 203 HIST --> PSI_UPDATE[Update PendingSyncIndex]
163 SPAWN --> |yes| SUB 204 PSI_UPDATE --> |EOSE received| CONFIRM[Move to RelaySyncIndex]
164 205```
165 subgraph handle_connect_or_reconnect - Fresh Sync 206
166 HC --> CHECK_FRESH{is_fresh_sync?} 207---
167 CHECK_FRESH --> |yes - no last_connected| L1[build_announcement_filter - no since] 208
168 L1 --> RCA[recompute_actions_for_relay] 209## Flow Scenarios
169 end
170 210
171 RCA --> SUB[Subscribe Layer 2+3 filters via AddFilters] 211### Scenario 1: Fresh Start (Initial Connect / Long Reconnect / Daily Sync)
172 SUB --> PB[Create PendingBatch] 212
213```mermaid
214flowchart TB
215 START[fresh_start called] --> CLEAR_PSI[Clear PendingSyncIndex]
216 CLEAR_PSI --> CLEAR_RSI[Clear RelaySyncIndex]
217 CLEAR_RSI --> L1_LIVE[L1: sync_live - announcements]
218 L1_LIVE --> L1_HIST[L1: historic_sync - no since]
219 L1_HIST --> NEG{NIP-77 supported?}
220 NEG --> |yes| NEGENTROPY[negentropy sync]
221 NEG --> |no| REQ[REQ+EOSE]
222 NEGENTROPY --> CA[compute_actions]
223 REQ --> CA
224 CA --> |empty RelaySyncIndex| AF[AddFilters for ALL repos]
225 AF --> SFRE[sync_computed_filters]
226 SFRE --> L23_LIVE[L2+L3: sync_live]
227 SFRE --> L23_HIST[L2+L3: historic_sync]
228 L23_HIST --> PB[Create PendingBatch]
173 PB --> EOSE[Wait for EOSE] 229 PB --> EOSE[Wait for EOSE]
174 EOSE --> CONFIRM[Move items to confirmed repos/root_events] 230 EOSE --> CONFIRM[Move items to RelaySyncIndex]
175``` 231```
176 232
177**Key points:** 233**Key points:**
178 234
179- No `since` filter on initial connect - get full history 235- Always clear PendingSyncIndex first, then RelaySyncIndex
180- `handle_connect_or_reconnect` detects `is_fresh_sync` via `last_connected.is_none()` 236- L1 live + L1 historic (uses negentropy if available)
181- Layer 1: `build_announcement_filter(None)` - subscribed immediately without since 237- Empty RelaySyncIndex means diff produces AddFilters for everything
182- Layer 2+3: handled via `recompute_actions_for_relay` → `compute_actions` with PendingBatch tracking 238- L2+L3 flow through sync_computed_filters with proper pending tracking
183 239
184### Scenario 2: Quick Reconnect (less than 15 minutes) 240### Scenario 2: Quick Reconnect (< 15 minutes)
185 241
186```mermaid 242```mermaid
187flowchart TB 243flowchart TB
188 DISC[Connection lost] --> MARK[Set disconnected_at = now] 244 DISC[Connection lost] --> MARK[Set disconnected_at = now]
189 MARK --> CLEAR_PEND[Clear PendingSyncIndex for relay] 245 MARK --> WAIT[Wait for reconnection < 15min]
190 CLEAR_PEND --> WAIT[Wait for reconnection]
191 WAIT --> RECONN[Connection restored] 246 WAIT --> RECONN[Connection restored]
192 RECONN --> HC[handle_connect_or_reconnect] 247 RECONN --> CLEAR_PSI[Clear PendingSyncIndex]
193 248 CLEAR_PSI --> L1_LIVE[L1: sync_live - announcements]
194 subgraph handle_connect_or_reconnect - Quick Reconnect 249 L1_LIVE --> L1_HIST[L1: historic_sync WITH since]
195 HC --> CHECK{is_fresh_sync?} 250 L1_HIST --> RECON[reconstruct_filters from RelaySyncIndex]
196 CHECK --> |no - last_connected exists AND less than 15min| SINCE[since = last_connected - 15min] 251 RECON --> L23_LIVE[L2+L3: sync_live]
197 SINCE --> L1[build_announcement_filter - with since] 252 RECON --> L23_HIST[L2+L3: historic_sync WITH since]
198 L1 --> L23[rebuild_layer2_and_layer3 - with since] 253 L23_HIST --> CA[compute_actions]
199 L23 --> RCA[recompute_actions_for_relay] 254 CA --> |check for new items| AF{New items?}
200 end 255 AF --> |yes| SFRE[sync_computed_filters]
201 256 AF --> |no| DONE[Done]
202 RCA --> AF[AddFilters for new items only] 257 SFRE --> PB[Create PendingBatch]
203 AF --> SUB[Subscribe]
204 SUB --> PB[Create PendingBatch]
205 PB --> EOSE[Wait for EOSE]
206 EOSE --> EXTEND[Extend confirmed state]
207``` 258```
208 259
209**Key points:** 260**Key points:**
210 261
211- PendingSyncIndex cleared on disconnect (not reconnect) 262- Clear PendingSyncIndex first (old subscriptions are dead)
212- `handle_connect_or_reconnect`: 263- L1 live (always on any connection)
213 1. `build_announcement_filter(Some(since))` - Layer 1 with since 264- L1 historic WITH since (catches up missed announcements)
214 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since 265- L2+L3 rebuilt from RelaySyncIndex (confirmed state preserved)
215 3. `recompute_actions_for_relay` - check for new items 266- compute_actions checks for any NEW items discovered during catchup
216- since = last_connected - 15min ensures we catch events during disconnection
217 267
218### Scenario 3: Stale Reconnect (greater than 15 minutes) 268### Scenario 3: Long Reconnect (> 15 minutes)
219 269
220```mermaid 270```mermaid
221flowchart TB 271flowchart TB
222 RECONN[Connection restored] --> HC[handle_connect_or_reconnect] 272 RECONN[Connection restored > 15min] --> METRIC[Record disconnect/reconnect metric]
273 METRIC --> FRESH[fresh_start]
274 FRESH --> |same as initial connect| DONE[Full sync initiated]
275```
223 276
224 subgraph handle_connect_or_reconnect - Stale Reconnect 277**Key points:**
225 HC --> CHECK{is_fresh_sync?}
226 CHECK --> |yes - disconnected greater than 15min| CLEAR[clear_sync_state]
227 CLEAR --> L1[build_announcement_filter - no since]
228 L1 --> RCA[recompute_actions_for_relay]
229 end
230 278
231 RCA --> CA[compute_actions with empty confirmed] 279- Records disconnect/reconnect as a metric
232 CA --> AF[AddFilters for everything] 280- Delegates to fresh_start() - same as initial connect
233 AF --> SUB[Subscribe - no since filter] 281- State too stale to trust, start fresh
234 SUB --> PB[Create PendingBatch] 282
235 PB --> EOSE[Wait for EOSE] 283### Scenario 4: Consolidation (Filter Count > Threshold)
236 EOSE --> CONFIRM[Populate confirmed state fresh] 284
285```mermaid
286flowchart TB
287 CHECK[Filter count check] --> THRESHOLD{count > 70?}
288 THRESHOLD --> |yes| CLEAR_PSI[Clear PendingSyncIndex]
289 CLEAR_PSI --> UNSUB[unsubscribe_all]
290 UNSUB --> RECON[reconstruct_filters from RelaySyncIndex]
291 RECON --> L1_LIVE[L1: sync_live]
292 RECON --> L23_LIVE[L2+L3: sync_live]
293 L23_LIVE --> CA[compute_actions]
294 CA --> |check for new items| AF{New items?}
295 AF --> |yes| SFRE[sync_computed_filters]
296 AF --> |no| DONE[Done]
297 THRESHOLD --> |no| SKIP[Continue normally]
237``` 298```
238 299
239**Key points:** 300**Key points:**
240 301
241- `should_clear_state()` returns true → triggers fresh sync 302- Clear PendingSyncIndex first
242- Same path as initial connect after clearing state 303- NO historic sync needed - items already synced/syncing
243- Layer 1: `build_announcement_filter(None)` - full history 304- Only rebuilds live subscriptions from confirmed state
244- Layer 2+3: handled via empty confirmed state → compute_actions generates AddFilters for everything 305- compute_actions catches any new items that need syncing
245 306
246### Scenario 4: Consolidation (Triggered on Filter Add) 307### Scenario 5: Daily Sync (23-25h Random Timer)
247 308
248```mermaid 309```mermaid
249flowchart TB 310flowchart TB
250 AF[handle_add_filters called] --> COUNT{current + new > 70?} 311 TIMER[Daily timer fires] --> FRESH[fresh_start]
251 COUNT --> |yes| CONSOLIDATE[consolidate] 312 FRESH --> |NO disconnect metric| DONE[Full sync initiated]
252 CONSOLIDATE --> WAIT_PEND[wait_pending_complete]
253 WAIT_PEND --> CLOSE[unsubscribe_all]
254 CLOSE --> SINCE[since = now - 15min]
255 SINCE --> L1[build_announcement_filter - with since]
256 L1 --> L23[rebuild_layer2_and_layer3 - with since]
257 COUNT --> |no| SUB[Subscribe new filters]
258 SUB --> PB[Create PendingBatch]
259``` 313```
260 314
261**Key points:** 315**Key points:**
262 316
263- Consolidation checked in `handle_add_filters` BEFORE adding new filters 317- Same as fresh_start() but WITHOUT recording disconnect/reconnect metric
264- After closing all subscriptions, re-subscribe: 318- Ensures consistency, detects any drift accumulated over 24 hours
265 1. `build_announcement_filter(Some(since))` - Layer 1 stays active with since
266 2. `rebuild_layer2_and_layer3(since)` - Layer 2+3 with since
267- `since = now - 15min` prevents re-fetching old events
268- Keeps confirmed state, just reduces filter count
269 319
270### Scenario 5: Daily Timer (23-25h Random) 320### Scenario 6: Self-Subscriber Batch
271 321
272```mermaid 322```mermaid
273flowchart TB 323flowchart TB
274 DAILY[Daily timer fires] --> CLOSE[unsubscribe_all] 324 EVENTS[Events from own relay] --> QUEUE[Queue to pending batch]
275 CLOSE --> CLEAR_PEND[Clear PendingSyncIndex for relay] 325 QUEUE --> TIMER[Batch timer fires - 5 seconds]
276 CLEAR_PEND --> CLEAR_STATE[clear_sync_state] 326 TIMER --> UPDATE[Update RepoSyncIndex]
277 CLEAR_STATE --> L1[build_announcement_filter - no since] 327 UPDATE --> CA[compute_actions]
278 L1 --> RCA[recompute_actions_for_relay] 328 CA --> |new repos/events discovered| AF[AddFilters]
279 RCA --> CA[compute_actions with empty confirmed] 329 AF --> SFRE[sync_computed_filters]
280 CA --> AF[AddFilters for everything] 330 SFRE --> LIVE[sync_live - L2+L3]
281 AF --> SUB[Subscribe - no since filter] 331 SFRE --> HIST[historic_sync - L2+L3]
282 SUB --> PB[Create PendingBatch]
283 PB --> EOSE[Wait for EOSE]
284 EOSE --> CONFIRM[Repopulate confirmed state]
285``` 332```
286 333
287**Key points:** 334**Key points:**
288 335
289- Daily timer is a full fresh sync, NOT consolidation 336- Self-subscriber monitors own relay for 30617, 1617, 1618, 1619, 1621
290- Clears both PendingSyncIndex and confirmed state 337- Batches events (5 second window)
291- Layer 1: `build_announcement_filter(None)` - full history 338- Updates RepoSyncIndex, then compute_actions finds new work
292- Layer 2+3: via compute_actions with empty confirmed - full history 339- New items flow through sync_computed_filters
293- Detects any state drift accumulated over 24 hours
294 340
295--- 341---
296 342
@@ -300,8 +346,6 @@ flowchart TB
300 346
301Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay. 347Transforms the repo-centric `RepoSyncIndex` into a relay-centric view. For each relay URL mentioned in any repo's announcements, collects all the repos and root events that should be synced from that relay.
302 348
303**Implementation:** [`derive_relay_targets()`](../../src/sync/algorithms.rs:61)
304
305```rust 349```rust
306// Conceptual: inverts repo → relays to relay → repos 350// Conceptual: inverts repo → relays to relay → repos
307fn derive_relay_targets(repo_index: &HashMap<String, RepoSyncNeeds>) 351fn derive_relay_targets(repo_index: &HashMap<String, RepoSyncNeeds>)
@@ -320,104 +364,262 @@ Performs a three-way diff: `target - pending - confirmed = new`
320 364
321Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect). 365Only creates `AddFilters` actions for items not already pending or confirmed. Skips disconnected relays (they will get AddFilters on reconnect).
322 366
323**Implementation:** [`compute_actions()`](../../src/sync/algorithms.rs:96) 367```rust
368fn compute_actions(
369 targets: &HashMap<String, RelaySyncNeeds>,
370 pending: &PendingSyncIndex,
371 confirmed: &RelaySyncIndex,
372) -> Vec<AddFilters>
373```
324 374
325--- 375---
326 376
327## Filter Building (Three-Layer Strategy) 377## Method Specifications
378
379### Primitives
380
381#### `sync_live()` - Live Subscriptions
382
383```rust
384/// Set up live subscription (filters with limit: 0)
385///
386/// - Uses `limit: 0` to receive only new events
387/// - NOT tracked in PendingSyncIndex (state reconstructable)
388async fn sync_live(&self, relay_url: &str, filters: &[Filter])
389```
390
391#### `historic_sync()` - Historical Sync Dispatcher
328 392
329The filter strategy uses three layers to ensure comprehensive event coverage: 393```rust
394/// Dispatch to appropriate historic sync method based on relay capabilities
395///
396/// Both paths update PendingSyncIndex to ensure consistent lifecycle tracking.
397async fn historic_sync(
398 &mut self,
399 relay_url: &str,
400 filters: Vec<Filter>,
401 items: PendingItems,
402 since: Option<Timestamp>,
403) -> Option<u64> // Returns batch_id
404```
405
406Dispatches to:
407
408- `historic_sync_negentropy()` - NIP-77 parallel sync (if supported)
409- `historic_sync_legacy()` - REQ+EOSE fallback
410
411### Building Blocks
412
413#### `reconstruct_filters()` - Rebuild from Confirmed State
414
415```rust
416/// Reconstruct filters from RelaySyncIndex (confirmed state ONLY)
417///
418/// Returns raw Vec<Filter> for L1+L2+L3.
419/// Used by: quick_reconnect, consolidate
420/// Does NOT include pending items - those flow through AddFilters path.
421async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter>
422```
423
424#### `sync_computed_filters()` - Handle New AddFilters
425
426```rust
427/// Process AddFilters action (from compute_actions)
428///
429/// Orchestrates both live and historic sync for NEW items:
430/// 1. sync_live() - set up permanent L2+L3 subscriptions
431/// 2. historic_sync() - catch up on past events
432///
433/// This is specifically for NEW filter discovery.
434async fn sync_computed_filters(
435 &mut self,
436 action: AddFilters,
437 since: Option<Timestamp>,
438) -> Option<u64>
439```
440
441### Top-Level Entry Points
442
443#### `fresh_start()` - Clean Slate Sync
444
445```rust
446/// Fresh start - clears state and does full sync
447///
448/// Called by: initial connect, long_reconnect, daily_sync
449///
450/// Flow:
451/// 1. Clear PendingSyncIndex
452/// 2. Clear RelaySyncIndex
453/// 3. L1 live + L1 historic (negentropy if available)
454/// 4. compute_actions → AddFilters → sync_computed_filters for L2+L3
455async fn fresh_start(&mut self, relay_url: &str)
456```
457
458#### `quick_reconnect()` - Short Disconnection Recovery
459
460```rust
461/// Quick reconnect - for disconnections < 15 minutes
462///
463/// Flow:
464/// 1. Clear PendingSyncIndex
465/// 2. L1 live + L1 historic(since)
466/// 3. reconstruct_filters → L2+L3 live + L2+L3 historic(since)
467/// 4. compute_actions for any new items
468async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp)
469```
470
471#### `long_reconnect()` - Extended Disconnection Recovery
472
473```rust
474/// Long reconnect - for disconnections > 15 minutes
475///
476/// Flow:
477/// 1. Record disconnect/reconnect metric
478/// 2. fresh_start()
479async fn long_reconnect(&mut self, relay_url: &str)
480```
481
482#### `daily_sync()` - Scheduled Full Refresh
483
484```rust
485/// Daily sync - full refresh without disconnect metrics
486///
487/// Flow: fresh_start() (no disconnect metric recorded)
488async fn daily_sync(&mut self, relay_url: &str)
489```
490
491#### `consolidate()` - Filter Count Reduction
492
493```rust
494/// Consolidate subscriptions when filter count exceeds threshold
495///
496/// Flow:
497/// 1. Clear PendingSyncIndex
498/// 2. unsubscribe_all
499/// 3. reconstruct_filters → sync_live only (L1+L2+L3)
500/// 4. compute_actions for any new items
501///
502/// NO historic sync - items already synced, just reducing subscriptions
503async fn consolidate(&mut self, relay_url: &str)
504```
505
506#### `handle_new_sync_filters()` - New Filter Discovery
507
508```rust
509/// Handle AddFilters action from compute_actions
510///
511/// Flow:
512/// 1. Check/spawn connection if needed
513/// 2. maybe_consolidate (check filter threshold)
514/// 3. sync_computed_filters
515async fn handle_new_sync_filters(&mut self, action: AddFilters)
516```
517
518---
519
520## Method Relationships Summary
521
522```
523fresh_start(relay_url) // Initial/long_reconnect/daily
524 ├──> Clear PendingSyncIndex
525 ├──> Clear RelaySyncIndex
526 ├──> L1: sync_live(announcement_filter)
527 ├──> L1: historic_sync(announcement_filter, None)
528 └──> compute_actions → AddFilters → sync_computed_filters (L2+L3)
529
530quick_reconnect(relay_url, since) // Disconnected < 15 min
531 ├──> Clear PendingSyncIndex
532 ├──> L1: sync_live(announcement_filter)
533 ├──> L1: historic_sync(announcement_filter, since)
534 ├──> reconstruct_filters() → L2+L3 filters
535 ├──> L2+L3: sync_live(filters)
536 ├──> L2+L3: historic_sync(filters, since)
537 └──> compute_actions → AddFilters → sync_computed_filters (new items only)
538
539long_reconnect(relay_url) // Disconnected > 15 min
540 ├──> Record disconnect/reconnect metric
541 └──> fresh_start()
542
543daily_sync(relay_url) // Timer fires
544 └──> fresh_start() // No disconnect metric
545
546consolidate(relay_url) // Filter count > threshold
547 ├──> Clear PendingSyncIndex
548 ├──> unsubscribe_all()
549 ├──> reconstruct_filters() → L1+L2+L3 filters
550 ├──> sync_live(filters) // Live only, NO historic
551 └──> compute_actions → AddFilters → sync_computed_filters (new items only)
552
553handle_new_sync_filters(action) // New filter discovery
554 ├──> Check/spawn connection
555 ├──> maybe_consolidate()
556 └──> sync_computed_filters(action, None)
557
558sync_computed_filters(action, since) // Process AddFilters
559 ├──> sync_live(action.filters) // L2+L3 live
560 └──> historic_sync(action.filters, since) // L2+L3 historic
561 ├── historic_sync_negentropy() // Parallel, updates Pending
562 └── historic_sync_legacy() // REQ+EOSE, updates Pending
563```
564
565---
566
567## Filter Building (Three-Layer Strategy)
330 568
331### Layer 1: Announcements 569### Layer 1: Announcements
332 570
333- **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists) 571- **Kinds**: 30617 (Repository Announcements), 30618 (Maintainer Lists)
334- **When subscribed**: ONCE on connect, NOT rebuilt during consolidation 572- **When subscribed**: On connect (any type) - handled by connection lifecycle
335- **Function**: [`build_announcement_filter()`](../../src/sync/filters.rs:20) 573- **Function**: `build_announcement_filter(since: Option<Timestamp>)`
336- 30618 is ONLY synced from remote relays, not self-subscribed 574- 30618 is ONLY synced from remote relays, not self-subscribed
337 575
338### Layer 2: Events Tagging Our Repos 576### Layer 2: Events Tagging Our Repos
339 577
340- **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage 578- **Tags**: lowercase `a`, uppercase `A`, and `q` tags for comprehensive coverage
341- **Batching**: Per 100 repo refs 579- **Batching**: Per 100 repo refs
342- **Function**: [`tagged_one_of_our_repo_event_filters()`](../../src/sync/filters.rs:43) 580- **Function**: `build_repo_tag_filters(repos, since)`
343 581
344### Layer 3: Events Tagging Our Root Events 582### Layer 3: Events Tagging Our Root Events
345 583
346- **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage 584- **Tags**: lowercase `e`, uppercase `E`, and `q` tags for comprehensive coverage
347- **Batching**: Per 100 event IDs 585- **Batching**: Per 100 event IDs
348- **Function**: [`tagged_one_of_our_root_event_filters()`](../../src/sync/filters.rs:98) 586- **Function**: `build_root_event_tag_filters(root_events, since)`
349 587
350### Combined Layer 2+3 588### Combined Layer 2+3
351 589
352The [`build_layer2_and_layer3_filters()`](../../src/sync/filters.rs:152) function combines both layers. Used by: 590The `build_layer2_and_layer3_filters()` function combines both layers. Used by:
353
354- `compute_actions` for incremental subscriptions
355- `rebuild_layer2_and_layer3` during reconnection
356- Consolidation rebuilds (Layer 1 remains active separately)
357 591
358**Key insight**: Layer 1 is connection-level (subscribe once), Layer 2+3 are item-level (managed by compute_actions and PendingBatch). 592- `sync_computed_filters` for new item subscriptions
593- `reconstruct_filters` for rebuilding from confirmed state
359 594
360--- 595---
361 596
362## SyncManager Key Methods 597## NIP-77 Negentropy Sync
363
364The [`SyncManager`](../../src/sync/mod.rs:308) orchestrates all sync operations. Key methods:
365
366### Connection Lifecycle
367
368| Method | Purpose |
369| ------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
370| `handle_connect_or_reconnect()` | Unified handler for initial connect and reconnect. Determines fresh vs quick reconnect based on `last_connected` and 15-minute rule |
371| `handle_disconnect()` | Updates RelayState to Disconnected, sets disconnected_at, clears pending batches, records failure in health tracker |
372| `spawn_relay_connection()` | Creates RelayConnection, subscribes to Layer 1, spawns event loop task |
373
374### Sync Operations
375
376| Method | Purpose |
377| ------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
378| `handle_add_filters()` | Auto-spawns connection if needed, checks consolidation threshold (>70 filters), subscribes and creates PendingBatch |
379| `handle_eose()` | Processes EOSE for subscription, moves items from pending to confirmed when batch completes |
380| `recompute_actions_for_relay()` | Runs derive_relay_targets → compute_actions for a specific relay to find new items |
381| `rebuild_layer2_and_layer3()` | Rebuilds subscriptions from confirmed state with optional since filter |
382
383### Maintenance
384
385| Method | Purpose |
386| --------------------- | -------------------------------------------------------------------------- |
387| `daily_sync()` | Full fresh sync - unsubscribes all, clears state, recomputes actions |
388| `consolidate()` | Reduces filter count by unsubscribing and rebuilding with combined filters |
389| `check_disconnects()` | Periodic check for empty relays (no repos) to disconnect |
390| `check_reconnects()` | Attempts reconnection for disconnected relays with pending work |
391 598
392--- 599### What is Negentropy?
393 600
394## Self-Subscriber 601NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences.
395 602
396The [`SelfSubscriber`](../../src/sync/self_subscriber.rs:86) monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`. 603### When Negentropy is Used
397 604
398### Event Kinds Monitored 605Negentropy sync is attempted for:
399 606
400- **30617** - Repository Announcements (triggers discovery of repos listing our relay) 607- **fresh_start()** - Full sync without `since`
401- **1617** - Patches (root events referencing repos) 608- **daily_sync()** - Periodic full refresh (via fresh_start)
402- **1618** - Issues 609- **long_reconnect()** - Via fresh_start
403- **1619** - Replies/Status
404- **1621** - Pull Requests
405 610
406Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays. 611Negentropy is NOT used for:
407 612
408### Batching Flow 613- **quick_reconnect()** - Uses REQ with `since` (more efficient for small gaps)
614- **Live subscriptions** - Always use REQ with `limit: 0`
409 615
4101. **Receive events** from own relay subscription 616### Fallback Behavior
4112. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID
4123. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events
4134. **Process batch**:
414 - Update `RepoSyncIndex` with discovered repos and root events
415 - Call `derive_relay_targets()` → `compute_actions()`
416 - Send `AddFilters` actions to SyncManager
417 617
418### Reconnection 618If negentropy fails (relay doesn't support NIP-77, network error, etc.):
419 619
420Uses `last_connected` timestamp to apply since filter on reconnect (15-minute buffer), similar to external relay reconnection logic. 6201. A warning is logged (once per relay to avoid spam)
6212. The sync falls back to traditional REQ+EOSE
6223. No error is raised - fallback is automatic
421 623
422--- 624---
423 625
@@ -431,12 +633,14 @@ flowchart TB
431 end 633 end
432 634
433 subgraph RepoSyncIndex - What We Want 635 subgraph RepoSyncIndex - What We Want
434 RSI[HashMap: Repo to Relays+Events] 636 RSI[HashMap: Repo Relays+Events]
435 end 637 end
436 638
437 subgraph Derived Target 639 subgraph Triggers
438 DT[derive_relay_targets fn] 640 T1[Self-subscriber batch]
439 TGT[Per-relay: repos + events we should sync] 641 T2[fresh_start after L1]
642 T3[quick_reconnect after catchup]
643 T4[consolidate after live rebuild]
440 end 644 end
441 645
442 subgraph compute_actions - Decision Point 646 subgraph compute_actions - Decision Point
@@ -449,136 +653,41 @@ flowchart TB
449 653
450 subgraph RelaySyncIndex - Confirmed State 654 subgraph RelaySyncIndex - Confirmed State
451 RLI[RelayState per relay] 655 RLI[RelayState per relay]
452 CONN[connection_status]
453 REPOS[repos + root_events]
454 TIMES[last_connected + disconnected_at]
455 end 656 end
456 657
457 SS -->|subscribe| OWN 658 SS -->|subscribe| OWN
458 OWN -->|events| SS 659 OWN -->|events| SS
459 SS -->|batch fires| RSI 660 SS -->|batch fires| RSI
460 RSI --> DT 661 RSI --> T1
461 DT --> TGT 662 T1 --> CA
462 TGT --> CA 663 T2 --> CA
664 T3 --> CA
665 T4 --> CA
463 PSI --> CA 666 PSI --> CA
464 RLI --> CA 667 RLI --> CA
465 CA -->|Layer 2+3 new items| AF[AddFilters] 668 CA -->|new items| AF[AddFilters]
466 AF -->|check filter count| CONSOL{count + new > 70?} 669 AF --> SFRE[sync_computed_filters]
467 CONSOL -->|yes| CONSOLIDATE[consolidate] 670 SFRE --> LIVE[sync_live L2+L3]
468 CONSOLIDATE --> L1_CONSOL[build_announcement_filter with since] 671 SFRE --> HIST[historic_sync L2+L3]
469 L1_CONSOL --> L23_CONSOL[rebuild_layer2_and_layer3 with since] 672 HIST --> PSI
470 CONSOL -->|no| SUB[subscribe] 673 PSI -->|EOSE| RLI
471 AF -->|spawn if needed| CONN
472 SUB --> PSI
473 PSI -->|EOSE| REPOS
474
475 CONN -->|disconnect| DISC[Clear PSI + set disconnected_at]
476 DISC -->|any reconnect| HC[handle_connect_or_reconnect]
477
478 subgraph handle_connect_or_reconnect
479 HC --> FRESH_CHECK{is_fresh_sync?}
480 FRESH_CHECK -->|yes: no last_connected OR >15min| L1_FRESH[build_announcement_filter - no since]
481 FRESH_CHECK -->|no: quick reconnect| L1_QUICK[build_announcement_filter - with since]
482 L1_FRESH --> RCA1[recompute_actions_for_relay]
483 L1_QUICK --> L23_QUICK[rebuild_layer2_and_layer3 - with since]
484 L23_QUICK --> RCA2[recompute_actions_for_relay]
485 end
486``` 674```
487 675
488--- 676---
489 677
490## Key Design Decisions 678## Key Design Decisions
491 679
492| Decision | Choice | Rationale | 680| Decision | Choice | Rationale |
493| -------------------------- | -------------------------------------- | --------------------------------------------------------------------------- | 681| ----------------------------- | ------------------------------------------- | ------------------------------------------------------------------ |
494| Startup mechanism | Self-subscription only | Single code path, fresh DB behaves same as reconnect | 682| Live vs Historic separation | Two distinct primitives | Clear responsibilities, easier reasoning about state |
495| Connect/reconnect handling | Unified handle_connect_or_reconnect | Single entry point for both initial and reconnect | 683| Live sync method | `limit: 0` not `since: now` | No clock dependency, deterministic, mirrors filter structure |
496| Layer 1 handling | Separate build_announcement_filter | Connection-level: subscribe ONCE on connect, NOT rebuilt in consolidation | 684| Layer 1 handling | On connect, separate from AddFilters | Connection-level concern, not item-level |
497| Layer 2+3 handling | Separate rebuild_layer2_and_layer3 | Item-level: managed by compute_actions, consolidated when filter count > 70 | 685| Layer 2+3 handling | Via compute_actions → sync_computed_filters | Item-level, proper pending tracking |
498| Filter functions | since as Option parameter | Allows same functions for fresh sync and catch-up | 686| Clear PendingSyncIndex | Always first | Old subscriptions are dead, must clear before any operation |
499| Since filter | Only on catch-up paths | Initial/stale gets full history, quick reconnect catches up | 687| fresh_start vs long_reconnect | Same flow, different metrics | Reuse logic, distinguish intentional refresh from failure recovery |
500| compute_actions role | ONLY for new Layer 2+3 items | Does NOT handle Layer 1 or catch-up | 688| Consolidation | Live only, no historic | Items already synced, just reducing subscription count |
501| Catch-up pending tracking | No PendingBatch | Items already confirmed, don't need re-confirmation | 689| compute_actions role | ONLY decision point for new work | Single place to reason about what needs syncing |
502| Consolidation trigger | On filter add, not periodic | Check in handle_add_filters before adding new filters | 690| NIP-77 negentropy | Try first on full sync, fallback | Efficient for large sets, graceful degradation |
503| Clear on disconnect | Clear PSI on disconnect | Cleanup at event boundary, simpler than on reconnect |
504| 15-minute rule | Clear confirmed if disconnected >15min | Matches since filter buffer, prevents stale subscriptions |
505| Daily timer | Fresh sync (clears state) | Ensures consistency, detects drift |
506| NIP-77 negentropy | Try first, fallback to REQ | Efficient set reconciliation when supported |
507
508---
509
510## NIP-77 Negentropy Sync
511
512The sync system supports NIP-77 negentropy for efficient set reconciliation when syncing with external relays.
513
514### What is Negentropy?
515
516NIP-77 defines the negentropy protocol for efficient event set comparison. Instead of requesting all events matching a filter (REQ+EOSE), negentropy allows relays to compare fingerprints of their event sets and only transfer the differences.
517
518### When Negentropy is Used
519
520Negentropy sync is attempted for:
521
522- **Initial connect** - Fresh sync without `last_connected`
523- **Daily sync** - Periodic full refresh (23-25 hour timer)
524- **Stale reconnect** - Disconnected for more than 15 minutes
525
526Negentropy is NOT used for:
527
528- **Quick reconnect** - Less than 15 minutes disconnected (uses REQ with `since`)
529- **Live subscriptions** - Ongoing event streams always use REQ
530
531### Implementation
532
533The [`RelayConnection`](../../src/sync/relay_connection.rs:71) now includes NIP-77 methods:
534
535```rust
536/// Check if negentropy sync should be attempted
537pub async fn supports_negentropy(&self) -> bool {
538 // Always returns true - we try negentropy and handle failure gracefully
539 true
540}
541
542/// Perform negentropy synchronization for a filter
543pub async fn negentropy_sync_filter(&self, filter: Filter)
544 -> Result<NegentropySyncResult, String> {
545 // Uses nostr-sdk's client.sync() method
546}
547```
548
549### Sync Flow with Negentropy
550
551```mermaid
552flowchart TB
553 CONNECT[Connect to relay] --> NEG{Try negentropy}
554 NEG --> |success| L1[Layer 1 synced via negentropy]
555 NEG --> |failure| FALLBACK[Fall back to REQ+EOSE]
556
557 L1 --> SINCE[Record timestamp = now]
558 FALLBACK --> EOSE[Wait for EOSE]
559 EOSE --> SINCE
560
561 SINCE --> LIVE[Open live REQ with since=now]
562```
563
564### Fallback Behavior
565
566If negentropy fails (relay doesn't support NIP-77, network error, etc.):
567
5681. A warning is logged (once per relay to avoid spam)
5692. The sync falls back to traditional REQ+EOSE
5703. No error is raised - fallback is automatic
571
572**Implementation:** [`negentropy_sync_and_process()`](../../src/sync/mod.rs:1549)
573
574### Key Design Decisions for Negentropy
575
576| Decision | Choice | Rationale |
577| ------------------ | --------------------------- | ------------------------------------------------- |
578| Detection approach | Try and fallback | More reliable than NIP-11 document detection |
579| When to use | Fresh/daily/stale sync only | Quick reconnect with `since` is already efficient |
580| Error handling | Log once, fallback silently | Avoid log spam while maintaining visibility |
581| Layer application | Layer 1 first | Announcements are highest priority |
582 691
583--- 692---
584 693
@@ -586,8 +695,8 @@ If negentropy fails (relay doesn't support NIP-77, network error, etc.):
586 695
587``` 696```
588src/sync/ 697src/sync/
589├── mod.rs # SyncManager, main loop, data structures (RepoSyncNeeds, RelayState, etc.) 698├── mod.rs # SyncManager, main loop, data structures
590├── algorithms.rs # derive_relay_targets(), compute_actions(), AddFilters 699├── algorithms.rs # derive_relay_targets(), compute_actions()
591├── filters.rs # build_announcement_filter(), build_layer2_and_layer3_filters() 700├── filters.rs # build_announcement_filter(), build_layer2_and_layer3_filters()
592├── health.rs # RelayHealthTracker with exponential backoff 701├── health.rs # RelayHealthTracker with exponential backoff
593├── relay_connection.rs # RelayConnection, RelayEvent handling 702├── relay_connection.rs # RelayConnection, RelayEvent handling
@@ -599,7 +708,7 @@ src/sync/
599 708
600## Health Tracking 709## Health Tracking
601 710
602The [`RelayHealthTracker`](../../src/sync/health.rs:93) manages connection health with exponential backoff: 711The `RelayHealthTracker` manages connection health with exponential backoff:
603 712
604- **States**: Healthy, Degraded, Dead 713- **States**: Healthy, Degraded, Dead
605- **Backoff**: `base * 2^(failures-1)`, capped at max_backoff 714- **Backoff**: `base * 2^(failures-1)`, capped at max_backoff
@@ -610,6 +719,32 @@ Bootstrap relays are never disconnected by the cleanup system, even if empty.
610 719
611--- 720---
612 721
722## Self-Subscriber
723
724The `SelfSubscriber` monitors our own relay for repository announcements and root events, updating the `RepoSyncIndex`.
725
726### Event Kinds Monitored
727
728- **30617** - Repository Announcements (triggers discovery of repos listing our relay)
729- **1617** - Patches (root events referencing repos)
730- **1618** - Issues
731- **1619** - Replies/Status
732- **1621** - Pull Requests
733
734Note: 30618 (Maintainer Lists) is NOT self-subscribed - only synced from remote relays.
735
736### Batching Flow
737
7381. **Receive events** from own relay subscription
7392. **Queue to pending** - announcements get repo ID + relay URLs; root events get repo ref + event ID
7403. **Timer fires** (configurable window, default 5 seconds) - does NOT reset on new events
7414. **Process batch**:
742 - Update `RepoSyncIndex` with discovered repos and root events
743 - Call `compute_actions()`
744 - Send `AddFilters` actions to SyncManager → `sync_computed_filters()`
745
746---
747
613## Disconnect Handling 748## Disconnect Handling
614 749
615The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays: 750The disconnect checker runs periodically (default: 60 seconds) to clean up empty relays:
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 5b5b520..84248b1 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -11,7 +11,9 @@ use std::collections::{HashMap, HashSet};
11 11
12use nostr_sdk::prelude::*; 12use nostr_sdk::prelude::*;
13 13
14use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; 14use crate::sync::PendingItems;
15
16use super::{ConnectionStatus, PendingBatch, RelayState};
15 17
16// ============================================================================= 18// =============================================================================
17// Data Structures 19// Data Structures
@@ -36,10 +38,8 @@ pub struct RelaySyncNeeds {
36pub struct AddFilters { 38pub struct AddFilters {
37 /// The relay URL to add filters to 39 /// The relay URL to add filters to
38 pub relay_url: String, 40 pub relay_url: String,
39 /// Repos being synced in this action 41 /// pending items - repos and root events
40 pub repos: HashSet<String>, 42 pub items: PendingItems,
41 /// Root events being tracked in this action
42 pub root_events: HashSet<EventId>,
43 /// The actual filters to subscribe with 43 /// The actual filters to subscribe with
44 pub filters: Vec<Filter>, 44 pub filters: Vec<Filter>,
45} 45}
@@ -161,8 +161,10 @@ pub fn compute_actions(
161 161
162 actions.push(AddFilters { 162 actions.push(AddFilters {
163 relay_url: relay_url.clone(), 163 relay_url: relay_url.clone(),
164 repos: new_repos, 164 items: PendingItems {
165 root_events: new_events, 165 repos: new_repos,
166 root_events: new_events,
167 },
166 filters, 168 filters,
167 }); 169 });
168 } 170 }
@@ -175,6 +177,7 @@ pub fn compute_actions(
175mod tests { 177mod tests {
176 use super::*; 178 use super::*;
177 use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; 179 use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds;
180 use crate::sync::SyncMethod;
178 181
179 // ========================================================================= 182 // =========================================================================
180 // derive_relay_targets tests 183 // derive_relay_targets tests
@@ -371,7 +374,7 @@ mod tests {
371 assert_eq!(actions.len(), 1); 374 assert_eq!(actions.len(), 1);
372 let action = &actions[0]; 375 let action = &actions[0];
373 assert_eq!(action.relay_url, "wss://relay1.com"); 376 assert_eq!(action.relay_url, "wss://relay1.com");
374 assert!(action.repos.contains("repo1")); 377 assert!(action.items.repos.contains("repo1"));
375 assert!(!action.filters.is_empty()); 378 assert!(!action.filters.is_empty());
376 } 379 }
377 380
@@ -528,10 +531,10 @@ mod tests {
528 assert_eq!(actions.len(), 1); 531 assert_eq!(actions.len(), 1);
529 let action = &actions[0]; 532 let action = &actions[0];
530 // Only repo3 should be in the action (repo1 pending, repo2 confirmed) 533 // Only repo3 should be in the action (repo1 pending, repo2 confirmed)
531 assert_eq!(action.repos.len(), 1); 534 assert_eq!(action.items.repos.len(), 1);
532 assert!(action.repos.contains("repo3")); 535 assert!(action.items.repos.contains("repo3"));
533 assert!(!action.repos.contains("repo1")); 536 assert!(!action.items.repos.contains("repo1"));
534 assert!(!action.repos.contains("repo2")); 537 assert!(!action.items.repos.contains("repo2"));
535 } 538 }
536 539
537 #[test] 540 #[test]
@@ -554,9 +557,9 @@ mod tests {
554 557
555 assert_eq!(actions.len(), 1); 558 assert_eq!(actions.len(), 1);
556 let action = &actions[0]; 559 let action = &actions[0];
557 assert!(action.repos.is_empty()); 560 assert!(action.items.repos.is_empty());
558 assert_eq!(action.root_events.len(), 1); 561 assert_eq!(action.items.root_events.len(), 1);
559 assert!(action.root_events.contains(&event_id)); 562 assert!(action.items.root_events.contains(&event_id));
560 // Should have 3 filters for the root event (e, E, q tags) 563 // Should have 3 filters for the root event (e, E, q tags)
561 assert_eq!(action.filters.len(), 3); 564 assert_eq!(action.filters.len(), 3);
562 } 565 }
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 41586a4..401cf21 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -591,7 +591,7 @@ impl SyncManager {
591 } 591 }
592 592
593 // Recompute actions for Layer 2+3 based on synced events 593 // Recompute actions for Layer 2+3 based on synced events
594 self.recompute_actions_for_relay(relay_url).await; 594 self.recompute_new_sync_filters_for_relay(relay_url).await;
595 } else { 595 } else {
596 // NIP-77 not supported - fall back to REQ+EOSE 596 // NIP-77 not supported - fall back to REQ+EOSE
597 tracing::info!( 597 tracing::info!(
@@ -612,7 +612,7 @@ impl SyncManager {
612 } 612 }
613 613
614 // Recompute actions for Layer 2+3 - will discover all repos/events again 614 // Recompute actions for Layer 2+3 - will discover all repos/events again
615 self.recompute_actions_for_relay(relay_url).await; 615 self.recompute_new_sync_filters_for_relay(relay_url).await;
616 } 616 }
617 617
618 if let Some(ref metrics) = self.metrics { 618 if let Some(ref metrics) = self.metrics {
@@ -709,7 +709,7 @@ impl SyncManager {
709 Some(add_filters) => { 709 Some(add_filters) => {
710 // Process AddFilters action directly 710 // Process AddFilters action directly
711 let mut manager = sync_manager.lock().await; 711 let mut manager = sync_manager.lock().await;
712 manager.handle_add_filters(add_filters).await; 712 manager.handle_new_sync_filters(add_filters).await;
713 } 713 }
714 None => break, 714 None => break,
715 } 715 }
@@ -763,13 +763,13 @@ impl SyncManager {
763 /// - For new relays: creates entry with Connecting status, spawns connection 763 /// - For new relays: creates entry with Connecting status, spawns connection
764 /// - For existing connected relays: subscribes to filters, creates PendingBatch 764 /// - For existing connected relays: subscribes to filters, creates PendingBatch
765 /// - For disconnected/connecting relays: returns (will be handled on connection) 765 /// - For disconnected/connecting relays: returns (will be handled on connection)
766 async fn handle_add_filters(&mut self, action: AddFilters) { 766 async fn handle_new_sync_filters(&mut self, action: AddFilters) {
767 tracing::info!( 767 tracing::info!(
768 relay = %action.relay_url, 768 relay = %action.relay_url,
769 repo_count = action.repos.len(), 769 repo_count = action.items.repos.len(),
770 root_event_count = action.root_events.len(), 770 root_event_count = action.items.root_events.len(),
771 filter_count = action.filters.len(), 771 filter_count = action.filters.len(),
772 "[DIAG] handle_add_filters called" 772 "[DIAG] handle_new_sync_filters called"
773 ); 773 );
774 774
775 // Step 1: Check if relay exists in relay_sync_index 775 // Step 1: Check if relay exists in relay_sync_index
@@ -801,7 +801,7 @@ impl SyncManager {
801 801
802 tracing::info!( 802 tracing::info!(
803 relay = %action.relay_url, 803 relay = %action.relay_url,
804 repos = action.repos.len(), 804 repos = action.items.repos.len(),
805 "Spawning connection for new relay" 805 "Spawning connection for new relay"
806 ); 806 );
807 807
@@ -827,7 +827,7 @@ impl SyncManager {
827 // Step 2: Check if consolidation is needed BEFORE adding new filters 827 // Step 2: Check if consolidation is needed BEFORE adding new filters
828 self.maybe_consolidate(&action.relay_url, action.filters.len()) 828 self.maybe_consolidate(&action.relay_url, action.filters.len())
829 .await; 829 .await;
830 830 /// DELETE this bit
831 // Step 3: Get connection and subscribe to all filters 831 // Step 3: Get connection and subscribe to all filters
832 let connection = match self.connections.get(&action.relay_url) { 832 let connection = match self.connections.get(&action.relay_url) {
833 Some(conn) => conn, 833 Some(conn) => conn,
@@ -870,8 +870,8 @@ impl SyncManager {
870 let batch = PendingBatch { 870 let batch = PendingBatch {
871 batch_id, 871 batch_id,
872 items: PendingItems { 872 items: PendingItems {
873 repos: action.repos.clone(), 873 repos: action.items.repos.clone(),
874 root_events: action.root_events.clone(), 874 root_events: action.items.root_events.clone(),
875 }, 875 },
876 outstanding_subs: subscription_ids.into_iter().collect(), 876 outstanding_subs: subscription_ids.into_iter().collect(),
877 sync_method: SyncMethod::ReqEose, 877 sync_method: SyncMethod::ReqEose,
@@ -889,33 +889,84 @@ impl SyncManager {
889 tracing::debug!( 889 tracing::debug!(
890 relay = %action.relay_url, 890 relay = %action.relay_url,
891 batch_id = batch_id, 891 batch_id = batch_id,
892 repos = action.repos.len(), 892 repos = action.items.repos.len(),
893 root_events = action.root_events.len(), 893 root_events = action.items.root_events.len(),
894 filters = action.filters.len(), 894 filters = action.filters.len(),
895 "Created pending batch for filter subscriptions" 895 "Created pending batch for filter subscriptions"
896 ); 896 );
897 // REPLACE WITH THIS:
898 // // Subscribe to each filter and collect subscription IDs
899 // self.sync_live(&action.relay_url, &action.filters).await;
900 // // TODO need to do actions.repos
901 // self.historic_sync(&action.relay_url, action.filters, action.items, None)
902 // .await;
897 } 903 }
898 904
899 /// Handle a connection success (called when a relay connects or reconnects) 905 /// Handle a connection success (called when a relay connects or reconnects)
900 /// 906 ///
901 /// This method implements smart reconnection logic: 907 /// This method dispatches to the appropriate reconnection strategy:
902 /// - Fresh sync if never connected or >15 min since last connection 908 /// - `fresh_start()` if never connected before
903 /// - Quick reconnect with since filter if <15 min since last connection 909 /// - `quick_reconnect()` if disconnected < 15 minutes
904 /// 910 /// - `long_reconnect()` if disconnected > 15 minutes
905 /// For fresh sync (with NIP-77 negentropy if supported):
906 /// - Clears any stale state
907 /// - Uses negentropy sync for Layer 1 (if NIP-77 supported)
908 /// - Falls back to REQ+EOSE if NIP-77 not supported
909 /// - Recomputes actions for new items
910 ///
911 /// For quick reconnect:
912 /// - Preserves existing state
913 /// - Subscribes to Layer 1 with since filter
914 /// - Rebuilds Layer 2 and Layer 3 with since filter
915 /// - Recomputes actions for new items
916 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { 911 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
917 let now = Timestamp::now(); 912 let now = Timestamp::now();
918 913
914 // // Get the relay state to determine reconnect type
915 // let (last_connected, disconnected_at) = {
916 // let index = self.relay_sync_index.read().await;
917 // if let Some(state) = index.get(relay_url) {
918 // (state.last_connected, state.disconnected_at)
919 // } else {
920 // (None, None) // No state found
921 // }
922 // };
923
924 // // Determine which reconnection strategy to use
925 // match (last_connected, disconnected_at) {
926 // (None, _) => {
927 // // Never connected before - fresh start
928 // tracing::info!(
929 // relay = %relay_url,
930 // "First connection - initiating fresh_start"
931 // );
932 // self.fresh_start(relay_url).await;
933 // }
934 // (Some(last), Some(disconnected)) => {
935 // // Was connected before, check how long disconnected
936 // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs());
937
938 // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS {
939 // // Disconnected < 15 minutes - quick reconnect
940 // // Use last_connected minus buffer as since timestamp
941 // let since =
942 // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS));
943 // tracing::info!(
944 // relay = %relay_url,
945 // disconnect_secs = disconnect_duration,
946 // since = %since,
947 // "Short disconnection - initiating quick_reconnect"
948 // );
949 // self.quick_reconnect(relay_url, since).await;
950 // } else {
951 // // Disconnected > 15 minutes - long reconnect
952 // tracing::info!(
953 // relay = %relay_url,
954 // disconnect_secs = disconnect_duration,
955 // "Long disconnection - initiating long_reconnect"
956 // );
957 // self.long_reconnect(relay_url).await;
958 // }
959 // }
960 // (Some(_last), None) => {
961 // // Was connected but no disconnected_at - shouldn't happen normally
962 // // Treat as long reconnect to be safe
963 // tracing::warn!(
964 // relay = %relay_url,
965 // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect"
966 // );
967 // self.long_reconnect(relay_url).await;
968 // }
969 // }
919 // Get the relay state to determine reconnect type 970 // Get the relay state to determine reconnect type
920 let (is_fresh_sync, last_connected, is_bootstrap) = { 971 let (is_fresh_sync, last_connected, is_bootstrap) = {
921 let index = self.relay_sync_index.read().await; 972 let index = self.relay_sync_index.read().await;
@@ -998,7 +1049,7 @@ impl SyncManager {
998 1049
999 // After negentropy sync, recompute Layer 2+3 actions 1050 // After negentropy sync, recompute Layer 2+3 actions
1000 // Layer 1 events are now in sync, so we can proceed with Layer 2+3 1051 // Layer 1 events are now in sync, so we can proceed with Layer 2+3
1001 self.recompute_actions_for_relay(relay_url).await; 1052 self.recompute_new_sync_filters_for_relay(relay_url).await;
1002 1053
1003 // Set up live subscription for new events (since=now) 1054 // Set up live subscription for new events (since=now)
1004 let live_filter = filters::build_announcement_filter(Some(now)); 1055 let live_filter = filters::build_announcement_filter(Some(now));
@@ -1021,7 +1072,7 @@ impl SyncManager {
1021 // during connect_and_subscribe() in handle_add_filters(). That call subscribes 1072 // during connect_and_subscribe() in handle_add_filters(). That call subscribes
1022 // to kinds 30617+30618 for the full history. Here we only need to recompute 1073 // to kinds 30617+30618 for the full history. Here we only need to recompute
1023 // Layer 2+3 actions based on the repos we're tracking. 1074 // Layer 2+3 actions based on the repos we're tracking.
1024 self.recompute_actions_for_relay(relay_url).await; 1075 self.recompute_new_sync_filters_for_relay(relay_url).await;
1025 } 1076 }
1026 } else { 1077 } else {
1027 // Quick reconnect: use since filter (no negentropy needed) 1078 // Quick reconnect: use since filter (no negentropy needed)
@@ -1055,7 +1106,7 @@ impl SyncManager {
1055 .await; 1106 .await;
1056 1107
1057 // Recompute actions for any new items discovered while disconnected 1108 // Recompute actions for any new items discovered while disconnected
1058 self.recompute_actions_for_relay(relay_url).await; 1109 self.recompute_new_sync_filters_for_relay(relay_url).await;
1059 1110
1060 if let Some(ref metrics) = self.metrics { 1111 if let Some(ref metrics) = self.metrics {
1061 metrics.record_event(event_source::RECONNECT); 1112 metrics.record_event(event_source::RECONNECT);
@@ -1063,6 +1114,225 @@ impl SyncManager {
1063 } 1114 }
1064 } 1115 }
1065 1116
1117 /// Fresh start - clears state and does full sync
1118 ///
1119 /// Called by: initial connect, long_reconnect, daily_sync
1120 ///
1121 /// Flow:
1122 /// 1. Clear PendingSyncIndex for this relay
1123 /// 2. Clear RelaySyncIndex sync state (repos/root_events)
1124 /// 3. Update connection state to Connected
1125 /// 4. L1 live + L1 historic (negentropy if available)
1126 /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3
1127 async fn fresh_start(&mut self, relay_url: &str) {
1128 let now = Timestamp::now();
1129
1130 tracing::info!(relay = %relay_url, "Starting fresh_start");
1131
1132 // Step 1: Clear PendingSyncIndex for this relay
1133 {
1134 let mut pending = self.pending_sync_index.write().await;
1135 if pending.remove(relay_url).is_some() {
1136 tracing::debug!(
1137 relay = %relay_url,
1138 "Cleared pending batches in fresh_start"
1139 );
1140 }
1141 }
1142
1143 // Step 2: Clear RelaySyncIndex sync state (but preserve connection metadata)
1144 {
1145 let mut index = self.relay_sync_index.write().await;
1146 if let Some(state) = index.get_mut(relay_url) {
1147 let repos_cleared = state.repos.len();
1148 let events_cleared = state.root_events.len();
1149 state.clear_sync_state();
1150 if repos_cleared > 0 || events_cleared > 0 {
1151 tracing::debug!(
1152 relay = %relay_url,
1153 repos_cleared = repos_cleared,
1154 events_cleared = events_cleared,
1155 "Cleared sync state in fresh_start"
1156 );
1157 }
1158 }
1159 }
1160
1161 // Step 3: Update connection state
1162 {
1163 let mut index = self.relay_sync_index.write().await;
1164 let state = index.entry(relay_url.to_string()).or_default();
1165 state.connection_status = ConnectionStatus::Connected;
1166 state.last_connected = Some(now);
1167 state.disconnected_at = None;
1168 }
1169
1170 // Record success in health tracker
1171 self.health_tracker.record_success(relay_url);
1172
1173 // Update metrics
1174 if let Some(ref metrics) = self.metrics {
1175 metrics.set_relay_connected(relay_url, true);
1176 metrics.inc_connected_count();
1177 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1178 }
1179
1180 // Step 4: L1 sync - check negentropy support
1181 let use_negentropy = if self.config.sync_disable_negentropy {
1182 tracing::debug!(relay = %relay_url, "Negentropy disabled via config");
1183 false
1184 } else if let Some(connection) = self.connections.get(relay_url) {
1185 connection.supports_negentropy().await
1186 } else {
1187 false
1188 };
1189
1190 if use_negentropy {
1191 // NIP-77 supported - use negentropy for L1 historical sync
1192 tracing::info!(
1193 relay = %relay_url,
1194 "Using NIP-77 negentropy for L1 historical sync"
1195 );
1196
1197 // L1 historic sync (no since - full sync)
1198 let layer1_filter = filters::build_announcement_filter(None);
1199 self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)")
1200 .await;
1201
1202 // L1 live subscription (since=now for ongoing events)
1203 let live_filter = filters::build_announcement_filter(Some(now));
1204 if let Some(connection) = self.connections.get(relay_url) {
1205 if let Err(e) = connection.subscribe_filter(live_filter).await {
1206 tracing::error!(
1207 relay = %relay_url,
1208 error = %e,
1209 "Failed to set up L1 live subscription in fresh_start"
1210 );
1211 }
1212 }
1213 } else {
1214 // NIP-77 not supported - REQ+EOSE
1215 // Note: Layer 1 subscription (without since) was already established
1216 // during connect_and_subscribe() in spawn_relay_connection
1217 tracing::info!(
1218 relay = %relay_url,
1219 "Using REQ+EOSE for L1 sync (negentropy not available)"
1220 );
1221 }
1222
1223 // Step 5: compute_actions → AddFilters for L2+L3
1224 // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters
1225 // for ALL repos that should be synced from this relay
1226 self.recompute_new_sync_filters_for_relay(relay_url).await;
1227
1228 tracing::info!(relay = %relay_url, "fresh_start complete");
1229 }
1230
1231 /// Quick reconnect - for disconnections < 15 minutes
1232 ///
1233 /// Flow:
1234 /// 1. Clear PendingSyncIndex for this relay
1235 /// 2. Update connection state to Connected
1236 /// 3. L1 live + L1 historic(since)
1237 /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since)
1238 /// 5. compute_actions for any new items discovered during catchup
1239 async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) {
1240 let now = Timestamp::now();
1241
1242 tracing::info!(
1243 relay = %relay_url,
1244 since = %since,
1245 "Starting quick_reconnect"
1246 );
1247
1248 // Step 1: Clear PendingSyncIndex for this relay
1249 // Old subscriptions are dead after disconnect
1250 {
1251 let mut pending = self.pending_sync_index.write().await;
1252 if pending.remove(relay_url).is_some() {
1253 tracing::debug!(
1254 relay = %relay_url,
1255 "Cleared pending batches in quick_reconnect"
1256 );
1257 }
1258 }
1259
1260 // Step 2: Update connection state (preserve repos/root_events - that's the point!)
1261 {
1262 let mut index = self.relay_sync_index.write().await;
1263 let state = index.entry(relay_url.to_string()).or_default();
1264 state.connection_status = ConnectionStatus::Connected;
1265 state.last_connected = Some(now);
1266 state.disconnected_at = None;
1267 }
1268
1269 // Record success in health tracker
1270 self.health_tracker.record_success(relay_url);
1271
1272 // Update metrics
1273 if let Some(ref metrics) = self.metrics {
1274 metrics.set_relay_connected(relay_url, true);
1275 metrics.inc_connected_count();
1276 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1277 metrics.record_event(event_source::RECONNECT);
1278 }
1279
1280 // Step 3: L1 live + L1 historic with since filter
1281 // L1 live subscription (since=now for ongoing events)
1282 let live_filter = filters::build_announcement_filter(Some(now));
1283 if let Some(connection) = self.connections.get(relay_url) {
1284 if let Err(e) = connection.subscribe_filter(live_filter).await {
1285 tracing::error!(
1286 relay = %relay_url,
1287 error = %e,
1288 "Failed to set up L1 live subscription in quick_reconnect"
1289 );
1290 }
1291 }
1292
1293 // L1 historic with since filter (catch up on missed announcements)
1294 let layer1_filter = filters::build_announcement_filter(Some(since));
1295 if let Some(connection) = self.connections.get(relay_url) {
1296 if let Err(e) = connection.subscribe_filter(layer1_filter).await {
1297 tracing::error!(
1298 relay = %relay_url,
1299 error = %e,
1300 "Failed to subscribe to L1 historic filter in quick_reconnect"
1301 );
1302 }
1303 }
1304
1305 // Step 4: Rebuild L2+L3 from confirmed state with since filter
1306 // This uses the preserved repos/root_events from RelaySyncIndex
1307 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
1308
1309 // Step 5: compute_actions for any NEW items discovered while disconnected
1310 self.recompute_new_sync_filters_for_relay(relay_url).await;
1311
1312 tracing::info!(relay = %relay_url, "quick_reconnect complete");
1313 }
1314
1315 /// Long reconnect - for disconnections > 15 minutes
1316 ///
1317 /// Flow:
1318 /// 1. Record disconnect/reconnect metric
1319 /// 2. Delegate to fresh_start()
1320 async fn long_reconnect(&mut self, relay_url: &str) {
1321 tracing::info!(relay = %relay_url, "Starting long_reconnect");
1322
1323 // Step 1: Record disconnect/reconnect metric
1324 // This distinguishes intentional daily refresh from failure recovery
1325 if let Some(ref metrics) = self.metrics {
1326 metrics.record_event(event_source::RECONNECT);
1327 }
1328
1329 // Step 2: Delegate to fresh_start
1330 // State is too stale to trust, start fresh
1331 self.fresh_start(relay_url).await;
1332
1333 tracing::info!(relay = %relay_url, "long_reconnect complete");
1334 }
1335
1066 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay 1336 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay
1067 /// 1337 ///
1068 /// Uses the confirmed repos and root_events from RelayState to build filters. 1338 /// Uses the confirmed repos and root_events from RelayState to build filters.
@@ -1129,7 +1399,7 @@ impl SyncManager {
1129 /// 1399 ///
1130 /// Uses derive_relay_targets and compute_actions to find new items 1400 /// Uses derive_relay_targets and compute_actions to find new items
1131 /// that need to be synced. Processes AddFilters actions for new items. 1401 /// that need to be synced. Processes AddFilters actions for new items.
1132 async fn recompute_actions_for_relay(&mut self, relay_url: &str) { 1402 async fn recompute_new_sync_filters_for_relay(&mut self, relay_url: &str) {
1133 use crate::sync::algorithms::{compute_actions, derive_relay_targets}; 1403 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
1134 1404
1135 // Get current state from indexes (need to collect to avoid holding locks) 1405 // Get current state from indexes (need to collect to avoid holding locks)
@@ -1173,12 +1443,12 @@ impl SyncManager {
1173 for action in actions { 1443 for action in actions {
1174 tracing::info!( 1444 tracing::info!(
1175 relay = %action.relay_url, 1445 relay = %action.relay_url,
1176 new_repos = action.repos.len(), 1446 new_repos = action.items.repos.len(),
1177 new_root_events = action.root_events.len(), 1447 new_root_events = action.items.root_events.len(),
1178 filters = action.filters.len(), 1448 filters = action.filters.len(),
1179 "Processing AddFilters for new items" 1449 "Processing AddFilters for new items"
1180 ); 1450 );
1181 self.handle_add_filters(action).await; 1451 self.handle_new_sync_filters(action).await;
1182 } 1452 }
1183 } 1453 }
1184 1454
@@ -2095,6 +2365,366 @@ impl SyncManager {
2095 } 2365 }
2096 } 2366 }
2097 2367
2368 // =========================================================================
2369 // Sync Primitives (Phase 1 of GRASP-02 refactoring)
2370 // These methods are new primitives that will be used in subsequent phases.
2371 // =========================================================================
2372
2373 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex
2374 #[allow(dead_code)] // Will be used in Phase 2+
2375 ///
2376 /// This method subscribes to filters with `limit: 0` for receiving ongoing events.
2377 /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have
2378 /// a definite "completion" - they stay open indefinitely.
2379 ///
2380 /// Used for:
2381 /// - Layer 1 live subscription (new announcements after initial sync)
2382 /// - Layer 2+3 live subscriptions (new events after initial sync)
2383 ///
2384 /// # Arguments
2385 /// * `relay_url` - The relay URL to subscribe on
2386 /// * `filters` - Filters to subscribe to (will have `limit: 0` applied)
2387 ///
2388 /// # Returns
2389 /// Vec of subscription IDs for the live subscriptions, or empty if connection not found
2390 async fn sync_live(&self, relay_url: &str, filters: &[Filter]) -> Vec<SubscriptionId> {
2391 if filters.is_empty() {
2392 return vec![];
2393 }
2394
2395 let connection = match self.connections.get(relay_url) {
2396 Some(conn) => conn,
2397 None => {
2398 tracing::warn!(
2399 relay = %relay_url,
2400 "No connection found for sync_live"
2401 );
2402 return vec![];
2403 }
2404 };
2405
2406 let mut sub_ids = Vec::new();
2407
2408 for filter in filters {
2409 // Apply limit: 0 to make this a live subscription
2410 // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit",
2411 // but omitting limit means "no limit" which is what we want for live.
2412 // The filter passed in should already NOT have a limit set.
2413 match connection.subscribe_filter(filter.clone()).await {
2414 Ok(sub_id) => {
2415 tracing::trace!(
2416 relay = %relay_url,
2417 sub_id = %sub_id,
2418 "Live subscription created"
2419 );
2420 sub_ids.push(sub_id);
2421 }
2422 Err(e) => {
2423 tracing::error!(
2424 relay = %relay_url,
2425 error = %e,
2426 "Failed to create live subscription"
2427 );
2428 }
2429 }
2430 }
2431
2432 tracing::debug!(
2433 relay = %relay_url,
2434 filter_count = filters.len(),
2435 sub_count = sub_ids.len(),
2436 "sync_live completed"
2437 );
2438
2439 sub_ids
2440 }
2441
2442 /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY)
2443 ///
2444 /// Returns raw Vec<Filter> for L1+L2+L3.
2445 /// Used by: quick_reconnect, consolidate
2446 /// Does NOT include pending items - those flow through AddFilters path.
2447 ///
2448 /// # Arguments
2449 /// * `relay_url` - The relay URL to reconstruct filters for
2450 ///
2451 /// # Returns
2452 /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags)
2453 #[allow(dead_code)] // Will be used in Phase 3+
2454 async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> {
2455 // Get confirmed state from relay_sync_index
2456 let (repos, root_events) = {
2457 let index = self.relay_sync_index.read().await;
2458 match index.get(relay_url) {
2459 Some(state) => (state.repos.clone(), state.root_events.clone()),
2460 None => {
2461 tracing::warn!(
2462 relay = %relay_url,
2463 "No RelayState found for reconstruct_filters"
2464 );
2465 return vec![];
2466 }
2467 }
2468 };
2469
2470 let mut all_filters = Vec::new();
2471
2472 // Layer 1: Announcements (always included)
2473 // Note: No `since` filter - this returns raw filters for live subscriptions
2474 all_filters.push(filters::build_announcement_filter(None));
2475
2476 // Layer 2 + Layer 3: Repo and root event tag filters
2477 if !repos.is_empty() || !root_events.is_empty() {
2478 let l2_l3_filters =
2479 filters::build_layer2_and_layer3_filters(&repos, &root_events, None);
2480 all_filters.extend(l2_l3_filters);
2481 }
2482
2483 tracing::debug!(
2484 relay = %relay_url,
2485 total_filters = all_filters.len(),
2486 repos_count = repos.len(),
2487 root_events_count = root_events.len(),
2488 "Reconstructed filters from confirmed state"
2489 );
2490
2491 all_filters
2492 }
2493
2494 /// Sync historical events and track in PendingSyncIndex
2495 #[allow(dead_code)] // Will be used in Phase 3+
2496 ///
2497 /// This method handles historical synchronization for a set of filters,
2498 /// creating a PendingBatch to track completion. It dispatches to either
2499 /// negentropy sync or traditional REQ+EOSE based on relay capability and config.
2500 ///
2501 /// Used for:
2502 /// - Initial sync (no since filter)
2503 /// - Reconnect sync (with since filter)
2504 /// - Daily sync (no since filter, full re-sync)
2505 ///
2506 /// # Arguments
2507 /// * `relay_url` - The relay URL to sync from
2508 /// * `filters` - Filters to sync (will have `since` applied if provided)
2509 /// * `items` - Items being synced (for tracking in PendingBatch)
2510 /// * `since` - Optional timestamp for incremental sync
2511 ///
2512 /// # Returns
2513 /// * `Some(batch_id)` - Batch was created and sync initiated
2514 /// * `None` - No connection or sync failed to start
2515 async fn historic_sync(
2516 &mut self,
2517 relay_url: &str,
2518 filters: Vec<Filter>,
2519 items: PendingItems,
2520 since: Option<Timestamp>,
2521 ) -> Option<u64> {
2522 if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() {
2523 tracing::debug!(
2524 relay = %relay_url,
2525 "historic_sync called with empty filters and items, skipping"
2526 );
2527 return None;
2528 }
2529
2530 // Check connection exists
2531 let connection = match self.connections.get(relay_url) {
2532 Some(conn) => conn,
2533 None => {
2534 tracing::warn!(
2535 relay = %relay_url,
2536 "No connection found for historic_sync"
2537 );
2538 return None;
2539 }
2540 };
2541
2542 // Apply since filter if provided
2543 let filters_with_since: Vec<Filter> = if let Some(ts) = since {
2544 filters.into_iter().map(|f| f.since(ts)).collect()
2545 } else {
2546 filters
2547 };
2548
2549 // Check if we should use negentropy
2550 let use_negentropy =
2551 !self.config.sync_disable_negentropy && connection.supports_negentropy().await;
2552
2553 // Generate batch ID
2554 let batch_id = self.next_batch_id();
2555
2556 if use_negentropy && !filters_with_since.is_empty() {
2557 // NIP-77 negentropy path
2558 tracing::debug!(
2559 relay = %relay_url,
2560 batch_id = batch_id,
2561 filter_count = filters_with_since.len(),
2562 repos = items.repos.len(),
2563 root_events = items.root_events.len(),
2564 "Starting historic_sync with negentropy"
2565 );
2566
2567 // Create PendingBatch for negentropy (empty outstanding_subs)
2568 let batch = PendingBatch {
2569 batch_id,
2570 items: items.clone(),
2571 outstanding_subs: HashSet::new(),
2572 sync_method: SyncMethod::Negentropy,
2573 };
2574
2575 // Add to pending_sync_index
2576 {
2577 let mut pending = self.pending_sync_index.write().await;
2578 pending
2579 .entry(relay_url.to_string())
2580 .or_insert_with(Vec::new)
2581 .push(batch);
2582 }
2583
2584 // Perform negentropy sync for each filter
2585 // Note: We sync each filter separately because negentropy works on a single filter
2586 let mut total_received = 0;
2587 let mut any_success = false;
2588
2589 for filter in &filters_with_since {
2590 if let Some(conn) = self.connections.get(relay_url) {
2591 match conn.negentropy_sync_filter(filter.clone()).await {
2592 Ok(result) => {
2593 total_received += result.received.len();
2594 any_success = true;
2595
2596 // Record metrics for received events
2597 if let Some(ref metrics) = self.metrics {
2598 for _ in 0..result.received.len() {
2599 metrics.record_event(event_source::STARTUP);
2600 }
2601 }
2602 }
2603 Err(e) => {
2604 tracing::warn!(
2605 relay = %relay_url,
2606 error = %e,
2607 "Negentropy sync failed for filter in historic_sync"
2608 );
2609 }
2610 }
2611 }
2612 }
2613
2614 if any_success {
2615 // Remove batch from pending and confirm it
2616 let completed_batch = {
2617 let mut pending = self.pending_sync_index.write().await;
2618 if let Some(batches) = pending.get_mut(relay_url) {
2619 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id);
2620 if let Some(idx) = batch_idx {
2621 let batch = batches.remove(idx);
2622 if batches.is_empty() {
2623 pending.remove(relay_url);
2624 }
2625 Some(batch)
2626 } else {
2627 None
2628 }
2629 } else {
2630 None
2631 }
2632 };
2633
2634 if let Some(batch) = completed_batch {
2635 self.confirm_batch(relay_url, batch).await;
2636 }
2637
2638 tracing::info!(
2639 relay = %relay_url,
2640 batch_id = batch_id,
2641 total_received = total_received,
2642 "historic_sync (negentropy) completed"
2643 );
2644 } else {
2645 // All negentropy syncs failed - remove the pending batch
2646 let mut pending = self.pending_sync_index.write().await;
2647 if let Some(batches) = pending.get_mut(relay_url) {
2648 batches.retain(|b| b.batch_id != batch_id);
2649 if batches.is_empty() {
2650 pending.remove(relay_url);
2651 }
2652 }
2653
2654 tracing::warn!(
2655 relay = %relay_url,
2656 batch_id = batch_id,
2657 "historic_sync (negentropy) failed for all filters"
2658 );
2659 return None;
2660 }
2661 } else {
2662 // Traditional REQ+EOSE path
2663 tracing::debug!(
2664 relay = %relay_url,
2665 batch_id = batch_id,
2666 filter_count = filters_with_since.len(),
2667 repos = items.repos.len(),
2668 root_events = items.root_events.len(),
2669 use_negentropy = use_negentropy,
2670 "Starting historic_sync with REQ+EOSE"
2671 );
2672
2673 // Subscribe to each filter and collect subscription IDs
2674 let mut subscription_ids = HashSet::new();
2675
2676 for filter in &filters_with_since {
2677 if let Some(conn) = self.connections.get(relay_url) {
2678 match conn.subscribe_filter(filter.clone()).await {
2679 Ok(sub_id) => {
2680 subscription_ids.insert(sub_id);
2681 }
2682 Err(e) => {
2683 tracing::error!(
2684 relay = %relay_url,
2685 error = %e,
2686 "Failed to subscribe to filter in historic_sync"
2687 );
2688 }
2689 }
2690 }
2691 }
2692
2693 if subscription_ids.is_empty() && !filters_with_since.is_empty() {
2694 tracing::warn!(
2695 relay = %relay_url,
2696 "All filter subscriptions failed in historic_sync"
2697 );
2698 return None;
2699 }
2700
2701 // Create PendingBatch for REQ+EOSE
2702 let batch = PendingBatch {
2703 batch_id,
2704 items,
2705 outstanding_subs: subscription_ids,
2706 sync_method: SyncMethod::ReqEose,
2707 };
2708
2709 // Add to pending_sync_index
2710 {
2711 let mut pending = self.pending_sync_index.write().await;
2712 pending
2713 .entry(relay_url.to_string())
2714 .or_insert_with(Vec::new)
2715 .push(batch);
2716 }
2717
2718 tracing::debug!(
2719 relay = %relay_url,
2720 batch_id = batch_id,
2721 "historic_sync (REQ+EOSE) batch created, awaiting EOSE"
2722 );
2723 }
2724
2725 Some(batch_id)
2726 }
2727
2098 /// Gracefully shutdown the SyncManager 2728 /// Gracefully shutdown the SyncManager
2099 /// 2729 ///
2100 /// This method: 2730 /// This method:
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 0379fe4..9643fc0 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -499,7 +499,7 @@ impl SelfSubscriber {
499 drop(index); // Release lock before async operations 499 drop(index); // Release lock before async operations
500 500
501 // For each relay, send AddFilters action directly 501 // For each relay, send AddFilters action directly
502 // SyncManager's handle_add_filters auto-spawns connection for unknown relays 502 // SyncManager's handle_new_sync_filters auto-spawns connection for unknown relays
503 for (relay_url, needs) in targets { 503 for (relay_url, needs) in targets {
504 // Skip our own relay URL (we're subscribed to ourselves via self-subscription) 504 // Skip our own relay URL (we're subscribed to ourselves via self-subscription)
505 if relay_url.contains(&self.relay_domain) { 505 if relay_url.contains(&self.relay_domain) {
@@ -519,8 +519,10 @@ impl SelfSubscriber {
519 519
520 let action = AddFilters { 520 let action = AddFilters {
521 relay_url: relay_url.clone(), 521 relay_url: relay_url.clone(),
522 repos: needs.repos, 522 items: crate::sync::PendingItems {
523 root_events: needs.root_events, 523 repos: needs.repos,
524 root_events: needs.root_events,
525 },
524 filters, 526 filters,
525 }; 527 };
526 528