upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/algorithms.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/algorithms.rs')
-rw-r--r--src/sync/algorithms.rs589
1 files changed, 589 insertions, 0 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
new file mode 100644
index 0000000..7d87411
--- /dev/null
+++ b/src/sync/algorithms.rs
@@ -0,0 +1,589 @@
1//! Core Sync Algorithms for Proactive Sync
2//!
3//! This module provides the decision-making algorithms for the sync system:
4//!
5//! - `derive_relay_targets()` - Inverts RepoSyncIndex to per-relay view
6//! - `compute_actions()` - Three-way diff to determine new sync actions
7//!
8//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
9
10use std::collections::{HashMap, HashSet};
11
12use nostr_sdk::prelude::*;
13
14use super::{ConnectionStatus, PendingBatch, RelayState};
15
16// =============================================================================
17// Data Structures
18// =============================================================================
19
20/// Relay-centric view of what needs syncing
21///
22/// This is the inverted view of `RepoSyncNeeds` - instead of "what relays does
23/// this repo need to sync from", it's "what repos does this relay need to sync".
24#[derive(Debug, Clone, Default)]
25pub struct RelaySyncNeeds {
26 /// Repos that need to be synced from this relay
27 pub repos: HashSet<String>,
28 /// Root events that need to be tracked from this relay
29 pub root_events: HashSet<EventId>,
30}
31
32/// Action to add filters to a relay
33///
34/// Produced by `compute_actions()` to describe incremental sync work needed.
35#[derive(Debug)]
36pub struct AddFilters {
37 /// The relay URL to add filters to
38 pub relay_url: String,
39 /// Repos being synced in this action
40 pub repos: HashSet<String>,
41 /// Root events being tracked in this action
42 pub root_events: HashSet<EventId>,
43 /// The actual filters to subscribe with
44 pub filters: Vec<Filter>,
45}
46
47// =============================================================================
48// Core Algorithms
49// =============================================================================
50
51/// Inverts RepoSyncIndex to per-relay view
52///
53/// Takes the repo-centric index (repo -> {relays, root_events}) and inverts it
54/// to a relay-centric view (relay -> {repos, root_events}).
55///
56/// # Arguments
57/// * `repo_index` - Map of repo addressable refs to their sync needs
58///
59/// # Returns
60/// Map of relay URLs to the combined sync needs from all repos
61pub fn derive_relay_targets(
62 repo_index: &HashMap<String, super::RepoSyncNeeds>,
63) -> HashMap<String, RelaySyncNeeds> {
64 let mut relay_targets: HashMap<String, RelaySyncNeeds> = HashMap::new();
65
66 for (repo_id, needs) in repo_index {
67 for relay_url in &needs.relays {
68 let entry = relay_targets
69 .entry(relay_url.clone())
70 .or_insert_with(RelaySyncNeeds::default);
71
72 entry.repos.insert(repo_id.clone());
73 entry.root_events.extend(needs.root_events.iter().cloned());
74 }
75 }
76
77 relay_targets
78}
79
80/// Three-way diff: target - pending - confirmed = new
81///
82/// Computes what sync actions are needed by comparing:
83/// 1. What we want (targets)
84/// 2. What's already in-flight (pending)
85/// 3. What's already confirmed (confirmed)
86///
87/// Only creates AddFilters actions for items not already pending or confirmed.
88///
89/// # Arguments
90/// * `targets` - Per-relay sync needs (from `derive_relay_targets`)
91/// * `pending` - In-flight batches per relay
92/// * `confirmed` - Confirmed relay states
93///
94/// # Returns
95/// Vec of AddFilters actions for new sync work
96pub fn compute_actions(
97 targets: &HashMap<String, RelaySyncNeeds>,
98 pending: &HashMap<String, Vec<PendingBatch>>,
99 confirmed: &HashMap<String, RelayState>,
100) -> Vec<AddFilters> {
101 use crate::sync::filters::build_layer2_and_layer3_filters;
102
103 let mut actions = Vec::new();
104
105 for (relay_url, target_needs) in targets {
106 // Skip disconnected relays
107 if let Some(state) = confirmed.get(relay_url) {
108 if matches!(state.connection_status, ConnectionStatus::Disconnected) {
109 continue;
110 }
111 }
112
113 // Calculate what's already pending
114 let pending_repos: HashSet<String> = pending
115 .get(relay_url)
116 .map(|batches| {
117 batches
118 .iter()
119 .flat_map(|batch| batch.items.repos.iter().cloned())
120 .collect()
121 })
122 .unwrap_or_default();
123
124 let pending_events: HashSet<EventId> = pending
125 .get(relay_url)
126 .map(|batches| {
127 batches
128 .iter()
129 .flat_map(|batch| batch.items.root_events.iter().cloned())
130 .collect()
131 })
132 .unwrap_or_default();
133
134 // Calculate what's already confirmed
135 let confirmed_repos: HashSet<String> = confirmed
136 .get(relay_url)
137 .map(|state| state.repos.clone())
138 .unwrap_or_default();
139
140 let confirmed_events: HashSet<EventId> = confirmed
141 .get(relay_url)
142 .map(|state| state.root_events.clone())
143 .unwrap_or_default();
144
145 // Calculate what's NEW (not in pending, not in confirmed)
146 let new_repos: HashSet<String> = target_needs
147 .repos
148 .difference(&pending_repos)
149 .filter(|repo| !confirmed_repos.contains(*repo))
150 .cloned()
151 .collect();
152
153 let new_events: HashSet<EventId> = target_needs
154 .root_events
155 .difference(&pending_events)
156 .filter(|event| !confirmed_events.contains(*event))
157 .cloned()
158 .collect();
159
160 // If there's anything new, create an AddFilters action
161 if !new_repos.is_empty() || !new_events.is_empty() {
162 let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None);
163
164 actions.push(AddFilters {
165 relay_url: relay_url.clone(),
166 repos: new_repos,
167 root_events: new_events,
168 filters,
169 });
170 }
171 }
172
173 actions
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds;
180
181 // =========================================================================
182 // derive_relay_targets tests
183 // =========================================================================
184
185 #[test]
186 fn test_derive_relay_targets_empty() {
187 let repo_index = HashMap::new();
188 let targets = derive_relay_targets(&repo_index);
189 assert!(targets.is_empty());
190 }
191
192 #[test]
193 fn test_derive_relay_targets_single_repo_single_relay() {
194 let mut repo_index = HashMap::new();
195 let mut relays = HashSet::new();
196 relays.insert("wss://relay1.com".to_string());
197
198 let mut root_events = HashSet::new();
199 root_events.insert(EventId::all_zeros());
200
201 repo_index.insert(
202 "repo1".to_string(),
203 ModRepoSyncNeeds {
204 relays,
205 root_events,
206 },
207 );
208
209 let targets = derive_relay_targets(&repo_index);
210
211 assert_eq!(targets.len(), 1);
212 let relay_needs = targets.get("wss://relay1.com").unwrap();
213 assert_eq!(relay_needs.repos.len(), 1);
214 assert!(relay_needs.repos.contains("repo1"));
215 assert_eq!(relay_needs.root_events.len(), 1);
216 }
217
218 #[test]
219 fn test_derive_relay_targets_multiple_repos_same_relay() {
220 let mut repo_index = HashMap::new();
221
222 for i in 1..=3 {
223 let mut relays = HashSet::new();
224 relays.insert("wss://relay1.com".to_string());
225
226 repo_index.insert(
227 format!("repo{}", i),
228 ModRepoSyncNeeds {
229 relays,
230 root_events: HashSet::new(),
231 },
232 );
233 }
234
235 let targets = derive_relay_targets(&repo_index);
236
237 assert_eq!(targets.len(), 1);
238 let relay_needs = targets.get("wss://relay1.com").unwrap();
239 assert_eq!(relay_needs.repos.len(), 3);
240 }
241
242 #[test]
243 fn test_derive_relay_targets_repo_across_multiple_relays() {
244 let mut repo_index = HashMap::new();
245 let mut relays = HashSet::new();
246 relays.insert("wss://relay1.com".to_string());
247 relays.insert("wss://relay2.com".to_string());
248
249 repo_index.insert(
250 "repo1".to_string(),
251 ModRepoSyncNeeds {
252 relays,
253 root_events: HashSet::new(),
254 },
255 );
256
257 let targets = derive_relay_targets(&repo_index);
258
259 assert_eq!(targets.len(), 2);
260 assert!(targets
261 .get("wss://relay1.com")
262 .unwrap()
263 .repos
264 .contains("repo1"));
265 assert!(targets
266 .get("wss://relay2.com")
267 .unwrap()
268 .repos
269 .contains("repo1"));
270 }
271
272 #[test]
273 fn test_derive_relay_targets_combines_root_events() {
274 let mut repo_index = HashMap::new();
275
276 // Repo1 has one root event
277 let mut relays1 = HashSet::new();
278 relays1.insert("wss://relay1.com".to_string());
279 let mut root_events1 = HashSet::new();
280 root_events1.insert(EventId::all_zeros());
281
282 repo_index.insert(
283 "repo1".to_string(),
284 ModRepoSyncNeeds {
285 relays: relays1,
286 root_events: root_events1,
287 },
288 );
289
290 // Repo2 also points to same relay but should have same event combined
291 let mut relays2 = HashSet::new();
292 relays2.insert("wss://relay1.com".to_string());
293 let mut root_events2 = HashSet::new();
294 root_events2.insert(EventId::all_zeros()); // Same event
295
296 repo_index.insert(
297 "repo2".to_string(),
298 ModRepoSyncNeeds {
299 relays: relays2,
300 root_events: root_events2,
301 },
302 );
303
304 let targets = derive_relay_targets(&repo_index);
305
306 assert_eq!(targets.len(), 1);
307 let relay_needs = targets.get("wss://relay1.com").unwrap();
308 assert_eq!(relay_needs.repos.len(), 2);
309 // Root events should be deduplicated
310 assert_eq!(relay_needs.root_events.len(), 1);
311 }
312
313 // =========================================================================
314 // compute_actions tests
315 // =========================================================================
316
317 #[test]
318 fn test_compute_actions_empty() {
319 let targets = HashMap::new();
320 let pending = HashMap::new();
321 let confirmed = HashMap::new();
322
323 let actions = compute_actions(&targets, &pending, &confirmed);
324 assert!(actions.is_empty());
325 }
326
327 #[test]
328 fn test_compute_actions_skips_disconnected() {
329 let mut targets = HashMap::new();
330 targets.insert(
331 "wss://relay1.com".to_string(),
332 RelaySyncNeeds {
333 repos: vec!["repo1".to_string()].into_iter().collect(),
334 root_events: HashSet::new(),
335 },
336 );
337
338 let pending = HashMap::new();
339
340 let mut confirmed = HashMap::new();
341 confirmed.insert(
342 "wss://relay1.com".to_string(),
343 RelayState {
344 repos: HashSet::new(),
345 root_events: HashSet::new(),
346 is_bootstrap: false,
347 connection_status: ConnectionStatus::Disconnected,
348 last_connected: None,
349 disconnected_at: None,
350 },
351 );
352
353 let actions = compute_actions(&targets, &pending, &confirmed);
354 assert!(actions.is_empty(), "Should skip disconnected relays");
355 }
356
357 #[test]
358 fn test_compute_actions_new_repo() {
359 let mut targets = HashMap::new();
360 targets.insert(
361 "wss://relay1.com".to_string(),
362 RelaySyncNeeds {
363 repos: vec!["repo1".to_string()].into_iter().collect(),
364 root_events: HashSet::new(),
365 },
366 );
367
368 let pending = HashMap::new();
369 let confirmed = HashMap::new();
370
371 let actions = compute_actions(&targets, &pending, &confirmed);
372
373 assert_eq!(actions.len(), 1);
374 let action = &actions[0];
375 assert_eq!(action.relay_url, "wss://relay1.com");
376 assert!(action.repos.contains("repo1"));
377 assert!(!action.filters.is_empty());
378 }
379
380 #[test]
381 fn test_compute_actions_excludes_pending() {
382 let mut targets = HashMap::new();
383 targets.insert(
384 "wss://relay1.com".to_string(),
385 RelaySyncNeeds {
386 repos: vec!["repo1".to_string()].into_iter().collect(),
387 root_events: HashSet::new(),
388 },
389 );
390
391 let mut pending = HashMap::new();
392 pending.insert(
393 "wss://relay1.com".to_string(),
394 vec![super::super::PendingBatch {
395 batch_id: 1,
396 items: super::super::PendingItems {
397 repos: vec!["repo1".to_string()].into_iter().collect(),
398 root_events: HashSet::new(),
399 },
400 outstanding_subs: HashSet::new(),
401 }],
402 );
403
404 let confirmed = HashMap::new();
405
406 let actions = compute_actions(&targets, &pending, &confirmed);
407 assert!(
408 actions.is_empty(),
409 "Should not create action for pending items"
410 );
411 }
412
413 #[test]
414 fn test_compute_actions_excludes_confirmed() {
415 let mut targets = HashMap::new();
416 targets.insert(
417 "wss://relay1.com".to_string(),
418 RelaySyncNeeds {
419 repos: vec!["repo1".to_string()].into_iter().collect(),
420 root_events: HashSet::new(),
421 },
422 );
423
424 let pending = HashMap::new();
425
426 let mut confirmed = HashMap::new();
427 confirmed.insert(
428 "wss://relay1.com".to_string(),
429 RelayState {
430 repos: vec!["repo1".to_string()].into_iter().collect(),
431 root_events: HashSet::new(),
432 is_bootstrap: false,
433 connection_status: ConnectionStatus::Connected,
434 last_connected: None,
435 disconnected_at: None,
436 },
437 );
438
439 let actions = compute_actions(&targets, &pending, &confirmed);
440 assert!(
441 actions.is_empty(),
442 "Should not create action for confirmed items"
443 );
444 }
445
446 #[test]
447 fn test_compute_actions_allows_connecting_relays() {
448 let mut targets = HashMap::new();
449 targets.insert(
450 "wss://relay1.com".to_string(),
451 RelaySyncNeeds {
452 repos: vec!["repo1".to_string()].into_iter().collect(),
453 root_events: HashSet::new(),
454 },
455 );
456
457 let pending = HashMap::new();
458
459 let mut confirmed = HashMap::new();
460 confirmed.insert(
461 "wss://relay1.com".to_string(),
462 RelayState {
463 repos: HashSet::new(),
464 root_events: HashSet::new(),
465 is_bootstrap: false,
466 connection_status: ConnectionStatus::Connecting,
467 last_connected: None,
468 disconnected_at: None,
469 },
470 );
471
472 let actions = compute_actions(&targets, &pending, &confirmed);
473 assert_eq!(
474 actions.len(),
475 1,
476 "Should create action for connecting relays"
477 );
478 }
479
480 #[test]
481 fn test_compute_actions_partial_overlap() {
482 // Target has repo1, repo2, repo3
483 let mut targets = HashMap::new();
484 targets.insert(
485 "wss://relay1.com".to_string(),
486 RelaySyncNeeds {
487 repos: vec![
488 "repo1".to_string(),
489 "repo2".to_string(),
490 "repo3".to_string(),
491 ]
492 .into_iter()
493 .collect(),
494 root_events: HashSet::new(),
495 },
496 );
497
498 // repo1 is pending
499 let mut pending = HashMap::new();
500 pending.insert(
501 "wss://relay1.com".to_string(),
502 vec![super::super::PendingBatch {
503 batch_id: 1,
504 items: super::super::PendingItems {
505 repos: vec!["repo1".to_string()].into_iter().collect(),
506 root_events: HashSet::new(),
507 },
508 outstanding_subs: HashSet::new(),
509 }],
510 );
511
512 // repo2 is confirmed
513 let mut confirmed = HashMap::new();
514 confirmed.insert(
515 "wss://relay1.com".to_string(),
516 RelayState {
517 repos: vec!["repo2".to_string()].into_iter().collect(),
518 root_events: HashSet::new(),
519 is_bootstrap: false,
520 connection_status: ConnectionStatus::Connected,
521 last_connected: None,
522 disconnected_at: None,
523 },
524 );
525
526 let actions = compute_actions(&targets, &pending, &confirmed);
527
528 assert_eq!(actions.len(), 1);
529 let action = &actions[0];
530 // Only repo3 should be in the action (repo1 pending, repo2 confirmed)
531 assert_eq!(action.repos.len(), 1);
532 assert!(action.repos.contains("repo3"));
533 assert!(!action.repos.contains("repo1"));
534 assert!(!action.repos.contains("repo2"));
535 }
536
537 #[test]
538 fn test_compute_actions_with_root_events() {
539 let event_id = EventId::all_zeros();
540
541 let mut targets = HashMap::new();
542 targets.insert(
543 "wss://relay1.com".to_string(),
544 RelaySyncNeeds {
545 repos: HashSet::new(),
546 root_events: vec![event_id].into_iter().collect(),
547 },
548 );
549
550 let pending = HashMap::new();
551 let confirmed = HashMap::new();
552
553 let actions = compute_actions(&targets, &pending, &confirmed);
554
555 assert_eq!(actions.len(), 1);
556 let action = &actions[0];
557 assert!(action.repos.is_empty());
558 assert_eq!(action.root_events.len(), 1);
559 assert!(action.root_events.contains(&event_id));
560 // Should have 3 filters for the root event (e, E, q tags)
561 assert_eq!(action.filters.len(), 3);
562 }
563
564 #[test]
565 fn test_compute_actions_unknown_relay_creates_action() {
566 // When a relay is not in confirmed at all, it should still create an action
567 // (it's treated as connected by default if missing from confirmed)
568 let mut targets = HashMap::new();
569 targets.insert(
570 "wss://new-relay.com".to_string(),
571 RelaySyncNeeds {
572 repos: vec!["repo1".to_string()].into_iter().collect(),
573 root_events: HashSet::new(),
574 },
575 );
576
577 let pending = HashMap::new();
578 let confirmed = HashMap::new(); // relay not in confirmed
579
580 let actions = compute_actions(&targets, &pending, &confirmed);
581
582 assert_eq!(
583 actions.len(),
584 1,
585 "Should create action for unknown relay (not yet tracked)"
586 );
587 assert_eq!(actions[0].relay_url, "wss://new-relay.com");
588 }
589} \ No newline at end of file