upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs335
1 files changed, 304 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 71f91e2..9dec982 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -37,17 +37,26 @@
37//! for the complete design context. 37//! for the complete design context.
38 38
39use std::collections::{HashMap, HashSet}; 39use std::collections::{HashMap, HashSet};
40use std::net::SocketAddr;
40use std::sync::Arc; 41use std::sync::Arc;
41 42
42use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; 43use nostr_relay_builder::prelude::{
44 DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy,
45};
46use nostr_sdk::prelude::*;
43use nostr_sdk::EventId; 47use nostr_sdk::EventId;
44use tokio::sync::RwLock; 48use tokio::sync::{mpsc, RwLock};
45 49
46use crate::config::Config; 50use crate::config::Config;
47use crate::nostr::builder::Nip34WritePolicy; 51use crate::nostr::builder::Nip34WritePolicy;
48use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; 52use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
49use crate::nostr::SharedDatabase; 53use crate::nostr::SharedDatabase;
50 54
55mod relay_connection;
56mod self_subscriber;
57pub use relay_connection::{RelayConnection, RelayEvent};
58pub use self_subscriber::{RelayAction, SelfSubscriber};
59
51// ============================================================================= 60// =============================================================================
52// Type Aliases for Sync State 61// Type Aliases for Sync State
53// ============================================================================= 62// =============================================================================
@@ -176,7 +185,7 @@ pub fn new_sync_relays() -> SyncRelays {
176/// The SyncManager is responsible for: 185/// The SyncManager is responsible for:
177/// - Discovering relays from stored repository announcements 186/// - Discovering relays from stored repository announcements
178/// - Maintaining connections to sync relays 187/// - Maintaining connections to sync relays
179/// - Subscribing to events at external relays 188/// - Subscribing to events at external relays
180/// - Applying the acceptance policy to synced events 189/// - Applying the acceptance policy to synced events
181/// 190///
182/// ## Lifecycle 191/// ## Lifecycle
@@ -186,14 +195,16 @@ pub fn new_sync_relays() -> SyncRelays {
186/// 195///
187/// ## Current Status 196/// ## Current Status
188/// 197///
189/// This is a stub implementation. The core data structures are: 198/// Phase 2 implementation supports:
199/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter
200/// - Event processing through write policy
201/// - Storage of accepted events
202///
203/// Core data structures:
190/// - [`FollowingRepoRootEvents`]: Repository root events we're following 204/// - [`FollowingRepoRootEvents`]: Repository root events we're following
191/// - [`SyncRelays`]: Relays we sync from with their repos and events 205/// - [`SyncRelays`]: Relays we sync from with their repos and events
192///
193/// Full implementation will come in later phases.
194pub struct SyncManager { 206pub struct SyncManager {
195 /// Bootstrap relay URL if configured 207 /// Bootstrap relay URL if configured
196 #[allow(dead_code)]
197 bootstrap_relay_url: Option<String>, 208 bootstrap_relay_url: Option<String>,
198 209
199 /// Our service domain for filtering repo announcements 210 /// Our service domain for filtering repo announcements
@@ -201,11 +212,9 @@ pub struct SyncManager {
201 service_domain: String, 212 service_domain: String,
202 213
203 /// Database for querying/storing events 214 /// Database for querying/storing events
204 #[allow(dead_code)]
205 database: SharedDatabase, 215 database: SharedDatabase,
206 216
207 /// Write policy for applying acceptance rules 217 /// Write policy for applying acceptance rules
208 #[allow(dead_code)]
209 write_policy: Nip34WritePolicy, 218 write_policy: Nip34WritePolicy,
210 219
211 /// Repository root events we're following (Phase 1 data structure) 220 /// Repository root events we're following (Phase 1 data structure)
@@ -219,6 +228,9 @@ pub struct SyncManager {
219 /// Max backoff duration for relay reconnection 228 /// Max backoff duration for relay reconnection
220 #[allow(dead_code)] 229 #[allow(dead_code)]
221 max_backoff_secs: u64, 230 max_backoff_secs: u64,
231
232 /// Socket address used for sync source (for write policy)
233 sync_source_addr: SocketAddr,
222} 234}
223 235
224impl SyncManager { 236impl SyncManager {
@@ -238,6 +250,10 @@ impl SyncManager {
238 write_policy: Nip34WritePolicy, 250 write_policy: Nip34WritePolicy,
239 config: &Config, 251 config: &Config,
240 ) -> Self { 252 ) -> Self {
253 // Create a synthetic SocketAddr for sync source identification
254 // This is used when calling write_policy.admit_event() for synced events
255 let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
256
241 Self { 257 Self {
242 bootstrap_relay_url, 258 bootstrap_relay_url,
243 service_domain, 259 service_domain,
@@ -246,6 +262,7 @@ impl SyncManager {
246 following_repo_root_events: new_following_repo_root_events(), 262 following_repo_root_events: new_following_repo_root_events(),
247 sync_relays: new_sync_relays(), 263 sync_relays: new_sync_relays(),
248 max_backoff_secs: config.sync_max_backoff_secs, 264 max_backoff_secs: config.sync_max_backoff_secs,
265 sync_source_addr,
249 } 266 }
250 } 267 }
251 268
@@ -460,14 +477,12 @@ impl SyncManager {
460 /// }); 477 /// });
461 /// ``` 478 /// ```
462 /// 479 ///
463 /// ## Current Status 480 /// ## Implementation Status
464 /// 481 ///
465 /// This is a stub that logs and then waits indefinitely. 482 /// - Phase 2: Layer 1 sync from bootstrap relay ✓
466 /// Full implementation includes: 483 /// - Phase 3: Self-subscription and relay discovery ✓
467 /// - Phase 2: Database initialization queries ✓ 484 /// - Phase 4-6: Filter building, connection management (TODO)
468 /// - Phase 3: Self-subscription for incremental updates 485 /// - Phase 7: Full sync loop (TODO)
469 /// - Phase 4-6: Filter building, connection management
470 /// - Phase 7: Full sync loop
471 pub async fn run(self) { 486 pub async fn run(self) {
472 tracing::info!( 487 tracing::info!(
473 "SyncManager starting (bootstrap_relay={:?}, domain={})", 488 "SyncManager starting (bootstrap_relay={:?}, domain={})",
@@ -475,27 +490,285 @@ impl SyncManager {
475 self.service_domain 490 self.service_domain
476 ); 491 );
477 492
478 // Phase 2: Initialize from database 493 // Phase 3: Initialize state from database BEFORE spawning connections
479 if let Err(e) = self.initialize_from_database().await { 494 if let Err(e) = self.initialize_from_database().await {
480 tracing::error!("Failed to initialize sync state from database: {}", e); 495 tracing::error!("Failed to initialize from database: {}", e);
481 // Continue anyway - we can still receive events via self-subscription 496 // Continue anyway - we can still sync from bootstrap
482 } 497 }
483 498
484 // Log initialization results 499 // Create channel for relay actions from self-subscriber
485 { 500 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
486 let following_count = self.following_repo_root_events.read().await.len(); 501
487 let sync_relays_count = self.sync_relays.read().await.len(); 502 // Construct our own relay URL for self-subscription
488 tracing::info!( 503 let own_relay_url = format!("ws://{}", self.service_domain);
489 "Sync state initialized: {} repos tracked, {} sync relays", 504
490 following_count, 505 // Spawn self-subscriber task
491 sync_relays_count 506 let self_subscriber = SelfSubscriber::new(
492 ); 507 own_relay_url.clone(),
508 self.service_domain.clone(),
509 Arc::clone(&self.following_repo_root_events),
510 Arc::clone(&self.sync_relays),
511 action_tx,
512 );
513
514 tokio::spawn(async move {
515 self_subscriber.run().await;
516 });
517
518 tracing::info!("SelfSubscriber spawned for {}", own_relay_url);
519
520 // Track active relay connections (relay_url -> event_sender)
521 let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new();
522
523 // Phase 2: Connect to bootstrap relay if configured
524 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
525 if let Some(event_tx) = self
526 .spawn_relay_connection(bootstrap_url.clone(), None)
527 .await
528 {
529 active_relays.insert(bootstrap_url.clone(), event_tx);
530 }
493 } 531 }
494 532
495 // Stub: wait indefinitely until full implementation (Phases 3-7) 533 // Main coordination loop
496 // This prevents the spawned task from immediately completing
497 loop { 534 loop {
498 tokio::time::sleep(std::time::Duration::from_secs(3600)).await; 535 tokio::select! {
536 // Handle relay actions from self-subscriber
537 action = action_rx.recv() => {
538 match action {
539 Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => {
540 tracing::info!("Spawning new relay connection to {}", relay_url);
541 if !active_relays.contains_key(&relay_url) {
542 if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await {
543 active_relays.insert(relay_url, event_tx);
544 }
545 }
546 }
547 Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => {
548 tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len());
549 // TODO: Implement filter updates for existing connections
550 }
551 None => {
552 tracing::info!("Action channel closed, continuing without self-subscriber");
553 }
554 }
555 }
556 // Sleep to prevent busy loop when no events
557 _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
558 // Periodic maintenance could go here
559 }
560 }
561 }
562 }
563
564 /// Spawn a relay connection with optional Layer 2 filters.
565 ///
566 /// Returns the event sender channel if successfully spawned.
567 async fn spawn_relay_connection(
568 &self,
569 relay_url: String,
570 repos: Option<HashMap<String, HashSet<EventId>>>,
571 ) -> Option<mpsc::Sender<RelayEvent>> {
572 // Create channel for receiving events
573 let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100);
574
575 // Create connection
576 let connection = RelayConnection::new(relay_url.clone());
577
578 // Determine if this is bootstrap (no repos) or discovered relay (with repos)
579 let is_bootstrap = repos.is_none();
580
581 match connection.connect_and_subscribe().await {
582 Ok(()) => {
583 if is_bootstrap {
584 tracing::info!("Bootstrap relay connection established: {}", relay_url);
585 } else {
586 tracing::info!(
587 "Discovered relay connection established: {} (with Layer 2 filters)",
588 relay_url
589 );
590
591 // Add Layer 2 subscription for repo events
592 if let Some(ref repos) = repos {
593 if let Err(e) = self.add_layer2_subscription(&connection, repos).await {
594 tracing::warn!("Failed to add Layer 2 subscription: {}", e);
595 }
596 }
597 }
598
599 // Clone refs needed for event processing task
600 let database = Arc::clone(&self.database);
601 let write_policy = self.write_policy.clone();
602 let sync_source_addr = self.sync_source_addr;
603
604 // Clone event_tx for the spawned task
605 let event_tx_clone = event_tx.clone();
606
607 // Spawn event loop task
608 let conn_url = relay_url.clone();
609 tokio::spawn(async move {
610 connection.run_event_loop(event_tx_clone).await;
611 });
612
613 // Spawn event processing task
614 tokio::spawn(async move {
615 Self::process_relay_events(
616 event_rx,
617 database,
618 write_policy,
619 sync_source_addr,
620 conn_url,
621 )
622 .await;
623 });
624
625 Some(event_tx)
626 }
627 Err(e) => {
628 tracing::error!("Failed to connect to relay {}: {}", relay_url, e);
629 None
630 }
631 }
632 }
633
634 /// Add Layer 2 subscription for repo-related events.
635 ///
636 /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track.
637 async fn add_layer2_subscription(
638 &self,
639 connection: &RelayConnection,
640 repos: &HashMap<String, HashSet<EventId>>,
641 ) -> Result<(), String> {
642 if repos.is_empty() {
643 return Ok(());
644 }
645
646 // Build repo refs list for filter
647 let repo_refs: Vec<String> = repos.keys().cloned().collect();
648
649 tracing::debug!(
650 "Adding Layer 2 subscription for {} repos to {}",
651 repo_refs.len(),
652 connection.url()
653 );
654
655 // Chunk repo_refs into groups of 100 (per plan)
656 for chunk in repo_refs.chunks(100) {
657 // Build filter with lowercase 'a' tag for each repo ref
658 let mut filter = Filter::new().kinds([
659 Kind::GitPatch, // 1617
660 Kind::Custom(1618), // PR
661 Kind::Custom(1619), // PR update
662 Kind::GitIssue, // 1621
663 ]);
664
665 // Add each repo ref as a custom tag filter
666 for repo_ref in chunk {
667 filter =
668 filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone());
669 }
670
671 // Subscribe to this filter
672 if let Err(e) = connection.subscribe_filter(filter).await {
673 return Err(format!("Failed to subscribe with Layer 2 filter: {}", e));
674 }
675 }
676
677 Ok(())
678 }
679
680 /// Process events from a single relay connection.
681 ///
682 /// This is a static method that runs in its own task.
683 async fn process_relay_events(
684 mut event_rx: mpsc::Receiver<RelayEvent>,
685 database: SharedDatabase,
686 write_policy: Nip34WritePolicy,
687 sync_source_addr: SocketAddr,
688 relay_url: String,
689 ) {
690 tracing::debug!("Starting event processing for relay: {}", relay_url);
691
692 while let Some(relay_event) = event_rx.recv().await {
693 match relay_event {
694 RelayEvent::Event(event) => {
695 Self::process_single_event_static(
696 &event,
697 &database,
698 &write_policy,
699 &sync_source_addr,
700 &relay_url,
701 )
702 .await;
703 }
704 RelayEvent::EndOfStoredEvents => {
705 tracing::debug!("EOSE received from {}", relay_url);
706 }
707 RelayEvent::Closed(reason) => {
708 tracing::warn!("Connection to {} closed: {}", relay_url, reason);
709 break;
710 }
711 }
712 }
713
714 tracing::info!("Event processing ended for relay: {}", relay_url);
715 }
716
717 /// Process a single event (static version for use in spawned tasks).
718 async fn process_single_event_static(
719 event: &Event,
720 database: &SharedDatabase,
721 write_policy: &Nip34WritePolicy,
722 sync_source_addr: &SocketAddr,
723 relay_url: &str,
724 ) {
725 let event_id = event.id;
726 let kind = event.kind.as_u16();
727
728 // Check if event already exists in database
729 match database.check_id(&event_id).await {
730 Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => {
731 tracing::trace!("Event {} already exists, skipping", event_id);
732 return;
733 }
734 Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing
735 Err(e) => {
736 tracing::warn!("Failed to check if event {} exists: {}", event_id, e);
737 }
738 }
739
740 // Pass through write policy
741 let policy_result = write_policy.admit_event(event, sync_source_addr).await;
742
743 match policy_result {
744 PolicyResult::Accept => match database.save_event(event).await {
745 Ok(SaveEventStatus::Success) => {
746 tracing::info!(
747 "Synced event {} (kind {}) from {}",
748 event_id,
749 kind,
750 relay_url
751 );
752 }
753 Ok(_) => {
754 tracing::debug!(
755 "Event {} (kind {}) already stored or rejected by database",
756 event_id,
757 kind
758 );
759 }
760 Err(e) => {
761 tracing::error!("Failed to save synced event {}: {}", event_id, e);
762 }
763 },
764 PolicyResult::Reject(reason) => {
765 tracing::debug!(
766 "Rejected synced event {} (kind {}): {}",
767 event_id,
768 kind,
769 reason
770 );
771 }
499 } 772 }
500 } 773 }
501} 774}