diff options
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 335 |
1 files changed, 304 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 71f91e2..9dec982 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -37,17 +37,26 @@ | |||
| 37 | //! for the complete design context. | 37 | //! for the complete design context. |
| 38 | 38 | ||
| 39 | use std::collections::{HashMap, HashSet}; | 39 | use std::collections::{HashMap, HashSet}; |
| 40 | use std::net::SocketAddr; | ||
| 40 | use std::sync::Arc; | 41 | use std::sync::Arc; |
| 41 | 42 | ||
| 42 | use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; | 43 | use nostr_relay_builder::prelude::{ |
| 44 | DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy, | ||
| 45 | }; | ||
| 46 | use nostr_sdk::prelude::*; | ||
| 43 | use nostr_sdk::EventId; | 47 | use nostr_sdk::EventId; |
| 44 | use tokio::sync::RwLock; | 48 | use tokio::sync::{mpsc, RwLock}; |
| 45 | 49 | ||
| 46 | use crate::config::Config; | 50 | use crate::config::Config; |
| 47 | use crate::nostr::builder::Nip34WritePolicy; | 51 | use crate::nostr::builder::Nip34WritePolicy; |
| 48 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; | 52 | use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; |
| 49 | use crate::nostr::SharedDatabase; | 53 | use crate::nostr::SharedDatabase; |
| 50 | 54 | ||
| 55 | mod relay_connection; | ||
| 56 | mod self_subscriber; | ||
| 57 | pub use relay_connection::{RelayConnection, RelayEvent}; | ||
| 58 | pub use self_subscriber::{RelayAction, SelfSubscriber}; | ||
| 59 | |||
| 51 | // ============================================================================= | 60 | // ============================================================================= |
| 52 | // Type Aliases for Sync State | 61 | // Type Aliases for Sync State |
| 53 | // ============================================================================= | 62 | // ============================================================================= |
| @@ -176,7 +185,7 @@ pub fn new_sync_relays() -> SyncRelays { | |||
| 176 | /// The SyncManager is responsible for: | 185 | /// The SyncManager is responsible for: |
| 177 | /// - Discovering relays from stored repository announcements | 186 | /// - Discovering relays from stored repository announcements |
| 178 | /// - Maintaining connections to sync relays | 187 | /// - Maintaining connections to sync relays |
| 179 | /// - Subscribing to events at external relays | 188 | /// - Subscribing to events at external relays |
| 180 | /// - Applying the acceptance policy to synced events | 189 | /// - Applying the acceptance policy to synced events |
| 181 | /// | 190 | /// |
| 182 | /// ## Lifecycle | 191 | /// ## Lifecycle |
| @@ -186,14 +195,16 @@ pub fn new_sync_relays() -> SyncRelays { | |||
| 186 | /// | 195 | /// |
| 187 | /// ## Current Status | 196 | /// ## Current Status |
| 188 | /// | 197 | /// |
| 189 | /// This is a stub implementation. The core data structures are: | 198 | /// Phase 2 implementation supports: |
| 199 | /// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter | ||
| 200 | /// - Event processing through write policy | ||
| 201 | /// - Storage of accepted events | ||
| 202 | /// | ||
| 203 | /// Core data structures: | ||
| 190 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following | 204 | /// - [`FollowingRepoRootEvents`]: Repository root events we're following |
| 191 | /// - [`SyncRelays`]: Relays we sync from with their repos and events | 205 | /// - [`SyncRelays`]: Relays we sync from with their repos and events |
| 192 | /// | ||
| 193 | /// Full implementation will come in later phases. | ||
| 194 | pub struct SyncManager { | 206 | pub struct SyncManager { |
| 195 | /// Bootstrap relay URL if configured | 207 | /// Bootstrap relay URL if configured |
| 196 | #[allow(dead_code)] | ||
| 197 | bootstrap_relay_url: Option<String>, | 208 | bootstrap_relay_url: Option<String>, |
| 198 | 209 | ||
| 199 | /// Our service domain for filtering repo announcements | 210 | /// Our service domain for filtering repo announcements |
| @@ -201,11 +212,9 @@ pub struct SyncManager { | |||
| 201 | service_domain: String, | 212 | service_domain: String, |
| 202 | 213 | ||
| 203 | /// Database for querying/storing events | 214 | /// Database for querying/storing events |
| 204 | #[allow(dead_code)] | ||
| 205 | database: SharedDatabase, | 215 | database: SharedDatabase, |
| 206 | 216 | ||
| 207 | /// Write policy for applying acceptance rules | 217 | /// Write policy for applying acceptance rules |
| 208 | #[allow(dead_code)] | ||
| 209 | write_policy: Nip34WritePolicy, | 218 | write_policy: Nip34WritePolicy, |
| 210 | 219 | ||
| 211 | /// Repository root events we're following (Phase 1 data structure) | 220 | /// Repository root events we're following (Phase 1 data structure) |
| @@ -219,6 +228,9 @@ pub struct SyncManager { | |||
| 219 | /// Max backoff duration for relay reconnection | 228 | /// Max backoff duration for relay reconnection |
| 220 | #[allow(dead_code)] | 229 | #[allow(dead_code)] |
| 221 | max_backoff_secs: u64, | 230 | max_backoff_secs: u64, |
| 231 | |||
| 232 | /// Socket address used for sync source (for write policy) | ||
| 233 | sync_source_addr: SocketAddr, | ||
| 222 | } | 234 | } |
| 223 | 235 | ||
| 224 | impl SyncManager { | 236 | impl SyncManager { |
| @@ -238,6 +250,10 @@ impl SyncManager { | |||
| 238 | write_policy: Nip34WritePolicy, | 250 | write_policy: Nip34WritePolicy, |
| 239 | config: &Config, | 251 | config: &Config, |
| 240 | ) -> Self { | 252 | ) -> Self { |
| 253 | // Create a synthetic SocketAddr for sync source identification | ||
| 254 | // This is used when calling write_policy.admit_event() for synced events | ||
| 255 | let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); | ||
| 256 | |||
| 241 | Self { | 257 | Self { |
| 242 | bootstrap_relay_url, | 258 | bootstrap_relay_url, |
| 243 | service_domain, | 259 | service_domain, |
| @@ -246,6 +262,7 @@ impl SyncManager { | |||
| 246 | following_repo_root_events: new_following_repo_root_events(), | 262 | following_repo_root_events: new_following_repo_root_events(), |
| 247 | sync_relays: new_sync_relays(), | 263 | sync_relays: new_sync_relays(), |
| 248 | max_backoff_secs: config.sync_max_backoff_secs, | 264 | max_backoff_secs: config.sync_max_backoff_secs, |
| 265 | sync_source_addr, | ||
| 249 | } | 266 | } |
| 250 | } | 267 | } |
| 251 | 268 | ||
| @@ -460,14 +477,12 @@ impl SyncManager { | |||
| 460 | /// }); | 477 | /// }); |
| 461 | /// ``` | 478 | /// ``` |
| 462 | /// | 479 | /// |
| 463 | /// ## Current Status | 480 | /// ## Implementation Status |
| 464 | /// | 481 | /// |
| 465 | /// This is a stub that logs and then waits indefinitely. | 482 | /// - Phase 2: Layer 1 sync from bootstrap relay ✓ |
| 466 | /// Full implementation includes: | 483 | /// - Phase 3: Self-subscription and relay discovery ✓ |
| 467 | /// - Phase 2: Database initialization queries ✓ | 484 | /// - Phase 4-6: Filter building, connection management (TODO) |
| 468 | /// - Phase 3: Self-subscription for incremental updates | 485 | /// - Phase 7: Full sync loop (TODO) |
| 469 | /// - Phase 4-6: Filter building, connection management | ||
| 470 | /// - Phase 7: Full sync loop | ||
| 471 | pub async fn run(self) { | 486 | pub async fn run(self) { |
| 472 | tracing::info!( | 487 | tracing::info!( |
| 473 | "SyncManager starting (bootstrap_relay={:?}, domain={})", | 488 | "SyncManager starting (bootstrap_relay={:?}, domain={})", |
| @@ -475,27 +490,285 @@ impl SyncManager { | |||
| 475 | self.service_domain | 490 | self.service_domain |
| 476 | ); | 491 | ); |
| 477 | 492 | ||
| 478 | // Phase 2: Initialize from database | 493 | // Phase 3: Initialize state from database BEFORE spawning connections |
| 479 | if let Err(e) = self.initialize_from_database().await { | 494 | if let Err(e) = self.initialize_from_database().await { |
| 480 | tracing::error!("Failed to initialize sync state from database: {}", e); | 495 | tracing::error!("Failed to initialize from database: {}", e); |
| 481 | // Continue anyway - we can still receive events via self-subscription | 496 | // Continue anyway - we can still sync from bootstrap |
| 482 | } | 497 | } |
| 483 | 498 | ||
| 484 | // Log initialization results | 499 | // Create channel for relay actions from self-subscriber |
| 485 | { | 500 | let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100); |
| 486 | let following_count = self.following_repo_root_events.read().await.len(); | 501 | |
| 487 | let sync_relays_count = self.sync_relays.read().await.len(); | 502 | // Construct our own relay URL for self-subscription |
| 488 | tracing::info!( | 503 | let own_relay_url = format!("ws://{}", self.service_domain); |
| 489 | "Sync state initialized: {} repos tracked, {} sync relays", | 504 | |
| 490 | following_count, | 505 | // Spawn self-subscriber task |
| 491 | sync_relays_count | 506 | let self_subscriber = SelfSubscriber::new( |
| 492 | ); | 507 | own_relay_url.clone(), |
| 508 | self.service_domain.clone(), | ||
| 509 | Arc::clone(&self.following_repo_root_events), | ||
| 510 | Arc::clone(&self.sync_relays), | ||
| 511 | action_tx, | ||
| 512 | ); | ||
| 513 | |||
| 514 | tokio::spawn(async move { | ||
| 515 | self_subscriber.run().await; | ||
| 516 | }); | ||
| 517 | |||
| 518 | tracing::info!("SelfSubscriber spawned for {}", own_relay_url); | ||
| 519 | |||
| 520 | // Track active relay connections (relay_url -> event_sender) | ||
| 521 | let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new(); | ||
| 522 | |||
| 523 | // Phase 2: Connect to bootstrap relay if configured | ||
| 524 | if let Some(ref bootstrap_url) = self.bootstrap_relay_url { | ||
| 525 | if let Some(event_tx) = self | ||
| 526 | .spawn_relay_connection(bootstrap_url.clone(), None) | ||
| 527 | .await | ||
| 528 | { | ||
| 529 | active_relays.insert(bootstrap_url.clone(), event_tx); | ||
| 530 | } | ||
| 493 | } | 531 | } |
| 494 | 532 | ||
| 495 | // Stub: wait indefinitely until full implementation (Phases 3-7) | 533 | // Main coordination loop |
| 496 | // This prevents the spawned task from immediately completing | ||
| 497 | loop { | 534 | loop { |
| 498 | tokio::time::sleep(std::time::Duration::from_secs(3600)).await; | 535 | tokio::select! { |
| 536 | // Handle relay actions from self-subscriber | ||
| 537 | action = action_rx.recv() => { | ||
| 538 | match action { | ||
| 539 | Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => { | ||
| 540 | tracing::info!("Spawning new relay connection to {}", relay_url); | ||
| 541 | if !active_relays.contains_key(&relay_url) { | ||
| 542 | if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await { | ||
| 543 | active_relays.insert(relay_url, event_tx); | ||
| 544 | } | ||
| 545 | } | ||
| 546 | } | ||
| 547 | Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => { | ||
| 548 | tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len()); | ||
| 549 | // TODO: Implement filter updates for existing connections | ||
| 550 | } | ||
| 551 | None => { | ||
| 552 | tracing::info!("Action channel closed, continuing without self-subscriber"); | ||
| 553 | } | ||
| 554 | } | ||
| 555 | } | ||
| 556 | // Sleep to prevent busy loop when no events | ||
| 557 | _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { | ||
| 558 | // Periodic maintenance could go here | ||
| 559 | } | ||
| 560 | } | ||
| 561 | } | ||
| 562 | } | ||
| 563 | |||
| 564 | /// Spawn a relay connection with optional Layer 2 filters. | ||
| 565 | /// | ||
| 566 | /// Returns the event sender channel if successfully spawned. | ||
| 567 | async fn spawn_relay_connection( | ||
| 568 | &self, | ||
| 569 | relay_url: String, | ||
| 570 | repos: Option<HashMap<String, HashSet<EventId>>>, | ||
| 571 | ) -> Option<mpsc::Sender<RelayEvent>> { | ||
| 572 | // Create channel for receiving events | ||
| 573 | let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100); | ||
| 574 | |||
| 575 | // Create connection | ||
| 576 | let connection = RelayConnection::new(relay_url.clone()); | ||
| 577 | |||
| 578 | // Determine if this is bootstrap (no repos) or discovered relay (with repos) | ||
| 579 | let is_bootstrap = repos.is_none(); | ||
| 580 | |||
| 581 | match connection.connect_and_subscribe().await { | ||
| 582 | Ok(()) => { | ||
| 583 | if is_bootstrap { | ||
| 584 | tracing::info!("Bootstrap relay connection established: {}", relay_url); | ||
| 585 | } else { | ||
| 586 | tracing::info!( | ||
| 587 | "Discovered relay connection established: {} (with Layer 2 filters)", | ||
| 588 | relay_url | ||
| 589 | ); | ||
| 590 | |||
| 591 | // Add Layer 2 subscription for repo events | ||
| 592 | if let Some(ref repos) = repos { | ||
| 593 | if let Err(e) = self.add_layer2_subscription(&connection, repos).await { | ||
| 594 | tracing::warn!("Failed to add Layer 2 subscription: {}", e); | ||
| 595 | } | ||
| 596 | } | ||
| 597 | } | ||
| 598 | |||
| 599 | // Clone refs needed for event processing task | ||
| 600 | let database = Arc::clone(&self.database); | ||
| 601 | let write_policy = self.write_policy.clone(); | ||
| 602 | let sync_source_addr = self.sync_source_addr; | ||
| 603 | |||
| 604 | // Clone event_tx for the spawned task | ||
| 605 | let event_tx_clone = event_tx.clone(); | ||
| 606 | |||
| 607 | // Spawn event loop task | ||
| 608 | let conn_url = relay_url.clone(); | ||
| 609 | tokio::spawn(async move { | ||
| 610 | connection.run_event_loop(event_tx_clone).await; | ||
| 611 | }); | ||
| 612 | |||
| 613 | // Spawn event processing task | ||
| 614 | tokio::spawn(async move { | ||
| 615 | Self::process_relay_events( | ||
| 616 | event_rx, | ||
| 617 | database, | ||
| 618 | write_policy, | ||
| 619 | sync_source_addr, | ||
| 620 | conn_url, | ||
| 621 | ) | ||
| 622 | .await; | ||
| 623 | }); | ||
| 624 | |||
| 625 | Some(event_tx) | ||
| 626 | } | ||
| 627 | Err(e) => { | ||
| 628 | tracing::error!("Failed to connect to relay {}: {}", relay_url, e); | ||
| 629 | None | ||
| 630 | } | ||
| 631 | } | ||
| 632 | } | ||
| 633 | |||
| 634 | /// Add Layer 2 subscription for repo-related events. | ||
| 635 | /// | ||
| 636 | /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track. | ||
| 637 | async fn add_layer2_subscription( | ||
| 638 | &self, | ||
| 639 | connection: &RelayConnection, | ||
| 640 | repos: &HashMap<String, HashSet<EventId>>, | ||
| 641 | ) -> Result<(), String> { | ||
| 642 | if repos.is_empty() { | ||
| 643 | return Ok(()); | ||
| 644 | } | ||
| 645 | |||
| 646 | // Build repo refs list for filter | ||
| 647 | let repo_refs: Vec<String> = repos.keys().cloned().collect(); | ||
| 648 | |||
| 649 | tracing::debug!( | ||
| 650 | "Adding Layer 2 subscription for {} repos to {}", | ||
| 651 | repo_refs.len(), | ||
| 652 | connection.url() | ||
| 653 | ); | ||
| 654 | |||
| 655 | // Chunk repo_refs into groups of 100 (per plan) | ||
| 656 | for chunk in repo_refs.chunks(100) { | ||
| 657 | // Build filter with lowercase 'a' tag for each repo ref | ||
| 658 | let mut filter = Filter::new().kinds([ | ||
| 659 | Kind::GitPatch, // 1617 | ||
| 660 | Kind::Custom(1618), // PR | ||
| 661 | Kind::Custom(1619), // PR update | ||
| 662 | Kind::GitIssue, // 1621 | ||
| 663 | ]); | ||
| 664 | |||
| 665 | // Add each repo ref as a custom tag filter | ||
| 666 | for repo_ref in chunk { | ||
| 667 | filter = | ||
| 668 | filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone()); | ||
| 669 | } | ||
| 670 | |||
| 671 | // Subscribe to this filter | ||
| 672 | if let Err(e) = connection.subscribe_filter(filter).await { | ||
| 673 | return Err(format!("Failed to subscribe with Layer 2 filter: {}", e)); | ||
| 674 | } | ||
| 675 | } | ||
| 676 | |||
| 677 | Ok(()) | ||
| 678 | } | ||
| 679 | |||
| 680 | /// Process events from a single relay connection. | ||
| 681 | /// | ||
| 682 | /// This is a static method that runs in its own task. | ||
| 683 | async fn process_relay_events( | ||
| 684 | mut event_rx: mpsc::Receiver<RelayEvent>, | ||
| 685 | database: SharedDatabase, | ||
| 686 | write_policy: Nip34WritePolicy, | ||
| 687 | sync_source_addr: SocketAddr, | ||
| 688 | relay_url: String, | ||
| 689 | ) { | ||
| 690 | tracing::debug!("Starting event processing for relay: {}", relay_url); | ||
| 691 | |||
| 692 | while let Some(relay_event) = event_rx.recv().await { | ||
| 693 | match relay_event { | ||
| 694 | RelayEvent::Event(event) => { | ||
| 695 | Self::process_single_event_static( | ||
| 696 | &event, | ||
| 697 | &database, | ||
| 698 | &write_policy, | ||
| 699 | &sync_source_addr, | ||
| 700 | &relay_url, | ||
| 701 | ) | ||
| 702 | .await; | ||
| 703 | } | ||
| 704 | RelayEvent::EndOfStoredEvents => { | ||
| 705 | tracing::debug!("EOSE received from {}", relay_url); | ||
| 706 | } | ||
| 707 | RelayEvent::Closed(reason) => { | ||
| 708 | tracing::warn!("Connection to {} closed: {}", relay_url, reason); | ||
| 709 | break; | ||
| 710 | } | ||
| 711 | } | ||
| 712 | } | ||
| 713 | |||
| 714 | tracing::info!("Event processing ended for relay: {}", relay_url); | ||
| 715 | } | ||
| 716 | |||
| 717 | /// Process a single event (static version for use in spawned tasks). | ||
| 718 | async fn process_single_event_static( | ||
| 719 | event: &Event, | ||
| 720 | database: &SharedDatabase, | ||
| 721 | write_policy: &Nip34WritePolicy, | ||
| 722 | sync_source_addr: &SocketAddr, | ||
| 723 | relay_url: &str, | ||
| 724 | ) { | ||
| 725 | let event_id = event.id; | ||
| 726 | let kind = event.kind.as_u16(); | ||
| 727 | |||
| 728 | // Check if event already exists in database | ||
| 729 | match database.check_id(&event_id).await { | ||
| 730 | Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => { | ||
| 731 | tracing::trace!("Event {} already exists, skipping", event_id); | ||
| 732 | return; | ||
| 733 | } | ||
| 734 | Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing | ||
| 735 | Err(e) => { | ||
| 736 | tracing::warn!("Failed to check if event {} exists: {}", event_id, e); | ||
| 737 | } | ||
| 738 | } | ||
| 739 | |||
| 740 | // Pass through write policy | ||
| 741 | let policy_result = write_policy.admit_event(event, sync_source_addr).await; | ||
| 742 | |||
| 743 | match policy_result { | ||
| 744 | PolicyResult::Accept => match database.save_event(event).await { | ||
| 745 | Ok(SaveEventStatus::Success) => { | ||
| 746 | tracing::info!( | ||
| 747 | "Synced event {} (kind {}) from {}", | ||
| 748 | event_id, | ||
| 749 | kind, | ||
| 750 | relay_url | ||
| 751 | ); | ||
| 752 | } | ||
| 753 | Ok(_) => { | ||
| 754 | tracing::debug!( | ||
| 755 | "Event {} (kind {}) already stored or rejected by database", | ||
| 756 | event_id, | ||
| 757 | kind | ||
| 758 | ); | ||
| 759 | } | ||
| 760 | Err(e) => { | ||
| 761 | tracing::error!("Failed to save synced event {}: {}", event_id, e); | ||
| 762 | } | ||
| 763 | }, | ||
| 764 | PolicyResult::Reject(reason) => { | ||
| 765 | tracing::debug!( | ||
| 766 | "Rejected synced event {} (kind {}): {}", | ||
| 767 | event_id, | ||
| 768 | kind, | ||
| 769 | reason | ||
| 770 | ); | ||
| 771 | } | ||
| 499 | } | 772 | } |
| 500 | } | 773 | } |
| 501 | } | 774 | } |