upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/relay_connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/relay_connection.rs')
-rw-r--r--src/sync/relay_connection.rs185
1 files changed, 185 insertions, 0 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
new file mode 100644
index 0000000..71b5d51
--- /dev/null
+++ b/src/sync/relay_connection.rs
@@ -0,0 +1,185 @@
1//! Relay Connection for Proactive Sync
2//!
3//! This module handles connecting to external relays and receiving events
4//! for the proactive sync system.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use crate::nostr::events::{KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE};
12
13/// Events received from a relay connection
14#[derive(Debug)]
15pub enum RelayEvent {
16 /// A nostr event was received
17 Event(Event),
18 /// End of stored events (EOSE) received
19 EndOfStoredEvents,
20 /// Connection was closed
21 Closed(String),
22}
23
24/// Connection to an external relay for syncing events.
25///
26/// RelayConnection handles:
27/// - Connecting to the relay
28/// - Subscribing with appropriate filters (Layer 1 for bootstrap)
29/// - Receiving events and sending them through a channel
30pub struct RelayConnection {
31 /// The relay URL
32 url: String,
33 /// The nostr-sdk client
34 client: Client,
35}
36
37impl RelayConnection {
38 /// Create a new relay connection.
39 ///
40 /// # Arguments
41 ///
42 /// * `url` - The WebSocket URL of the relay to connect to
43 pub fn new(url: String) -> Self {
44 // Create a client with generated keys (we're just subscribing, not publishing)
45 let keys = Keys::generate();
46 let client = Client::new(keys);
47
48 Self { url, client }
49 }
50
51 /// Connect to the relay and subscribe with Layer 1 filter.
52 ///
53 /// Layer 1 filter syncs announcement events (30617, 30618) which are
54 /// the foundation for discovering repository relationships.
55 ///
56 /// Returns the notification stream for receiving events.
57 pub async fn connect_and_subscribe(&self) -> Result<(), String> {
58 // Add the relay
59 self.client
60 .add_relay(&self.url)
61 .await
62 .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?;
63
64 // Connect to relay
65 self.client.connect().await;
66
67 // Wait for connection to establish
68 let mut connected = false;
69 for _ in 0..30 {
70 tokio::time::sleep(Duration::from_millis(100)).await;
71 let relays = self.client.relays().await;
72 if relays.values().any(|r| r.is_connected()) {
73 connected = true;
74 break;
75 }
76 }
77
78 if !connected {
79 return Err(format!(
80 "Failed to connect to relay {} after 3 seconds",
81 self.url
82 ));
83 }
84
85 tracing::info!("Connected to bootstrap relay: {}", self.url);
86
87 // Layer 1 filter: Repository announcements and state events
88 // These are addressable events that define repositories
89 let filter = Filter::new().kinds([
90 Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), // 30617
91 Kind::Custom(KIND_REPOSITORY_STATE), // 30618
92 ]);
93
94 // Subscribe to the filter
95 self.client
96 .subscribe(filter, None)
97 .await
98 .map_err(|e| format!("Failed to subscribe: {}", e))?;
99
100 tracing::debug!(
101 "Subscribed to Layer 1 events (kinds 30617, 30618) from {}",
102 self.url
103 );
104
105 Ok(())
106 }
107
108 /// Run the event loop, sending received events through the channel.
109 ///
110 /// This method runs until the connection is closed or an error occurs.
111 ///
112 /// # Arguments
113 ///
114 /// * `event_sender` - Channel to send received events
115 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
116 tracing::debug!("Starting event loop for relay: {}", self.url);
117
118 // Handle notifications
119 self.client
120 .handle_notifications(|notification| async {
121 match notification {
122 RelayPoolNotification::Event { event, .. } => {
123 tracing::debug!(
124 "Received event {} (kind {}) from {}",
125 event.id,
126 event.kind.as_u16(),
127 self.url
128 );
129 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
130 tracing::warn!("Event channel closed, stopping relay connection");
131 return Ok(true); // Stop handling
132 }
133 }
134 RelayPoolNotification::Message { message, .. } => {
135 if let RelayMessage::EndOfStoredEvents(_) = message {
136 tracing::debug!("EOSE received from {}", self.url);
137 if event_sender
138 .send(RelayEvent::EndOfStoredEvents)
139 .await
140 .is_err()
141 {
142 return Ok(true); // Stop handling
143 }
144 }
145 }
146 RelayPoolNotification::Shutdown => {
147 tracing::info!("Relay {} shutting down", self.url);
148 let _ = event_sender
149 .send(RelayEvent::Closed("Shutdown".to_string()))
150 .await;
151 return Ok(true); // Stop handling
152 }
153 }
154 Ok(false) // Continue handling
155 })
156 .await
157 .ok(); // Ignore errors on shutdown
158
159 // Disconnect when done
160 self.client.disconnect().await;
161 tracing::info!("Disconnected from relay: {}", self.url);
162 }
163
164 /// Get the relay URL
165 pub fn url(&self) -> &str {
166 &self.url
167 }
168
169 /// Subscribe to an additional filter.
170 ///
171 /// This is used to add Layer 2 filters for repo-related events after
172 /// the initial connection is established.
173 pub async fn subscribe_filter(&self, filter: Filter) -> Result<(), String> {
174 self.client
175 .subscribe(filter, None)
176 .await
177 .map_err(|e| format!("Failed to subscribe with filter: {}", e))?;
178 Ok(())
179 }
180
181 /// Get a reference to the client for additional operations.
182 pub fn client(&self) -> &Client {
183 &self.client
184 }
185}