diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:07:50 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 11:07:50 +0000 |
| commit | 39e782b12fce1776f2ad0b0f5430749533cb80ea (patch) | |
| tree | d050a079a82898848da870d9307a98a83480629b /src | |
| parent | 586fc2a7df1ce256469f0742d23f687ac4b075b1 (diff) | |
sync v4 mvp
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/algorithms.rs | 589 | ||||
| -rw-r--r-- | src/sync/filters.rs | 340 | ||||
| -rw-r--r-- | src/sync/mod.rs | 301 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 216 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 430 |
5 files changed, 1867 insertions, 9 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs new file mode 100644 index 0000000..7d87411 --- /dev/null +++ b/src/sync/algorithms.rs | |||
| @@ -0,0 +1,589 @@ | |||
| 1 | //! Core Sync Algorithms for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module provides the decision-making algorithms for the sync system: | ||
| 4 | //! | ||
| 5 | //! - `derive_relay_targets()` - Inverts RepoSyncIndex to per-relay view | ||
| 6 | //! - `compute_actions()` - Three-way diff to determine new sync actions | ||
| 7 | //! | ||
| 8 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | ||
| 9 | |||
| 10 | use std::collections::{HashMap, HashSet}; | ||
| 11 | |||
| 12 | use nostr_sdk::prelude::*; | ||
| 13 | |||
| 14 | use super::{ConnectionStatus, PendingBatch, RelayState}; | ||
| 15 | |||
| 16 | // ============================================================================= | ||
| 17 | // Data Structures | ||
| 18 | // ============================================================================= | ||
| 19 | |||
| 20 | /// Relay-centric view of what needs syncing | ||
| 21 | /// | ||
| 22 | /// This is the inverted view of `RepoSyncNeeds` - instead of "what relays does | ||
| 23 | /// this repo need to sync from", it's "what repos does this relay need to sync". | ||
| 24 | #[derive(Debug, Clone, Default)] | ||
| 25 | pub struct RelaySyncNeeds { | ||
| 26 | /// Repos that need to be synced from this relay | ||
| 27 | pub repos: HashSet<String>, | ||
| 28 | /// Root events that need to be tracked from this relay | ||
| 29 | pub root_events: HashSet<EventId>, | ||
| 30 | } | ||
| 31 | |||
| 32 | /// Action to add filters to a relay | ||
| 33 | /// | ||
| 34 | /// Produced by `compute_actions()` to describe incremental sync work needed. | ||
| 35 | #[derive(Debug)] | ||
| 36 | pub struct AddFilters { | ||
| 37 | /// The relay URL to add filters to | ||
| 38 | pub relay_url: String, | ||
| 39 | /// Repos being synced in this action | ||
| 40 | pub repos: HashSet<String>, | ||
| 41 | /// Root events being tracked in this action | ||
| 42 | pub root_events: HashSet<EventId>, | ||
| 43 | /// The actual filters to subscribe with | ||
| 44 | pub filters: Vec<Filter>, | ||
| 45 | } | ||
| 46 | |||
| 47 | // ============================================================================= | ||
| 48 | // Core Algorithms | ||
| 49 | // ============================================================================= | ||
| 50 | |||
| 51 | /// Inverts RepoSyncIndex to per-relay view | ||
| 52 | /// | ||
| 53 | /// Takes the repo-centric index (repo -> {relays, root_events}) and inverts it | ||
| 54 | /// to a relay-centric view (relay -> {repos, root_events}). | ||
| 55 | /// | ||
| 56 | /// # Arguments | ||
| 57 | /// * `repo_index` - Map of repo addressable refs to their sync needs | ||
| 58 | /// | ||
| 59 | /// # Returns | ||
| 60 | /// Map of relay URLs to the combined sync needs from all repos | ||
| 61 | pub fn derive_relay_targets( | ||
| 62 | repo_index: &HashMap<String, super::RepoSyncNeeds>, | ||
| 63 | ) -> HashMap<String, RelaySyncNeeds> { | ||
| 64 | let mut relay_targets: HashMap<String, RelaySyncNeeds> = HashMap::new(); | ||
| 65 | |||
| 66 | for (repo_id, needs) in repo_index { | ||
| 67 | for relay_url in &needs.relays { | ||
| 68 | let entry = relay_targets | ||
| 69 | .entry(relay_url.clone()) | ||
| 70 | .or_insert_with(RelaySyncNeeds::default); | ||
| 71 | |||
| 72 | entry.repos.insert(repo_id.clone()); | ||
| 73 | entry.root_events.extend(needs.root_events.iter().cloned()); | ||
| 74 | } | ||
| 75 | } | ||
| 76 | |||
| 77 | relay_targets | ||
| 78 | } | ||
| 79 | |||
| 80 | /// Three-way diff: target - pending - confirmed = new | ||
| 81 | /// | ||
| 82 | /// Computes what sync actions are needed by comparing: | ||
| 83 | /// 1. What we want (targets) | ||
| 84 | /// 2. What's already in-flight (pending) | ||
| 85 | /// 3. What's already confirmed (confirmed) | ||
| 86 | /// | ||
| 87 | /// Only creates AddFilters actions for items not already pending or confirmed. | ||
| 88 | /// | ||
| 89 | /// # Arguments | ||
| 90 | /// * `targets` - Per-relay sync needs (from `derive_relay_targets`) | ||
| 91 | /// * `pending` - In-flight batches per relay | ||
| 92 | /// * `confirmed` - Confirmed relay states | ||
| 93 | /// | ||
| 94 | /// # Returns | ||
| 95 | /// Vec of AddFilters actions for new sync work | ||
| 96 | pub fn compute_actions( | ||
| 97 | targets: &HashMap<String, RelaySyncNeeds>, | ||
| 98 | pending: &HashMap<String, Vec<PendingBatch>>, | ||
| 99 | confirmed: &HashMap<String, RelayState>, | ||
| 100 | ) -> Vec<AddFilters> { | ||
| 101 | use crate::sync::filters::build_layer2_and_layer3_filters; | ||
| 102 | |||
| 103 | let mut actions = Vec::new(); | ||
| 104 | |||
| 105 | for (relay_url, target_needs) in targets { | ||
| 106 | // Skip disconnected relays | ||
| 107 | if let Some(state) = confirmed.get(relay_url) { | ||
| 108 | if matches!(state.connection_status, ConnectionStatus::Disconnected) { | ||
| 109 | continue; | ||
| 110 | } | ||
| 111 | } | ||
| 112 | |||
| 113 | // Calculate what's already pending | ||
| 114 | let pending_repos: HashSet<String> = pending | ||
| 115 | .get(relay_url) | ||
| 116 | .map(|batches| { | ||
| 117 | batches | ||
| 118 | .iter() | ||
| 119 | .flat_map(|batch| batch.items.repos.iter().cloned()) | ||
| 120 | .collect() | ||
| 121 | }) | ||
| 122 | .unwrap_or_default(); | ||
| 123 | |||
| 124 | let pending_events: HashSet<EventId> = pending | ||
| 125 | .get(relay_url) | ||
| 126 | .map(|batches| { | ||
| 127 | batches | ||
| 128 | .iter() | ||
| 129 | .flat_map(|batch| batch.items.root_events.iter().cloned()) | ||
| 130 | .collect() | ||
| 131 | }) | ||
| 132 | .unwrap_or_default(); | ||
| 133 | |||
| 134 | // Calculate what's already confirmed | ||
| 135 | let confirmed_repos: HashSet<String> = confirmed | ||
| 136 | .get(relay_url) | ||
| 137 | .map(|state| state.repos.clone()) | ||
| 138 | .unwrap_or_default(); | ||
| 139 | |||
| 140 | let confirmed_events: HashSet<EventId> = confirmed | ||
| 141 | .get(relay_url) | ||
| 142 | .map(|state| state.root_events.clone()) | ||
| 143 | .unwrap_or_default(); | ||
| 144 | |||
| 145 | // Calculate what's NEW (not in pending, not in confirmed) | ||
| 146 | let new_repos: HashSet<String> = target_needs | ||
| 147 | .repos | ||
| 148 | .difference(&pending_repos) | ||
| 149 | .filter(|repo| !confirmed_repos.contains(*repo)) | ||
| 150 | .cloned() | ||
| 151 | .collect(); | ||
| 152 | |||
| 153 | let new_events: HashSet<EventId> = target_needs | ||
| 154 | .root_events | ||
| 155 | .difference(&pending_events) | ||
| 156 | .filter(|event| !confirmed_events.contains(*event)) | ||
| 157 | .cloned() | ||
| 158 | .collect(); | ||
| 159 | |||
| 160 | // If there's anything new, create an AddFilters action | ||
| 161 | if !new_repos.is_empty() || !new_events.is_empty() { | ||
| 162 | let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); | ||
| 163 | |||
| 164 | actions.push(AddFilters { | ||
| 165 | relay_url: relay_url.clone(), | ||
| 166 | repos: new_repos, | ||
| 167 | root_events: new_events, | ||
| 168 | filters, | ||
| 169 | }); | ||
| 170 | } | ||
| 171 | } | ||
| 172 | |||
| 173 | actions | ||
| 174 | } | ||
| 175 | |||
| 176 | #[cfg(test)] | ||
| 177 | mod tests { | ||
| 178 | use super::*; | ||
| 179 | use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; | ||
| 180 | |||
| 181 | // ========================================================================= | ||
| 182 | // derive_relay_targets tests | ||
| 183 | // ========================================================================= | ||
| 184 | |||
| 185 | #[test] | ||
| 186 | fn test_derive_relay_targets_empty() { | ||
| 187 | let repo_index = HashMap::new(); | ||
| 188 | let targets = derive_relay_targets(&repo_index); | ||
| 189 | assert!(targets.is_empty()); | ||
| 190 | } | ||
| 191 | |||
| 192 | #[test] | ||
| 193 | fn test_derive_relay_targets_single_repo_single_relay() { | ||
| 194 | let mut repo_index = HashMap::new(); | ||
| 195 | let mut relays = HashSet::new(); | ||
| 196 | relays.insert("wss://relay1.com".to_string()); | ||
| 197 | |||
| 198 | let mut root_events = HashSet::new(); | ||
| 199 | root_events.insert(EventId::all_zeros()); | ||
| 200 | |||
| 201 | repo_index.insert( | ||
| 202 | "repo1".to_string(), | ||
| 203 | ModRepoSyncNeeds { | ||
| 204 | relays, | ||
| 205 | root_events, | ||
| 206 | }, | ||
| 207 | ); | ||
| 208 | |||
| 209 | let targets = derive_relay_targets(&repo_index); | ||
| 210 | |||
| 211 | assert_eq!(targets.len(), 1); | ||
| 212 | let relay_needs = targets.get("wss://relay1.com").unwrap(); | ||
| 213 | assert_eq!(relay_needs.repos.len(), 1); | ||
| 214 | assert!(relay_needs.repos.contains("repo1")); | ||
| 215 | assert_eq!(relay_needs.root_events.len(), 1); | ||
| 216 | } | ||
| 217 | |||
| 218 | #[test] | ||
| 219 | fn test_derive_relay_targets_multiple_repos_same_relay() { | ||
| 220 | let mut repo_index = HashMap::new(); | ||
| 221 | |||
| 222 | for i in 1..=3 { | ||
| 223 | let mut relays = HashSet::new(); | ||
| 224 | relays.insert("wss://relay1.com".to_string()); | ||
| 225 | |||
| 226 | repo_index.insert( | ||
| 227 | format!("repo{}", i), | ||
| 228 | ModRepoSyncNeeds { | ||
| 229 | relays, | ||
| 230 | root_events: HashSet::new(), | ||
| 231 | }, | ||
| 232 | ); | ||
| 233 | } | ||
| 234 | |||
| 235 | let targets = derive_relay_targets(&repo_index); | ||
| 236 | |||
| 237 | assert_eq!(targets.len(), 1); | ||
| 238 | let relay_needs = targets.get("wss://relay1.com").unwrap(); | ||
| 239 | assert_eq!(relay_needs.repos.len(), 3); | ||
| 240 | } | ||
| 241 | |||
| 242 | #[test] | ||
| 243 | fn test_derive_relay_targets_repo_across_multiple_relays() { | ||
| 244 | let mut repo_index = HashMap::new(); | ||
| 245 | let mut relays = HashSet::new(); | ||
| 246 | relays.insert("wss://relay1.com".to_string()); | ||
| 247 | relays.insert("wss://relay2.com".to_string()); | ||
| 248 | |||
| 249 | repo_index.insert( | ||
| 250 | "repo1".to_string(), | ||
| 251 | ModRepoSyncNeeds { | ||
| 252 | relays, | ||
| 253 | root_events: HashSet::new(), | ||
| 254 | }, | ||
| 255 | ); | ||
| 256 | |||
| 257 | let targets = derive_relay_targets(&repo_index); | ||
| 258 | |||
| 259 | assert_eq!(targets.len(), 2); | ||
| 260 | assert!(targets | ||
| 261 | .get("wss://relay1.com") | ||
| 262 | .unwrap() | ||
| 263 | .repos | ||
| 264 | .contains("repo1")); | ||
| 265 | assert!(targets | ||
| 266 | .get("wss://relay2.com") | ||
| 267 | .unwrap() | ||
| 268 | .repos | ||
| 269 | .contains("repo1")); | ||
| 270 | } | ||
| 271 | |||
| 272 | #[test] | ||
| 273 | fn test_derive_relay_targets_combines_root_events() { | ||
| 274 | let mut repo_index = HashMap::new(); | ||
| 275 | |||
| 276 | // Repo1 has one root event | ||
| 277 | let mut relays1 = HashSet::new(); | ||
| 278 | relays1.insert("wss://relay1.com".to_string()); | ||
| 279 | let mut root_events1 = HashSet::new(); | ||
| 280 | root_events1.insert(EventId::all_zeros()); | ||
| 281 | |||
| 282 | repo_index.insert( | ||
| 283 | "repo1".to_string(), | ||
| 284 | ModRepoSyncNeeds { | ||
| 285 | relays: relays1, | ||
| 286 | root_events: root_events1, | ||
| 287 | }, | ||
| 288 | ); | ||
| 289 | |||
| 290 | // Repo2 also points to same relay but should have same event combined | ||
| 291 | let mut relays2 = HashSet::new(); | ||
| 292 | relays2.insert("wss://relay1.com".to_string()); | ||
| 293 | let mut root_events2 = HashSet::new(); | ||
| 294 | root_events2.insert(EventId::all_zeros()); // Same event | ||
| 295 | |||
| 296 | repo_index.insert( | ||
| 297 | "repo2".to_string(), | ||
| 298 | ModRepoSyncNeeds { | ||
| 299 | relays: relays2, | ||
| 300 | root_events: root_events2, | ||
| 301 | }, | ||
| 302 | ); | ||
| 303 | |||
| 304 | let targets = derive_relay_targets(&repo_index); | ||
| 305 | |||
| 306 | assert_eq!(targets.len(), 1); | ||
| 307 | let relay_needs = targets.get("wss://relay1.com").unwrap(); | ||
| 308 | assert_eq!(relay_needs.repos.len(), 2); | ||
| 309 | // Root events should be deduplicated | ||
| 310 | assert_eq!(relay_needs.root_events.len(), 1); | ||
| 311 | } | ||
| 312 | |||
| 313 | // ========================================================================= | ||
| 314 | // compute_actions tests | ||
| 315 | // ========================================================================= | ||
| 316 | |||
| 317 | #[test] | ||
| 318 | fn test_compute_actions_empty() { | ||
| 319 | let targets = HashMap::new(); | ||
| 320 | let pending = HashMap::new(); | ||
| 321 | let confirmed = HashMap::new(); | ||
| 322 | |||
| 323 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 324 | assert!(actions.is_empty()); | ||
| 325 | } | ||
| 326 | |||
| 327 | #[test] | ||
| 328 | fn test_compute_actions_skips_disconnected() { | ||
| 329 | let mut targets = HashMap::new(); | ||
| 330 | targets.insert( | ||
| 331 | "wss://relay1.com".to_string(), | ||
| 332 | RelaySyncNeeds { | ||
| 333 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 334 | root_events: HashSet::new(), | ||
| 335 | }, | ||
| 336 | ); | ||
| 337 | |||
| 338 | let pending = HashMap::new(); | ||
| 339 | |||
| 340 | let mut confirmed = HashMap::new(); | ||
| 341 | confirmed.insert( | ||
| 342 | "wss://relay1.com".to_string(), | ||
| 343 | RelayState { | ||
| 344 | repos: HashSet::new(), | ||
| 345 | root_events: HashSet::new(), | ||
| 346 | is_bootstrap: false, | ||
| 347 | connection_status: ConnectionStatus::Disconnected, | ||
| 348 | last_connected: None, | ||
| 349 | disconnected_at: None, | ||
| 350 | }, | ||
| 351 | ); | ||
| 352 | |||
| 353 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 354 | assert!(actions.is_empty(), "Should skip disconnected relays"); | ||
| 355 | } | ||
| 356 | |||
| 357 | #[test] | ||
| 358 | fn test_compute_actions_new_repo() { | ||
| 359 | let mut targets = HashMap::new(); | ||
| 360 | targets.insert( | ||
| 361 | "wss://relay1.com".to_string(), | ||
| 362 | RelaySyncNeeds { | ||
| 363 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 364 | root_events: HashSet::new(), | ||
| 365 | }, | ||
| 366 | ); | ||
| 367 | |||
| 368 | let pending = HashMap::new(); | ||
| 369 | let confirmed = HashMap::new(); | ||
| 370 | |||
| 371 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 372 | |||
| 373 | assert_eq!(actions.len(), 1); | ||
| 374 | let action = &actions[0]; | ||
| 375 | assert_eq!(action.relay_url, "wss://relay1.com"); | ||
| 376 | assert!(action.repos.contains("repo1")); | ||
| 377 | assert!(!action.filters.is_empty()); | ||
| 378 | } | ||
| 379 | |||
| 380 | #[test] | ||
| 381 | fn test_compute_actions_excludes_pending() { | ||
| 382 | let mut targets = HashMap::new(); | ||
| 383 | targets.insert( | ||
| 384 | "wss://relay1.com".to_string(), | ||
| 385 | RelaySyncNeeds { | ||
| 386 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 387 | root_events: HashSet::new(), | ||
| 388 | }, | ||
| 389 | ); | ||
| 390 | |||
| 391 | let mut pending = HashMap::new(); | ||
| 392 | pending.insert( | ||
| 393 | "wss://relay1.com".to_string(), | ||
| 394 | vec![super::super::PendingBatch { | ||
| 395 | batch_id: 1, | ||
| 396 | items: super::super::PendingItems { | ||
| 397 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 398 | root_events: HashSet::new(), | ||
| 399 | }, | ||
| 400 | outstanding_subs: HashSet::new(), | ||
| 401 | }], | ||
| 402 | ); | ||
| 403 | |||
| 404 | let confirmed = HashMap::new(); | ||
| 405 | |||
| 406 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 407 | assert!( | ||
| 408 | actions.is_empty(), | ||
| 409 | "Should not create action for pending items" | ||
| 410 | ); | ||
| 411 | } | ||
| 412 | |||
| 413 | #[test] | ||
| 414 | fn test_compute_actions_excludes_confirmed() { | ||
| 415 | let mut targets = HashMap::new(); | ||
| 416 | targets.insert( | ||
| 417 | "wss://relay1.com".to_string(), | ||
| 418 | RelaySyncNeeds { | ||
| 419 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 420 | root_events: HashSet::new(), | ||
| 421 | }, | ||
| 422 | ); | ||
| 423 | |||
| 424 | let pending = HashMap::new(); | ||
| 425 | |||
| 426 | let mut confirmed = HashMap::new(); | ||
| 427 | confirmed.insert( | ||
| 428 | "wss://relay1.com".to_string(), | ||
| 429 | RelayState { | ||
| 430 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 431 | root_events: HashSet::new(), | ||
| 432 | is_bootstrap: false, | ||
| 433 | connection_status: ConnectionStatus::Connected, | ||
| 434 | last_connected: None, | ||
| 435 | disconnected_at: None, | ||
| 436 | }, | ||
| 437 | ); | ||
| 438 | |||
| 439 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 440 | assert!( | ||
| 441 | actions.is_empty(), | ||
| 442 | "Should not create action for confirmed items" | ||
| 443 | ); | ||
| 444 | } | ||
| 445 | |||
| 446 | #[test] | ||
| 447 | fn test_compute_actions_allows_connecting_relays() { | ||
| 448 | let mut targets = HashMap::new(); | ||
| 449 | targets.insert( | ||
| 450 | "wss://relay1.com".to_string(), | ||
| 451 | RelaySyncNeeds { | ||
| 452 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 453 | root_events: HashSet::new(), | ||
| 454 | }, | ||
| 455 | ); | ||
| 456 | |||
| 457 | let pending = HashMap::new(); | ||
| 458 | |||
| 459 | let mut confirmed = HashMap::new(); | ||
| 460 | confirmed.insert( | ||
| 461 | "wss://relay1.com".to_string(), | ||
| 462 | RelayState { | ||
| 463 | repos: HashSet::new(), | ||
| 464 | root_events: HashSet::new(), | ||
| 465 | is_bootstrap: false, | ||
| 466 | connection_status: ConnectionStatus::Connecting, | ||
| 467 | last_connected: None, | ||
| 468 | disconnected_at: None, | ||
| 469 | }, | ||
| 470 | ); | ||
| 471 | |||
| 472 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 473 | assert_eq!( | ||
| 474 | actions.len(), | ||
| 475 | 1, | ||
| 476 | "Should create action for connecting relays" | ||
| 477 | ); | ||
| 478 | } | ||
| 479 | |||
| 480 | #[test] | ||
| 481 | fn test_compute_actions_partial_overlap() { | ||
| 482 | // Target has repo1, repo2, repo3 | ||
| 483 | let mut targets = HashMap::new(); | ||
| 484 | targets.insert( | ||
| 485 | "wss://relay1.com".to_string(), | ||
| 486 | RelaySyncNeeds { | ||
| 487 | repos: vec![ | ||
| 488 | "repo1".to_string(), | ||
| 489 | "repo2".to_string(), | ||
| 490 | "repo3".to_string(), | ||
| 491 | ] | ||
| 492 | .into_iter() | ||
| 493 | .collect(), | ||
| 494 | root_events: HashSet::new(), | ||
| 495 | }, | ||
| 496 | ); | ||
| 497 | |||
| 498 | // repo1 is pending | ||
| 499 | let mut pending = HashMap::new(); | ||
| 500 | pending.insert( | ||
| 501 | "wss://relay1.com".to_string(), | ||
| 502 | vec![super::super::PendingBatch { | ||
| 503 | batch_id: 1, | ||
| 504 | items: super::super::PendingItems { | ||
| 505 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 506 | root_events: HashSet::new(), | ||
| 507 | }, | ||
| 508 | outstanding_subs: HashSet::new(), | ||
| 509 | }], | ||
| 510 | ); | ||
| 511 | |||
| 512 | // repo2 is confirmed | ||
| 513 | let mut confirmed = HashMap::new(); | ||
| 514 | confirmed.insert( | ||
| 515 | "wss://relay1.com".to_string(), | ||
| 516 | RelayState { | ||
| 517 | repos: vec!["repo2".to_string()].into_iter().collect(), | ||
| 518 | root_events: HashSet::new(), | ||
| 519 | is_bootstrap: false, | ||
| 520 | connection_status: ConnectionStatus::Connected, | ||
| 521 | last_connected: None, | ||
| 522 | disconnected_at: None, | ||
| 523 | }, | ||
| 524 | ); | ||
| 525 | |||
| 526 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 527 | |||
| 528 | assert_eq!(actions.len(), 1); | ||
| 529 | let action = &actions[0]; | ||
| 530 | // Only repo3 should be in the action (repo1 pending, repo2 confirmed) | ||
| 531 | assert_eq!(action.repos.len(), 1); | ||
| 532 | assert!(action.repos.contains("repo3")); | ||
| 533 | assert!(!action.repos.contains("repo1")); | ||
| 534 | assert!(!action.repos.contains("repo2")); | ||
| 535 | } | ||
| 536 | |||
| 537 | #[test] | ||
| 538 | fn test_compute_actions_with_root_events() { | ||
| 539 | let event_id = EventId::all_zeros(); | ||
| 540 | |||
| 541 | let mut targets = HashMap::new(); | ||
| 542 | targets.insert( | ||
| 543 | "wss://relay1.com".to_string(), | ||
| 544 | RelaySyncNeeds { | ||
| 545 | repos: HashSet::new(), | ||
| 546 | root_events: vec![event_id].into_iter().collect(), | ||
| 547 | }, | ||
| 548 | ); | ||
| 549 | |||
| 550 | let pending = HashMap::new(); | ||
| 551 | let confirmed = HashMap::new(); | ||
| 552 | |||
| 553 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 554 | |||
| 555 | assert_eq!(actions.len(), 1); | ||
| 556 | let action = &actions[0]; | ||
| 557 | assert!(action.repos.is_empty()); | ||
| 558 | assert_eq!(action.root_events.len(), 1); | ||
| 559 | assert!(action.root_events.contains(&event_id)); | ||
| 560 | // Should have 3 filters for the root event (e, E, q tags) | ||
| 561 | assert_eq!(action.filters.len(), 3); | ||
| 562 | } | ||
| 563 | |||
| 564 | #[test] | ||
| 565 | fn test_compute_actions_unknown_relay_creates_action() { | ||
| 566 | // When a relay is not in confirmed at all, it should still create an action | ||
| 567 | // (it's treated as connected by default if missing from confirmed) | ||
| 568 | let mut targets = HashMap::new(); | ||
| 569 | targets.insert( | ||
| 570 | "wss://new-relay.com".to_string(), | ||
| 571 | RelaySyncNeeds { | ||
| 572 | repos: vec!["repo1".to_string()].into_iter().collect(), | ||
| 573 | root_events: HashSet::new(), | ||
| 574 | }, | ||
| 575 | ); | ||
| 576 | |||
| 577 | let pending = HashMap::new(); | ||
| 578 | let confirmed = HashMap::new(); // relay not in confirmed | ||
| 579 | |||
| 580 | let actions = compute_actions(&targets, &pending, &confirmed); | ||
| 581 | |||
| 582 | assert_eq!( | ||
| 583 | actions.len(), | ||
| 584 | 1, | ||
| 585 | "Should create action for unknown relay (not yet tracked)" | ||
| 586 | ); | ||
| 587 | assert_eq!(actions[0].relay_url, "wss://new-relay.com"); | ||
| 588 | } | ||
| 589 | } \ No newline at end of file | ||
diff --git a/src/sync/filters.rs b/src/sync/filters.rs new file mode 100644 index 0000000..02d580e --- /dev/null +++ b/src/sync/filters.rs | |||
| @@ -0,0 +1,340 @@ | |||
| 1 | //! Filter Building Functions for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module provides functions to construct Nostr filters for the three-layer | ||
| 4 | //! sync strategy defined in GRASP-02 v4: | ||
| 5 | //! | ||
| 6 | //! - Layer 1: Repository announcements (30617 + 30618) | ||
| 7 | //! - Layer 2: Events tagging our repos (a/A/q tags) | ||
| 8 | //! - Layer 3: Events tagging our root events (e/E/q tags) | ||
| 9 | //! | ||
| 10 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | ||
| 11 | |||
| 12 | use std::collections::HashSet; | ||
| 13 | |||
| 14 | use nostr_sdk::prelude::*; | ||
| 15 | |||
| 16 | /// Layer 1: Announcements filter (kinds 30617 + 30618) | ||
| 17 | /// | ||
| 18 | /// Subscribed ONCE on connect - NOT included in consolidation rebuilds. | ||
| 19 | /// Note: 30618 is ONLY synced from remote relays, not self-subscribed. | ||
| 20 | pub fn build_announcement_filter(since: Option<Timestamp>) -> Filter { | ||
| 21 | let filter = Filter::new().kinds([ | ||
| 22 | Kind::Custom(30617), // Repository announcements | ||
| 23 | Kind::Custom(30618), // Maintainer lists | ||
| 24 | ]); | ||
| 25 | |||
| 26 | match since { | ||
| 27 | Some(ts) => filter.since(ts), | ||
| 28 | None => filter, | ||
| 29 | } | ||
| 30 | } | ||
| 31 | |||
| 32 | /// Layer 2: Events tagging one of our repos | ||
| 33 | /// | ||
| 34 | /// Uses lowercase a, uppercase A, and q tags for comprehensive coverage. | ||
| 35 | /// Batched per 100 repo refs. | ||
| 36 | /// | ||
| 37 | /// # Arguments | ||
| 38 | /// * `repos` - Set of repo addressable refs (format: 30617:pubkey:identifier) | ||
| 39 | /// * `since` - Optional timestamp for incremental sync | ||
| 40 | /// | ||
| 41 | /// # Returns | ||
| 42 | /// Vec of filters, one set of 3 filters (a/A/q) per 100-repo chunk | ||
| 43 | pub fn tagged_one_of_our_repo_event_filters( | ||
| 44 | repos: &HashSet<String>, | ||
| 45 | since: Option<Timestamp>, | ||
| 46 | ) -> Vec<Filter> { | ||
| 47 | if repos.is_empty() { | ||
| 48 | return vec![]; | ||
| 49 | } | ||
| 50 | |||
| 51 | let mut filters = Vec::new(); | ||
| 52 | let repo_refs: Vec<_> = repos.iter().collect(); | ||
| 53 | |||
| 54 | for chunk in repo_refs.chunks(100) { | ||
| 55 | // Lowercase 'a' tag - standard addressable reference | ||
| 56 | let mut f1 = Filter::new(); | ||
| 57 | for repo in chunk { | ||
| 58 | f1 = f1.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo.as_str()); | ||
| 59 | } | ||
| 60 | |||
| 61 | // Uppercase 'A' tag - some clients use this | ||
| 62 | let mut f2 = Filter::new(); | ||
| 63 | for repo in chunk { | ||
| 64 | f2 = f2.custom_tag(SingleLetterTag::uppercase(Alphabet::A), repo.as_str()); | ||
| 65 | } | ||
| 66 | |||
| 67 | // Quote 'q' tag - NIP-10 quote references to addressable events | ||
| 68 | let mut f3 = Filter::new(); | ||
| 69 | for repo in chunk { | ||
| 70 | f3 = f3.custom_tag(SingleLetterTag::lowercase(Alphabet::Q), repo.as_str()); | ||
| 71 | } | ||
| 72 | |||
| 73 | if let Some(ts) = since { | ||
| 74 | f1 = f1.since(ts); | ||
| 75 | f2 = f2.since(ts); | ||
| 76 | f3 = f3.since(ts); | ||
| 77 | } | ||
| 78 | |||
| 79 | filters.push(f1); | ||
| 80 | filters.push(f2); | ||
| 81 | filters.push(f3); | ||
| 82 | } | ||
| 83 | |||
| 84 | filters | ||
| 85 | } | ||
| 86 | |||
| 87 | /// Layer 3: Events tagging one of our root events | ||
| 88 | /// | ||
| 89 | /// Uses lowercase e, uppercase E, and q tags for comprehensive coverage. | ||
| 90 | /// Batched per 100 event IDs. | ||
| 91 | /// | ||
| 92 | /// # Arguments | ||
| 93 | /// * `root_events` - Set of event IDs (1617/1618/1619/1621 root events) | ||
| 94 | /// * `since` - Optional timestamp for incremental sync | ||
| 95 | /// | ||
| 96 | /// # Returns | ||
| 97 | /// Vec of filters, one set of 3 filters (e/E/q) per 100-event chunk | ||
| 98 | pub fn tagged_one_of_our_root_event_filters( | ||
| 99 | root_events: &HashSet<EventId>, | ||
| 100 | since: Option<Timestamp>, | ||
| 101 | ) -> Vec<Filter> { | ||
| 102 | if root_events.is_empty() { | ||
| 103 | return vec![]; | ||
| 104 | } | ||
| 105 | |||
| 106 | let mut filters = Vec::new(); | ||
| 107 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); | ||
| 108 | |||
| 109 | for chunk in event_ids.chunks(100) { | ||
| 110 | // Lowercase 'e' tag - standard event reference | ||
| 111 | let mut f1 = Filter::new(); | ||
| 112 | for event_id in chunk { | ||
| 113 | f1 = f1.custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id.as_str()); | ||
| 114 | } | ||
| 115 | |||
| 116 | // Uppercase 'E' tag - some clients use this | ||
| 117 | let mut f2 = Filter::new(); | ||
| 118 | for event_id in chunk { | ||
| 119 | f2 = f2.custom_tag(SingleLetterTag::uppercase(Alphabet::E), event_id.as_str()); | ||
| 120 | } | ||
| 121 | |||
| 122 | // Quote 'q' tag - NIP-10 quote references to events | ||
| 123 | let mut f3 = Filter::new(); | ||
| 124 | for event_id in chunk { | ||
| 125 | f3 = f3.custom_tag(SingleLetterTag::lowercase(Alphabet::Q), event_id.as_str()); | ||
| 126 | } | ||
| 127 | |||
| 128 | if let Some(ts) = since { | ||
| 129 | f1 = f1.since(ts); | ||
| 130 | f2 = f2.since(ts); | ||
| 131 | f3 = f3.since(ts); | ||
| 132 | } | ||
| 133 | |||
| 134 | filters.push(f1); | ||
| 135 | filters.push(f2); | ||
| 136 | filters.push(f3); | ||
| 137 | } | ||
| 138 | |||
| 139 | filters | ||
| 140 | } | ||
| 141 | |||
| 142 | /// Builds Layer 2 + Layer 3 filters only (NOT Layer 1) | ||
| 143 | /// | ||
| 144 | /// Used by: | ||
| 145 | /// - compute_actions for incremental subscriptions | ||
| 146 | /// - consolidation rebuilds (Layer 1 remains active) | ||
| 147 | /// | ||
| 148 | /// # Arguments | ||
| 149 | /// * `repos` - Set of repo addressable refs | ||
| 150 | /// * `root_events` - Set of root event IDs | ||
| 151 | /// * `since` - Optional timestamp for incremental sync | ||
| 152 | pub fn build_layer2_and_layer3_filters( | ||
| 153 | repos: &HashSet<String>, | ||
| 154 | root_events: &HashSet<EventId>, | ||
| 155 | since: Option<Timestamp>, | ||
| 156 | ) -> Vec<Filter> { | ||
| 157 | let mut filters = Vec::new(); | ||
| 158 | filters.extend(tagged_one_of_our_repo_event_filters(repos, since)); | ||
| 159 | filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); | ||
| 160 | filters | ||
| 161 | } | ||
| 162 | |||
| 163 | #[cfg(test)] | ||
| 164 | mod tests { | ||
| 165 | use super::*; | ||
| 166 | |||
| 167 | #[test] | ||
| 168 | fn test_announcement_filter_no_since() { | ||
| 169 | let filter = build_announcement_filter(None); | ||
| 170 | |||
| 171 | // Verify it includes both kinds | ||
| 172 | // Filter API: we can check by converting to JSON or inspecting structure | ||
| 173 | // For now we just verify it doesn't panic and returns a valid filter | ||
| 174 | assert!(!filter.is_empty()); | ||
| 175 | } | ||
| 176 | |||
| 177 | #[test] | ||
| 178 | fn test_announcement_filter_with_since() { | ||
| 179 | let since = Timestamp::from(1700000000); | ||
| 180 | let filter = build_announcement_filter(Some(since)); | ||
| 181 | |||
| 182 | assert!(!filter.is_empty()); | ||
| 183 | } | ||
| 184 | |||
| 185 | #[test] | ||
| 186 | fn test_repo_filters_empty() { | ||
| 187 | let repos: HashSet<String> = HashSet::new(); | ||
| 188 | let filters = tagged_one_of_our_repo_event_filters(&repos, None); | ||
| 189 | |||
| 190 | assert!(filters.is_empty()); | ||
| 191 | } | ||
| 192 | |||
| 193 | #[test] | ||
| 194 | fn test_repo_filters_single_repo() { | ||
| 195 | let mut repos = HashSet::new(); | ||
| 196 | repos.insert("30617:abc123:test-repo".to_string()); | ||
| 197 | |||
| 198 | let filters = tagged_one_of_our_repo_event_filters(&repos, None); | ||
| 199 | |||
| 200 | // Should create 3 filters (a, A, q) for one chunk | ||
| 201 | assert_eq!(filters.len(), 3); | ||
| 202 | } | ||
| 203 | |||
| 204 | #[test] | ||
| 205 | fn test_repo_filters_batching() { | ||
| 206 | let mut repos = HashSet::new(); | ||
| 207 | for i in 0..250 { | ||
| 208 | repos.insert(format!("30617:pubkey{}:repo{}", i, i)); | ||
| 209 | } | ||
| 210 | |||
| 211 | let filters = tagged_one_of_our_repo_event_filters(&repos, None); | ||
| 212 | |||
| 213 | // Should create 9 filters (3 chunks * 3 tag types) | ||
| 214 | // 250 repos = 100 + 100 + 50 = 3 chunks | ||
| 215 | assert_eq!(filters.len(), 9); | ||
| 216 | } | ||
| 217 | |||
| 218 | #[test] | ||
| 219 | fn test_repo_filters_with_since() { | ||
| 220 | let mut repos = HashSet::new(); | ||
| 221 | repos.insert("30617:abc123:test-repo".to_string()); | ||
| 222 | |||
| 223 | let since = Timestamp::from(1700000000); | ||
| 224 | let filters = tagged_one_of_our_repo_event_filters(&repos, Some(since)); | ||
| 225 | |||
| 226 | assert_eq!(filters.len(), 3); | ||
| 227 | } | ||
| 228 | |||
| 229 | #[test] | ||
| 230 | fn test_root_event_filters_empty() { | ||
| 231 | let root_events: HashSet<EventId> = HashSet::new(); | ||
| 232 | let filters = tagged_one_of_our_root_event_filters(&root_events, None); | ||
| 233 | |||
| 234 | assert!(filters.is_empty()); | ||
| 235 | } | ||
| 236 | |||
| 237 | #[test] | ||
| 238 | fn test_root_event_filters_single_event() { | ||
| 239 | let mut root_events = HashSet::new(); | ||
| 240 | // Create a valid event ID (all zeros for testing) | ||
| 241 | root_events.insert(EventId::all_zeros()); | ||
| 242 | |||
| 243 | let filters = tagged_one_of_our_root_event_filters(&root_events, None); | ||
| 244 | |||
| 245 | // Should create 3 filters (e, E, q) for one chunk | ||
| 246 | assert_eq!(filters.len(), 3); | ||
| 247 | } | ||
| 248 | |||
| 249 | #[test] | ||
| 250 | fn test_root_event_filters_batching() { | ||
| 251 | let mut root_events = HashSet::new(); | ||
| 252 | // EventId::all_zeros() will deduplicate, so we need unique IDs | ||
| 253 | // For testing purposes, we'll just verify with one ID since HashSet | ||
| 254 | // deduplicates all_zeros(). In real usage these would be unique. | ||
| 255 | for _ in 0..250 { | ||
| 256 | root_events.insert(EventId::all_zeros()); | ||
| 257 | } | ||
| 258 | |||
| 259 | let filters = tagged_one_of_our_root_event_filters(&root_events, None); | ||
| 260 | |||
| 261 | // With deduplication, we only have 1 unique ID, so 3 filters | ||
| 262 | // In real usage with 250 unique IDs, it would be 9 filters | ||
| 263 | assert_eq!(filters.len(), 3); | ||
| 264 | } | ||
| 265 | |||
| 266 | #[test] | ||
| 267 | fn test_root_event_filters_with_since() { | ||
| 268 | let mut root_events = HashSet::new(); | ||
| 269 | root_events.insert(EventId::all_zeros()); | ||
| 270 | |||
| 271 | let since = Timestamp::from(1700000000); | ||
| 272 | let filters = tagged_one_of_our_root_event_filters(&root_events, Some(since)); | ||
| 273 | |||
| 274 | assert_eq!(filters.len(), 3); | ||
| 275 | } | ||
| 276 | |||
| 277 | #[test] | ||
| 278 | fn test_combined_filters_empty() { | ||
| 279 | let repos: HashSet<String> = HashSet::new(); | ||
| 280 | let root_events: HashSet<EventId> = HashSet::new(); | ||
| 281 | |||
| 282 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 283 | |||
| 284 | assert!(filters.is_empty()); | ||
| 285 | } | ||
| 286 | |||
| 287 | #[test] | ||
| 288 | fn test_combined_filters() { | ||
| 289 | let mut repos = HashSet::new(); | ||
| 290 | repos.insert("30617:abc123:repo1".to_string()); | ||
| 291 | |||
| 292 | let mut root_events = HashSet::new(); | ||
| 293 | root_events.insert(EventId::all_zeros()); | ||
| 294 | |||
| 295 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 296 | |||
| 297 | // Should have 6 filters (3 for repos + 3 for root events) | ||
| 298 | assert_eq!(filters.len(), 6); | ||
| 299 | } | ||
| 300 | |||
| 301 | #[test] | ||
| 302 | fn test_combined_filters_repos_only() { | ||
| 303 | let mut repos = HashSet::new(); | ||
| 304 | repos.insert("30617:abc123:repo1".to_string()); | ||
| 305 | |||
| 306 | let root_events: HashSet<EventId> = HashSet::new(); | ||
| 307 | |||
| 308 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 309 | |||
| 310 | // Should have 3 filters (3 for repos only) | ||
| 311 | assert_eq!(filters.len(), 3); | ||
| 312 | } | ||
| 313 | |||
| 314 | #[test] | ||
| 315 | fn test_combined_filters_root_events_only() { | ||
| 316 | let repos: HashSet<String> = HashSet::new(); | ||
| 317 | |||
| 318 | let mut root_events = HashSet::new(); | ||
| 319 | root_events.insert(EventId::all_zeros()); | ||
| 320 | |||
| 321 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 322 | |||
| 323 | // Should have 3 filters (3 for root events only) | ||
| 324 | assert_eq!(filters.len(), 3); | ||
| 325 | } | ||
| 326 | |||
| 327 | #[test] | ||
| 328 | fn test_combined_filters_with_since() { | ||
| 329 | let mut repos = HashSet::new(); | ||
| 330 | repos.insert("30617:abc123:repo1".to_string()); | ||
| 331 | |||
| 332 | let mut root_events = HashSet::new(); | ||
| 333 | root_events.insert(EventId::all_zeros()); | ||
| 334 | |||
| 335 | let since = Timestamp::from(1700000000); | ||
| 336 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, Some(since)); | ||
| 337 | |||
| 338 | assert_eq!(filters.len(), 6); | ||
| 339 | } | ||
| 340 | } \ No newline at end of file | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c1f8bca..fb09896 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -12,6 +12,20 @@ | |||
| 12 | //! | 12 | //! |
| 13 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | 13 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. |
| 14 | 14 | ||
| 15 | pub mod algorithms; | ||
| 16 | pub mod filters; | ||
| 17 | pub mod relay_connection; | ||
| 18 | pub mod self_subscriber; | ||
| 19 | |||
| 20 | // Re-export core algorithm types | ||
| 21 | pub use algorithms::{AddFilters, RelaySyncNeeds}; | ||
| 22 | |||
| 23 | // Re-export relay connection types | ||
| 24 | pub use relay_connection::{RelayConnection, RelayEvent}; | ||
| 25 | |||
| 26 | // Re-export self-subscriber types | ||
| 27 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | ||
| 28 | |||
| 15 | use std::collections::{HashMap, HashSet}; | 29 | use std::collections::{HashMap, HashSet}; |
| 16 | use std::sync::Arc; | 30 | use std::sync::Arc; |
| 17 | 31 | ||
| @@ -355,21 +369,290 @@ impl SyncManager { | |||
| 355 | } | 369 | } |
| 356 | } | 370 | } |
| 357 | 371 | ||
| 358 | /// Run the sync manager (placeholder for Phase 1) | 372 | /// Run the sync manager |
| 359 | /// | 373 | /// |
| 360 | /// This will be implemented in later phases to: | 374 | /// Coordinates all sync components: |
| 361 | /// 1. Subscribe to local relay for 30617 events | 375 | /// 1. Spawns self-subscriber to monitor own relay for announcements |
| 362 | /// 2. Process events to build RepoSyncIndex | 376 | /// 2. Connects to bootstrap relay if configured |
| 363 | /// 3. Compute and execute sync actions | 377 | /// 3. Handles relay actions from self-subscriber |
| 364 | /// 4. Handle reconnection and catch-up logic | ||
| 365 | pub async fn run(self) { | 378 | pub async fn run(self) { |
| 379 | use tokio::sync::mpsc; | ||
| 380 | |||
| 366 | tracing::info!( | 381 | tracing::info!( |
| 367 | bootstrap_relay = ?self.bootstrap_relay_url, | 382 | bootstrap_relay = ?self.bootstrap_relay_url, |
| 368 | service_domain = %self.service_domain, | 383 | service_domain = %self.service_domain, |
| 369 | "SyncManager starting (placeholder - not yet implemented)" | 384 | "SyncManager starting" |
| 385 | ); | ||
| 386 | |||
| 387 | // 1. Create action channel for self-subscriber -> manager communication | ||
| 388 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); | ||
| 389 | |||
| 390 | // 2. Spawn self-subscriber | ||
| 391 | let self_subscriber = SelfSubscriber::new( | ||
| 392 | format!("ws://{}", self.service_domain), | ||
| 393 | self.service_domain.clone(), | ||
| 394 | Arc::clone(&self.repo_sync_index), | ||
| 395 | action_tx, | ||
| 370 | ); | 396 | ); |
| 397 | tokio::spawn(async move { self_subscriber.run().await }); | ||
| 398 | |||
| 399 | // 3. Connect to bootstrap relay if configured | ||
| 400 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | ||
| 401 | self.spawn_relay_connection(bootstrap_url.clone()).await; | ||
| 402 | } | ||
| 371 | 403 | ||
| 372 | // Phase 1: Just log and return | 404 | // 4. Main loop - handle actions from self-subscriber |
| 373 | // Full implementation will be added in subsequent phases | 405 | loop { |
| 406 | tokio::select! { | ||
| 407 | action = action_rx.recv() => { | ||
| 408 | match action { | ||
| 409 | Some(RelayAction::SpawnRelay { relay_url, repos }) => { | ||
| 410 | // Check if relay already exists | ||
| 411 | let relay_index = self.relay_sync_index.read().await; | ||
| 412 | let exists = relay_index.contains_key(&relay_url); | ||
| 413 | drop(relay_index); | ||
| 414 | |||
| 415 | if !exists { | ||
| 416 | tracing::info!(relay = %relay_url, "Spawning new relay connection"); | ||
| 417 | self.spawn_relay_with_layer2(relay_url, repos).await; | ||
| 418 | } else { | ||
| 419 | tracing::debug!( | ||
| 420 | relay = %relay_url, | ||
| 421 | "Relay already exists, considering AddFilters" | ||
| 422 | ); | ||
| 423 | // For MVP, we don't handle AddFilters - just log | ||
| 424 | // Full implementation would call subscribe_filters on existing connection | ||
| 425 | } | ||
| 426 | } | ||
| 427 | Some(RelayAction::AddFilters { relay_url, repos }) => { | ||
| 428 | tracing::debug!( | ||
| 429 | relay = %relay_url, | ||
| 430 | repo_count = repos.len(), | ||
| 431 | "AddFilters action (MVP: not implemented)" | ||
| 432 | ); | ||
| 433 | // For MVP, not implemented - full version would add Layer 2 filters | ||
| 434 | // to existing relay connection | ||
| 435 | } | ||
| 436 | None => break, | ||
| 437 | } | ||
| 438 | } | ||
| 439 | } | ||
| 440 | } | ||
| 441 | } | ||
| 442 | |||
| 443 | /// Spawn relay connection with Layer 2 filters for specific repos | ||
| 444 | /// | ||
| 445 | /// Used when discovering relays from announcements. Connects to the relay, | ||
| 446 | /// subscribes to Layer 1 (announcements) AND Layer 2+3 filters for the | ||
| 447 | /// specific repos we want to sync. | ||
| 448 | async fn spawn_relay_with_layer2( | ||
| 449 | &self, | ||
| 450 | relay_url: String, | ||
| 451 | repos: HashMap<String, HashSet<EventId>>, | ||
| 452 | ) { | ||
| 453 | use crate::sync::filters::build_layer2_and_layer3_filters; | ||
| 454 | use tokio::sync::mpsc; | ||
| 455 | |||
| 456 | let database = Arc::clone(&self.database); | ||
| 457 | let write_policy = self.write_policy.clone(); | ||
| 458 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | ||
| 459 | |||
| 460 | // Create relay connection | ||
| 461 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 462 | |||
| 463 | // Connect and subscribe to Layer 1 (announcements) | ||
| 464 | if let Err(e) = connection.connect_and_subscribe(None).await { | ||
| 465 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | ||
| 466 | return; | ||
| 467 | } | ||
| 468 | |||
| 469 | // Mark as connected in relay sync index | ||
| 470 | { | ||
| 471 | let mut index = relay_sync_index.write().await; | ||
| 472 | index.insert( | ||
| 473 | relay_url.clone(), | ||
| 474 | RelayState { | ||
| 475 | repos: repos.keys().cloned().collect(), | ||
| 476 | root_events: repos.values().flatten().cloned().collect(), | ||
| 477 | is_bootstrap: false, | ||
| 478 | connection_status: ConnectionStatus::Connected, | ||
| 479 | last_connected: Some(Timestamp::now()), | ||
| 480 | disconnected_at: None, | ||
| 481 | }, | ||
| 482 | ); | ||
| 483 | } | ||
| 484 | |||
| 485 | // Subscribe to Layer 2+3 filters for the repos | ||
| 486 | let repo_ids: HashSet<String> = repos.keys().cloned().collect(); | ||
| 487 | let root_events: HashSet<EventId> = repos.values().flatten().cloned().collect(); | ||
| 488 | let filters = build_layer2_and_layer3_filters(&repo_ids, &root_events, None); | ||
| 489 | |||
| 490 | for filter in filters { | ||
| 491 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 492 | tracing::error!( | ||
| 493 | relay = %relay_url, | ||
| 494 | error = %e, | ||
| 495 | "Failed to subscribe to Layer 2 filter" | ||
| 496 | ); | ||
| 497 | } | ||
| 498 | } | ||
| 499 | |||
| 500 | tracing::info!( | ||
| 501 | relay = %relay_url, | ||
| 502 | repo_count = repos.len(), | ||
| 503 | "Connected to discovered relay with Layer 2+3 filters" | ||
| 504 | ); | ||
| 505 | |||
| 506 | // Create event channel | ||
| 507 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 508 | |||
| 509 | // Spawn event loop | ||
| 510 | tokio::spawn(async move { | ||
| 511 | connection.run_event_loop(event_tx).await; | ||
| 512 | }); | ||
| 513 | |||
| 514 | // Spawn event processor | ||
| 515 | let relay_url_clone = relay_url.clone(); | ||
| 516 | tokio::spawn(async move { | ||
| 517 | while let Some(relay_event) = event_rx.recv().await { | ||
| 518 | match relay_event { | ||
| 519 | RelayEvent::Event(event) => { | ||
| 520 | Self::process_event_static( | ||
| 521 | &event, | ||
| 522 | &relay_url_clone, | ||
| 523 | &database, | ||
| 524 | &write_policy, | ||
| 525 | ) | ||
| 526 | .await; | ||
| 527 | } | ||
| 528 | RelayEvent::EndOfStoredEvents(_) => { | ||
| 529 | tracing::debug!(relay = %relay_url_clone, "EOSE received"); | ||
| 530 | } | ||
| 531 | RelayEvent::Closed(_) | RelayEvent::Shutdown => { | ||
| 532 | tracing::info!(relay = %relay_url_clone, "Relay disconnected"); | ||
| 533 | break; | ||
| 534 | } | ||
| 535 | } | ||
| 536 | } | ||
| 537 | }); | ||
| 538 | } | ||
| 539 | |||
| 540 | /// Spawn a relay connection and start its event loop | ||
| 541 | async fn spawn_relay_connection(&self, relay_url: String) { | ||
| 542 | use tokio::sync::mpsc; | ||
| 543 | |||
| 544 | let database = Arc::clone(&self.database); | ||
| 545 | let write_policy = self.write_policy.clone(); | ||
| 546 | let relay_sync_index = Arc::clone(&self.relay_sync_index); | ||
| 547 | |||
| 548 | // Create relay connection | ||
| 549 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 550 | |||
| 551 | // Connect and subscribe to Layer 1 | ||
| 552 | if let Err(e) = connection.connect_and_subscribe(None).await { | ||
| 553 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | ||
| 554 | return; | ||
| 555 | } | ||
| 556 | |||
| 557 | // Mark as connected in relay sync index | ||
| 558 | { | ||
| 559 | let mut index = relay_sync_index.write().await; | ||
| 560 | index.insert( | ||
| 561 | relay_url.clone(), | ||
| 562 | RelayState { | ||
| 563 | repos: HashSet::new(), | ||
| 564 | root_events: HashSet::new(), | ||
| 565 | is_bootstrap: true, | ||
| 566 | connection_status: ConnectionStatus::Connected, | ||
| 567 | last_connected: Some(Timestamp::now()), | ||
| 568 | disconnected_at: None, | ||
| 569 | }, | ||
| 570 | ); | ||
| 571 | } | ||
| 572 | |||
| 573 | // Create event channel | ||
| 574 | let (event_tx, mut event_rx) = mpsc::channel::<RelayEvent>(1000); | ||
| 575 | |||
| 576 | // Spawn event loop | ||
| 577 | tokio::spawn(async move { | ||
| 578 | connection.run_event_loop(event_tx).await; | ||
| 579 | }); | ||
| 580 | |||
| 581 | // Spawn event processor | ||
| 582 | let relay_url_clone = relay_url.clone(); | ||
| 583 | tokio::spawn(async move { | ||
| 584 | while let Some(relay_event) = event_rx.recv().await { | ||
| 585 | match relay_event { | ||
| 586 | RelayEvent::Event(event) => { | ||
| 587 | Self::process_event_static(&event, &relay_url_clone, &database, &write_policy) | ||
| 588 | .await; | ||
| 589 | } | ||
| 590 | RelayEvent::EndOfStoredEvents(_) => { | ||
| 591 | tracing::debug!("EOSE from {}", relay_url_clone); | ||
| 592 | } | ||
| 593 | RelayEvent::Closed(_) | RelayEvent::Shutdown => { | ||
| 594 | tracing::info!("Relay {} disconnected", relay_url_clone); | ||
| 595 | break; | ||
| 596 | } | ||
| 597 | } | ||
| 598 | } | ||
| 599 | }); | ||
| 600 | } | ||
| 601 | |||
| 602 | /// Process a single event from a relay (static version for spawned tasks) | ||
| 603 | async fn process_event_static( | ||
| 604 | event: &Event, | ||
| 605 | relay_url: &str, | ||
| 606 | database: &SharedDatabase, | ||
| 607 | write_policy: &Nip34WritePolicy, | ||
| 608 | ) { | ||
| 609 | use nostr_relay_builder::prelude::{PolicyResult, WritePolicy}; | ||
| 610 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
| 611 | |||
| 612 | // Check if event already exists | ||
| 613 | match database.event_by_id(&event.id).await { | ||
| 614 | Ok(Some(_)) => { | ||
| 615 | tracing::trace!(event_id = %event.id, "Event already exists, skipping"); | ||
| 616 | return; | ||
| 617 | } | ||
| 618 | Err(e) => { | ||
| 619 | tracing::warn!(event_id = %event.id, error = %e, "Database error checking event"); | ||
| 620 | return; | ||
| 621 | } | ||
| 622 | Ok(None) => {} // Continue processing | ||
| 623 | } | ||
| 624 | |||
| 625 | // Apply write policy using a dummy address (sync events aren't from network clients) | ||
| 626 | let dummy_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); | ||
| 627 | let result = write_policy.admit_event(event, &dummy_addr).await; | ||
| 628 | |||
| 629 | match result { | ||
| 630 | PolicyResult::Accept => { | ||
| 631 | // Save event | ||
| 632 | if let Err(e) = database.save_event(event).await { | ||
| 633 | tracing::error!( | ||
| 634 | event_id = %event.id, | ||
| 635 | relay = %relay_url, | ||
| 636 | error = %e, | ||
| 637 | "Failed to save synced event" | ||
| 638 | ); | ||
| 639 | } else { | ||
| 640 | tracing::debug!( | ||
| 641 | event_id = %event.id, | ||
| 642 | relay = %relay_url, | ||
| 643 | kind = %event.kind.as_u16(), | ||
| 644 | "Saved synced event" | ||
| 645 | ); | ||
| 646 | } | ||
| 647 | } | ||
| 648 | PolicyResult::Reject(reason) => { | ||
| 649 | tracing::debug!( | ||
| 650 | event_id = %event.id, | ||
| 651 | relay = %relay_url, | ||
| 652 | reason = %reason, | ||
| 653 | "Event rejected by write policy" | ||
| 654 | ); | ||
| 655 | } | ||
| 656 | } | ||
| 374 | } | 657 | } |
| 375 | } \ No newline at end of file | 658 | } \ No newline at end of file |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs new file mode 100644 index 0000000..6499c27 --- /dev/null +++ b/src/sync/relay_connection.rs | |||
| @@ -0,0 +1,216 @@ | |||
| 1 | //! Relay Connection Management for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! This module provides relay connection management for external relay connections. | ||
| 4 | //! Each RelayConnection manages a single connection to an external relay and handles | ||
| 5 | //! subscriptions using the three-layer sync strategy. | ||
| 6 | //! | ||
| 7 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | ||
| 8 | |||
| 9 | use nostr_sdk::prelude::*; | ||
| 10 | use tokio::sync::mpsc; | ||
| 11 | |||
| 12 | use super::filters::build_announcement_filter; | ||
| 13 | |||
| 14 | /// Events from a relay connection | ||
| 15 | #[derive(Debug)] | ||
| 16 | pub enum RelayEvent { | ||
| 17 | /// A new event was received | ||
| 18 | Event(Event), | ||
| 19 | /// End of stored events for a subscription | ||
| 20 | EndOfStoredEvents(SubscriptionId), | ||
| 21 | /// Connection was closed | ||
| 22 | Closed(String), | ||
| 23 | /// Shutdown notification | ||
| 24 | Shutdown, | ||
| 25 | } | ||
| 26 | |||
| 27 | /// Manages connection to a single external relay | ||
| 28 | /// | ||
| 29 | /// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection | ||
| 30 | /// to an external relay. It handles: | ||
| 31 | /// - Connection establishment | ||
| 32 | /// - Layer 1 subscription (announcements) | ||
| 33 | /// - Additional filter subscriptions (Layers 2 & 3) | ||
| 34 | /// - Event notification loop | ||
| 35 | pub struct RelayConnection { | ||
| 36 | /// The relay URL this connection is for | ||
| 37 | url: String, | ||
| 38 | /// The underlying nostr-sdk client | ||
| 39 | client: Client, | ||
| 40 | } | ||
| 41 | |||
| 42 | impl RelayConnection { | ||
| 43 | /// Create a new relay connection (not yet connected) | ||
| 44 | /// | ||
| 45 | /// # Arguments | ||
| 46 | /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com") | ||
| 47 | pub fn new(url: String) -> Self { | ||
| 48 | let client = Client::default(); | ||
| 49 | Self { url, client } | ||
| 50 | } | ||
| 51 | |||
| 52 | /// Connect to the relay and subscribe to Layer 1 (announcements) | ||
| 53 | /// | ||
| 54 | /// This method: | ||
| 55 | /// 1. Adds the relay to the client | ||
| 56 | /// 2. Establishes the WebSocket connection | ||
| 57 | /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618) | ||
| 58 | /// | ||
| 59 | /// # Arguments | ||
| 60 | /// * `since` - Optional timestamp for incremental sync on reconnect | ||
| 61 | /// | ||
| 62 | /// # Returns | ||
| 63 | /// * `Ok(SubscriptionId)` - The subscription ID on successful connection | ||
| 64 | /// * `Err(String)` with error description on failure | ||
| 65 | pub async fn connect_and_subscribe( | ||
| 66 | &self, | ||
| 67 | since: Option<Timestamp>, | ||
| 68 | ) -> Result<SubscriptionId, String> { | ||
| 69 | // Add relay to client | ||
| 70 | self.client | ||
| 71 | .add_relay(&self.url) | ||
| 72 | .await | ||
| 73 | .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?; | ||
| 74 | |||
| 75 | // Establish connection | ||
| 76 | self.client.connect().await; | ||
| 77 | |||
| 78 | // Subscribe to Layer 1 (announcements) | ||
| 79 | let filter = build_announcement_filter(since); | ||
| 80 | let output = self | ||
| 81 | .client | ||
| 82 | .subscribe(filter, None) | ||
| 83 | .await | ||
| 84 | .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?; | ||
| 85 | |||
| 86 | tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); | ||
| 87 | Ok(output.val) | ||
| 88 | } | ||
| 89 | |||
| 90 | /// Run the event loop, sending events through the provided channel | ||
| 91 | /// | ||
| 92 | /// This method blocks and processes notifications from the relay: | ||
| 93 | /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event` | ||
| 94 | /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents` | ||
| 95 | /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown` | ||
| 96 | /// | ||
| 97 | /// The loop terminates when: | ||
| 98 | /// - The sender channel is closed (receiver dropped) | ||
| 99 | /// - A shutdown notification is received | ||
| 100 | /// - An error occurs receiving notifications | ||
| 101 | /// | ||
| 102 | /// # Arguments | ||
| 103 | /// * `event_sender` - Channel to send relay events through | ||
| 104 | pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) { | ||
| 105 | let mut notifications = self.client.notifications(); | ||
| 106 | let url = self.url.clone(); | ||
| 107 | |||
| 108 | tracing::debug!(relay = %url, "Starting event loop"); | ||
| 109 | |||
| 110 | while let Ok(notification) = notifications.recv().await { | ||
| 111 | match notification { | ||
| 112 | RelayPoolNotification::Event { event, .. } => { | ||
| 113 | tracing::trace!(relay = %url, event_id = %event.id, "Received event"); | ||
| 114 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | ||
| 115 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | ||
| 116 | break; | ||
| 117 | } | ||
| 118 | } | ||
| 119 | RelayPoolNotification::Message { message, .. } => { | ||
| 120 | match message { | ||
| 121 | RelayMessage::EndOfStoredEvents(sub_id) => { | ||
| 122 | tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE"); | ||
| 123 | // Convert Cow<SubscriptionId> to owned SubscriptionId | ||
| 124 | let owned_sub_id = sub_id.into_owned(); | ||
| 125 | if event_sender | ||
| 126 | .send(RelayEvent::EndOfStoredEvents(owned_sub_id)) | ||
| 127 | .await | ||
| 128 | .is_err() | ||
| 129 | { | ||
| 130 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | ||
| 131 | break; | ||
| 132 | } | ||
| 133 | } | ||
| 134 | RelayMessage::Closed { message: msg, .. } => { | ||
| 135 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | ||
| 136 | let _ = event_sender | ||
| 137 | .send(RelayEvent::Closed(msg.to_string())) | ||
| 138 | .await; | ||
| 139 | } | ||
| 140 | _ => {} | ||
| 141 | } | ||
| 142 | } | ||
| 143 | RelayPoolNotification::Shutdown => { | ||
| 144 | tracing::info!(relay = %url, "Relay pool shutdown"); | ||
| 145 | let _ = event_sender.send(RelayEvent::Shutdown).await; | ||
| 146 | break; | ||
| 147 | } | ||
| 148 | } | ||
| 149 | } | ||
| 150 | |||
| 151 | tracing::debug!(relay = %url, "Event loop terminated"); | ||
| 152 | } | ||
| 153 | |||
| 154 | /// Add additional filter subscription (for Layer 2 + 3) | ||
| 155 | /// | ||
| 156 | /// Use this to subscribe to: | ||
| 157 | /// - Layer 2: Events tagging our repos (a/A/q tags) | ||
| 158 | /// - Layer 3: Events tagging our root events (e/E/q tags) | ||
| 159 | /// | ||
| 160 | /// # Arguments | ||
| 161 | /// * `filter` - The filter to subscribe to | ||
| 162 | /// | ||
| 163 | /// # Returns | ||
| 164 | /// * `Ok(SubscriptionId)` - The subscription ID on success | ||
| 165 | /// * `Err(String)` - Error description on failure | ||
| 166 | pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> { | ||
| 167 | let output = self | ||
| 168 | .client | ||
| 169 | .subscribe(filter, None) | ||
| 170 | .await | ||
| 171 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; | ||
| 172 | Ok(output.val) | ||
| 173 | } | ||
| 174 | |||
| 175 | /// Subscribe to multiple filters at once | ||
| 176 | /// | ||
| 177 | /// Each filter creates its own subscription. Returns when all subscriptions | ||
| 178 | /// are established. This is useful for Layer 2 + 3 filters together. | ||
| 179 | /// | ||
| 180 | /// # Arguments | ||
| 181 | /// * `filters` - Vec of filters to subscribe to | ||
| 182 | /// | ||
| 183 | /// # Returns | ||
| 184 | /// * `Ok(Vec<SubscriptionId>)` - The subscription IDs on success | ||
| 185 | /// * `Err(String)` - Error description on failure | ||
| 186 | pub async fn subscribe_filters( | ||
| 187 | &self, | ||
| 188 | filters: Vec<Filter>, | ||
| 189 | ) -> Result<Vec<SubscriptionId>, String> { | ||
| 190 | if filters.is_empty() { | ||
| 191 | return Ok(vec![]); | ||
| 192 | } | ||
| 193 | |||
| 194 | let mut sub_ids = Vec::with_capacity(filters.len()); | ||
| 195 | for filter in filters { | ||
| 196 | let output = self | ||
| 197 | .client | ||
| 198 | .subscribe(filter, None) | ||
| 199 | .await | ||
| 200 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; | ||
| 201 | sub_ids.push(output.val); | ||
| 202 | } | ||
| 203 | Ok(sub_ids) | ||
| 204 | } | ||
| 205 | |||
| 206 | /// Get the relay URL | ||
| 207 | pub fn url(&self) -> &str { | ||
| 208 | &self.url | ||
| 209 | } | ||
| 210 | |||
| 211 | /// Disconnect from the relay | ||
| 212 | pub async fn disconnect(&self) { | ||
| 213 | self.client.disconnect().await; | ||
| 214 | tracing::debug!(relay = %self.url, "Disconnected from relay"); | ||
| 215 | } | ||
| 216 | } \ No newline at end of file | ||
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs new file mode 100644 index 0000000..1dec219 --- /dev/null +++ b/src/sync/self_subscriber.rs | |||
| @@ -0,0 +1,430 @@ | |||
| 1 | //! Self-Subscriber for Proactive Sync | ||
| 2 | //! | ||
| 3 | //! Monitors the relay's own database for repository announcements and | ||
| 4 | //! updates the RepoSyncIndex when new relevant events are discovered. | ||
| 5 | //! | ||
| 6 | //! This module subscribes to relevant event kinds on our own relay and | ||
| 7 | //! batches updates before sending them to the SyncManager. | ||
| 8 | //! | ||
| 9 | //! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details. | ||
| 10 | |||
| 11 | use std::collections::{HashMap, HashSet}; | ||
| 12 | use std::time::Duration; | ||
| 13 | |||
| 14 | use nostr_sdk::prelude::*; | ||
| 15 | use tokio::sync::mpsc; | ||
| 16 | |||
| 17 | use super::{RepoSyncIndex, RepoSyncNeeds}; | ||
| 18 | |||
| 19 | // ============================================================================= | ||
| 20 | // RelayAction - Actions to send to SyncManager | ||
| 21 | // ============================================================================= | ||
| 22 | |||
| 23 | /// Actions that the SelfSubscriber sends to the SyncManager | ||
| 24 | #[derive(Debug)] | ||
| 25 | pub enum RelayAction { | ||
| 26 | /// Spawn a new relay connection | ||
| 27 | SpawnRelay { | ||
| 28 | /// The relay URL to connect to | ||
| 29 | relay_url: String, | ||
| 30 | /// Repos to sync, mapped to their root event IDs | ||
| 31 | repos: HashMap<String, HashSet<EventId>>, | ||
| 32 | }, | ||
| 33 | /// Add filters to an existing relay connection | ||
| 34 | AddFilters { | ||
| 35 | /// The relay URL to add filters to | ||
| 36 | relay_url: String, | ||
| 37 | /// Repos to sync, mapped to their root event IDs | ||
| 38 | repos: HashMap<String, HashSet<EventId>>, | ||
| 39 | }, | ||
| 40 | } | ||
| 41 | |||
| 42 | // ============================================================================= | ||
| 43 | // PendingUpdates - Accumulator for batching | ||
| 44 | // ============================================================================= | ||
| 45 | |||
| 46 | /// Accumulates updates between batch timer firings | ||
| 47 | struct PendingUpdates { | ||
| 48 | /// Repos discovered since last batch, keyed by repo addressable ref | ||
| 49 | repos: HashMap<String, RepoSyncNeeds>, | ||
| 50 | } | ||
| 51 | |||
| 52 | impl PendingUpdates { | ||
| 53 | /// Create a new empty pending updates accumulator | ||
| 54 | fn new() -> Self { | ||
| 55 | Self { | ||
| 56 | repos: HashMap::new(), | ||
| 57 | } | ||
| 58 | } | ||
| 59 | |||
| 60 | /// Add or update a repo with its relays and root events | ||
| 61 | fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) { | ||
| 62 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | ||
| 63 | relays: HashSet::new(), | ||
| 64 | root_events: HashSet::new(), | ||
| 65 | }); | ||
| 66 | entry.relays.extend(relays); | ||
| 67 | entry.root_events.extend(root_events); | ||
| 68 | } | ||
| 69 | |||
| 70 | /// Check if there are any pending updates | ||
| 71 | fn is_empty(&self) -> bool { | ||
| 72 | self.repos.is_empty() | ||
| 73 | } | ||
| 74 | |||
| 75 | /// Take all pending updates, leaving empty | ||
| 76 | fn take(&mut self) -> HashMap<String, RepoSyncNeeds> { | ||
| 77 | std::mem::take(&mut self.repos) | ||
| 78 | } | ||
| 79 | } | ||
| 80 | |||
| 81 | // ============================================================================= | ||
| 82 | // SelfSubscriber - Main Component | ||
| 83 | // ============================================================================= | ||
| 84 | |||
| 85 | /// Subscribes to own relay's events to discover repos needing sync | ||
| 86 | /// | ||
| 87 | /// The SelfSubscriber connects to our own relay and monitors for: | ||
| 88 | /// - 30617 (Repository Announcements) - to discover repos listing our relay | ||
| 89 | /// - 1617 (Patches) - root events referencing repos | ||
| 90 | /// - 1618 (Issues) - root events referencing repos | ||
| 91 | /// - 1619 (Replies) - root events referencing repos | ||
| 92 | /// - 1621 (PRs) - root events referencing repos | ||
| 93 | /// | ||
| 94 | /// Note: 30618 is NOT subscribed to here (per v4 spec - only synced from remote relays) | ||
| 95 | pub struct SelfSubscriber { | ||
| 96 | /// Our own relay URL (to connect to) | ||
| 97 | own_relay_url: String, | ||
| 98 | /// Our service domain (for filtering relevant repos) | ||
| 99 | relay_domain: String, | ||
| 100 | /// Shared index of repos to sync | ||
| 101 | repo_sync_index: RepoSyncIndex, | ||
| 102 | /// Channel to send actions to SyncManager | ||
| 103 | action_tx: mpsc::Sender<RelayAction>, | ||
| 104 | } | ||
| 105 | |||
| 106 | impl SelfSubscriber { | ||
| 107 | /// Create a new SelfSubscriber | ||
| 108 | /// | ||
| 109 | /// # Arguments | ||
| 110 | /// * `own_relay_url` - The WebSocket URL of our own relay | ||
| 111 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | ||
| 112 | /// * `repo_sync_index` - Shared index to update with discovered repos | ||
| 113 | /// * `action_tx` - Channel to send RelayActions to the SyncManager | ||
| 114 | pub fn new( | ||
| 115 | own_relay_url: String, | ||
| 116 | relay_domain: String, | ||
| 117 | repo_sync_index: RepoSyncIndex, | ||
| 118 | action_tx: mpsc::Sender<RelayAction>, | ||
| 119 | ) -> Self { | ||
| 120 | Self { | ||
| 121 | own_relay_url, | ||
| 122 | relay_domain, | ||
| 123 | repo_sync_index, | ||
| 124 | action_tx, | ||
| 125 | } | ||
| 126 | } | ||
| 127 | |||
| 128 | /// Get batch window from environment or use default | ||
| 129 | /// | ||
| 130 | /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. | ||
| 131 | /// Default: 5000ms (5 seconds) | ||
| 132 | fn get_batch_window() -> Duration { | ||
| 133 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | ||
| 134 | .ok() | ||
| 135 | .and_then(|s| s.parse::<u64>().ok()) | ||
| 136 | .map(Duration::from_millis) | ||
| 137 | .unwrap_or(Duration::from_millis(5000)) | ||
| 138 | } | ||
| 139 | |||
| 140 | /// Extract relay URLs from event tags | ||
| 141 | /// | ||
| 142 | /// Extracts URLs from: | ||
| 143 | /// - `relays` tags: ["relays", "wss://relay1.com", "wss://relay2.com", ...] | ||
| 144 | /// - `clone` tags: ["clone", "https://example.com/repo.git", ...] (converted to ws://) | ||
| 145 | fn extract_relay_urls(event: &Event) -> HashSet<String> { | ||
| 146 | let mut relays = HashSet::new(); | ||
| 147 | |||
| 148 | for tag in event.tags.iter() { | ||
| 149 | let tag_vec = tag.as_slice(); | ||
| 150 | if tag_vec.is_empty() { | ||
| 151 | continue; | ||
| 152 | } | ||
| 153 | |||
| 154 | match tag_vec[0].as_str() { | ||
| 155 | "relays" => { | ||
| 156 | // All subsequent values are relay URLs | ||
| 157 | for url in tag_vec.iter().skip(1) { | ||
| 158 | relays.insert(url.to_string()); | ||
| 159 | } | ||
| 160 | } | ||
| 161 | "clone" if tag_vec.len() >= 2 => { | ||
| 162 | // Convert http(s) clone URL to ws(s) relay URL | ||
| 163 | if let Some(relay_url) = clone_url_to_relay_url(&tag_vec[1]) { | ||
| 164 | relays.insert(relay_url); | ||
| 165 | } | ||
| 166 | } | ||
| 167 | _ => {} | ||
| 168 | } | ||
| 169 | } | ||
| 170 | |||
| 171 | relays | ||
| 172 | } | ||
| 173 | |||
| 174 | /// Extract repo identifier from event | ||
| 175 | /// | ||
| 176 | /// For kind 30617, uses the `d` tag to build the addressable reference | ||
| 177 | /// Format: 30617:pubkey:identifier | ||
| 178 | fn extract_repo_id(event: &Event) -> Option<String> { | ||
| 179 | // For kind 30617, extract d tag and build addressable ref | ||
| 180 | if event.kind == Kind::Custom(30617) { | ||
| 181 | for tag in event.tags.iter() { | ||
| 182 | let tag_vec = tag.as_slice(); | ||
| 183 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { | ||
| 184 | return Some(format!("30617:{}:{}", event.pubkey, tag_vec[1])); | ||
| 185 | } | ||
| 186 | } | ||
| 187 | } | ||
| 188 | |||
| 189 | // For other kinds (1617, 1618, 1619, 1621), we'd need to look at | ||
| 190 | // their 'a' tags to find which repo they belong to. | ||
| 191 | // That processing happens in the batch processing, not here. | ||
| 192 | None | ||
| 193 | } | ||
| 194 | |||
| 195 | /// Check if announcement lists our relay | ||
| 196 | /// | ||
| 197 | /// Returns true if any extracted relay URL contains our domain | ||
| 198 | fn lists_our_relay(&self, event: &Event) -> bool { | ||
| 199 | Self::extract_relay_urls(event).iter().any(|url| { | ||
| 200 | url.contains(&self.relay_domain) || url == &self.own_relay_url | ||
| 201 | }) | ||
| 202 | } | ||
| 203 | |||
| 204 | /// Main run loop | ||
| 205 | /// | ||
| 206 | /// Connects to own relay, subscribes to relevant event kinds, | ||
| 207 | /// and batches updates before processing them. | ||
| 208 | pub async fn run(self) { | ||
| 209 | let client = Client::default(); | ||
| 210 | |||
| 211 | // Add own relay | ||
| 212 | if let Err(e) = client.add_relay(&self.own_relay_url).await { | ||
| 213 | tracing::error!( | ||
| 214 | url = %self.own_relay_url, | ||
| 215 | error = %e, | ||
| 216 | "Failed to add own relay for self-subscription" | ||
| 217 | ); | ||
| 218 | return; | ||
| 219 | } | ||
| 220 | |||
| 221 | // Connect | ||
| 222 | client.connect().await; | ||
| 223 | |||
| 224 | // Subscribe to announcement and root event kinds | ||
| 225 | // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618) | ||
| 226 | let filter = Filter::new().kinds(vec![ | ||
| 227 | Kind::Custom(30617), // Repository Announcements | ||
| 228 | Kind::Custom(1617), // Patches | ||
| 229 | Kind::Custom(1618), // Issues | ||
| 230 | Kind::Custom(1619), // Replies/Status | ||
| 231 | Kind::Custom(1621), // Pull Requests | ||
| 232 | ]); | ||
| 233 | |||
| 234 | if let Err(e) = client.subscribe(filter, None).await { | ||
| 235 | tracing::error!( | ||
| 236 | error = %e, | ||
| 237 | "Failed to subscribe to own relay for self-subscription" | ||
| 238 | ); | ||
| 239 | return; | ||
| 240 | } | ||
| 241 | |||
| 242 | tracing::info!( | ||
| 243 | url = %self.own_relay_url, | ||
| 244 | domain = %self.relay_domain, | ||
| 245 | "SelfSubscriber started" | ||
| 246 | ); | ||
| 247 | |||
| 248 | let mut notifications = client.notifications(); | ||
| 249 | let batch_window = Self::get_batch_window(); | ||
| 250 | let mut pending = PendingUpdates::new(); | ||
| 251 | |||
| 252 | // Timer does NOT reset on new events - use interval | ||
| 253 | let mut timer = tokio::time::interval(batch_window); | ||
| 254 | timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
| 255 | |||
| 256 | loop { | ||
| 257 | tokio::select! { | ||
| 258 | notification = notifications.recv() => { | ||
| 259 | match notification { | ||
| 260 | Ok(RelayPoolNotification::Event { event, .. }) => { | ||
| 261 | // Only process 30617 events that list our relay | ||
| 262 | if event.kind == Kind::Custom(30617) { | ||
| 263 | if !self.lists_our_relay(&event) { | ||
| 264 | continue; | ||
| 265 | } | ||
| 266 | |||
| 267 | // Extract repo ID and relays | ||
| 268 | if let Some(repo_id) = Self::extract_repo_id(&event) { | ||
| 269 | let relays = Self::extract_relay_urls(&event); | ||
| 270 | let mut root_events = HashSet::new(); | ||
| 271 | root_events.insert(event.id); | ||
| 272 | |||
| 273 | pending.add_repo(repo_id, relays, root_events); | ||
| 274 | tracing::debug!( | ||
| 275 | event_id = %event.id, | ||
| 276 | "Queued 30617 announcement for batch processing" | ||
| 277 | ); | ||
| 278 | } | ||
| 279 | } else { | ||
| 280 | // For root event kinds (1617, 1618, 1619, 1621), | ||
| 281 | // we need to check if they reference repos we care about. | ||
| 282 | // For now, we'll track them in a simpler way. | ||
| 283 | // Full implementation would extract 'a' tag and match to known repos. | ||
| 284 | tracing::trace!( | ||
| 285 | kind = %event.kind, | ||
| 286 | event_id = %event.id, | ||
| 287 | "Received root event (processing deferred)" | ||
| 288 | ); | ||
| 289 | } | ||
| 290 | } | ||
| 291 | Ok(RelayPoolNotification::Shutdown) => { | ||
| 292 | tracing::info!("SelfSubscriber received shutdown notification"); | ||
| 293 | break; | ||
| 294 | } | ||
| 295 | Err(e) => { | ||
| 296 | tracing::error!(error = %e, "Error receiving notification"); | ||
| 297 | break; | ||
| 298 | } | ||
| 299 | _ => {} | ||
| 300 | } | ||
| 301 | } | ||
| 302 | _ = timer.tick() => { | ||
| 303 | if !pending.is_empty() { | ||
| 304 | self.process_batch(&mut pending).await; | ||
| 305 | } | ||
| 306 | } | ||
| 307 | } | ||
| 308 | } | ||
| 309 | |||
| 310 | tracing::info!("SelfSubscriber stopped"); | ||
| 311 | } | ||
| 312 | |||
| 313 | /// Process accumulated batch | ||
| 314 | /// | ||
| 315 | /// Updates the RepoSyncIndex with discovered repos, then derives per-relay | ||
| 316 | /// targets and sends RelayAction messages to the SyncManager. | ||
| 317 | async fn process_batch(&self, pending: &mut PendingUpdates) { | ||
| 318 | use crate::sync::algorithms::derive_relay_targets; | ||
| 319 | |||
| 320 | let updates = pending.take(); | ||
| 321 | |||
| 322 | if updates.is_empty() { | ||
| 323 | return; | ||
| 324 | } | ||
| 325 | |||
| 326 | tracing::info!( | ||
| 327 | repo_count = updates.len(), | ||
| 328 | "Processing batch of repo updates" | ||
| 329 | ); | ||
| 330 | |||
| 331 | // Update RepoSyncIndex | ||
| 332 | let mut index = self.repo_sync_index.write().await; | ||
| 333 | |||
| 334 | for (repo_id, needs) in updates { | ||
| 335 | // Merge with existing entry or insert new | ||
| 336 | let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { | ||
| 337 | relays: HashSet::new(), | ||
| 338 | root_events: HashSet::new(), | ||
| 339 | }); | ||
| 340 | entry.relays.extend(needs.relays); | ||
| 341 | entry.root_events.extend(needs.root_events); | ||
| 342 | |||
| 343 | tracing::debug!( | ||
| 344 | repo_id = %repo_id, | ||
| 345 | relay_count = entry.relays.len(), | ||
| 346 | event_count = entry.root_events.len(), | ||
| 347 | "Updated repo sync needs" | ||
| 348 | ); | ||
| 349 | } | ||
| 350 | |||
| 351 | // Derive per-relay targets from the updated index | ||
| 352 | let targets = derive_relay_targets(&index); | ||
| 353 | drop(index); // Release lock before async operations | ||
| 354 | |||
| 355 | // For each relay, send SpawnRelay action | ||
| 356 | // SyncManager will check if relay already exists | ||
| 357 | for (relay_url, needs) in targets { | ||
| 358 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) | ||
| 359 | if relay_url.contains(&self.relay_domain) { | ||
| 360 | continue; | ||
| 361 | } | ||
| 362 | |||
| 363 | // Convert needs to HashMap<String, HashSet<EventId>> | ||
| 364 | let mut repos = HashMap::new(); | ||
| 365 | for repo_id in needs.repos { | ||
| 366 | repos.insert(repo_id, needs.root_events.clone()); | ||
| 367 | } | ||
| 368 | |||
| 369 | let action = RelayAction::SpawnRelay { relay_url: relay_url.clone(), repos }; | ||
| 370 | |||
| 371 | if let Err(e) = self.action_tx.send(action).await { | ||
| 372 | tracing::error!( | ||
| 373 | relay = %relay_url, | ||
| 374 | error = %e, | ||
| 375 | "Failed to send SpawnRelay action" | ||
| 376 | ); | ||
| 377 | } else { | ||
| 378 | tracing::debug!( | ||
| 379 | relay = %relay_url, | ||
| 380 | "Sent SpawnRelay action to SyncManager" | ||
| 381 | ); | ||
| 382 | } | ||
| 383 | } | ||
| 384 | } | ||
| 385 | } | ||
| 386 | |||
| 387 | // ============================================================================= | ||
| 388 | // Helper Functions | ||
| 389 | // ============================================================================= | ||
| 390 | |||
| 391 | /// Convert clone URL to relay URL | ||
| 392 | /// | ||
| 393 | /// Converts http:// to ws:// and https:// to wss:// | ||
| 394 | /// Returns None for unsupported URL schemes | ||
| 395 | fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | ||
| 396 | if clone_url.starts_with("http://") { | ||
| 397 | Some(clone_url.replacen("http://", "ws://", 1)) | ||
| 398 | } else if clone_url.starts_with("https://") { | ||
| 399 | Some(clone_url.replacen("https://", "wss://", 1)) | ||
| 400 | } else { | ||
| 401 | None | ||
| 402 | } | ||
| 403 | } | ||
| 404 | |||
| 405 | #[cfg(test)] | ||
| 406 | mod tests { | ||
| 407 | use super::*; | ||
| 408 | |||
| 409 | #[test] | ||
| 410 | fn test_clone_url_to_relay_url_https() { | ||
| 411 | assert_eq!( | ||
| 412 | clone_url_to_relay_url("https://example.com/repo.git"), | ||
| 413 | Some("wss://example.com/repo.git".to_string()) | ||
| 414 | ); | ||
| 415 | } | ||
| 416 | |||
| 417 | #[test] | ||
| 418 | fn test_clone_url_to_relay_url_http() { | ||
| 419 | assert_eq!( | ||
| 420 | clone_url_to_relay_url("http://localhost:3000/repo.git"), | ||
| 421 | Some("ws://localhost:3000/repo.git".to_string()) | ||
| 422 | ); | ||
| 423 | } | ||
| 424 | |||
| 425 | #[test] | ||
| 426 | fn test_clone_url_to_relay_url_unsupported() { | ||
| 427 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); | ||
| 428 | assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); | ||
| 429 | } | ||
| 430 | } \ No newline at end of file | ||