diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/connection.rs | 143 | ||||
| -rw-r--r-- | src/sync/manager.rs | 101 | ||||
| -rw-r--r-- | src/sync/mod.rs | 22 |
3 files changed, 266 insertions, 0 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs new file mode 100644 index 0000000..4a79128 --- /dev/null +++ b/src/sync/connection.rs | |||
| @@ -0,0 +1,143 @@ | |||
| 1 | //! WebSocket connection handling for sync | ||
| 2 | //! | ||
| 3 | //! Manages the connection to a source relay, subscribes to kind 30617 events, | ||
| 4 | //! and passes them through validation. | ||
| 5 | |||
| 6 | use std::time::Duration; | ||
| 7 | |||
| 8 | use nostr_sdk::prelude::*; | ||
| 9 | use tokio::sync::mpsc; | ||
| 10 | |||
| 11 | use super::KIND_REPOSITORY_STATE; | ||
| 12 | |||
| 13 | /// Event received from the sync connection | ||
| 14 | #[derive(Debug, Clone)] | ||
| 15 | pub struct SyncedEvent { | ||
| 16 | pub event: Event, | ||
| 17 | pub source_url: String, | ||
| 18 | } | ||
| 19 | |||
| 20 | /// Manages a WebSocket connection to a single relay for syncing | ||
| 21 | pub struct SyncConnection { | ||
| 22 | url: String, | ||
| 23 | client: Client, | ||
| 24 | } | ||
| 25 | |||
| 26 | impl SyncConnection { | ||
| 27 | /// Create a new sync connection to the given relay URL | ||
| 28 | pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { | ||
| 29 | let client = Client::default(); | ||
| 30 | |||
| 31 | // Add the relay | ||
| 32 | client.add_relay(url).await?; | ||
| 33 | |||
| 34 | // Connect to the relay | ||
| 35 | client.connect().await; | ||
| 36 | |||
| 37 | tracing::info!("Sync connection established to {}", url); | ||
| 38 | |||
| 39 | Ok(Self { | ||
| 40 | url: url.to_string(), | ||
| 41 | client, | ||
| 42 | }) | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Start receiving events and send them through the channel | ||
| 46 | /// | ||
| 47 | /// This method runs indefinitely, reconnecting as needed. | ||
| 48 | pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { | ||
| 49 | // Create filter for kind 30617 (repository state) events | ||
| 50 | let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); | ||
| 51 | |||
| 52 | // Subscribe to events | ||
| 53 | match self.client.subscribe(filter, None).await { | ||
| 54 | Ok(output) => { | ||
| 55 | tracing::info!( | ||
| 56 | "Subscribed to kind {} events on {} (subscription: {})", | ||
| 57 | KIND_REPOSITORY_STATE, | ||
| 58 | self.url, | ||
| 59 | output.id() | ||
| 60 | ); | ||
| 61 | } | ||
| 62 | Err(e) => { | ||
| 63 | tracing::error!("Failed to subscribe on {}: {}", self.url, e); | ||
| 64 | return; | ||
| 65 | } | ||
| 66 | }; | ||
| 67 | |||
| 68 | // Handle incoming notifications | ||
| 69 | let url = self.url.clone(); | ||
| 70 | self.client | ||
| 71 | .handle_notifications(|notification| { | ||
| 72 | let tx = tx.clone(); | ||
| 73 | let url = url.clone(); | ||
| 74 | async move { | ||
| 75 | match notification { | ||
| 76 | RelayPoolNotification::Event { event, .. } => { | ||
| 77 | tracing::debug!( | ||
| 78 | "Received event {} from {} (kind {})", | ||
| 79 | event.id, | ||
| 80 | url, | ||
| 81 | event.kind.as_u16() | ||
| 82 | ); | ||
| 83 | |||
| 84 | // Send the event to the manager for processing | ||
| 85 | let synced = SyncedEvent { | ||
| 86 | event: (*event).clone(), | ||
| 87 | source_url: url.clone(), | ||
| 88 | }; | ||
| 89 | |||
| 90 | if let Err(e) = tx.send(synced).await { | ||
| 91 | tracing::warn!("Failed to send synced event: {}", e); | ||
| 92 | return Ok(true); // Stop if channel is closed | ||
| 93 | } | ||
| 94 | } | ||
| 95 | RelayPoolNotification::Shutdown => { | ||
| 96 | tracing::warn!("Relay connection shutdown for {}", url); | ||
| 97 | return Ok(true); // Stop on shutdown | ||
| 98 | } | ||
| 99 | RelayPoolNotification::Message { message, .. } => { | ||
| 100 | tracing::trace!("Received message from {}: {:?}", url, message); | ||
| 101 | } | ||
| 102 | } | ||
| 103 | Ok(false) // Continue processing | ||
| 104 | } | ||
| 105 | }) | ||
| 106 | .await | ||
| 107 | .ok(); | ||
| 108 | } | ||
| 109 | |||
| 110 | } | ||
| 111 | |||
| 112 | /// Reconnect loop with exponential backoff | ||
| 113 | pub async fn connect_with_retry( | ||
| 114 | url: &str, | ||
| 115 | tx: mpsc::Sender<SyncedEvent>, | ||
| 116 | ) { | ||
| 117 | let mut backoff = Duration::from_secs(1); | ||
| 118 | let max_backoff = Duration::from_secs(60); | ||
| 119 | |||
| 120 | loop { | ||
| 121 | match SyncConnection::new(url).await { | ||
| 122 | Ok(conn) => { | ||
| 123 | backoff = Duration::from_secs(1); // Reset backoff on successful connection | ||
| 124 | conn.run(tx.clone()).await; | ||
| 125 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | ||
| 126 | } | ||
| 127 | Err(e) => { | ||
| 128 | tracing::error!( | ||
| 129 | "Failed to connect to sync relay {}: {} (retrying in {:?})", | ||
| 130 | url, | ||
| 131 | e, | ||
| 132 | backoff | ||
| 133 | ); | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | // Wait before reconnecting | ||
| 138 | tokio::time::sleep(backoff).await; | ||
| 139 | |||
| 140 | // Exponential backoff | ||
| 141 | backoff = std::cmp::min(backoff * 2, max_backoff); | ||
| 142 | } | ||
| 143 | } \ No newline at end of file | ||
diff --git a/src/sync/manager.rs b/src/sync/manager.rs new file mode 100644 index 0000000..8c883f5 --- /dev/null +++ b/src/sync/manager.rs | |||
| @@ -0,0 +1,101 @@ | |||
| 1 | //! SyncManager - Coordinates proactive sync operations | ||
| 2 | //! | ||
| 3 | //! The SyncManager spawns connections to configured relays, receives events, | ||
| 4 | //! validates them through the write policy, and stores accepted events. | ||
| 5 | |||
| 6 | use nostr_relay_builder::prelude::*; | ||
| 7 | use tokio::sync::mpsc; | ||
| 8 | |||
| 9 | use super::connection::{connect_with_retry, SyncedEvent}; | ||
| 10 | use super::SYNC_SOURCE_ADDR; | ||
| 11 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | ||
| 12 | |||
| 13 | /// Coordinates proactive sync from configured relays | ||
| 14 | pub struct SyncManager { | ||
| 15 | /// URL of the relay to sync from | ||
| 16 | sync_relay_url: String, | ||
| 17 | /// Database for storing accepted events | ||
| 18 | database: SharedDatabase, | ||
| 19 | /// Write policy for validating events | ||
| 20 | write_policy: Nip34WritePolicy, | ||
| 21 | } | ||
| 22 | |||
| 23 | impl SyncManager { | ||
| 24 | /// Create a new SyncManager | ||
| 25 | pub fn new( | ||
| 26 | sync_relay_url: String, | ||
| 27 | database: SharedDatabase, | ||
| 28 | write_policy: Nip34WritePolicy, | ||
| 29 | ) -> Self { | ||
| 30 | Self { | ||
| 31 | sync_relay_url, | ||
| 32 | database, | ||
| 33 | write_policy, | ||
| 34 | } | ||
| 35 | } | ||
| 36 | |||
| 37 | /// Run the sync manager | ||
| 38 | /// | ||
| 39 | /// This spawns a connection task and processes incoming events. | ||
| 40 | /// Runs indefinitely until the task is cancelled. | ||
| 41 | pub async fn run(self) { | ||
| 42 | tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); | ||
| 43 | |||
| 44 | // Create channel for receiving events from connection | ||
| 45 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); | ||
| 46 | |||
| 47 | // Spawn connection task with auto-reconnect | ||
| 48 | let url = self.sync_relay_url.clone(); | ||
| 49 | tokio::spawn(async move { | ||
| 50 | connect_with_retry(&url, tx).await; | ||
| 51 | }); | ||
| 52 | |||
| 53 | // Process incoming events | ||
| 54 | while let Some(synced_event) = rx.recv().await { | ||
| 55 | self.process_event(synced_event).await; | ||
| 56 | } | ||
| 57 | |||
| 58 | tracing::warn!("SyncManager event channel closed, shutting down"); | ||
| 59 | } | ||
| 60 | |||
| 61 | /// Process a single synced event | ||
| 62 | async fn process_event(&self, synced_event: SyncedEvent) { | ||
| 63 | let event = &synced_event.event; | ||
| 64 | let event_id = event.id.to_hex(); | ||
| 65 | |||
| 66 | tracing::debug!( | ||
| 67 | "Processing synced event {} (kind {}) from {}", | ||
| 68 | event_id, | ||
| 69 | event.kind.as_u16(), | ||
| 70 | synced_event.source_url | ||
| 71 | ); | ||
| 72 | |||
| 73 | // Validate through write policy using SYNC_SOURCE_ADDR | ||
| 74 | let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; | ||
| 75 | |||
| 76 | match result { | ||
| 77 | PolicyResult::Accept => { | ||
| 78 | tracing::info!( | ||
| 79 | "Synced event {} (kind {}) accepted, storing", | ||
| 80 | event_id, | ||
| 81 | event.kind.as_u16() | ||
| 82 | ); | ||
| 83 | |||
| 84 | // Store the event in the database | ||
| 85 | if let Err(e) = self.database.save_event(event).await { | ||
| 86 | tracing::error!("Failed to store synced event {}: {}", event_id, e); | ||
| 87 | } else { | ||
| 88 | tracing::debug!("Synced event {} stored successfully", event_id); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | PolicyResult::Reject(reason) => { | ||
| 92 | tracing::info!( | ||
| 93 | "Synced event {} (kind {}) rejected: {}", | ||
| 94 | event_id, | ||
| 95 | event.kind.as_u16(), | ||
| 96 | reason | ||
| 97 | ); | ||
| 98 | } | ||
| 99 | } | ||
| 100 | } | ||
| 101 | } \ No newline at end of file | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..279471b --- /dev/null +++ b/src/sync/mod.rs | |||
| @@ -0,0 +1,22 @@ | |||
| 1 | //! Proactive Sync Module for GRASP-02 | ||
| 2 | //! | ||
| 3 | //! This module implements proactive synchronization of kind 30617 (repository state) | ||
| 4 | //! events from configured relay(s). Events are validated through the same write policy | ||
| 5 | //! as directly-submitted events. | ||
| 6 | |||
| 7 | mod connection; | ||
| 8 | mod manager; | ||
| 9 | |||
| 10 | pub use manager::SyncManager; | ||
| 11 | |||
| 12 | use std::net::SocketAddr; | ||
| 13 | |||
| 14 | /// Synthetic source address used for synced events | ||
| 15 | /// | ||
| 16 | /// This distinguishes synced events from directly-submitted events in logs and metrics. | ||
| 17 | /// Uses 127.0.0.2:0 as a recognizable "synced event" marker. | ||
| 18 | pub const SYNC_SOURCE_ADDR: SocketAddr = | ||
| 19 | SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0); | ||
| 20 | |||
| 21 | /// Kind for repository state events (NIP-34) | ||
| 22 | pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file | ||