From b167f1b2ae7edbcab95554b5203d22d9e372c8b5 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:03:40 +0000 Subject: feat(sync): Phase 1 MVP - single relay proactive sync - Add src/sync/ module with SyncManager - Add NGIT_SYNC_RELAY_URL config option - Subscribe to kind 30617 on configured relay - Validate synced events through Nip34WritePolicy - Integration test with two TestRelay instances --- src/sync/connection.rs | 143 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/manager.rs | 101 ++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 22 ++++++++ 3 files changed, 266 insertions(+) create mode 100644 src/sync/connection.rs create mode 100644 src/sync/manager.rs create mode 100644 src/sync/mod.rs (limited to 'src/sync') diff --git a/src/sync/connection.rs b/src/sync/connection.rs new file mode 100644 index 0000000..4a79128 --- /dev/null +++ b/src/sync/connection.rs @@ -0,0 +1,143 @@ +//! WebSocket connection handling for sync +//! +//! Manages the connection to a source relay, subscribes to kind 30617 events, +//! and passes them through validation. + +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use super::KIND_REPOSITORY_STATE; + +/// Event received from the sync connection +#[derive(Debug, Clone)] +pub struct SyncedEvent { + pub event: Event, + pub source_url: String, +} + +/// Manages a WebSocket connection to a single relay for syncing +pub struct SyncConnection { + url: String, + client: Client, +} + +impl SyncConnection { + /// Create a new sync connection to the given relay URL + pub async fn new(url: &str) -> Result> { + let client = Client::default(); + + // Add the relay + client.add_relay(url).await?; + + // Connect to the relay + client.connect().await; + + tracing::info!("Sync connection established to {}", url); + + Ok(Self { + url: url.to_string(), + client, + }) + } + + /// Start receiving events and send them through the channel + /// + /// This method runs indefinitely, reconnecting as needed. + pub async fn run(self, tx: mpsc::Sender) { + // Create filter for kind 30617 (repository state) events + let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); + + // Subscribe to events + match self.client.subscribe(filter, None).await { + Ok(output) => { + tracing::info!( + "Subscribed to kind {} events on {} (subscription: {})", + KIND_REPOSITORY_STATE, + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!("Failed to subscribe on {}: {}", self.url, e); + return; + } + }; + + // Handle incoming notifications + let url = self.url.clone(); + self.client + .handle_notifications(|notification| { + let tx = tx.clone(); + let url = url.clone(); + async move { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::debug!( + "Received event {} from {} (kind {})", + event.id, + url, + event.kind.as_u16() + ); + + // Send the event to the manager for processing + let synced = SyncedEvent { + event: (*event).clone(), + source_url: url.clone(), + }; + + if let Err(e) = tx.send(synced).await { + tracing::warn!("Failed to send synced event: {}", e); + return Ok(true); // Stop if channel is closed + } + } + RelayPoolNotification::Shutdown => { + tracing::warn!("Relay connection shutdown for {}", url); + return Ok(true); // Stop on shutdown + } + RelayPoolNotification::Message { message, .. } => { + tracing::trace!("Received message from {}: {:?}", url, message); + } + } + Ok(false) // Continue processing + } + }) + .await + .ok(); + } + +} + +/// Reconnect loop with exponential backoff +pub async fn connect_with_retry( + url: &str, + tx: mpsc::Sender, +) { + let mut backoff = Duration::from_secs(1); + let max_backoff = Duration::from_secs(60); + + loop { + match SyncConnection::new(url).await { + Ok(conn) => { + backoff = Duration::from_secs(1); // Reset backoff on successful connection + conn.run(tx.clone()).await; + tracing::warn!("Sync connection to {} ended, will reconnect", url); + } + Err(e) => { + tracing::error!( + "Failed to connect to sync relay {}: {} (retrying in {:?})", + url, + e, + backoff + ); + } + } + + // Wait before reconnecting + tokio::time::sleep(backoff).await; + + // Exponential backoff + backoff = std::cmp::min(backoff * 2, max_backoff); + } +} \ No newline at end of file diff --git a/src/sync/manager.rs b/src/sync/manager.rs new file mode 100644 index 0000000..8c883f5 --- /dev/null +++ b/src/sync/manager.rs @@ -0,0 +1,101 @@ +//! SyncManager - Coordinates proactive sync operations +//! +//! The SyncManager spawns connections to configured relays, receives events, +//! validates them through the write policy, and stores accepted events. + +use nostr_relay_builder::prelude::*; +use tokio::sync::mpsc; + +use super::connection::{connect_with_retry, SyncedEvent}; +use super::SYNC_SOURCE_ADDR; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; + +/// Coordinates proactive sync from configured relays +pub struct SyncManager { + /// URL of the relay to sync from + sync_relay_url: String, + /// Database for storing accepted events + database: SharedDatabase, + /// Write policy for validating events + write_policy: Nip34WritePolicy, +} + +impl SyncManager { + /// Create a new SyncManager + pub fn new( + sync_relay_url: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + ) -> Self { + Self { + sync_relay_url, + database, + write_policy, + } + } + + /// Run the sync manager + /// + /// This spawns a connection task and processes incoming events. + /// Runs indefinitely until the task is cancelled. + pub async fn run(self) { + tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); + + // Create channel for receiving events from connection + let (tx, mut rx) = mpsc::channel::(100); + + // Spawn connection task with auto-reconnect + let url = self.sync_relay_url.clone(); + tokio::spawn(async move { + connect_with_retry(&url, tx).await; + }); + + // Process incoming events + while let Some(synced_event) = rx.recv().await { + self.process_event(synced_event).await; + } + + tracing::warn!("SyncManager event channel closed, shutting down"); + } + + /// Process a single synced event + async fn process_event(&self, synced_event: SyncedEvent) { + let event = &synced_event.event; + let event_id = event.id.to_hex(); + + tracing::debug!( + "Processing synced event {} (kind {}) from {}", + event_id, + event.kind.as_u16(), + synced_event.source_url + ); + + // Validate through write policy using SYNC_SOURCE_ADDR + let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; + + match result { + PolicyResult::Accept => { + tracing::info!( + "Synced event {} (kind {}) accepted, storing", + event_id, + event.kind.as_u16() + ); + + // Store the event in the database + if let Err(e) = self.database.save_event(event).await { + tracing::error!("Failed to store synced event {}: {}", event_id, e); + } else { + tracing::debug!("Synced event {} stored successfully", event_id); + } + } + PolicyResult::Reject(reason) => { + tracing::info!( + "Synced event {} (kind {}) rejected: {}", + event_id, + event.kind.as_u16(), + reason + ); + } + } + } +} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..279471b --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,22 @@ +//! Proactive Sync Module for GRASP-02 +//! +//! This module implements proactive synchronization of kind 30617 (repository state) +//! events from configured relay(s). Events are validated through the same write policy +//! as directly-submitted events. + +mod connection; +mod manager; + +pub use manager::SyncManager; + +use std::net::SocketAddr; + +/// Synthetic source address used for synced events +/// +/// This distinguishes synced events from directly-submitted events in logs and metrics. +/// Uses 127.0.0.2:0 as a recognizable "synced event" marker. +pub const SYNC_SOURCE_ADDR: SocketAddr = + SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0); + +/// Kind for repository state events (NIP-34) +pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file -- cgit v1.2.3