diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:03:40 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 17:03:40 +0000 |
| commit | b167f1b2ae7edbcab95554b5203d22d9e372c8b5 (patch) | |
| tree | 39b3bb879302cb6a4eaabded4a5d20f7d0d68ffc /src/sync/connection.rs | |
| parent | fdbc8895e1e9e712882bd854908295a95e7afcb9 (diff) | |
feat(sync): Phase 1 MVP - single relay proactive sync
- Add src/sync/ module with SyncManager
- Add NGIT_SYNC_RELAY_URL config option
- Subscribe to kind 30617 on configured relay
- Validate synced events through Nip34WritePolicy
- Integration test with two TestRelay instances
Diffstat (limited to 'src/sync/connection.rs')
| -rw-r--r-- | src/sync/connection.rs | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs new file mode 100644 index 0000000..4a79128 --- /dev/null +++ b/src/sync/connection.rs | |||
| @@ -0,0 +1,143 @@ | |||
| 1 | //! WebSocket connection handling for sync | ||
| 2 | //! | ||
| 3 | //! Manages the connection to a source relay, subscribes to kind 30617 events, | ||
| 4 | //! and passes them through validation. | ||
| 5 | |||
| 6 | use std::time::Duration; | ||
| 7 | |||
| 8 | use nostr_sdk::prelude::*; | ||
| 9 | use tokio::sync::mpsc; | ||
| 10 | |||
| 11 | use super::KIND_REPOSITORY_STATE; | ||
| 12 | |||
| 13 | /// Event received from the sync connection | ||
| 14 | #[derive(Debug, Clone)] | ||
| 15 | pub struct SyncedEvent { | ||
| 16 | pub event: Event, | ||
| 17 | pub source_url: String, | ||
| 18 | } | ||
| 19 | |||
| 20 | /// Manages a WebSocket connection to a single relay for syncing | ||
| 21 | pub struct SyncConnection { | ||
| 22 | url: String, | ||
| 23 | client: Client, | ||
| 24 | } | ||
| 25 | |||
| 26 | impl SyncConnection { | ||
| 27 | /// Create a new sync connection to the given relay URL | ||
| 28 | pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { | ||
| 29 | let client = Client::default(); | ||
| 30 | |||
| 31 | // Add the relay | ||
| 32 | client.add_relay(url).await?; | ||
| 33 | |||
| 34 | // Connect to the relay | ||
| 35 | client.connect().await; | ||
| 36 | |||
| 37 | tracing::info!("Sync connection established to {}", url); | ||
| 38 | |||
| 39 | Ok(Self { | ||
| 40 | url: url.to_string(), | ||
| 41 | client, | ||
| 42 | }) | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Start receiving events and send them through the channel | ||
| 46 | /// | ||
| 47 | /// This method runs indefinitely, reconnecting as needed. | ||
| 48 | pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { | ||
| 49 | // Create filter for kind 30617 (repository state) events | ||
| 50 | let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); | ||
| 51 | |||
| 52 | // Subscribe to events | ||
| 53 | match self.client.subscribe(filter, None).await { | ||
| 54 | Ok(output) => { | ||
| 55 | tracing::info!( | ||
| 56 | "Subscribed to kind {} events on {} (subscription: {})", | ||
| 57 | KIND_REPOSITORY_STATE, | ||
| 58 | self.url, | ||
| 59 | output.id() | ||
| 60 | ); | ||
| 61 | } | ||
| 62 | Err(e) => { | ||
| 63 | tracing::error!("Failed to subscribe on {}: {}", self.url, e); | ||
| 64 | return; | ||
| 65 | } | ||
| 66 | }; | ||
| 67 | |||
| 68 | // Handle incoming notifications | ||
| 69 | let url = self.url.clone(); | ||
| 70 | self.client | ||
| 71 | .handle_notifications(|notification| { | ||
| 72 | let tx = tx.clone(); | ||
| 73 | let url = url.clone(); | ||
| 74 | async move { | ||
| 75 | match notification { | ||
| 76 | RelayPoolNotification::Event { event, .. } => { | ||
| 77 | tracing::debug!( | ||
| 78 | "Received event {} from {} (kind {})", | ||
| 79 | event.id, | ||
| 80 | url, | ||
| 81 | event.kind.as_u16() | ||
| 82 | ); | ||
| 83 | |||
| 84 | // Send the event to the manager for processing | ||
| 85 | let synced = SyncedEvent { | ||
| 86 | event: (*event).clone(), | ||
| 87 | source_url: url.clone(), | ||
| 88 | }; | ||
| 89 | |||
| 90 | if let Err(e) = tx.send(synced).await { | ||
| 91 | tracing::warn!("Failed to send synced event: {}", e); | ||
| 92 | return Ok(true); // Stop if channel is closed | ||
| 93 | } | ||
| 94 | } | ||
| 95 | RelayPoolNotification::Shutdown => { | ||
| 96 | tracing::warn!("Relay connection shutdown for {}", url); | ||
| 97 | return Ok(true); // Stop on shutdown | ||
| 98 | } | ||
| 99 | RelayPoolNotification::Message { message, .. } => { | ||
| 100 | tracing::trace!("Received message from {}: {:?}", url, message); | ||
| 101 | } | ||
| 102 | } | ||
| 103 | Ok(false) // Continue processing | ||
| 104 | } | ||
| 105 | }) | ||
| 106 | .await | ||
| 107 | .ok(); | ||
| 108 | } | ||
| 109 | |||
| 110 | } | ||
| 111 | |||
| 112 | /// Reconnect loop with exponential backoff | ||
| 113 | pub async fn connect_with_retry( | ||
| 114 | url: &str, | ||
| 115 | tx: mpsc::Sender<SyncedEvent>, | ||
| 116 | ) { | ||
| 117 | let mut backoff = Duration::from_secs(1); | ||
| 118 | let max_backoff = Duration::from_secs(60); | ||
| 119 | |||
| 120 | loop { | ||
| 121 | match SyncConnection::new(url).await { | ||
| 122 | Ok(conn) => { | ||
| 123 | backoff = Duration::from_secs(1); // Reset backoff on successful connection | ||
| 124 | conn.run(tx.clone()).await; | ||
| 125 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | ||
| 126 | } | ||
| 127 | Err(e) => { | ||
| 128 | tracing::error!( | ||
| 129 | "Failed to connect to sync relay {}: {} (retrying in {:?})", | ||
| 130 | url, | ||
| 131 | e, | ||
| 132 | backoff | ||
| 133 | ); | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | // Wait before reconnecting | ||
| 138 | tokio::time::sleep(backoff).await; | ||
| 139 | |||
| 140 | // Exponential backoff | ||
| 141 | backoff = std::cmp::min(backoff * 2, max_backoff); | ||
| 142 | } | ||
| 143 | } \ No newline at end of file | ||