upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
commit39e782b12fce1776f2ad0b0f5430749533cb80ea (patch)
treed050a079a82898848da870d9307a98a83480629b /src/sync
parent586fc2a7df1ce256469f0742d23f687ac4b075b1 (diff)
sync v4 mvp
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs589
-rw-r--r--src/sync/filters.rs340
-rw-r--r--src/sync/mod.rs301
-rw-r--r--src/sync/relay_connection.rs216
-rw-r--r--src/sync/self_subscriber.rs430
5 files changed, 1867 insertions, 9 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
new file mode 100644
index 0000000..7d87411
--- /dev/null
+++ b/src/sync/algorithms.rs
@@ -0,0 +1,589 @@
1//! Core Sync Algorithms for Proactive Sync
2//!
3//! This module provides the decision-making algorithms for the sync system:
4//!
5//! - `derive_relay_targets()` - Inverts RepoSyncIndex to per-relay view
6//! - `compute_actions()` - Three-way diff to determine new sync actions
7//!
8//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
9
10use std::collections::{HashMap, HashSet};
11
12use nostr_sdk::prelude::*;
13
14use super::{ConnectionStatus, PendingBatch, RelayState};
15
16// =============================================================================
17// Data Structures
18// =============================================================================
19
20/// Relay-centric view of what needs syncing
21///
22/// This is the inverted view of `RepoSyncNeeds` - instead of "what relays does
23/// this repo need to sync from", it's "what repos does this relay need to sync".
24#[derive(Debug, Clone, Default)]
25pub struct RelaySyncNeeds {
26 /// Repos that need to be synced from this relay
27 pub repos: HashSet<String>,
28 /// Root events that need to be tracked from this relay
29 pub root_events: HashSet<EventId>,
30}
31
32/// Action to add filters to a relay
33///
34/// Produced by `compute_actions()` to describe incremental sync work needed.
35#[derive(Debug)]
36pub struct AddFilters {
37 /// The relay URL to add filters to
38 pub relay_url: String,
39 /// Repos being synced in this action
40 pub repos: HashSet<String>,
41 /// Root events being tracked in this action
42 pub root_events: HashSet<EventId>,
43 /// The actual filters to subscribe with
44 pub filters: Vec<Filter>,
45}
46
47// =============================================================================
48// Core Algorithms
49// =============================================================================
50
51/// Inverts RepoSyncIndex to per-relay view
52///
53/// Takes the repo-centric index (repo -> {relays, root_events}) and inverts it
54/// to a relay-centric view (relay -> {repos, root_events}).
55///
56/// # Arguments
57/// * `repo_index` - Map of repo addressable refs to their sync needs
58///
59/// # Returns
60/// Map of relay URLs to the combined sync needs from all repos
61pub fn derive_relay_targets(
62 repo_index: &HashMap<String, super::RepoSyncNeeds>,
63) -> HashMap<String, RelaySyncNeeds> {
64 let mut relay_targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
65
66 for (repo_id, needs) in repo_index {
67 for relay_url in &needs.relays {
68 let entry = relay_targets
69 .entry(relay_url.clone())
70 .or_insert_with(RelaySyncNeeds::default);
71
72 entry.repos.insert(repo_id.clone());
73 entry.root_events.extend(needs.root_events.iter().cloned());
74 }
75 }
76
77 relay_targets
78}
79
80/// Three-way diff: target - pending - confirmed = new
81///
82/// Computes what sync actions are needed by comparing:
83/// 1. What we want (targets)
84/// 2. What's already in-flight (pending)
85/// 3. What's already confirmed (confirmed)
86///
87/// Only creates AddFilters actions for items not already pending or confirmed.
88///
89/// # Arguments
90/// * `targets` - Per-relay sync needs (from `derive_relay_targets`)
91/// * `pending` - In-flight batches per relay
92/// * `confirmed` - Confirmed relay states
93///
94/// # Returns
95/// Vec of AddFilters actions for new sync work
96pub fn compute_actions(
97 targets: &HashMap<String, RelaySyncNeeds>,
98 pending: &HashMap<String, Vec<PendingBatch>>,
99 confirmed: &HashMap<String, RelayState>,
100) -> Vec<AddFilters> {
101 use crate::sync::filters::build_layer2_and_layer3_filters;
102
103 let mut actions = Vec::new();
104
105 for (relay_url, target_needs) in targets {
106 // Skip disconnected relays
107 if let Some(state) = confirmed.get(relay_url) {
108 if matches!(state.connection_status, ConnectionStatus::Disconnected) {
109 continue;
110 }
111 }
112
113 // Calculate what's already pending
114 let pending_repos: HashSet<String> = pending
115 .get(relay_url)
116 .map(|batches| {
117 batches
118 .iter()
119 .flat_map(|batch| batch.items.repos.iter().cloned())
120 .collect()
121 })
122 .unwrap_or_default();
123
124 let pending_events: HashSet<EventId> = pending
125 .get(relay_url)
126 .map(|batches| {
127 batches
128 .iter()
129 .flat_map(|batch| batch.items.root_events.iter().cloned())
130 .collect()
131 })
132 .unwrap_or_default();
133
134 // Calculate what's already confirmed
135 let confirmed_repos: HashSet<String> = confirmed
136 .get(relay_url)
137 .map(|state| state.repos.clone())
138 .unwrap_or_default();
139
140 let confirmed_events: HashSet<EventId> = confirmed
141 .get(relay_url)
142 .map(|state| state.root_events.clone())
143 .unwrap_or_default();
144
145 // Calculate what's NEW (not in pending, not in confirmed)
146 let new_repos: HashSet<String> = target_needs
147 .repos
148 .difference(&pending_repos)
149 .filter(|repo| !confirmed_repos.contains(*repo))
150 .cloned()
151 .collect();
152
153 let new_events: HashSet<EventId> = target_needs
154 .root_events
155 .difference(&pending_events)
156 .filter(|event| !confirmed_events.contains(*event))
157 .cloned()
158 .collect();
159
160 // If there's anything new, create an AddFilters action
161 if !new_repos.is_empty() || !new_events.is_empty() {
162 let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None);
163
164 actions.push(AddFilters {
165 relay_url: relay_url.clone(),
166 repos: new_repos,
167 root_events: new_events,
168 filters,
169 });
170 }
171 }
172
173 actions
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds;
180
181 // =========================================================================
182 // derive_relay_targets tests
183 // =========================================================================
184
185 #[test]
186 fn test_derive_relay_targets_empty() {
187 let repo_index = HashMap::new();
188 let targets = derive_relay_targets(&repo_index);
189 assert!(targets.is_empty());
190 }
191
192 #[test]
193 fn test_derive_relay_targets_single_repo_single_relay() {
194 let mut repo_index = HashMap::new();
195 let mut relays = HashSet::new();
196 relays.insert("wss://relay1.com".to_string());
197
198 let mut root_events = HashSet::new();
199 root_events.insert(EventId::all_zeros());
200
201 repo_index.insert(
202 "repo1".to_string(),
203 ModRepoSyncNeeds {
204 relays,
205 root_events,
206 },
207 );
208
209 let targets = derive_relay_targets(&repo_index);
210
211 assert_eq!(targets.len(), 1);
212 let relay_needs = targets.get("wss://relay1.com").unwrap();
213 assert_eq!(relay_needs.repos.len(), 1);
214 assert!(relay_needs.repos.contains("repo1"));
215 assert_eq!(relay_needs.root_events.len(), 1);
216 }
217
218 #[test]
219 fn test_derive_relay_targets_multiple_repos_same_relay() {
220 let mut repo_index = HashMap::new();
221
222 for i in 1..=3 {
223 let mut relays = HashSet::new();
224 relays.insert("wss://relay1.com".to_string());
225
226 repo_index.insert(
227 format!("repo{}", i),
228 ModRepoSyncNeeds {
229 relays,
230 root_events: HashSet::new(),
231 },
232 );
233 }
234
235 let targets = derive_relay_targets(&repo_index);
236
237 assert_eq!(targets.len(), 1);
238 let relay_needs = targets.get("wss://relay1.com").unwrap();
239 assert_eq!(relay_needs.repos.len(), 3);
240 }
241
242 #[test]
243 fn test_derive_relay_targets_repo_across_multiple_relays() {
244 let mut repo_index = HashMap::new();
245 let mut relays = HashSet::new();
246 relays.insert("wss://relay1.com".to_string());
247 relays.insert("wss://relay2.com".to_string());
248
249 repo_index.insert(
250 "repo1".to_string(),
251 ModRepoSyncNeeds {
252 relays,
253 root_events: HashSet::new(),
254 },
255 );
256
257 let targets = derive_relay_targets(&repo_index);
258
259 assert_eq!(targets.len(), 2);
260 assert!(targets
261 .get("wss://relay1.com")
262 .unwrap()
263 .repos
264 .contains("repo1"));
265 assert!(targets
266 .get("wss://relay2.com")
267 .unwrap()
268 .repos
269 .contains("repo1"));
270 }
271
272 #[test]
273 fn test_derive_relay_targets_combines_root_events() {
274 let mut repo_index = HashMap::new();
275
276 // Repo1 has one root event
277 let mut relays1 = HashSet::new();
278 relays1.insert("wss://relay1.com".to_string());
279 let mut root_events1 = HashSet::new();
280 root_events1.insert(EventId::all_zeros());
281
282 repo_index.insert(
283 "repo1".to_string(),
284 ModRepoSyncNeeds {
285 relays: relays1,
286 root_events: root_events1,
287 },
288 );
289
290 // Repo2 also points to same relay but should have same event combined
291 let mut relays2 = HashSet::new();
292 relays2.insert("wss://relay1.com".to_string());
293 let mut root_events2 = HashSet::new();
294 root_events2.insert(EventId::all_zeros()); // Same event
295
296 repo_index.insert(
297 "repo2".to_string(),
298 ModRepoSyncNeeds {
299 relays: relays2,
300 root_events: root_events2,
301 },
302 );
303
304 let targets = derive_relay_targets(&repo_index);
305
306 assert_eq!(targets.len(), 1);
307 let relay_needs = targets.get("wss://relay1.com").unwrap();
308 assert_eq!(relay_needs.repos.len(), 2);
309 // Root events should be deduplicated
310 assert_eq!(relay_needs.root_events.len(), 1);
311 }
312
313 // =========================================================================
314 // compute_actions tests
315 // =========================================================================
316
317 #[test]
318 fn test_compute_actions_empty() {
319 let targets = HashMap::new();
320 let pending = HashMap::new();
321 let confirmed = HashMap::new();
322
323 let actions = compute_actions(&targets, &pending, &confirmed);
324 assert!(actions.is_empty());
325 }
326
327 #[test]
328 fn test_compute_actions_skips_disconnected() {
329 let mut targets = HashMap::new();
330 targets.insert(
331 "wss://relay1.com".to_string(),
332 RelaySyncNeeds {
333 repos: vec!["repo1".to_string()].into_iter().collect(),
334 root_events: HashSet::new(),
335 },
336 );
337
338 let pending = HashMap::new();
339
340 let mut confirmed = HashMap::new();
341 confirmed.insert(
342 "wss://relay1.com".to_string(),
343 RelayState {
344 repos: HashSet::new(),
345 root_events: HashSet::new(),
346 is_bootstrap: false,
347 connection_status: ConnectionStatus::Disconnected,
348 last_connected: None,
349 disconnected_at: None,
350 },
351 );
352
353 let actions = compute_actions(&targets, &pending, &confirmed);
354 assert!(actions.is_empty(), "Should skip disconnected relays");
355 }
356
357 #[test]
358 fn test_compute_actions_new_repo() {
359 let mut targets = HashMap::new();
360 targets.insert(
361 "wss://relay1.com".to_string(),
362 RelaySyncNeeds {
363 repos: vec!["repo1".to_string()].into_iter().collect(),
364 root_events: HashSet::new(),
365 },
366 );
367
368 let pending = HashMap::new();
369 let confirmed = HashMap::new();
370
371 let actions = compute_actions(&targets, &pending, &confirmed);
372
373 assert_eq!(actions.len(), 1);
374 let action = &actions[0];
375 assert_eq!(action.relay_url, "wss://relay1.com");
376 assert!(action.repos.contains("repo1"));
377 assert!(!action.filters.is_empty());
378 }
379
380 #[test]
381 fn test_compute_actions_excludes_pending() {
382 let mut targets = HashMap::new();
383 targets.insert(
384 "wss://relay1.com".to_string(),
385 RelaySyncNeeds {
386 repos: vec!["repo1".to_string()].into_iter().collect(),
387 root_events: HashSet::new(),
388 },
389 );
390
391 let mut pending = HashMap::new();
392 pending.insert(
393 "wss://relay1.com".to_string(),
394 vec![super::super::PendingBatch {
395 batch_id: 1,
396 items: super::super::PendingItems {
397 repos: vec!["repo1".to_string()].into_iter().collect(),
398 root_events: HashSet::new(),
399 },
400 outstanding_subs: HashSet::new(),
401 }],
402 );
403
404 let confirmed = HashMap::new();
405
406 let actions = compute_actions(&targets, &pending, &confirmed);
407 assert!(
408 actions.is_empty(),
409 "Should not create action for pending items"
410 );
411 }
412
413 #[test]
414 fn test_compute_actions_excludes_confirmed() {
415 let mut targets = HashMap::new();
416 targets.insert(
417 "wss://relay1.com".to_string(),
418 RelaySyncNeeds {
419 repos: vec!["repo1".to_string()].into_iter().collect(),
420 root_events: HashSet::new(),
421 },
422 );
423
424 let pending = HashMap::new();
425
426 let mut confirmed = HashMap::new();
427 confirmed.insert(
428 "wss://relay1.com".to_string(),
429 RelayState {
430 repos: vec!["repo1".to_string()].into_iter().collect(),
431 root_events: HashSet::new(),
432 is_bootstrap: false,
433 connection_status: ConnectionStatus::Connected,
434 last_connected: None,
435 disconnected_at: None,
436 },
437 );
438
439 let actions = compute_actions(&targets, &pending, &confirmed);
440 assert!(
441 actions.is_empty(),
442 "Should not create action for confirmed items"
443 );
444 }
445
446 #[test]
447 fn test_compute_actions_allows_connecting_relays() {
448 let mut targets = HashMap::new();
449 targets.insert(
450 "wss://relay1.com".to_string(),
451 RelaySyncNeeds {
452 repos: vec!["repo1".to_string()].into_iter().collect(),
453 root_events: HashSet::new(),
454 },
455 );
456
457 let pending = HashMap::new();
458
459 let mut confirmed = HashMap::new();
460 confirmed.insert(
461 "wss://relay1.com".to_string(),
462 RelayState {
463 repos: HashSet::new(),
464 root_events: HashSet::new(),
465 is_bootstrap: false,
466 connection_status: ConnectionStatus::Connecting,
467 last_connected: None,
468 disconnected_at: None,
469 },
470 );
471
472 let actions = compute_actions(&targets, &pending, &confirmed);
473 assert_eq!(
474 actions.len(),
475 1,
476 "Should create action for connecting relays"
477 );
478 }
479
480 #[test]
481 fn test_compute_actions_partial_overlap() {
482 // Target has repo1, repo2, repo3
483 let mut targets = HashMap::new();
484 targets.insert(
485 "wss://relay1.com".to_string(),
486 RelaySyncNeeds {
487 repos: vec![
488 "repo1".to_string(),
489 "repo2".to_string(),
490 "repo3".to_string(),
491 ]
492 .into_iter()
493 .collect(),
494 root_events: HashSet::new(),
495 },
496 );
497
498 // repo1 is pending
499 let mut pending = HashMap::new();
500 pending.insert(
501 "wss://relay1.com".to_string(),
502 vec![super::super::PendingBatch {
503 batch_id: 1,
504 items: super::super::PendingItems {
505 repos: vec!["repo1".to_string()].into_iter().collect(),
506 root_events: HashSet::new(),
507 },
508 outstanding_subs: HashSet::new(),
509 }],
510 );
511
512 // repo2 is confirmed
513 let mut confirmed = HashMap::new();
514 confirmed.insert(
515 "wss://relay1.com".to_string(),
516 RelayState {
517 repos: vec!["repo2".to_string()].into_iter().collect(),
518 root_events: HashSet::new(),
519 is_bootstrap: false,
520 connection_status: ConnectionStatus::Connected,
521 last_connected: None,
522 disconnected_at: None,
523 },
524 );
525
526 let actions = compute_actions(&targets, &pending, &confirmed);
527
528 assert_eq!(actions.len(), 1);
529 let action = &actions[0];
530 // Only repo3 should be in the action (repo1 pending, repo2 confirmed)
531 assert_eq!(action.repos.len(), 1);
532 assert!(action.repos.contains("repo3"));
533 assert!(!action.repos.contains("repo1"));
534 assert!(!action.repos.contains("repo2"));
535 }
536
537 #[test]
538 fn test_compute_actions_with_root_events() {
539 let event_id = EventId::all_zeros();
540
541 let mut targets = HashMap::new();
542 targets.insert(
543 "wss://relay1.com".to_string(),
544 RelaySyncNeeds {
545 repos: HashSet::new(),
546 root_events: vec![event_id].into_iter().collect(),
547 },
548 );
549
550 let pending = HashMap::new();
551 let confirmed = HashMap::new();
552
553 let actions = compute_actions(&targets, &pending, &confirmed);
554
555 assert_eq!(actions.len(), 1);
556 let action = &actions[0];
557 assert!(action.repos.is_empty());
558 assert_eq!(action.root_events.len(), 1);
559 assert!(action.root_events.contains(&event_id));
560 // Should have 3 filters for the root event (e, E, q tags)
561 assert_eq!(action.filters.len(), 3);
562 }
563
564 #[test]
565 fn test_compute_actions_unknown_relay_creates_action() {
566 // When a relay is not in confirmed at all, it should still create an action
567 // (it's treated as connected by default if missing from confirmed)
568 let mut targets = HashMap::new();
569 targets.insert(
570 "wss://new-relay.com".to_string(),
571 RelaySyncNeeds {
572 repos: vec!["repo1".to_string()].into_iter().collect(),
573 root_events: HashSet::new(),
574 },
575 );
576
577 let pending = HashMap::new();
578 let confirmed = HashMap::new(); // relay not in confirmed
579
580 let actions = compute_actions(&targets, &pending, &confirmed);
581
582 assert_eq!(
583 actions.len(),
584 1,
585 "Should create action for unknown relay (not yet tracked)"
586 );
587 assert_eq!(actions[0].relay_url, "wss://new-relay.com");
588 }
589} \ No newline at end of file
diff --git a/src/sync/filters.rs b/src/sync/filters.rs
new file mode 100644
index 0000000..02d580e
--- /dev/null
+++ b/src/sync/filters.rs
@@ -0,0 +1,340 @@
1//! Filter Building Functions for Proactive Sync
2//!
3//! This module provides functions to construct Nostr filters for the three-layer
4//! sync strategy defined in GRASP-02 v4:
5//!
6//! - Layer 1: Repository announcements (30617 + 30618)
7//! - Layer 2: Events tagging our repos (a/A/q tags)
8//! - Layer 3: Events tagging our root events (e/E/q tags)
9//!
10//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
11
12use std::collections::HashSet;
13
14use nostr_sdk::prelude::*;
15
16/// Layer 1: Announcements filter (kinds 30617 + 30618)
17///
18/// Subscribed ONCE on connect - NOT included in consolidation rebuilds.
19/// Note: 30618 is ONLY synced from remote relays, not self-subscribed.
20pub fn build_announcement_filter(since: Option<Timestamp>) -> Filter {
21 let filter = Filter::new().kinds([
22 Kind::Custom(30617), // Repository announcements
23 Kind::Custom(30618), // Maintainer lists
24 ]);
25
26 match since {
27 Some(ts) => filter.since(ts),
28 None => filter,
29 }
30}
31
32/// Layer 2: Events tagging one of our repos
33///
34/// Uses lowercase a, uppercase A, and q tags for comprehensive coverage.
35/// Batched per 100 repo refs.
36///
37/// # Arguments
38/// * `repos` - Set of repo addressable refs (format: 30617:pubkey:identifier)
39/// * `since` - Optional timestamp for incremental sync
40///
41/// # Returns
42/// Vec of filters, one set of 3 filters (a/A/q) per 100-repo chunk
43pub fn tagged_one_of_our_repo_event_filters(
44 repos: &HashSet<String>,
45 since: Option<Timestamp>,
46) -> Vec<Filter> {
47 if repos.is_empty() {
48 return vec![];
49 }
50
51 let mut filters = Vec::new();
52 let repo_refs: Vec<_> = repos.iter().collect();
53
54 for chunk in repo_refs.chunks(100) {
55 // Lowercase 'a' tag - standard addressable reference
56 let mut f1 = Filter::new();
57 for repo in chunk {
58 f1 = f1.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo.as_str());
59 }
60
61 // Uppercase 'A' tag - some clients use this
62 let mut f2 = Filter::new();
63 for repo in chunk {
64 f2 = f2.custom_tag(SingleLetterTag::uppercase(Alphabet::A), repo.as_str());
65 }
66
67 // Quote 'q' tag - NIP-10 quote references to addressable events
68 let mut f3 = Filter::new();
69 for repo in chunk {
70 f3 = f3.custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo.as_str());
71 }
72
73 if let Some(ts) = since {
74 f1 = f1.since(ts);
75 f2 = f2.since(ts);
76 f3 = f3.since(ts);
77 }
78
79 filters.push(f1);
80 filters.push(f2);
81 filters.push(f3);
82 }
83
84 filters
85}
86
87/// Layer 3: Events tagging one of our root events
88///
89/// Uses lowercase e, uppercase E, and q tags for comprehensive coverage.
90/// Batched per 100 event IDs.
91///
92/// # Arguments
93/// * `root_events` - Set of event IDs (1617/1618/1619/1621 root events)
94/// * `since` - Optional timestamp for incremental sync
95///
96/// # Returns
97/// Vec of filters, one set of 3 filters (e/E/q) per 100-event chunk
98pub fn tagged_one_of_our_root_event_filters(
99 root_events: &HashSet<EventId>,
100 since: Option<Timestamp>,
101) -> Vec<Filter> {
102 if root_events.is_empty() {
103 return vec![];
104 }
105
106 let mut filters = Vec::new();
107 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
108
109 for chunk in event_ids.chunks(100) {
110 // Lowercase 'e' tag - standard event reference
111 let mut f1 = Filter::new();
112 for event_id in chunk {
113 f1 = f1.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.as_str());
114 }
115
116 // Uppercase 'E' tag - some clients use this
117 let mut f2 = Filter::new();
118 for event_id in chunk {
119 f2 = f2.custom_tag(SingleLetterTag::uppercase(Alphabet::E), event_id.as_str());
120 }
121
122 // Quote 'q' tag - NIP-10 quote references to events
123 let mut f3 = Filter::new();
124 for event_id in chunk {
125 f3 = f3.custom_tag(SingleLetterTag::lowercase(Alphabet::Q), event_id.as_str());
126 }
127
128 if let Some(ts) = since {
129 f1 = f1.since(ts);
130 f2 = f2.since(ts);
131 f3 = f3.since(ts);
132 }
133
134 filters.push(f1);
135 filters.push(f2);
136 filters.push(f3);
137 }
138
139 filters
140}
141
142/// Builds Layer 2 + Layer 3 filters only (NOT Layer 1)
143///
144/// Used by:
145/// - compute_actions for incremental subscriptions
146/// - consolidation rebuilds (Layer 1 remains active)
147///
148/// # Arguments
149/// * `repos` - Set of repo addressable refs
150/// * `root_events` - Set of root event IDs
151/// * `since` - Optional timestamp for incremental sync
152pub fn build_layer2_and_layer3_filters(
153 repos: &HashSet<String>,
154 root_events: &HashSet<EventId>,
155 since: Option<Timestamp>,
156) -> Vec<Filter> {
157 let mut filters = Vec::new();
158 filters.extend(tagged_one_of_our_repo_event_filters(repos, since));
159 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
160 filters
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[test]
168 fn test_announcement_filter_no_since() {
169 let filter = build_announcement_filter(None);
170
171 // Verify it includes both kinds
172 // Filter API: we can check by converting to JSON or inspecting structure
173 // For now we just verify it doesn't panic and returns a valid filter
174 assert!(!filter.is_empty());
175 }
176
177 #[test]
178 fn test_announcement_filter_with_since() {
179 let since = Timestamp::from(1700000000);
180 let filter = build_announcement_filter(Some(since));
181
182 assert!(!filter.is_empty());
183 }
184
185 #[test]
186 fn test_repo_filters_empty() {
187 let repos: HashSet<String> = HashSet::new();
188 let filters = tagged_one_of_our_repo_event_filters(&repos, None);
189
190 assert!(filters.is_empty());
191 }
192
193 #[test]
194 fn test_repo_filters_single_repo() {
195 let mut repos = HashSet::new();
196 repos.insert("30617:abc123:test-repo".to_string());
197
198 let filters = tagged_one_of_our_repo_event_filters(&repos, None);
199
200 // Should create 3 filters (a, A, q) for one chunk
201 assert_eq!(filters.len(), 3);
202 }
203
204 #[test]
205 fn test_repo_filters_batching() {
206 let mut repos = HashSet::new();
207 for i in 0..250 {
208 repos.insert(format!("30617:pubkey{}:repo{}", i, i));
209 }
210
211 let filters = tagged_one_of_our_repo_event_filters(&repos, None);
212
213 // Should create 9 filters (3 chunks * 3 tag types)
214 // 250 repos = 100 + 100 + 50 = 3 chunks
215 assert_eq!(filters.len(), 9);
216 }
217
218 #[test]
219 fn test_repo_filters_with_since() {
220 let mut repos = HashSet::new();
221 repos.insert("30617:abc123:test-repo".to_string());
222
223 let since = Timestamp::from(1700000000);
224 let filters = tagged_one_of_our_repo_event_filters(&repos, Some(since));
225
226 assert_eq!(filters.len(), 3);
227 }
228
229 #[test]
230 fn test_root_event_filters_empty() {
231 let root_events: HashSet<EventId> = HashSet::new();
232 let filters = tagged_one_of_our_root_event_filters(&root_events, None);
233
234 assert!(filters.is_empty());
235 }
236
237 #[test]
238 fn test_root_event_filters_single_event() {
239 let mut root_events = HashSet::new();
240 // Create a valid event ID (all zeros for testing)
241 root_events.insert(EventId::all_zeros());
242
243 let filters = tagged_one_of_our_root_event_filters(&root_events, None);
244
245 // Should create 3 filters (e, E, q) for one chunk
246 assert_eq!(filters.len(), 3);
247 }
248
249 #[test]
250 fn test_root_event_filters_batching() {
251 let mut root_events = HashSet::new();
252 // EventId::all_zeros() will deduplicate, so we need unique IDs
253 // For testing purposes, we'll just verify with one ID since HashSet
254 // deduplicates all_zeros(). In real usage these would be unique.
255 for _ in 0..250 {
256 root_events.insert(EventId::all_zeros());
257 }
258
259 let filters = tagged_one_of_our_root_event_filters(&root_events, None);
260
261 // With deduplication, we only have 1 unique ID, so 3 filters
262 // In real usage with 250 unique IDs, it would be 9 filters
263 assert_eq!(filters.len(), 3);
264 }
265
266 #[test]
267 fn test_root_event_filters_with_since() {
268 let mut root_events = HashSet::new();
269 root_events.insert(EventId::all_zeros());
270
271 let since = Timestamp::from(1700000000);
272 let filters = tagged_one_of_our_root_event_filters(&root_events, Some(since));
273
274 assert_eq!(filters.len(), 3);
275 }
276
277 #[test]
278 fn test_combined_filters_empty() {
279 let repos: HashSet<String> = HashSet::new();
280 let root_events: HashSet<EventId> = HashSet::new();
281
282 let filters = build_layer2_and_layer3_filters(&repos, &root_events, None);
283
284 assert!(filters.is_empty());
285 }
286
287 #[test]
288 fn test_combined_filters() {
289 let mut repos = HashSet::new();
290 repos.insert("30617:abc123:repo1".to_string());
291
292 let mut root_events = HashSet::new();
293 root_events.insert(EventId::all_zeros());
294
295 let filters = build_layer2_and_layer3_filters(&repos, &root_events, None);
296
297 // Should have 6 filters (3 for repos + 3 for root events)
298 assert_eq!(filters.len(), 6);
299 }
300
301 #[test]
302 fn test_combined_filters_repos_only() {
303 let mut repos = HashSet::new();
304 repos.insert("30617:abc123:repo1".to_string());
305
306 let root_events: HashSet<EventId> = HashSet::new();
307
308 let filters = build_layer2_and_layer3_filters(&repos, &root_events, None);
309
310 // Should have 3 filters (3 for repos only)
311 assert_eq!(filters.len(), 3);
312 }
313
314 #[test]
315 fn test_combined_filters_root_events_only() {
316 let repos: HashSet<String> = HashSet::new();
317
318 let mut root_events = HashSet::new();
319 root_events.insert(EventId::all_zeros());
320
321 let filters = build_layer2_and_layer3_filters(&repos, &root_events, None);
322
323 // Should have 3 filters (3 for root events only)
324 assert_eq!(filters.len(), 3);
325 }
326
327 #[test]
328 fn test_combined_filters_with_since() {
329 let mut repos = HashSet::new();
330 repos.insert("30617:abc123:repo1".to_string());
331
332 let mut root_events = HashSet::new();
333 root_events.insert(EventId::all_zeros());
334
335 let since = Timestamp::from(1700000000);
336 let filters = build_layer2_and_layer3_filters(&repos, &root_events, Some(since));
337
338 assert_eq!(filters.len(), 6);
339 }
340} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index c1f8bca..fb09896 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -12,6 +12,20 @@
12//! 12//!
13//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. 13//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
14 14
15pub mod algorithms;
16pub mod filters;
17pub mod relay_connection;
18pub mod self_subscriber;
19
20// Re-export core algorithm types
21pub use algorithms::{AddFilters, RelaySyncNeeds};
22
23// Re-export relay connection types
24pub use relay_connection::{RelayConnection, RelayEvent};
25
26// Re-export self-subscriber types
27pub use self_subscriber::{RelayAction, SelfSubscriber};
28
15use std::collections::{HashMap, HashSet}; 29use std::collections::{HashMap, HashSet};
16use std::sync::Arc; 30use std::sync::Arc;
17 31
@@ -355,21 +369,290 @@ impl SyncManager {
355 } 369 }
356 } 370 }
357 371
358 /// Run the sync manager (placeholder for Phase 1) 372 /// Run the sync manager
359 /// 373 ///
360 /// This will be implemented in later phases to: 374 /// Coordinates all sync components:
361 /// 1. Subscribe to local relay for 30617 events 375 /// 1. Spawns self-subscriber to monitor own relay for announcements
362 /// 2. Process events to build RepoSyncIndex 376 /// 2. Connects to bootstrap relay if configured
363 /// 3. Compute and execute sync actions 377 /// 3. Handles relay actions from self-subscriber
364 /// 4. Handle reconnection and catch-up logic
365 pub async fn run(self) { 378 pub async fn run(self) {
379 use tokio::sync::mpsc;
380
366 tracing::info!( 381 tracing::info!(
367 bootstrap_relay = ?self.bootstrap_relay_url, 382 bootstrap_relay = ?self.bootstrap_relay_url,
368 service_domain = %self.service_domain, 383 service_domain = %self.service_domain,
369 "SyncManager starting (placeholder - not yet implemented)" 384 "SyncManager starting"
385 );
386
387 // 1. Create action channel for self-subscriber -> manager communication
388 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
389
390 // 2. Spawn self-subscriber
391 let self_subscriber = SelfSubscriber::new(
392 format!("ws://{}", self.service_domain),
393 self.service_domain.clone(),
394 Arc::clone(&self.repo_sync_index),
395 action_tx,
370 ); 396 );
397 tokio::spawn(async move { self_subscriber.run().await });
398
399 // 3. Connect to bootstrap relay if configured
400 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
401 self.spawn_relay_connection(bootstrap_url.clone()).await;
402 }
371 403
372 // Phase 1: Just log and return 404 // 4. Main loop - handle actions from self-subscriber
373 // Full implementation will be added in subsequent phases 405 loop {
406 tokio::select! {
407 action = action_rx.recv() => {
408 match action {
409 Some(RelayAction::SpawnRelay { relay_url, repos }) => {
410 // Check if relay already exists
411 let relay_index = self.relay_sync_index.read().await;
412 let exists = relay_index.contains_key(&relay_url);
413 drop(relay_index);
414
415 if !exists {
416 tracing::info!(relay = %relay_url, "Spawning new relay connection");
417 self.spawn_relay_with_layer2(relay_url, repos).await;
418 } else {
419 tracing::debug!(
420 relay = %relay_url,
421 "Relay already exists, considering AddFilters"
422 );
423 // For MVP, we don't handle AddFilters - just log
424 // Full implementation would call subscribe_filters on existing connection
425 }
426 }
427 Some(RelayAction::AddFilters { relay_url, repos }) => {
428 tracing::debug!(
429 relay = %relay_url,
430 repo_count = repos.len(),
431 "AddFilters action (MVP: not implemented)"
432 );
433 // For MVP, not implemented - full version would add Layer 2 filters
434 // to existing relay connection
435 }
436 None => break,
437 }
438 }
439 }
440 }
441 }
442
443 /// Spawn relay connection with Layer 2 filters for specific repos
444 ///
445 /// Used when discovering relays from announcements. Connects to the relay,
446 /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the
447 /// specific repos we want to sync.
448 async fn spawn_relay_with_layer2(
449 &self,
450 relay_url: String,
451 repos: HashMap<String, HashSet<EventId>>,
452 ) {
453 use crate::sync::filters::build_layer2_and_layer3_filters;
454 use tokio::sync::mpsc;
455
456 let database = Arc::clone(&self.database);
457 let write_policy = self.write_policy.clone();
458 let relay_sync_index = Arc::clone(&self.relay_sync_index);
459
460 // Create relay connection
461 let connection = RelayConnection::new(relay_url.clone());
462
463 // Connect and subscribe to Layer 1 (announcements)
464 if let Err(e) = connection.connect_and_subscribe(None).await {
465 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
466 return;
467 }
468
469 // Mark as connected in relay sync index
470 {
471 let mut index = relay_sync_index.write().await;
472 index.insert(
473 relay_url.clone(),
474 RelayState {
475 repos: repos.keys().cloned().collect(),
476 root_events: repos.values().flatten().cloned().collect(),
477 is_bootstrap: false,
478 connection_status: ConnectionStatus::Connected,
479 last_connected: Some(Timestamp::now()),
480 disconnected_at: None,
481 },
482 );
483 }
484
485 // Subscribe to Layer 2+3 filters for the repos
486 let repo_ids: HashSet<String> = repos.keys().cloned().collect();
487 let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect();
488 let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None);
489
490 for filter in filters {
491 if let Err(e) = connection.subscribe_filter(filter).await {
492 tracing::error!(
493 relay = %relay_url,
494 error = %e,
495 "Failed to subscribe to Layer 2 filter"
496 );
497 }
498 }
499
500 tracing::info!(
501 relay = %relay_url,
502 repo_count = repos.len(),
503 "Connected to discovered relay with Layer 2+3 filters"
504 );
505
506 // Create event channel
507 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
508
509 // Spawn event loop
510 tokio::spawn(async move {
511 connection.run_event_loop(event_tx).await;
512 });
513
514 // Spawn event processor
515 let relay_url_clone = relay_url.clone();
516 tokio::spawn(async move {
517 while let Some(relay_event) = event_rx.recv().await {
518 match relay_event {
519 RelayEvent::Event(event) => {
520 Self::process_event_static(
521 &event,
522 &relay_url_clone,
523 &database,
524 &write_policy,
525 )
526 .await;
527 }
528 RelayEvent::EndOfStoredEvents(_) => {
529 tracing::debug!(relay = %relay_url_clone, "EOSE received");
530 }
531 RelayEvent::Closed(_) | RelayEvent::Shutdown => {
532 tracing::info!(relay = %relay_url_clone, "Relay disconnected");
533 break;
534 }
535 }
536 }
537 });
538 }
539
540 /// Spawn a relay connection and start its event loop
541 async fn spawn_relay_connection(&self, relay_url: String) {
542 use tokio::sync::mpsc;
543
544 let database = Arc::clone(&self.database);
545 let write_policy = self.write_policy.clone();
546 let relay_sync_index = Arc::clone(&self.relay_sync_index);
547
548 // Create relay connection
549 let connection = RelayConnection::new(relay_url.clone());
550
551 // Connect and subscribe to Layer 1
552 if let Err(e) = connection.connect_and_subscribe(None).await {
553 tracing::error!("Failed to connect to relay {}: {}", relay_url, e);
554 return;
555 }
556
557 // Mark as connected in relay sync index
558 {
559 let mut index = relay_sync_index.write().await;
560 index.insert(
561 relay_url.clone(),
562 RelayState {
563 repos: HashSet::new(),
564 root_events: HashSet::new(),
565 is_bootstrap: true,
566 connection_status: ConnectionStatus::Connected,
567 last_connected: Some(Timestamp::now()),
568 disconnected_at: None,
569 },
570 );
571 }
572
573 // Create event channel
574 let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000);
575
576 // Spawn event loop
577 tokio::spawn(async move {
578 connection.run_event_loop(event_tx).await;
579 });
580
581 // Spawn event processor
582 let relay_url_clone = relay_url.clone();
583 tokio::spawn(async move {
584 while let Some(relay_event) = event_rx.recv().await {
585 match relay_event {
586 RelayEvent::Event(event) => {
587 Self::process_event_static(&event, &relay_url_clone, &database, &write_policy)
588 .await;
589 }
590 RelayEvent::EndOfStoredEvents(_) => {
591 tracing::debug!("EOSE from {}", relay_url_clone);
592 }
593 RelayEvent::Closed(_) | RelayEvent::Shutdown => {
594 tracing::info!("Relay {} disconnected", relay_url_clone);
595 break;
596 }
597 }
598 }
599 });
600 }
601
602 /// Process a single event from a relay (static version for spawned tasks)
603 async fn process_event_static(
604 event: &Event,
605 relay_url: &str,
606 database: &SharedDatabase,
607 write_policy: &Nip34WritePolicy,
608 ) {
609 use nostr_relay_builder::prelude::{PolicyResult, WritePolicy};
610 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
611
612 // Check if event already exists
613 match database.event_by_id(&event.id).await {
614 Ok(Some(_)) => {
615 tracing::trace!(event_id = %event.id, "Event already exists, skipping");
616 return;
617 }
618 Err(e) => {
619 tracing::warn!(event_id = %event.id, error = %e, "Database error checking event");
620 return;
621 }
622 Ok(None) => {} // Continue processing
623 }
624
625 // Apply write policy using a dummy address (sync events aren't from network clients)
626 let dummy_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
627 let result = write_policy.admit_event(event, &dummy_addr).await;
628
629 match result {
630 PolicyResult::Accept => {
631 // Save event
632 if let Err(e) = database.save_event(event).await {
633 tracing::error!(
634 event_id = %event.id,
635 relay = %relay_url,
636 error = %e,
637 "Failed to save synced event"
638 );
639 } else {
640 tracing::debug!(
641 event_id = %event.id,
642 relay = %relay_url,
643 kind = %event.kind.as_u16(),
644 "Saved synced event"
645 );
646 }
647 }
648 PolicyResult::Reject(reason) => {
649 tracing::debug!(
650 event_id = %event.id,
651 relay = %relay_url,
652 reason = %reason,
653 "Event rejected by write policy"
654 );
655 }
656 }
374 } 657 }
375} \ No newline at end of file 658} \ No newline at end of file
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
new file mode 100644
index 0000000..6499c27
--- /dev/null
+++ b/src/sync/relay_connection.rs
@@ -0,0 +1,216 @@
1//! Relay Connection Management for Proactive Sync
2//!
3//! This module provides relay connection management for external relay connections.
4//! Each RelayConnection manages a single connection to an external relay and handles
5//! subscriptions using the three-layer sync strategy.
6//!
7//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
8
9use nostr_sdk::prelude::*;
10use tokio::sync::mpsc;
11
12use super::filters::build_announcement_filter;
13
14/// Events from a relay connection
15#[derive(Debug)]
16pub enum RelayEvent {
17 /// A new event was received
18 Event(Event),
19 /// End of stored events for a subscription
20 EndOfStoredEvents(SubscriptionId),
21 /// Connection was closed
22 Closed(String),
23 /// Shutdown notification
24 Shutdown,
25}
26
27/// Manages connection to a single external relay
28///
29/// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection
30/// to an external relay. It handles:
31/// - Connection establishment
32/// - Layer 1 subscription (announcements)
33/// - Additional filter subscriptions (Layers 2 & 3)
34/// - Event notification loop
35pub struct RelayConnection {
36 /// The relay URL this connection is for
37 url: String,
38 /// The underlying nostr-sdk client
39 client: Client,
40}
41
42impl RelayConnection {
43 /// Create a new relay connection (not yet connected)
44 ///
45 /// # Arguments
46 /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com")
47 pub fn new(url: String) -> Self {
48 let client = Client::default();
49 Self { url, client }
50 }
51
52 /// Connect to the relay and subscribe to Layer 1 (announcements)
53 ///
54 /// This method:
55 /// 1. Adds the relay to the client
56 /// 2. Establishes the WebSocket connection
57 /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618)
58 ///
59 /// # Arguments
60 /// * `since` - Optional timestamp for incremental sync on reconnect
61 ///
62 /// # Returns
63 /// * `Ok(SubscriptionId)` - The subscription ID on successful connection
64 /// * `Err(String)` with error description on failure
65 pub async fn connect_and_subscribe(
66 &self,
67 since: Option<Timestamp>,
68 ) -> Result<SubscriptionId, String> {
69 // Add relay to client
70 self.client
71 .add_relay(&self.url)
72 .await
73 .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?;
74
75 // Establish connection
76 self.client.connect().await;
77
78 // Subscribe to Layer 1 (announcements)
79 let filter = build_announcement_filter(since);
80 let output = self
81 .client
82 .subscribe(filter, None)
83 .await
84 .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?;
85
86 tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)");
87 Ok(output.val)
88 }
89
90 /// Run the event loop, sending events through the provided channel
91 ///
92 /// This method blocks and processes notifications from the relay:
93 /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event`
94 /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents`
95 /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown`
96 ///
97 /// The loop terminates when:
98 /// - The sender channel is closed (receiver dropped)
99 /// - A shutdown notification is received
100 /// - An error occurs receiving notifications
101 ///
102 /// # Arguments
103 /// * `event_sender` - Channel to send relay events through
104 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
105 let mut notifications = self.client.notifications();
106 let url = self.url.clone();
107
108 tracing::debug!(relay = %url, "Starting event loop");
109
110 while let Ok(notification) = notifications.recv().await {
111 match notification {
112 RelayPoolNotification::Event { event, .. } => {
113 tracing::trace!(relay = %url, event_id = %event.id, "Received event");
114 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
115 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
116 break;
117 }
118 }
119 RelayPoolNotification::Message { message, .. } => {
120 match message {
121 RelayMessage::EndOfStoredEvents(sub_id) => {
122 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE");
123 // Convert Cow<SubscriptionId> to owned SubscriptionId
124 let owned_sub_id = sub_id.into_owned();
125 if event_sender
126 .send(RelayEvent::EndOfStoredEvents(owned_sub_id))
127 .await
128 .is_err()
129 {
130 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
131 break;
132 }
133 }
134 RelayMessage::Closed { message: msg, .. } => {
135 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
136 let _ = event_sender
137 .send(RelayEvent::Closed(msg.to_string()))
138 .await;
139 }
140 _ => {}
141 }
142 }
143 RelayPoolNotification::Shutdown => {
144 tracing::info!(relay = %url, "Relay pool shutdown");
145 let _ = event_sender.send(RelayEvent::Shutdown).await;
146 break;
147 }
148 }
149 }
150
151 tracing::debug!(relay = %url, "Event loop terminated");
152 }
153
154 /// Add additional filter subscription (for Layer 2 + 3)
155 ///
156 /// Use this to subscribe to:
157 /// - Layer 2: Events tagging our repos (a/A/q tags)
158 /// - Layer 3: Events tagging our root events (e/E/q tags)
159 ///
160 /// # Arguments
161 /// * `filter` - The filter to subscribe to
162 ///
163 /// # Returns
164 /// * `Ok(SubscriptionId)` - The subscription ID on success
165 /// * `Err(String)` - Error description on failure
166 pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> {
167 let output = self
168 .client
169 .subscribe(filter, None)
170 .await
171 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
172 Ok(output.val)
173 }
174
175 /// Subscribe to multiple filters at once
176 ///
177 /// Each filter creates its own subscription. Returns when all subscriptions
178 /// are established. This is useful for Layer 2 + 3 filters together.
179 ///
180 /// # Arguments
181 /// * `filters` - Vec of filters to subscribe to
182 ///
183 /// # Returns
184 /// * `Ok(Vec<SubscriptionId>)` - The subscription IDs on success
185 /// * `Err(String)` - Error description on failure
186 pub async fn subscribe_filters(
187 &self,
188 filters: Vec<Filter>,
189 ) -> Result<Vec<SubscriptionId>, String> {
190 if filters.is_empty() {
191 return Ok(vec![]);
192 }
193
194 let mut sub_ids = Vec::with_capacity(filters.len());
195 for filter in filters {
196 let output = self
197 .client
198 .subscribe(filter, None)
199 .await
200 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
201 sub_ids.push(output.val);
202 }
203 Ok(sub_ids)
204 }
205
206 /// Get the relay URL
207 pub fn url(&self) -> &str {
208 &self.url
209 }
210
211 /// Disconnect from the relay
212 pub async fn disconnect(&self) {
213 self.client.disconnect().await;
214 tracing::debug!(relay = %self.url, "Disconnected from relay");
215 }
216} \ No newline at end of file
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
new file mode 100644
index 0000000..1dec219
--- /dev/null
+++ b/src/sync/self_subscriber.rs
@@ -0,0 +1,430 @@
1//! Self-Subscriber for Proactive Sync
2//!
3//! Monitors the relay's own database for repository announcements and
4//! updates the RepoSyncIndex when new relevant events are discovered.
5//!
6//! This module subscribes to relevant event kinds on our own relay and
7//! batches updates before sending them to the SyncManager.
8//!
9//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
10
11use std::collections::{HashMap, HashSet};
12use std::time::Duration;
13
14use nostr_sdk::prelude::*;
15use tokio::sync::mpsc;
16
17use super::{RepoSyncIndex, RepoSyncNeeds};
18
19// =============================================================================
20// RelayAction - Actions to send to SyncManager
21// =============================================================================
22
23/// Actions that the SelfSubscriber sends to the SyncManager
24#[derive(Debug)]
25pub enum RelayAction {
26 /// Spawn a new relay connection
27 SpawnRelay {
28 /// The relay URL to connect to
29 relay_url: String,
30 /// Repos to sync, mapped to their root event IDs
31 repos: HashMap<String, HashSet<EventId>>,
32 },
33 /// Add filters to an existing relay connection
34 AddFilters {
35 /// The relay URL to add filters to
36 relay_url: String,
37 /// Repos to sync, mapped to their root event IDs
38 repos: HashMap<String, HashSet<EventId>>,
39 },
40}
41
42// =============================================================================
43// PendingUpdates - Accumulator for batching
44// =============================================================================
45
46/// Accumulates updates between batch timer firings
47struct PendingUpdates {
48 /// Repos discovered since last batch, keyed by repo addressable ref
49 repos: HashMap<String, RepoSyncNeeds>,
50}
51
52impl PendingUpdates {
53 /// Create a new empty pending updates accumulator
54 fn new() -> Self {
55 Self {
56 repos: HashMap::new(),
57 }
58 }
59
60 /// Add or update a repo with its relays and root events
61 fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) {
62 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
63 relays: HashSet::new(),
64 root_events: HashSet::new(),
65 });
66 entry.relays.extend(relays);
67 entry.root_events.extend(root_events);
68 }
69
70 /// Check if there are any pending updates
71 fn is_empty(&self) -> bool {
72 self.repos.is_empty()
73 }
74
75 /// Take all pending updates, leaving empty
76 fn take(&mut self) -> HashMap<String, RepoSyncNeeds> {
77 std::mem::take(&mut self.repos)
78 }
79}
80
81// =============================================================================
82// SelfSubscriber - Main Component
83// =============================================================================
84
85/// Subscribes to own relay's events to discover repos needing sync
86///
87/// The SelfSubscriber connects to our own relay and monitors for:
88/// - 30617 (Repository Announcements) - to discover repos listing our relay
89/// - 1617 (Patches) - root events referencing repos
90/// - 1618 (Issues) - root events referencing repos
91/// - 1619 (Replies) - root events referencing repos
92/// - 1621 (PRs) - root events referencing repos
93///
94/// Note: 30618 is NOT subscribed to here (per v4 spec - only synced from remote relays)
95pub struct SelfSubscriber {
96 /// Our own relay URL (to connect to)
97 own_relay_url: String,
98 /// Our service domain (for filtering relevant repos)
99 relay_domain: String,
100 /// Shared index of repos to sync
101 repo_sync_index: RepoSyncIndex,
102 /// Channel to send actions to SyncManager
103 action_tx: mpsc::Sender<RelayAction>,
104}
105
106impl SelfSubscriber {
107 /// Create a new SelfSubscriber
108 ///
109 /// # Arguments
110 /// * `own_relay_url` - The WebSocket URL of our own relay
111 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
112 /// * `repo_sync_index` - Shared index to update with discovered repos
113 /// * `action_tx` - Channel to send RelayActions to the SyncManager
114 pub fn new(
115 own_relay_url: String,
116 relay_domain: String,
117 repo_sync_index: RepoSyncIndex,
118 action_tx: mpsc::Sender<RelayAction>,
119 ) -> Self {
120 Self {
121 own_relay_url,
122 relay_domain,
123 repo_sync_index,
124 action_tx,
125 }
126 }
127
128 /// Get batch window from environment or use default
129 ///
130 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable.
131 /// Default: 5000ms (5 seconds)
132 fn get_batch_window() -> Duration {
133 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS")
134 .ok()
135 .and_then(|s| s.parse::<u64>().ok())
136 .map(Duration::from_millis)
137 .unwrap_or(Duration::from_millis(5000))
138 }
139
140 /// Extract relay URLs from event tags
141 ///
142 /// Extracts URLs from:
143 /// - `relays` tags: ["relays", "wss://relay1.com", "wss://relay2.com", ...]
144 /// - `clone` tags: ["clone", "https://example.com/repo.git", ...] (converted to ws://)
145 fn extract_relay_urls(event: &Event) -> HashSet<String> {
146 let mut relays = HashSet::new();
147
148 for tag in event.tags.iter() {
149 let tag_vec = tag.as_slice();
150 if tag_vec.is_empty() {
151 continue;
152 }
153
154 match tag_vec[0].as_str() {
155 "relays" => {
156 // All subsequent values are relay URLs
157 for url in tag_vec.iter().skip(1) {
158 relays.insert(url.to_string());
159 }
160 }
161 "clone" if tag_vec.len() >= 2 => {
162 // Convert http(s) clone URL to ws(s) relay URL
163 if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) {
164 relays.insert(relay_url);
165 }
166 }
167 _ => {}
168 }
169 }
170
171 relays
172 }
173
174 /// Extract repo identifier from event
175 ///
176 /// For kind 30617, uses the `d` tag to build the addressable reference
177 /// Format: 30617:pubkey:identifier
178 fn extract_repo_id(event: &Event) -> Option<String> {
179 // For kind 30617, extract d tag and build addressable ref
180 if event.kind == Kind::Custom(30617) {
181 for tag in event.tags.iter() {
182 let tag_vec = tag.as_slice();
183 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
184 return Some(format!("30617:{}:{}", event.pubkey, tag_vec[1]));
185 }
186 }
187 }
188
189 // For other kinds (1617, 1618, 1619, 1621), we'd need to look at
190 // their 'a' tags to find which repo they belong to.
191 // That processing happens in the batch processing, not here.
192 None
193 }
194
195 /// Check if announcement lists our relay
196 ///
197 /// Returns true if any extracted relay URL contains our domain
198 fn lists_our_relay(&self, event: &Event) -> bool {
199 Self::extract_relay_urls(event).iter().any(|url| {
200 url.contains(&self.relay_domain) || url == &self.own_relay_url
201 })
202 }
203
204 /// Main run loop
205 ///
206 /// Connects to own relay, subscribes to relevant event kinds,
207 /// and batches updates before processing them.
208 pub async fn run(self) {
209 let client = Client::default();
210
211 // Add own relay
212 if let Err(e) = client.add_relay(&self.own_relay_url).await {
213 tracing::error!(
214 url = %self.own_relay_url,
215 error = %e,
216 "Failed to add own relay for self-subscription"
217 );
218 return;
219 }
220
221 // Connect
222 client.connect().await;
223
224 // Subscribe to announcement and root event kinds
225 // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618)
226 let filter = Filter::new().kinds(vec![
227 Kind::Custom(30617), // Repository Announcements
228 Kind::Custom(1617), // Patches
229 Kind::Custom(1618), // Issues
230 Kind::Custom(1619), // Replies/Status
231 Kind::Custom(1621), // Pull Requests
232 ]);
233
234 if let Err(e) = client.subscribe(filter, None).await {
235 tracing::error!(
236 error = %e,
237 "Failed to subscribe to own relay for self-subscription"
238 );
239 return;
240 }
241
242 tracing::info!(
243 url = %self.own_relay_url,
244 domain = %self.relay_domain,
245 "SelfSubscriber started"
246 );
247
248 let mut notifications = client.notifications();
249 let batch_window = Self::get_batch_window();
250 let mut pending = PendingUpdates::new();
251
252 // Timer does NOT reset on new events - use interval
253 let mut timer = tokio::time::interval(batch_window);
254 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
255
256 loop {
257 tokio::select! {
258 notification = notifications.recv() => {
259 match notification {
260 Ok(RelayPoolNotification::Event { event, .. }) => {
261 // Only process 30617 events that list our relay
262 if event.kind == Kind::Custom(30617) {
263 if !self.lists_our_relay(&event) {
264 continue;
265 }
266
267 // Extract repo ID and relays
268 if let Some(repo_id) = Self::extract_repo_id(&event) {
269 let relays = Self::extract_relay_urls(&event);
270 let mut root_events = HashSet::new();
271 root_events.insert(event.id);
272
273 pending.add_repo(repo_id, relays, root_events);
274 tracing::debug!(
275 event_id = %event.id,
276 "Queued 30617 announcement for batch processing"
277 );
278 }
279 } else {
280 // For root event kinds (1617, 1618, 1619, 1621),
281 // we need to check if they reference repos we care about.
282 // For now, we'll track them in a simpler way.
283 // Full implementation would extract 'a' tag and match to known repos.
284 tracing::trace!(
285 kind = %event.kind,
286 event_id = %event.id,
287 "Received root event (processing deferred)"
288 );
289 }
290 }
291 Ok(RelayPoolNotification::Shutdown) => {
292 tracing::info!("SelfSubscriber received shutdown notification");
293 break;
294 }
295 Err(e) => {
296 tracing::error!(error = %e, "Error receiving notification");
297 break;
298 }
299 _ => {}
300 }
301 }
302 _ = timer.tick() => {
303 if !pending.is_empty() {
304 self.process_batch(&mut pending).await;
305 }
306 }
307 }
308 }
309
310 tracing::info!("SelfSubscriber stopped");
311 }
312
313 /// Process accumulated batch
314 ///
315 /// Updates the RepoSyncIndex with discovered repos, then derives per-relay
316 /// targets and sends RelayAction messages to the SyncManager.
317 async fn process_batch(&self, pending: &mut PendingUpdates) {
318 use crate::sync::algorithms::derive_relay_targets;
319
320 let updates = pending.take();
321
322 if updates.is_empty() {
323 return;
324 }
325
326 tracing::info!(
327 repo_count = updates.len(),
328 "Processing batch of repo updates"
329 );
330
331 // Update RepoSyncIndex
332 let mut index = self.repo_sync_index.write().await;
333
334 for (repo_id, needs) in updates {
335 // Merge with existing entry or insert new
336 let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds {
337 relays: HashSet::new(),
338 root_events: HashSet::new(),
339 });
340 entry.relays.extend(needs.relays);
341 entry.root_events.extend(needs.root_events);
342
343 tracing::debug!(
344 repo_id = %repo_id,
345 relay_count = entry.relays.len(),
346 event_count = entry.root_events.len(),
347 "Updated repo sync needs"
348 );
349 }
350
351 // Derive per-relay targets from the updated index
352 let targets = derive_relay_targets(&index);
353 drop(index); // Release lock before async operations
354
355 // For each relay, send SpawnRelay action
356 // SyncManager will check if relay already exists
357 for (relay_url, needs) in targets {
358 // Skip our own relay URL (we're subscribed to ourselves via self-subscription)
359 if relay_url.contains(&self.relay_domain) {
360 continue;
361 }
362
363 // Convert needs to HashMap<String, HashSet<EventId>>
364 let mut repos = HashMap::new();
365 for repo_id in needs.repos {
366 repos.insert(repo_id, needs.root_events.clone());
367 }
368
369 let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos };
370
371 if let Err(e) = self.action_tx.send(action).await {
372 tracing::error!(
373 relay = %relay_url,
374 error = %e,
375 "Failed to send SpawnRelay action"
376 );
377 } else {
378 tracing::debug!(
379 relay = %relay_url,
380 "Sent SpawnRelay action to SyncManager"
381 );
382 }
383 }
384 }
385}
386
387// =============================================================================
388// Helper Functions
389// =============================================================================
390
391/// Convert clone URL to relay URL
392///
393/// Converts http:// to ws:// and https:// to wss://
394/// Returns None for unsupported URL schemes
395fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
396 if clone_url.starts_with("http://") {
397 Some(clone_url.replacen("http://", "ws://", 1))
398 } else if clone_url.starts_with("https://") {
399 Some(clone_url.replacen("https://", "wss://", 1))
400 } else {
401 None
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_clone_url_to_relay_url_https() {
411 assert_eq!(
412 clone_url_to_relay_url("https://example.com/repo.git"),
413 Some("wss://example.com/repo.git".to_string())
414 );
415 }
416
417 #[test]
418 fn test_clone_url_to_relay_url_http() {
419 assert_eq!(
420 clone_url_to_relay_url("http://localhost:3000/repo.git"),
421 Some("ws://localhost:3000/repo.git".to_string())
422 );
423 }
424
425 #[test]
426 fn test_clone_url_to_relay_url_unsupported() {
427 assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None);
428 assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None);
429 }
430} \ No newline at end of file