upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs335
-rw-r--r--src/sync/relay_connection.rs185
-rw-r--r--src/sync/self_subscriber.rs497
3 files changed, 986 insertions, 31 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 71f91e2..9dec982 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -37,17 +37,26 @@
37//! for the complete design context. 37//! for the complete design context.
38 38
39use std::collections::{HashMap, HashSet}; 39use std::collections::{HashMap, HashSet};
40use std::net::SocketAddr;
40use std::sync::Arc; 41use std::sync::Arc;
41 42
42use nostr_relay_builder::prelude::{Event, Filter, Kind, TagKind}; 43use nostr_relay_builder::prelude::{
44 DatabaseEventStatus, Event, Filter, Kind, PolicyResult, SaveEventStatus, TagKind, WritePolicy,
45};
46use nostr_sdk::prelude::*;
43use nostr_sdk::EventId; 47use nostr_sdk::EventId;
44use tokio::sync::RwLock; 48use tokio::sync::{mpsc, RwLock};
45 49
46use crate::config::Config; 50use crate::config::Config;
47use crate::nostr::builder::Nip34WritePolicy; 51use crate::nostr::builder::Nip34WritePolicy;
48use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT}; 52use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
49use crate::nostr::SharedDatabase; 53use crate::nostr::SharedDatabase;
50 54
55mod relay_connection;
56mod self_subscriber;
57pub use relay_connection::{RelayConnection, RelayEvent};
58pub use self_subscriber::{RelayAction, SelfSubscriber};
59
51// ============================================================================= 60// =============================================================================
52// Type Aliases for Sync State 61// Type Aliases for Sync State
53// ============================================================================= 62// =============================================================================
@@ -176,7 +185,7 @@ pub fn new_sync_relays() -> SyncRelays {
176/// The SyncManager is responsible for: 185/// The SyncManager is responsible for:
177/// - Discovering relays from stored repository announcements 186/// - Discovering relays from stored repository announcements
178/// - Maintaining connections to sync relays 187/// - Maintaining connections to sync relays
179/// - Subscribing to events at external relays 188/// - Subscribing to events at external relays
180/// - Applying the acceptance policy to synced events 189/// - Applying the acceptance policy to synced events
181/// 190///
182/// ## Lifecycle 191/// ## Lifecycle
@@ -186,14 +195,16 @@ pub fn new_sync_relays() -> SyncRelays {
186/// 195///
187/// ## Current Status 196/// ## Current Status
188/// 197///
189/// This is a stub implementation. The core data structures are: 198/// Phase 2 implementation supports:
199/// - Layer 1 sync: Bootstrap relay connection with 30617/30618 filter
200/// - Event processing through write policy
201/// - Storage of accepted events
202///
203/// Core data structures:
190/// - [`FollowingRepoRootEvents`]: Repository root events we're following 204/// - [`FollowingRepoRootEvents`]: Repository root events we're following
191/// - [`SyncRelays`]: Relays we sync from with their repos and events 205/// - [`SyncRelays`]: Relays we sync from with their repos and events
192///
193/// Full implementation will come in later phases.
194pub struct SyncManager { 206pub struct SyncManager {
195 /// Bootstrap relay URL if configured 207 /// Bootstrap relay URL if configured
196 #[allow(dead_code)]
197 bootstrap_relay_url: Option<String>, 208 bootstrap_relay_url: Option<String>,
198 209
199 /// Our service domain for filtering repo announcements 210 /// Our service domain for filtering repo announcements
@@ -201,11 +212,9 @@ pub struct SyncManager {
201 service_domain: String, 212 service_domain: String,
202 213
203 /// Database for querying/storing events 214 /// Database for querying/storing events
204 #[allow(dead_code)]
205 database: SharedDatabase, 215 database: SharedDatabase,
206 216
207 /// Write policy for applying acceptance rules 217 /// Write policy for applying acceptance rules
208 #[allow(dead_code)]
209 write_policy: Nip34WritePolicy, 218 write_policy: Nip34WritePolicy,
210 219
211 /// Repository root events we're following (Phase 1 data structure) 220 /// Repository root events we're following (Phase 1 data structure)
@@ -219,6 +228,9 @@ pub struct SyncManager {
219 /// Max backoff duration for relay reconnection 228 /// Max backoff duration for relay reconnection
220 #[allow(dead_code)] 229 #[allow(dead_code)]
221 max_backoff_secs: u64, 230 max_backoff_secs: u64,
231
232 /// Socket address used for sync source (for write policy)
233 sync_source_addr: SocketAddr,
222} 234}
223 235
224impl SyncManager { 236impl SyncManager {
@@ -238,6 +250,10 @@ impl SyncManager {
238 write_policy: Nip34WritePolicy, 250 write_policy: Nip34WritePolicy,
239 config: &Config, 251 config: &Config,
240 ) -> Self { 252 ) -> Self {
253 // Create a synthetic SocketAddr for sync source identification
254 // This is used when calling write_policy.admit_event() for synced events
255 let sync_source_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
256
241 Self { 257 Self {
242 bootstrap_relay_url, 258 bootstrap_relay_url,
243 service_domain, 259 service_domain,
@@ -246,6 +262,7 @@ impl SyncManager {
246 following_repo_root_events: new_following_repo_root_events(), 262 following_repo_root_events: new_following_repo_root_events(),
247 sync_relays: new_sync_relays(), 263 sync_relays: new_sync_relays(),
248 max_backoff_secs: config.sync_max_backoff_secs, 264 max_backoff_secs: config.sync_max_backoff_secs,
265 sync_source_addr,
249 } 266 }
250 } 267 }
251 268
@@ -460,14 +477,12 @@ impl SyncManager {
460 /// }); 477 /// });
461 /// ``` 478 /// ```
462 /// 479 ///
463 /// ## Current Status 480 /// ## Implementation Status
464 /// 481 ///
465 /// This is a stub that logs and then waits indefinitely. 482 /// - Phase 2: Layer 1 sync from bootstrap relay ✓
466 /// Full implementation includes: 483 /// - Phase 3: Self-subscription and relay discovery ✓
467 /// - Phase 2: Database initialization queries ✓ 484 /// - Phase 4-6: Filter building, connection management (TODO)
468 /// - Phase 3: Self-subscription for incremental updates 485 /// - Phase 7: Full sync loop (TODO)
469 /// - Phase 4-6: Filter building, connection management
470 /// - Phase 7: Full sync loop
471 pub async fn run(self) { 486 pub async fn run(self) {
472 tracing::info!( 487 tracing::info!(
473 "SyncManager starting (bootstrap_relay={:?}, domain={})", 488 "SyncManager starting (bootstrap_relay={:?}, domain={})",
@@ -475,27 +490,285 @@ impl SyncManager {
475 self.service_domain 490 self.service_domain
476 ); 491 );
477 492
478 // Phase 2: Initialize from database 493 // Phase 3: Initialize state from database BEFORE spawning connections
479 if let Err(e) = self.initialize_from_database().await { 494 if let Err(e) = self.initialize_from_database().await {
480 tracing::error!("Failed to initialize sync state from database: {}", e); 495 tracing::error!("Failed to initialize from database: {}", e);
481 // Continue anyway - we can still receive events via self-subscription 496 // Continue anyway - we can still sync from bootstrap
482 } 497 }
483 498
484 // Log initialization results 499 // Create channel for relay actions from self-subscriber
485 { 500 let (action_tx, mut action_rx) = mpsc::channel::<RelayAction>(100);
486 let following_count = self.following_repo_root_events.read().await.len(); 501
487 let sync_relays_count = self.sync_relays.read().await.len(); 502 // Construct our own relay URL for self-subscription
488 tracing::info!( 503 let own_relay_url = format!("ws://{}", self.service_domain);
489 "Sync state initialized: {} repos tracked, {} sync relays", 504
490 following_count, 505 // Spawn self-subscriber task
491 sync_relays_count 506 let self_subscriber = SelfSubscriber::new(
492 ); 507 own_relay_url.clone(),
508 self.service_domain.clone(),
509 Arc::clone(&self.following_repo_root_events),
510 Arc::clone(&self.sync_relays),
511 action_tx,
512 );
513
514 tokio::spawn(async move {
515 self_subscriber.run().await;
516 });
517
518 tracing::info!("SelfSubscriber spawned for {}", own_relay_url);
519
520 // Track active relay connections (relay_url -> event_sender)
521 let mut active_relays: HashMap<String, mpsc::Sender<RelayEvent>> = HashMap::new();
522
523 // Phase 2: Connect to bootstrap relay if configured
524 if let Some(ref bootstrap_url) = self.bootstrap_relay_url {
525 if let Some(event_tx) = self
526 .spawn_relay_connection(bootstrap_url.clone(), None)
527 .await
528 {
529 active_relays.insert(bootstrap_url.clone(), event_tx);
530 }
493 } 531 }
494 532
495 // Stub: wait indefinitely until full implementation (Phases 3-7) 533 // Main coordination loop
496 // This prevents the spawned task from immediately completing
497 loop { 534 loop {
498 tokio::time::sleep(std::time::Duration::from_secs(3600)).await; 535 tokio::select! {
536 // Handle relay actions from self-subscriber
537 action = action_rx.recv() => {
538 match action {
539 Some(RelayAction::SpawnRelay { relay_url, repos_and_root_events }) => {
540 tracing::info!("Spawning new relay connection to {}", relay_url);
541 if !active_relays.contains_key(&relay_url) {
542 if let Some(event_tx) = self.spawn_relay_connection(relay_url.clone(), Some(repos)).await {
543 active_relays.insert(relay_url, event_tx);
544 }
545 }
546 }
547 Some(RelayAction::AddFilters { relay_url, repos_and_new_root_event }) => {
548 tracing::debug!("AddFilters for {} - {} repos (not yet implemented)", relay_url, repos.len());
549 // TODO: Implement filter updates for existing connections
550 }
551 None => {
552 tracing::info!("Action channel closed, continuing without self-subscriber");
553 }
554 }
555 }
556 // Sleep to prevent busy loop when no events
557 _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
558 // Periodic maintenance could go here
559 }
560 }
561 }
562 }
563
564 /// Spawn a relay connection with optional Layer 2 filters.
565 ///
566 /// Returns the event sender channel if successfully spawned.
567 async fn spawn_relay_connection(
568 &self,
569 relay_url: String,
570 repos: Option<HashMap<String, HashSet<EventId>>>,
571 ) -> Option<mpsc::Sender<RelayEvent>> {
572 // Create channel for receiving events
573 let (event_tx, event_rx) = mpsc::channel::<RelayEvent>(100);
574
575 // Create connection
576 let connection = RelayConnection::new(relay_url.clone());
577
578 // Determine if this is bootstrap (no repos) or discovered relay (with repos)
579 let is_bootstrap = repos.is_none();
580
581 match connection.connect_and_subscribe().await {
582 Ok(()) => {
583 if is_bootstrap {
584 tracing::info!("Bootstrap relay connection established: {}", relay_url);
585 } else {
586 tracing::info!(
587 "Discovered relay connection established: {} (with Layer 2 filters)",
588 relay_url
589 );
590
591 // Add Layer 2 subscription for repo events
592 if let Some(ref repos) = repos {
593 if let Err(e) = self.add_layer2_subscription(&connection, repos).await {
594 tracing::warn!("Failed to add Layer 2 subscription: {}", e);
595 }
596 }
597 }
598
599 // Clone refs needed for event processing task
600 let database = Arc::clone(&self.database);
601 let write_policy = self.write_policy.clone();
602 let sync_source_addr = self.sync_source_addr;
603
604 // Clone event_tx for the spawned task
605 let event_tx_clone = event_tx.clone();
606
607 // Spawn event loop task
608 let conn_url = relay_url.clone();
609 tokio::spawn(async move {
610 connection.run_event_loop(event_tx_clone).await;
611 });
612
613 // Spawn event processing task
614 tokio::spawn(async move {
615 Self::process_relay_events(
616 event_rx,
617 database,
618 write_policy,
619 sync_source_addr,
620 conn_url,
621 )
622 .await;
623 });
624
625 Some(event_tx)
626 }
627 Err(e) => {
628 tracing::error!("Failed to connect to relay {}: {}", relay_url, e);
629 None
630 }
631 }
632 }
633
634 /// Add Layer 2 subscription for repo-related events.
635 ///
636 /// Layer 2 filters subscribe to events with 'a' tags referencing repos we track.
637 async fn add_layer2_subscription(
638 &self,
639 connection: &RelayConnection,
640 repos: &HashMap<String, HashSet<EventId>>,
641 ) -> Result<(), String> {
642 if repos.is_empty() {
643 return Ok(());
644 }
645
646 // Build repo refs list for filter
647 let repo_refs: Vec<String> = repos.keys().cloned().collect();
648
649 tracing::debug!(
650 "Adding Layer 2 subscription for {} repos to {}",
651 repo_refs.len(),
652 connection.url()
653 );
654
655 // Chunk repo_refs into groups of 100 (per plan)
656 for chunk in repo_refs.chunks(100) {
657 // Build filter with lowercase 'a' tag for each repo ref
658 let mut filter = Filter::new().kinds([
659 Kind::GitPatch, // 1617
660 Kind::Custom(1618), // PR
661 Kind::Custom(1619), // PR update
662 Kind::GitIssue, // 1621
663 ]);
664
665 // Add each repo ref as a custom tag filter
666 for repo_ref in chunk {
667 filter =
668 filter.custom_tag(SingleLetterTag::lowercase(Alphabet::A), repo_ref.clone());
669 }
670
671 // Subscribe to this filter
672 if let Err(e) = connection.subscribe_filter(filter).await {
673 return Err(format!("Failed to subscribe with Layer 2 filter: {}", e));
674 }
675 }
676
677 Ok(())
678 }
679
680 /// Process events from a single relay connection.
681 ///
682 /// This is a static method that runs in its own task.
683 async fn process_relay_events(
684 mut event_rx: mpsc::Receiver<RelayEvent>,
685 database: SharedDatabase,
686 write_policy: Nip34WritePolicy,
687 sync_source_addr: SocketAddr,
688 relay_url: String,
689 ) {
690 tracing::debug!("Starting event processing for relay: {}", relay_url);
691
692 while let Some(relay_event) = event_rx.recv().await {
693 match relay_event {
694 RelayEvent::Event(event) => {
695 Self::process_single_event_static(
696 &event,
697 &database,
698 &write_policy,
699 &sync_source_addr,
700 &relay_url,
701 )
702 .await;
703 }
704 RelayEvent::EndOfStoredEvents => {
705 tracing::debug!("EOSE received from {}", relay_url);
706 }
707 RelayEvent::Closed(reason) => {
708 tracing::warn!("Connection to {} closed: {}", relay_url, reason);
709 break;
710 }
711 }
712 }
713
714 tracing::info!("Event processing ended for relay: {}", relay_url);
715 }
716
717 /// Process a single event (static version for use in spawned tasks).
718 async fn process_single_event_static(
719 event: &Event,
720 database: &SharedDatabase,
721 write_policy: &Nip34WritePolicy,
722 sync_source_addr: &SocketAddr,
723 relay_url: &str,
724 ) {
725 let event_id = event.id;
726 let kind = event.kind.as_u16();
727
728 // Check if event already exists in database
729 match database.check_id(&event_id).await {
730 Ok(DatabaseEventStatus::Saved) | Ok(DatabaseEventStatus::Deleted) => {
731 tracing::trace!("Event {} already exists, skipping", event_id);
732 return;
733 }
734 Ok(DatabaseEventStatus::NotExistent) => {} // Continue processing
735 Err(e) => {
736 tracing::warn!("Failed to check if event {} exists: {}", event_id, e);
737 }
738 }
739
740 // Pass through write policy
741 let policy_result = write_policy.admit_event(event, sync_source_addr).await;
742
743 match policy_result {
744 PolicyResult::Accept => match database.save_event(event).await {
745 Ok(SaveEventStatus::Success) => {
746 tracing::info!(
747 "Synced event {} (kind {}) from {}",
748 event_id,
749 kind,
750 relay_url
751 );
752 }
753 Ok(_) => {
754 tracing::debug!(
755 "Event {} (kind {}) already stored or rejected by database",
756 event_id,
757 kind
758 );
759 }
760 Err(e) => {
761 tracing::error!("Failed to save synced event {}: {}", event_id, e);
762 }
763 },
764 PolicyResult::Reject(reason) => {
765 tracing::debug!(
766 "Rejected synced event {} (kind {}): {}",
767 event_id,
768 kind,
769 reason
770 );
771 }
499 } 772 }
500 } 773 }
501} 774}
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
new file mode 100644
index 0000000..71b5d51
--- /dev/null
+++ b/src/sync/relay_connection.rs
@@ -0,0 +1,185 @@
1//! Relay Connection for Proactive Sync
2//!
3//! This module handles connecting to external relays and receiving events
4//! for the proactive sync system.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE};
12
13/// Events received from a relay connection
14#[derive(Debug)]
15pub enum RelayEvent {
16 /// A nostr event was received
17 Event(Event),
18 /// End of stored events (EOSE) received
19 EndOfStoredEvents,
20 /// Connection was closed
21 Closed(String),
22}
23
24/// Connection to an external relay for syncing events.
25///
26/// RelayConnection handles:
27/// - Connecting to the relay
28/// - Subscribing with appropriate filters (Layer 1 for bootstrap)
29/// - Receiving events and sending them through a channel
30pub struct RelayConnection {
31 /// The relay URL
32 url: String,
33 /// The nostr-sdk client
34 client: Client,
35}
36
37impl RelayConnection {
38 /// Create a new relay connection.
39 ///
40 /// # Arguments
41 ///
42 /// * `url` - The WebSocket URL of the relay to connect to
43 pub fn new(url: String) -> Self {
44 // Create a client with generated keys (we're just subscribing, not publishing)
45 let keys = Keys::generate();
46 let client = Client::new(keys);
47
48 Self { url, client }
49 }
50
51 /// Connect to the relay and subscribe with Layer 1 filter.
52 ///
53 /// Layer 1 filter syncs announcement events (30617, 30618) which are
54 /// the foundation for discovering repository relationships.
55 ///
56 /// Returns the notification stream for receiving events.
57 pub async fn connect_and_subscribe(&self) -> Result<(), String> {
58 // Add the relay
59 self.client
60 .add_relay(&self.url)
61 .await
62 .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?;
63
64 // Connect to relay
65 self.client.connect().await;
66
67 // Wait for connection to establish
68 let mut connected = false;
69 for _ in 0..30 {
70 tokio::time::sleep(Duration::from_millis(100)).await;
71 let relays = self.client.relays().await;
72 if relays.values().any(|r| r.is_connected()) {
73 connected = true;
74 break;
75 }
76 }
77
78 if !connected {
79 return Err(format!(
80 "Failed to connect to relay {} after 3 seconds",
81 self.url
82 ));
83 }
84
85 tracing::info!("Connected to bootstrap relay: {}", self.url);
86
87 // Layer 1 filter: Repository announcements and state events
88 // These are addressable events that define repositories
89 let filter = Filter::new().kinds([
90 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617
91 Kind::Custom(KIND_REPOSITORY_STATE), // 30618
92 ]);
93
94 // Subscribe to the filter
95 self.client
96 .subscribe(filter, None)
97 .await
98 .map_err(|e| format!("Failed to subscribe: {}", e))?;
99
100 tracing::debug!(
101 "Subscribed to Layer 1 events (kinds 30617, 30618) from {}",
102 self.url
103 );
104
105 Ok(())
106 }
107
108 /// Run the event loop, sending received events through the channel.
109 ///
110 /// This method runs until the connection is closed or an error occurs.
111 ///
112 /// # Arguments
113 ///
114 /// * `event_sender` - Channel to send received events
115 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
116 tracing::debug!("Starting event loop for relay: {}", self.url);
117
118 // Handle notifications
119 self.client
120 .handle_notifications(|notification| async {
121 match notification {
122 RelayPoolNotification::Event { event, .. } => {
123 tracing::debug!(
124 "Received event {} (kind {}) from {}",
125 event.id,
126 event.kind.as_u16(),
127 self.url
128 );
129 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
130 tracing::warn!("Event channel closed, stopping relay connection");
131 return Ok(true); // Stop handling
132 }
133 }
134 RelayPoolNotification::Message { message, .. } => {
135 if let RelayMessage::EndOfStoredEvents(_) = message {
136 tracing::debug!("EOSE received from {}", self.url);
137 if event_sender
138 .send(RelayEvent::EndOfStoredEvents)
139 .await
140 .is_err()
141 {
142 return Ok(true); // Stop handling
143 }
144 }
145 }
146 RelayPoolNotification::Shutdown => {
147 tracing::info!("Relay {} shutting down", self.url);
148 let _ = event_sender
149 .send(RelayEvent::Closed("Shutdown".to_string()))
150 .await;
151 return Ok(true); // Stop handling
152 }
153 }
154 Ok(false) // Continue handling
155 })
156 .await
157 .ok(); // Ignore errors on shutdown
158
159 // Disconnect when done
160 self.client.disconnect().await;
161 tracing::info!("Disconnected from relay: {}", self.url);
162 }
163
164 /// Get the relay URL
165 pub fn url(&self) -> &str {
166 &self.url
167 }
168
169 /// Subscribe to an additional filter.
170 ///
171 /// This is used to add Layer 2 filters for repo-related events after
172 /// the initial connection is established.
173 pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> {
174 self.client
175 .subscribe(filter, None)
176 .await
177 .map_err(|e| format!("Failed to subscribe with filter: {}", e))?;
178 Ok(())
179 }
180
181 /// Get a reference to the client for additional operations.
182 pub fn client(&self) -> &Client {
183 &self.client
184 }
185}
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
new file mode 100644
index 0000000..0512088
--- /dev/null
+++ b/src/sync/self_subscriber.rs
@@ -0,0 +1,497 @@
1//! Self-Subscriber for Proactive Sync
2//!
3//! This module handles subscribing to our own relay to detect new events
4//! and trigger relay discovery from announcements.
5
6use std::collections::{HashMap, HashSet};
7use std::time::Duration;
8
9use nostr_sdk::prelude::*;
10use tokio::sync::mpsc;
11use tokio::time::Instant;
12
13use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_ANNOUNCEMENT};
14
15use super::{FollowingRepoRootEvents, SyncManager, SyncRelays};
16
17// =============================================================================
18// Types
19// =============================================================================
20
21/// Actions to be taken by the SyncManager based on self-subscription events.
22#[derive(Debug, Clone)]
23pub enum RelayAction {
24 /// Spawn a new relay connection to sync from.
25 /// Contains: relay_url, map of repo_refs to their event IDs for Layer 2 filtering.
26 SpawnRelay {
27 relay_url: String,
28 repos_and_root_events: HashMap<String, HashSet<EventId>>,
29 },
30 /// Add filters to an existing relay connection.
31 /// Contains: relay_url, additional repos to add.
32 AddFilters {
33 relay_url: String,
34 repos_and_new_root_event: HashMap<String, HashSet<EventId>>,
35 },
36}
37
38/// Pending updates collected during batch window.
39#[derive(Debug, Default)]
40struct PendingUpdates {
41 /// New announcements (kind 30617) - triggers relay discovery
42 announcements: Vec<Event>,
43 /// New root events (kinds 1617, 1618, 1619, 1621) - updates following set
44 root_events: Vec<Event>,
45}
46
47// =============================================================================
48// SelfSubscriber
49// =============================================================================
50
51/// Subscribes to our own relay to detect new events.
52///
53/// The self-subscriber:
54/// 1. Connects to our own relay
55/// 2. Subscribes to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618)
56/// 3. When events arrive, batches them
57/// 4. On batch timer fire, processes updates and sends relay actions
58pub struct SelfSubscriber {
59 /// URL of our own relay to subscribe to
60 own_relay_url: String,
61 /// Our relay domain for checking if announcements list us
62 relay_domain: String,
63 /// Reference to following repo root events (shared with SyncManager)
64 following_repo_root_events: FollowingRepoRootEvents,
65 /// Reference to sync relays (shared with SyncManager)
66 sync_relays: SyncRelays,
67 /// Channel to send relay actions back to manager
68 action_tx: mpsc::Sender<RelayAction>,
69}
70
71impl SelfSubscriber {
72 /// Create a new self-subscriber.
73 pub fn new(
74 own_relay_url: String,
75 relay_domain: String,
76 following_repo_root_events: FollowingRepoRootEvents,
77 sync_relays: SyncRelays,
78 action_tx: mpsc::Sender<RelayAction>,
79 ) -> Self {
80 Self {
81 own_relay_url,
82 relay_domain,
83 following_repo_root_events,
84 sync_relays,
85 action_tx,
86 }
87 }
88
89 /// Get the batch window duration from environment variable.
90 ///
91 /// Default is 5 seconds, but can be overridden via NGIT_SYNC_BATCH_WINDOW_MS
92 /// for faster tests (typically 200ms).
93 fn get_batch_window() -> Duration {
94 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS")
95 .ok()
96 .and_then(|s| s.parse().ok())
97 .map(Duration::from_millis)
98 .unwrap_or(Duration::from_secs(5))
99 }
100
101 /// Run the self-subscriber event loop.
102 ///
103 /// This method:
104 /// 1. Connects to our own relay
105 /// 2. Subscribes to relevant event kinds
106 /// 3. Receives events and batches them
107 /// 4. On batch timer fire, processes and sends relay actions
108 pub async fn run(self) {
109 tracing::info!("SelfSubscriber starting for {}", self.own_relay_url);
110
111 // Create nostr-sdk client
112 let keys = Keys::generate();
113 let client = Client::new(keys);
114
115 // Connect to our own relay
116 if let Err(e) = client.add_relay(&self.own_relay_url).await {
117 tracing::error!("Failed to add own relay {}: {}", self.own_relay_url, e);
118 return;
119 }
120
121 client.connect().await;
122
123 // Wait for connection
124 let mut connected = false;
125 for _ in 0..30 {
126 tokio::time::sleep(Duration::from_millis(100)).await;
127 let relays = client.relays().await;
128 if relays.values().any(|r| r.is_connected()) {
129 connected = true;
130 break;
131 }
132 }
133
134 if !connected {
135 tracing::error!(
136 "Failed to connect to own relay {} after 3 seconds",
137 self.own_relay_url
138 );
139 return;
140 }
141
142 tracing::info!("SelfSubscriber connected to {}", self.own_relay_url);
143
144 // Subscribe to kinds 30617, 1617, 1618, 1619, 1621 (NOT 30618 per v2 design)
145 let filter = Filter::new()
146 .kinds([
147 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617
148 Kind::GitPatch, // 1617
149 Kind::Custom(KIND_PR), // 1618
150 Kind::Custom(KIND_PR_UPDATE), // 1619
151 Kind::GitIssue, // 1621
152 ])
153 .since(Timestamp::now());
154
155 if let Err(e) = client.subscribe(filter, None).await {
156 tracing::error!("Failed to subscribe to own relay: {}", e);
157 return;
158 }
159
160 tracing::info!("SelfSubscriber subscribed to event kinds on own relay");
161
162 // Batch state
163 let mut pending = PendingUpdates::default();
164 let mut batch_timer_started: Option<Instant> = None;
165 let batch_window = Self::get_batch_window();
166
167 // Main event loop using notifications stream
168 loop {
169 // Calculate timeout for batch processing
170 let timeout = if let Some(started) = batch_timer_started {
171 let elapsed = started.elapsed();
172 if elapsed >= batch_window {
173 Duration::ZERO
174 } else {
175 batch_window - elapsed
176 }
177 } else {
178 Duration::from_secs(60) // Long timeout when no batch pending
179 };
180
181 // Wait for notification with timeout
182 let notification = tokio::time::timeout(timeout, client.notifications().recv()).await;
183
184 match notification {
185 Ok(Ok(notification)) => {
186 match notification {
187 RelayPoolNotification::Event { event, .. } => {
188 let kind = event.kind.as_u16();
189
190 // Start batch timer on first event (does NOT reset)
191 if batch_timer_started.is_none() {
192 batch_timer_started = Some(Instant::now());
193 tracing::debug!("Batch timer started");
194 }
195
196 // Classify and add to pending
197 if kind == KIND_REPOSITORY_ANNOUNCEMENT {
198 tracing::debug!(
199 "SelfSubscriber received announcement {}",
200 event.id
201 );
202 pending.announcements.push(*event);
203 } else {
204 tracing::debug!(
205 "SelfSubscriber received root event {} (kind {})",
206 event.id,
207 kind
208 );
209 pending.root_events.push(*event);
210 }
211 }
212 RelayPoolNotification::Message { message, .. } => {
213 if let RelayMessage::EndOfStoredEvents(_) = message {
214 tracing::debug!("SelfSubscriber EOSE received");
215 // Process any pending events after EOSE
216 if !pending.announcements.is_empty()
217 || !pending.root_events.is_empty()
218 {
219 self.process_batch(&mut pending).await;
220 batch_timer_started = None;
221 }
222 }
223 }
224 RelayPoolNotification::Shutdown => {
225 tracing::info!("SelfSubscriber shutting down");
226 break;
227 }
228 }
229 }
230 Ok(Err(_)) => {
231 // Channel closed
232 tracing::warn!("SelfSubscriber notification channel closed");
233 break;
234 }
235 Err(_) => {
236 // Timeout - check if batch should be processed
237 if let Some(started) = batch_timer_started {
238 if started.elapsed() >= batch_window {
239 if !pending.announcements.is_empty() || !pending.root_events.is_empty()
240 {
241 self.process_batch(&mut pending).await;
242 }
243 batch_timer_started = None;
244 }
245 }
246 }
247 }
248 }
249
250 client.disconnect().await;
251 tracing::info!("SelfSubscriber disconnected");
252 }
253
254 /// Process a batch of pending updates.
255 async fn process_batch(&self, pending: &mut PendingUpdates) {
256 tracing::debug!(
257 "Processing batch: {} announcements, {} root events",
258 pending.announcements.len(),
259 pending.root_events.len()
260 );
261
262 // Process root events first (update following_repo_root_events)
263 for event in pending.root_events.drain(..) {
264 let repo_refs = SyncManager::extract_all_repo_refs(&event);
265 if !repo_refs.is_empty() {
266 let mut guard = self.following_repo_root_events.write().await;
267 for repo_ref in repo_refs {
268 guard.entry(repo_ref).or_default().insert(event.id);
269 }
270 }
271 }
272
273 // Process announcements (relay discovery)
274 for event in pending.announcements.drain(..) {
275 self.process_announcement(&event).await;
276 }
277 }
278
279 /// Process an announcement event for relay discovery.
280 async fn process_announcement(&self, event: &Event) {
281 let repo_ref = SyncManager::build_repo_ref(event);
282 let relay_urls = Self::extract_relay_urls_from_announcement(event);
283
284 // Check if this announcement lists our relay
285 if !self.lists_our_service(event) {
286 tracing::debug!(
287 "Announcement {} does not list our service, skipping relay discovery",
288 event.id
289 );
290 return;
291 }
292
293 tracing::info!(
294 "Processing announcement {} for repo {}, found {} relay URLs",
295 event.id,
296 repo_ref,
297 relay_urls.len()
298 );
299
300 // Get current events for this repo from following_repo_root_events
301 let events = self
302 .following_repo_root_events
303 .read()
304 .await
305 .get(&repo_ref)
306 .cloned()
307 .unwrap_or_default();
308
309 // For each relay URL in the announcement, check if we need to spawn or update
310 for relay_url in relay_urls {
311 if self.is_own_relay(&relay_url) {
312 continue; // Skip our own relay
313 }
314
315 let sync_relays_guard = self.sync_relays.read().await;
316 let exists = sync_relays_guard.contains_key(&relay_url);
317 drop(sync_relays_guard);
318
319 if exists {
320 // Relay already known - check if we need to add this repo
321 let mut guard = self.sync_relays.write().await;
322 let relay_repos = guard.entry(relay_url.clone()).or_default();
323 let is_new_repo = !relay_repos.contains_key(&repo_ref);
324
325 if is_new_repo {
326 relay_repos.insert(repo_ref.clone(), events.clone());
327 drop(guard);
328
329 // Send action to add filters
330 let mut repos_filters = HashMap::new();
331 repos_filters.insert(repo_ref.clone(), events.clone());
332
333 if let Err(e) = self
334 .action_tx
335 .send(RelayAction::AddFilters {
336 relay_url: relay_url.clone(),
337 repos_and_new_root_event: repos_filters,
338 })
339 .await
340 {
341 tracing::warn!("Failed to send AddFilters action: {}", e);
342 }
343 }
344 } else {
345 // New relay - add to sync_relays and spawn
346 let mut guard = self.sync_relays.write().await;
347 let mut repos = HashMap::new();
348 repos.insert(repo_ref.clone(), events.clone());
349 guard.insert(relay_url.clone(), repos.clone());
350 drop(guard);
351
352 tracing::info!("Discovered new relay to sync from: {}", relay_url);
353
354 // Send action to spawn relay
355 if let Err(e) = self
356 .action_tx
357 .send(RelayAction::SpawnRelay {
358 relay_url: relay_url.clone(),
359 repos_and_root_events: repos,
360 })
361 .await
362 {
363 tracing::warn!("Failed to send SpawnRelay action: {}", e);
364 }
365 }
366 }
367 }
368
369 /// Extract relay URLs from an announcement event.
370 ///
371 /// Looks for both 'relays' and 'clone' tags.
372 fn extract_relay_urls_from_announcement(event: &Event) -> Vec<String> {
373 let mut urls = Vec::new();
374
375 // Extract from 'relays' tag
376 for tag in event.tags.iter() {
377 if matches!(tag.kind(), TagKind::Relays) {
378 let vec = tag.clone().to_vec();
379 urls.extend(vec.into_iter().skip(1)); // Skip tag name
380 }
381 }
382
383 // Extract from 'clone' tag - parse URLs to get relay hints
384 // Clone URLs look like: http://domain/repo.git or git://domain/repo.git
385 // We want to construct ws://domain from these
386 for tag in event.tags.iter() {
387 if matches!(tag.kind(), TagKind::Clone) {
388 let vec = tag.clone().to_vec();
389 for url in vec.into_iter().skip(1) {
390 if let Some(relay_url) = Self::clone_url_to_relay_url(&url) {
391 if !urls.contains(&relay_url) {
392 urls.push(relay_url);
393 }
394 }
395 }
396 }
397 }
398
399 urls
400 }
401
402 /// Convert a clone URL to a potential relay URL.
403 ///
404 /// E.g., "http://127.0.0.1:8080/repo.git" -> "ws://127.0.0.1:8080"
405 fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
406 // Parse the URL to extract host:port
407 if let Ok(url) = url::Url::parse(clone_url) {
408 let host = url.host_str()?;
409 let port = url.port();
410 let scheme = if url.scheme() == "https" { "wss" } else { "ws" };
411
412 if let Some(port) = port {
413 Some(format!("{}://{}:{}", scheme, host, port))
414 } else {
415 Some(format!("{}://{}", scheme, host))
416 }
417 } else {
418 None
419 }
420 }
421
422 /// Check if event lists our service in the relays or clone tags.
423 fn lists_our_service(&self, event: &Event) -> bool {
424 // Check relays tag
425 for tag in event.tags.iter() {
426 if matches!(tag.kind(), TagKind::Relays) {
427 let vec = tag.clone().to_vec();
428 for url in vec.into_iter().skip(1) {
429 if self.is_own_relay(&url) {
430 return true;
431 }
432 }
433 }
434 }
435
436 // Check clone tag
437 for tag in event.tags.iter() {
438 if matches!(tag.kind(), TagKind::Clone) {
439 let vec = tag.clone().to_vec();
440 for url in vec.into_iter().skip(1) {
441 if url.contains(&self.relay_domain) {
442 return true;
443 }
444 }
445 }
446 }
447
448 false
449 }
450
451 /// Check if a relay URL matches our relay.
452 fn is_own_relay(&self, relay_url: &str) -> bool {
453 relay_url.contains(&self.relay_domain)
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[test]
462 fn test_clone_url_to_relay_url_http() {
463 let url = "http://127.0.0.1:8080/repo.git";
464 let relay = SelfSubscriber::clone_url_to_relay_url(url);
465 assert_eq!(relay, Some("ws://127.0.0.1:8080".to_string()));
466 }
467
468 #[test]
469 fn test_clone_url_to_relay_url_https() {
470 let url = "https://example.com/repo.git";
471 let relay = SelfSubscriber::clone_url_to_relay_url(url);
472 assert_eq!(relay, Some("wss://example.com".to_string()));
473 }
474
475 #[test]
476 fn test_clone_url_to_relay_url_invalid() {
477 let url = "not-a-valid-url";
478 let relay = SelfSubscriber::clone_url_to_relay_url(url);
479 assert_eq!(relay, None);
480 }
481
482 #[test]
483 fn test_get_batch_window_default() {
484 // Clear env var if set
485 std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS");
486 let window = SelfSubscriber::get_batch_window();
487 assert_eq!(window, Duration::from_secs(5));
488 }
489
490 #[test]
491 fn test_get_batch_window_from_env() {
492 std::env::set_var("NGIT_SYNC_BATCH_WINDOW_MS", "200");
493 let window = SelfSubscriber::get_batch_window();
494 assert_eq!(window, Duration::from_millis(200));
495 std::env::remove_var("NGIT_SYNC_BATCH_WINDOW_MS");
496 }
497}