upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs143
1 files changed, 143 insertions, 0 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
new file mode 100644
index 0000000..4a79128
--- /dev/null
+++ b/src/sync/connection.rs
@@ -0,0 +1,143 @@
1//! WebSocket connection handling for sync
2//!
3//! Manages the connection to a source relay, subscribes to kind 30617 events,
4//! and passes them through validation.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use super::KIND_REPOSITORY_STATE;
12
13/// Event received from the sync connection
14#[derive(Debug, Clone)]
15pub struct SyncedEvent {
16 pub event: Event,
17 pub source_url: String,
18}
19
20/// Manages a WebSocket connection to a single relay for syncing
21pub struct SyncConnection {
22 url: String,
23 client: Client,
24}
25
26impl SyncConnection {
27 /// Create a new sync connection to the given relay URL
28 pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
29 let client = Client::default();
30
31 // Add the relay
32 client.add_relay(url).await?;
33
34 // Connect to the relay
35 client.connect().await;
36
37 tracing::info!("Sync connection established to {}", url);
38
39 Ok(Self {
40 url: url.to_string(),
41 client,
42 })
43 }
44
45 /// Start receiving events and send them through the channel
46 ///
47 /// This method runs indefinitely, reconnecting as needed.
48 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) {
49 // Create filter for kind 30617 (repository state) events
50 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE));
51
52 // Subscribe to events
53 match self.client.subscribe(filter, None).await {
54 Ok(output) => {
55 tracing::info!(
56 "Subscribed to kind {} events on {} (subscription: {})",
57 KIND_REPOSITORY_STATE,
58 self.url,
59 output.id()
60 );
61 }
62 Err(e) => {
63 tracing::error!("Failed to subscribe on {}: {}", self.url, e);
64 return;
65 }
66 };
67
68 // Handle incoming notifications
69 let url = self.url.clone();
70 self.client
71 .handle_notifications(|notification| {
72 let tx = tx.clone();
73 let url = url.clone();
74 async move {
75 match notification {
76 RelayPoolNotification::Event { event, .. } => {
77 tracing::debug!(
78 "Received event {} from {} (kind {})",
79 event.id,
80 url,
81 event.kind.as_u16()
82 );
83
84 // Send the event to the manager for processing
85 let synced = SyncedEvent {
86 event: (*event).clone(),
87 source_url: url.clone(),
88 };
89
90 if let Err(e) = tx.send(synced).await {
91 tracing::warn!("Failed to send synced event: {}", e);
92 return Ok(true); // Stop if channel is closed
93 }
94 }
95 RelayPoolNotification::Shutdown => {
96 tracing::warn!("Relay connection shutdown for {}", url);
97 return Ok(true); // Stop on shutdown
98 }
99 RelayPoolNotification::Message { message, .. } => {
100 tracing::trace!("Received message from {}: {:?}", url, message);
101 }
102 }
103 Ok(false) // Continue processing
104 }
105 })
106 .await
107 .ok();
108 }
109
110}
111
112/// Reconnect loop with exponential backoff
113pub async fn connect_with_retry(
114 url: &str,
115 tx: mpsc::Sender<SyncedEvent>,
116) {
117 let mut backoff = Duration::from_secs(1);
118 let max_backoff = Duration::from_secs(60);
119
120 loop {
121 match SyncConnection::new(url).await {
122 Ok(conn) => {
123 backoff = Duration::from_secs(1); // Reset backoff on successful connection
124 conn.run(tx.clone()).await;
125 tracing::warn!("Sync connection to {} ended, will reconnect", url);
126 }
127 Err(e) => {
128 tracing::error!(
129 "Failed to connect to sync relay {}: {} (retrying in {:?})",
130 url,
131 e,
132 backoff
133 );
134 }
135 }
136
137 // Wait before reconnecting
138 tokio::time::sleep(backoff).await;
139
140 // Exponential backoff
141 backoff = std::cmp::min(backoff * 2, max_backoff);
142 }
143} \ No newline at end of file