upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/relay_connection.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 11:07:50 +0000
commit39e782b12fce1776f2ad0b0f5430749533cb80ea (patch)
treed050a079a82898848da870d9307a98a83480629b /src/sync/relay_connection.rs
parent586fc2a7df1ce256469f0742d23f687ac4b075b1 (diff)
sync v4 mvp
Diffstat (limited to 'src/sync/relay_connection.rs')
-rw-r--r--src/sync/relay_connection.rs216
1 files changed, 216 insertions, 0 deletions
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
new file mode 100644
index 0000000..6499c27
--- /dev/null
+++ b/src/sync/relay_connection.rs
@@ -0,0 +1,216 @@
1//! Relay Connection Management for Proactive Sync
2//!
3//! This module provides relay connection management for external relay connections.
4//! Each RelayConnection manages a single connection to an external relay and handles
5//! subscriptions using the three-layer sync strategy.
6//!
7//! See `docs/explanation/grasp-02-proactive-sync-v4.md` for full design details.
8
9use nostr_sdk::prelude::*;
10use tokio::sync::mpsc;
11
12use super::filters::build_announcement_filter;
13
14/// Events from a relay connection
15#[derive(Debug)]
16pub enum RelayEvent {
17 /// A new event was received
18 Event(Event),
19 /// End of stored events for a subscription
20 EndOfStoredEvents(SubscriptionId),
21 /// Connection was closed
22 Closed(String),
23 /// Shutdown notification
24 Shutdown,
25}
26
27/// Manages connection to a single external relay
28///
29/// RelayConnection wraps a nostr-sdk Client to manage a WebSocket connection
30/// to an external relay. It handles:
31/// - Connection establishment
32/// - Layer 1 subscription (announcements)
33/// - Additional filter subscriptions (Layers 2 & 3)
34/// - Event notification loop
35pub struct RelayConnection {
36 /// The relay URL this connection is for
37 url: String,
38 /// The underlying nostr-sdk client
39 client: Client,
40}
41
42impl RelayConnection {
43 /// Create a new relay connection (not yet connected)
44 ///
45 /// # Arguments
46 /// * `url` - The relay URL to connect to (e.g., "wss://relay.example.com")
47 pub fn new(url: String) -> Self {
48 let client = Client::default();
49 Self { url, client }
50 }
51
52 /// Connect to the relay and subscribe to Layer 1 (announcements)
53 ///
54 /// This method:
55 /// 1. Adds the relay to the client
56 /// 2. Establishes the WebSocket connection
57 /// 3. Subscribes to Layer 1 filter (kinds 30617 + 30618)
58 ///
59 /// # Arguments
60 /// * `since` - Optional timestamp for incremental sync on reconnect
61 ///
62 /// # Returns
63 /// * `Ok(SubscriptionId)` - The subscription ID on successful connection
64 /// * `Err(String)` with error description on failure
65 pub async fn connect_and_subscribe(
66 &self,
67 since: Option<Timestamp>,
68 ) -> Result<SubscriptionId, String> {
69 // Add relay to client
70 self.client
71 .add_relay(&self.url)
72 .await
73 .map_err(|e| format!("Failed to add relay {}: {}", self.url, e))?;
74
75 // Establish connection
76 self.client.connect().await;
77
78 // Subscribe to Layer 1 (announcements)
79 let filter = build_announcement_filter(since);
80 let output = self
81 .client
82 .subscribe(filter, None)
83 .await
84 .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?;
85
86 tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)");
87 Ok(output.val)
88 }
89
90 /// Run the event loop, sending events through the provided channel
91 ///
92 /// This method blocks and processes notifications from the relay:
93 /// - `RelayPoolNotification::Event` -> sends `RelayEvent::Event`
94 /// - `RelayPoolNotification::Message` with EOSE -> sends `RelayEvent::EndOfStoredEvents`
95 /// - `RelayPoolNotification::Shutdown` -> sends `RelayEvent::Shutdown`
96 ///
97 /// The loop terminates when:
98 /// - The sender channel is closed (receiver dropped)
99 /// - A shutdown notification is received
100 /// - An error occurs receiving notifications
101 ///
102 /// # Arguments
103 /// * `event_sender` - Channel to send relay events through
104 pub async fn run_event_loop(self, event_sender: mpsc::Sender<RelayEvent>) {
105 let mut notifications = self.client.notifications();
106 let url = self.url.clone();
107
108 tracing::debug!(relay = %url, "Starting event loop");
109
110 while let Ok(notification) = notifications.recv().await {
111 match notification {
112 RelayPoolNotification::Event { event, .. } => {
113 tracing::trace!(relay = %url, event_id = %event.id, "Received event");
114 if event_sender.send(RelayEvent::Event(*event)).await.is_err() {
115 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
116 break;
117 }
118 }
119 RelayPoolNotification::Message { message, .. } => {
120 match message {
121 RelayMessage::EndOfStoredEvents(sub_id) => {
122 tracing::debug!(relay = %url, sub_id = ?sub_id, "Received EOSE");
123 // Convert Cow<SubscriptionId> to owned SubscriptionId
124 let owned_sub_id = sub_id.into_owned();
125 if event_sender
126 .send(RelayEvent::EndOfStoredEvents(owned_sub_id))
127 .await
128 .is_err()
129 {
130 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
131 break;
132 }
133 }
134 RelayMessage::Closed { message: msg, .. } => {
135 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
136 let _ = event_sender
137 .send(RelayEvent::Closed(msg.to_string()))
138 .await;
139 }
140 _ => {}
141 }
142 }
143 RelayPoolNotification::Shutdown => {
144 tracing::info!(relay = %url, "Relay pool shutdown");
145 let _ = event_sender.send(RelayEvent::Shutdown).await;
146 break;
147 }
148 }
149 }
150
151 tracing::debug!(relay = %url, "Event loop terminated");
152 }
153
154 /// Add additional filter subscription (for Layer 2 + 3)
155 ///
156 /// Use this to subscribe to:
157 /// - Layer 2: Events tagging our repos (a/A/q tags)
158 /// - Layer 3: Events tagging our root events (e/E/q tags)
159 ///
160 /// # Arguments
161 /// * `filter` - The filter to subscribe to
162 ///
163 /// # Returns
164 /// * `Ok(SubscriptionId)` - The subscription ID on success
165 /// * `Err(String)` - Error description on failure
166 pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> {
167 let output = self
168 .client
169 .subscribe(filter, None)
170 .await
171 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
172 Ok(output.val)
173 }
174
175 /// Subscribe to multiple filters at once
176 ///
177 /// Each filter creates its own subscription. Returns when all subscriptions
178 /// are established. This is useful for Layer 2 + 3 filters together.
179 ///
180 /// # Arguments
181 /// * `filters` - Vec of filters to subscribe to
182 ///
183 /// # Returns
184 /// * `Ok(Vec<SubscriptionId>)` - The subscription IDs on success
185 /// * `Err(String)` - Error description on failure
186 pub async fn subscribe_filters(
187 &self,
188 filters: Vec<Filter>,
189 ) -> Result<Vec<SubscriptionId>, String> {
190 if filters.is_empty() {
191 return Ok(vec![]);
192 }
193
194 let mut sub_ids = Vec::with_capacity(filters.len());
195 for filter in filters {
196 let output = self
197 .client
198 .subscribe(filter, None)
199 .await
200 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
201 sub_ids.push(output.val);
202 }
203 Ok(sub_ids)
204 }
205
206 /// Get the relay URL
207 pub fn url(&self) -> &str {
208 &self.url
209 }
210
211 /// Disconnect from the relay
212 pub async fn disconnect(&self) {
213 self.client.disconnect().await;
214 tracing::debug!(relay = %self.url, "Disconnected from relay");
215 }
216} \ No newline at end of file