diff options
| -rw-r--r-- | src/sync/mod.rs | 386 |
1 files changed, 379 insertions, 7 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index aa34490..71f91e2 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -39,11 +39,13 @@ | |||
| 39 | use std::collections::{HashMap, HashSet}; | 39 | use std::collections::{HashMap, HashSet}; |
| 40 | use std::sync::Arc; | 40 | use std::sync::Arc; |
| 41 | 41 | ||
| 42 | use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; | ||
| 42 | use nostr_sdk::EventId; | 43 | use nostr_sdk::EventId; |
| 43 | use tokio::sync::RwLock; | 44 | use tokio::sync::RwLock; |
| 44 | 45 | ||
| 45 | use crate::config::Config; | 46 | use crate::config::Config; |
| 46 | use crate::nostr::builder::Nip34WritePolicy; | 47 | use crate::nostr::builder::Nip34WritePolicy; |
| 48 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | ||
| 47 | use crate::nostr::SharedDatabase; | 49 | use crate::nostr::SharedDatabase; |
| 48 | 50 | ||
| 49 | // ============================================================================= | 51 | // ============================================================================= |
| @@ -263,6 +265,191 @@ impl SyncManager { | |||
| 263 | &self.sync_relays | 265 | &self.sync_relays |
| 264 | } | 266 | } |
| 265 | 267 | ||
| 268 | // ========================================================================= | ||
| 269 | // Phase 2: Database Initialization | ||
| 270 | // ========================================================================= | ||
| 271 | |||
| 272 | /// Initialize sync state from database queries at startup. | ||
| 273 | /// | ||
| 274 | /// This method performs two database queries: | ||
| 275 | /// 1. Query kinds 1617/1618/1619/1621 to build `following_repo_root_events` | ||
| 276 | /// 2. Query kind 30617 to build `sync_relays` | ||
| 277 | /// | ||
| 278 | /// The bootstrap relay (if configured) is always added to `sync_relays`. | ||
| 279 | /// | ||
| 280 | /// # Errors | ||
| 281 | /// | ||
| 282 | /// Returns an error if database queries fail. | ||
| 283 | pub async fn initialize_from_database(&self) -> Result<(), String> { | ||
| 284 | // Initialize bootstrap relay if configured (never removed) | ||
| 285 | if let Some(bootstrap_url) = &self.bootstrap_relay_url { | ||
| 286 | self.sync_relays.write().await.insert( | ||
| 287 | bootstrap_url.clone(), | ||
| 288 | HashMap::new(), // Repos potentially populated below but may stay empty (Layer 1 only) | ||
| 289 | ); | ||
| 290 | tracing::info!("Added bootstrap relay to sync_relays: {}", bootstrap_url); | ||
| 291 | } | ||
| 292 | |||
| 293 | // Query 1: Build following_repo_root_events | ||
| 294 | // Find all 1617/1618/1619/1621 events and extract their repo references | ||
| 295 | let root_event_kinds = vec![ | ||
| 296 | Kind::GitPatch, // 1617 | ||
| 297 | Kind::from(KIND_PR), // 1618 | ||
| 298 | Kind::from(KIND_PR_UPDATE), // 1619 | ||
| 299 | Kind::GitIssue, // 1621 | ||
| 300 | ]; | ||
| 301 | |||
| 302 | let filter = Filter::new().kinds(root_event_kinds); | ||
| 303 | let root_events = self | ||
| 304 | .database | ||
| 305 | .query(filter) | ||
| 306 | .await | ||
| 307 | .map_err(|e| format!("Failed to query root events: {}", e))?; | ||
| 308 | |||
| 309 | let mut root_events_count = 0; | ||
| 310 | for event in root_events { | ||
| 311 | // An event may have multiple 'a' tags pointing to different repos | ||
| 312 | let repo_refs = Self::extract_all_repo_refs(&event); | ||
| 313 | for repo_ref in repo_refs { | ||
| 314 | self.following_repo_root_events | ||
| 315 | .write() | ||
| 316 | .await | ||
| 317 | .entry(repo_ref) | ||
| 318 | .or_default() | ||
| 319 | .insert(event.id); | ||
| 320 | root_events_count += 1; | ||
| 321 | } | ||
| 322 | } | ||
| 323 | tracing::info!( | ||
| 324 | "Populated following_repo_root_events with {} repo-event mappings", | ||
| 325 | root_events_count | ||
| 326 | ); | ||
| 327 | |||
| 328 | // Query 2: Build sync_relays from kind 30617 announcements | ||
| 329 | let announcement_filter = Filter::new().kind(Kind::from(KIND_REPOSITORY_ANNOUNCEMENT)); | ||
| 330 | let announcements = self | ||
| 331 | .database | ||
| 332 | .query(announcement_filter) | ||
| 333 | .await | ||
| 334 | .map_err(|e| format!("Failed to query announcements: {}", e))?; | ||
| 335 | |||
| 336 | let mut sync_relays_count = 0; | ||
| 337 | for event in announcements { | ||
| 338 | let repo_ref = Self::build_repo_ref(&event); | ||
| 339 | let relay_urls = Self::extract_relay_urls(&event); | ||
| 340 | |||
| 341 | // Only track repos that list BOTH a remote relay AND our service | ||
| 342 | if self.lists_our_service(&event) { | ||
| 343 | for relay_url in relay_urls { | ||
| 344 | if !self.is_own_relay(&relay_url) { | ||
| 345 | // Get events for this repo from following_repo_root_events | ||
| 346 | let events = self | ||
| 347 | .following_repo_root_events | ||
| 348 | .read() | ||
| 349 | .await | ||
| 350 | .get(&repo_ref) | ||
| 351 | .cloned() | ||
| 352 | .unwrap_or_default(); | ||
| 353 | |||
| 354 | self.sync_relays | ||
| 355 | .write() | ||
| 356 | .await | ||
| 357 | .entry(relay_url) | ||
| 358 | .or_default() | ||
| 359 | .insert(repo_ref.clone(), events); | ||
| 360 | sync_relays_count += 1; | ||
| 361 | } | ||
| 362 | } | ||
| 363 | } | ||
| 364 | } | ||
| 365 | tracing::info!( | ||
| 366 | "Populated sync_relays with {} relay-repo mappings", | ||
| 367 | sync_relays_count | ||
| 368 | ); | ||
| 369 | |||
| 370 | Ok(()) | ||
| 371 | } | ||
| 372 | |||
| 373 | // ========================================================================= | ||
| 374 | // Helper Methods for Event Extraction | ||
| 375 | // ========================================================================= | ||
| 376 | |||
| 377 | /// Extract ALL repo refs from an event (it may tag multiple repos). | ||
| 378 | /// | ||
| 379 | /// Looks for 'a' tags that reference kind 30617 (repository announcements). | ||
| 380 | /// Returns refs in format "30617:pubkey:identifier". | ||
| 381 | pub fn extract_all_repo_refs(event: &Event) -> Vec<String> { | ||
| 382 | event | ||
| 383 | .tags | ||
| 384 | .iter() | ||
| 385 | .filter_map(|tag| { | ||
| 386 | let tag_vec = tag.clone().to_vec(); | ||
| 387 | if tag_vec.len() >= 2 && tag_vec[0] == "a" { | ||
| 388 | // Validate it's a 30617 reference | ||
| 389 | if tag_vec[1].starts_with("30617:") { | ||
| 390 | Some(tag_vec[1].clone()) | ||
| 391 | } else { | ||
| 392 | None | ||
| 393 | } | ||
| 394 | } else { | ||
| 395 | None | ||
| 396 | } | ||
| 397 | }) | ||
| 398 | .collect() | ||
| 399 | } | ||
| 400 | |||
| 401 | /// Build a repo ref string from a 30617 announcement event. | ||
| 402 | /// | ||
| 403 | /// Returns format "30617:pubkey:identifier". | ||
| 404 | pub fn build_repo_ref(event: &Event) -> String { | ||
| 405 | // Extract 'd' tag for identifier | ||
| 406 | let identifier = event | ||
| 407 | .tags | ||
| 408 | .iter() | ||
| 409 | .find(|tag| tag.kind() == TagKind::d()) | ||
| 410 | .and_then(|tag| tag.content()) | ||
| 411 | .map(|s| s.to_string()) | ||
| 412 | .unwrap_or_default(); | ||
| 413 | |||
| 414 | format!("30617:{}:{}", event.pubkey.to_hex(), identifier) | ||
| 415 | } | ||
| 416 | |||
| 417 | /// Extract relay URLs from a repository announcement event. | ||
| 418 | /// | ||
| 419 | /// Looks for the 'relays' tag and returns all relay URLs. | ||
| 420 | pub fn extract_relay_urls(event: &Event) -> Vec<String> { | ||
| 421 | event | ||
| 422 | .tags | ||
| 423 | .iter() | ||
| 424 | .filter(|tag| matches!(tag.kind(), TagKind::Relays)) | ||
| 425 | .flat_map(|tag| { | ||
| 426 | let vec = tag.clone().to_vec(); | ||
| 427 | // Skip first element (tag name), rest are values | ||
| 428 | vec.into_iter().skip(1) | ||
| 429 | }) | ||
| 430 | .collect() | ||
| 431 | } | ||
| 432 | |||
| 433 | /// Check if event lists our service in the relays tag. | ||
| 434 | /// | ||
| 435 | /// Compares relay URLs against our service domain. | ||
| 436 | fn lists_our_service(&self, event: &Event) -> bool { | ||
| 437 | let relay_urls = Self::extract_relay_urls(event); | ||
| 438 | relay_urls.iter().any(|url| self.is_own_relay(url)) | ||
| 439 | } | ||
| 440 | |||
| 441 | /// Check if a relay URL matches our relay. | ||
| 442 | /// | ||
| 443 | /// Compares the URL against our service domain. | ||
| 444 | fn is_own_relay(&self, relay_url: &str) -> bool { | ||
| 445 | // Normalize comparison: check if URL contains our domain | ||
| 446 | relay_url.contains(&self.service_domain) | ||
| 447 | } | ||
| 448 | |||
| 449 | // ========================================================================= | ||
| 450 | // Main Run Loop | ||
| 451 | // ========================================================================= | ||
| 452 | |||
| 266 | /// Runs the sync manager main loop. | 453 | /// Runs the sync manager main loop. |
| 267 | /// | 454 | /// |
| 268 | /// This method should be called in a spawned task: | 455 | /// This method should be called in a spawned task: |
| @@ -277,22 +464,35 @@ impl SyncManager { | |||
| 277 | /// | 464 | /// |
| 278 | /// This is a stub that logs and then waits indefinitely. | 465 | /// This is a stub that logs and then waits indefinitely. |
| 279 | /// Full implementation includes: | 466 | /// Full implementation includes: |
| 280 | /// - Phase 2: Database initialization queries | 467 | /// - Phase 2: Database initialization queries ✓ |
| 281 | /// - Phase 3: Self-subscription for incremental updates | 468 | /// - Phase 3: Self-subscription for incremental updates |
| 282 | /// - Phase 4-6: Filter building, connection management | 469 | /// - Phase 4-6: Filter building, connection management |
| 283 | /// - Phase 7: Full sync loop | 470 | /// - Phase 7: Full sync loop |
| 284 | pub async fn run(self) { | 471 | pub async fn run(self) { |
| 285 | tracing::info!( | 472 | tracing::info!( |
| 286 | "SyncManager stub started (bootstrap_relay={:?}, domain={})", | 473 | "SyncManager starting (bootstrap_relay={:?}, domain={})", |
| 287 | self.bootstrap_relay_url, | 474 | self.bootstrap_relay_url, |
| 288 | self.service_domain | 475 | self.service_domain |
| 289 | ); | 476 | ); |
| 290 | 477 | ||
| 291 | tracing::info!( | 478 | // Phase 2: Initialize from database |
| 292 | "Phase 1 data structures initialized: following_repo_root_events, sync_relays" | 479 | if let Err(e) = self.initialize_from_database().await { |
| 293 | ); | 480 | tracing::error!("Failed to initialize sync state from database: {}", e); |
| 481 | // Continue anyway - we can still receive events via self-subscription | ||
| 482 | } | ||
| 483 | |||
| 484 | // Log initialization results | ||
| 485 | { | ||
| 486 | let following_count = self.following_repo_root_events.read().await.len(); | ||
| 487 | let sync_relays_count = self.sync_relays.read().await.len(); | ||
| 488 | tracing::info!( | ||
| 489 | "Sync state initialized: {} repos tracked, {} sync relays", | ||
| 490 | following_count, | ||
| 491 | sync_relays_count | ||
| 492 | ); | ||
| 493 | } | ||
| 294 | 494 | ||
| 295 | // Stub: just wait indefinitely until full implementation | 495 | // Stub: wait indefinitely until full implementation (Phases 3-7) |
| 296 | // This prevents the spawned task from immediately completing | 496 | // This prevents the spawned task from immediately completing |
| 297 | loop { | 497 | loop { |
| 298 | tokio::time::sleep(std::time::Duration::from_secs(3600)).await; | 498 | tokio::time::sleep(std::time::Duration::from_secs(3600)).await; |
| @@ -318,6 +518,178 @@ pub use metrics::{event_source, SyncMetrics}; | |||
| 318 | #[cfg(test)] | 518 | #[cfg(test)] |
| 319 | mod tests { | 519 | mod tests { |
| 320 | use super::*; | 520 | use super::*; |
| 521 | use nostr_relay_builder::prelude::{EventBuilder, Keys, Tag}; | ||
| 522 | |||
| 523 | /// Helper to create a test event with specific tags | ||
| 524 | fn create_test_event(kind: Kind, tags: Vec<Tag>) -> Event { | ||
| 525 | let keys = Keys::generate(); | ||
| 526 | EventBuilder::new(kind, "test content") | ||
| 527 | .tags(tags) | ||
| 528 | .sign_with_keys(&keys) | ||
| 529 | .expect("Failed to sign test event") | ||
| 530 | } | ||
| 531 | |||
| 532 | // ========================================================================= | ||
| 533 | // Tests for extract_all_repo_refs | ||
| 534 | // ========================================================================= | ||
| 535 | |||
| 536 | #[test] | ||
| 537 | fn test_extract_all_repo_refs_single_ref() { | ||
| 538 | let event = create_test_event( | ||
| 539 | Kind::GitPatch, | ||
| 540 | vec![Tag::custom( | ||
| 541 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 542 | vec!["30617:abc123def456:my-project"], | ||
| 543 | )], | ||
| 544 | ); | ||
| 545 | |||
| 546 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 547 | assert_eq!(refs.len(), 1); | ||
| 548 | assert_eq!(refs[0], "30617:abc123def456:my-project"); | ||
| 549 | } | ||
| 550 | |||
| 551 | #[test] | ||
| 552 | fn test_extract_all_repo_refs_multiple_refs() { | ||
| 553 | let event = create_test_event( | ||
| 554 | Kind::GitPatch, | ||
| 555 | vec![ | ||
| 556 | Tag::custom( | ||
| 557 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 558 | vec!["30617:abc123:project1"], | ||
| 559 | ), | ||
| 560 | Tag::custom( | ||
| 561 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 562 | vec!["30617:def456:project2"], | ||
| 563 | ), | ||
| 564 | ], | ||
| 565 | ); | ||
| 566 | |||
| 567 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 568 | assert_eq!(refs.len(), 2); | ||
| 569 | assert!(refs.contains(&"30617:abc123:project1".to_string())); | ||
| 570 | assert!(refs.contains(&"30617:def456:project2".to_string())); | ||
| 571 | } | ||
| 572 | |||
| 573 | #[test] | ||
| 574 | fn test_extract_all_repo_refs_ignores_non_30617() { | ||
| 575 | let event = create_test_event( | ||
| 576 | Kind::GitPatch, | ||
| 577 | vec![ | ||
| 578 | Tag::custom( | ||
| 579 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 580 | vec!["30617:abc123:valid-repo"], | ||
| 581 | ), | ||
| 582 | Tag::custom( | ||
| 583 | nostr_relay_builder::prelude::TagKind::custom("a"), | ||
| 584 | vec!["30618:def456:state-event"], // Not a repo ref | ||
| 585 | ), | ||
| 586 | ], | ||
| 587 | ); | ||
| 588 | |||
| 589 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 590 | assert_eq!(refs.len(), 1); | ||
| 591 | assert_eq!(refs[0], "30617:abc123:valid-repo"); | ||
| 592 | } | ||
| 593 | |||
| 594 | #[test] | ||
| 595 | fn test_extract_all_repo_refs_empty_when_no_a_tags() { | ||
| 596 | let event = create_test_event( | ||
| 597 | Kind::GitPatch, | ||
| 598 | vec![Tag::custom( | ||
| 599 | nostr_relay_builder::prelude::TagKind::custom("e"), | ||
| 600 | vec!["some-event-id"], | ||
| 601 | )], | ||
| 602 | ); | ||
| 603 | |||
| 604 | let refs = SyncManager::extract_all_repo_refs(&event); | ||
| 605 | assert!(refs.is_empty()); | ||
| 606 | } | ||
| 607 | |||
| 608 | // ========================================================================= | ||
| 609 | // Tests for build_repo_ref | ||
| 610 | // ========================================================================= | ||
| 611 | |||
| 612 | #[test] | ||
| 613 | fn test_build_repo_ref() { | ||
| 614 | let keys = Keys::generate(); | ||
| 615 | let event = EventBuilder::new(Kind::from(30617_u16), "announcement") | ||
| 616 | .tags(vec![Tag::custom( | ||
| 617 | nostr_relay_builder::prelude::TagKind::d(), | ||
| 618 | vec!["my-identifier"], | ||
| 619 | )]) | ||
| 620 | .sign_with_keys(&keys) | ||
| 621 | .expect("Failed to sign test event"); | ||
| 622 | |||
| 623 | let repo_ref = SyncManager::build_repo_ref(&event); | ||
| 624 | assert!(repo_ref.starts_with("30617:")); | ||
| 625 | assert!(repo_ref.ends_with(":my-identifier")); | ||
| 626 | assert!(repo_ref.contains(&event.pubkey.to_hex())); | ||
| 627 | } | ||
| 628 | |||
| 629 | #[test] | ||
| 630 | fn test_build_repo_ref_empty_identifier() { | ||
| 631 | let keys = Keys::generate(); | ||
| 632 | let event = EventBuilder::new(Kind::from(30617_u16), "announcement") | ||
| 633 | .sign_with_keys(&keys) | ||
| 634 | .expect("Failed to sign test event"); | ||
| 635 | |||
| 636 | let repo_ref = SyncManager::build_repo_ref(&event); | ||
| 637 | assert!(repo_ref.starts_with("30617:")); | ||
| 638 | assert!(repo_ref.ends_with(":")); // Empty identifier | ||
| 639 | } | ||
| 640 | |||
| 641 | // ========================================================================= | ||
| 642 | // Tests for extract_relay_urls | ||
| 643 | // ========================================================================= | ||
| 644 | |||
| 645 | #[test] | ||
| 646 | fn test_extract_relay_urls_single() { | ||
| 647 | let event = create_test_event( | ||
| 648 | Kind::from(30617_u16), | ||
| 649 | vec![Tag::custom( | ||
| 650 | nostr_relay_builder::prelude::TagKind::Relays, | ||
| 651 | vec!["wss://relay.example.com"], | ||
| 652 | )], | ||
| 653 | ); | ||
| 654 | |||
| 655 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 656 | assert_eq!(urls.len(), 1); | ||
| 657 | assert_eq!(urls[0], "wss://relay.example.com"); | ||
| 658 | } | ||
| 659 | |||
| 660 | #[test] | ||
| 661 | fn test_extract_relay_urls_multiple() { | ||
| 662 | let event = create_test_event( | ||
| 663 | Kind::from(30617_u16), | ||
| 664 | vec![Tag::custom( | ||
| 665 | nostr_relay_builder::prelude::TagKind::Relays, | ||
| 666 | vec!["wss://relay1.example.com", "wss://relay2.example.com"], | ||
| 667 | )], | ||
| 668 | ); | ||
| 669 | |||
| 670 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 671 | assert_eq!(urls.len(), 2); | ||
| 672 | assert!(urls.contains(&"wss://relay1.example.com".to_string())); | ||
| 673 | assert!(urls.contains(&"wss://relay2.example.com".to_string())); | ||
| 674 | } | ||
| 675 | |||
| 676 | #[test] | ||
| 677 | fn test_extract_relay_urls_empty_when_no_relays_tag() { | ||
| 678 | let event = create_test_event( | ||
| 679 | Kind::from(30617_u16), | ||
| 680 | vec![Tag::custom( | ||
| 681 | nostr_relay_builder::prelude::TagKind::custom("d"), | ||
| 682 | vec!["my-project"], | ||
| 683 | )], | ||
| 684 | ); | ||
| 685 | |||
| 686 | let urls = SyncManager::extract_relay_urls(&event); | ||
| 687 | assert!(urls.is_empty()); | ||
| 688 | } | ||
| 689 | |||
| 690 | // ========================================================================= | ||
| 691 | // Original data structure tests | ||
| 692 | // ========================================================================= | ||
| 321 | 693 | ||
| 322 | #[tokio::test] | 694 | #[tokio::test] |
| 323 | async fn test_following_repo_root_events_basic_operations() { | 695 | async fn test_following_repo_root_events_basic_operations() { |
| @@ -391,4 +763,4 @@ mod tests { | |||
| 391 | let guard = state.read().await; | 763 | let guard = state.read().await; |
| 392 | assert!(guard.contains_key("30617:writer:repo")); | 764 | assert!(guard.contains_key("30617:writer:repo")); |
| 393 | } | 765 | } |
| 394 | } \ No newline at end of file | 766 | } |