From b167f1b2ae7edbcab95554b5203d22d9e372c8b5 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 17:03:40 +0000 Subject: feat(sync): Phase 1 MVP - single relay proactive sync - Add src/sync/ module with SyncManager - Add NGIT_SYNC_RELAY_URL config option - Subscribe to kind 30617 on configured relay - Validate synced events through Nip34WritePolicy - Integration test with two TestRelay instances --- src/config.rs | 5 ++ src/http/nip11.rs | 2 + src/lib.rs | 1 + src/main.rs | 16 ++++++ src/nostr/builder.rs | 13 ++--- src/sync/connection.rs | 143 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sync/manager.rs | 101 ++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 22 ++++++++ 8 files changed, 297 insertions(+), 6 deletions(-) create mode 100644 src/sync/connection.rs create mode 100644 src/sync/manager.rs create mode 100644 src/sync/mod.rs (limited to 'src') diff --git a/src/config.rs b/src/config.rs index 025e020..a2a27be 100644 --- a/src/config.rs +++ b/src/config.rs @@ -83,6 +83,10 @@ pub struct Config { /// Number of top bandwidth repos to track in metrics #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] pub metrics_top_n_repos: usize, + + /// URL of relay to sync kind 30617 events from (optional, enables proactive sync) + #[arg(long, env = "NGIT_SYNC_RELAY_URL")] + pub sync_relay_url: Option, } impl Config { @@ -138,6 +142,7 @@ impl Config { metrics_enabled: true, metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, + sync_relay_url: None, } } } diff --git a/src/http/nip11.rs b/src/http/nip11.rs index 1723601..e9e1c25 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs @@ -105,6 +105,7 @@ mod tests { metrics_enabled: true, metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, + sync_relay_url: None, }; let doc = RelayInformationDocument::from_config(&config); @@ -139,6 +140,7 @@ mod tests { metrics_enabled: true, metrics_connection_per_ip_abuse_threshold: 10, metrics_top_n_repos: 10, + sync_relay_url: None, }; let doc = RelayInformationDocument::from_config(&config); diff --git a/src/lib.rs b/src/lib.rs index 4d5aab0..a1306c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,3 +3,4 @@ pub mod git; pub mod http; pub mod metrics; pub mod nostr; +pub mod sync; diff --git a/src/main.rs b/src/main.rs index 9200cc2..21b351f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use ngit_grasp::{ http, metrics::Metrics, nostr, + sync::SyncManager, }; #[tokio::main] @@ -50,6 +51,21 @@ async fn main() -> Result<()> { config.domain ); + // Start SyncManager if sync_relay_url is configured + if let Some(ref sync_url) = config.sync_relay_url { + info!("Starting proactive sync from: {}", sync_url); + let sync_manager = SyncManager::new( + sync_url.clone(), + relay_with_db.database.clone(), + relay_with_db.write_policy.clone(), + ); + tokio::spawn(async move { + sync_manager.run().await; + }); + } else { + info!("Proactive sync disabled (no NGIT_SYNC_RELAY_URL configured)"); + } + // Start HTTP server with integrated relay and database info!("Starting HTTP server on {}", config.bind_address); http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?; diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 15ff083..2284c18 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -269,12 +269,14 @@ impl WritePolicy for Nip34WritePolicy { } } -/// Result of creating a relay - includes both the relay and database +/// Result of creating a relay - includes relay, database, and write policy pub struct RelayWithDatabase { /// The local relay instance pub relay: LocalRelay, /// The database Arc that can be used for direct queries pub database: SharedDatabase, + /// The write policy used for event validation + pub write_policy: Nip34WritePolicy, } /// Create a configured LocalRelay with full GRASP-01 validation @@ -330,13 +332,11 @@ pub fn create_relay(config: &Config) -> Result { // Build relay with GRASP-01 validation // Clone Arc for the write policy so both relay and policy can access the database let git_data_path = config.effective_git_data_path(); + let write_policy = Nip34WritePolicy::new(&config.domain, database.clone(), &git_data_path); + let builder = RelayBuilder::default() .database(database.clone()) - .write_policy(Nip34WritePolicy::new( - &config.domain, - database.clone(), - &git_data_path, - )); + .write_policy(write_policy.clone()); tracing::info!( "Relay configured with GRASP-01 validation for domain: {}", @@ -346,5 +346,6 @@ pub fn create_relay(config: &Config) -> Result { Ok(RelayWithDatabase { relay: LocalRelay::new(builder), database, + write_policy, }) } \ No newline at end of file diff --git a/src/sync/connection.rs b/src/sync/connection.rs new file mode 100644 index 0000000..4a79128 --- /dev/null +++ b/src/sync/connection.rs @@ -0,0 +1,143 @@ +//! WebSocket connection handling for sync +//! +//! Manages the connection to a source relay, subscribes to kind 30617 events, +//! and passes them through validation. + +use std::time::Duration; + +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use super::KIND_REPOSITORY_STATE; + +/// Event received from the sync connection +#[derive(Debug, Clone)] +pub struct SyncedEvent { + pub event: Event, + pub source_url: String, +} + +/// Manages a WebSocket connection to a single relay for syncing +pub struct SyncConnection { + url: String, + client: Client, +} + +impl SyncConnection { + /// Create a new sync connection to the given relay URL + pub async fn new(url: &str) -> Result> { + let client = Client::default(); + + // Add the relay + client.add_relay(url).await?; + + // Connect to the relay + client.connect().await; + + tracing::info!("Sync connection established to {}", url); + + Ok(Self { + url: url.to_string(), + client, + }) + } + + /// Start receiving events and send them through the channel + /// + /// This method runs indefinitely, reconnecting as needed. + pub async fn run(self, tx: mpsc::Sender) { + // Create filter for kind 30617 (repository state) events + let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); + + // Subscribe to events + match self.client.subscribe(filter, None).await { + Ok(output) => { + tracing::info!( + "Subscribed to kind {} events on {} (subscription: {})", + KIND_REPOSITORY_STATE, + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!("Failed to subscribe on {}: {}", self.url, e); + return; + } + }; + + // Handle incoming notifications + let url = self.url.clone(); + self.client + .handle_notifications(|notification| { + let tx = tx.clone(); + let url = url.clone(); + async move { + match notification { + RelayPoolNotification::Event { event, .. } => { + tracing::debug!( + "Received event {} from {} (kind {})", + event.id, + url, + event.kind.as_u16() + ); + + // Send the event to the manager for processing + let synced = SyncedEvent { + event: (*event).clone(), + source_url: url.clone(), + }; + + if let Err(e) = tx.send(synced).await { + tracing::warn!("Failed to send synced event: {}", e); + return Ok(true); // Stop if channel is closed + } + } + RelayPoolNotification::Shutdown => { + tracing::warn!("Relay connection shutdown for {}", url); + return Ok(true); // Stop on shutdown + } + RelayPoolNotification::Message { message, .. } => { + tracing::trace!("Received message from {}: {:?}", url, message); + } + } + Ok(false) // Continue processing + } + }) + .await + .ok(); + } + +} + +/// Reconnect loop with exponential backoff +pub async fn connect_with_retry( + url: &str, + tx: mpsc::Sender, +) { + let mut backoff = Duration::from_secs(1); + let max_backoff = Duration::from_secs(60); + + loop { + match SyncConnection::new(url).await { + Ok(conn) => { + backoff = Duration::from_secs(1); // Reset backoff on successful connection + conn.run(tx.clone()).await; + tracing::warn!("Sync connection to {} ended, will reconnect", url); + } + Err(e) => { + tracing::error!( + "Failed to connect to sync relay {}: {} (retrying in {:?})", + url, + e, + backoff + ); + } + } + + // Wait before reconnecting + tokio::time::sleep(backoff).await; + + // Exponential backoff + backoff = std::cmp::min(backoff * 2, max_backoff); + } +} \ No newline at end of file diff --git a/src/sync/manager.rs b/src/sync/manager.rs new file mode 100644 index 0000000..8c883f5 --- /dev/null +++ b/src/sync/manager.rs @@ -0,0 +1,101 @@ +//! SyncManager - Coordinates proactive sync operations +//! +//! The SyncManager spawns connections to configured relays, receives events, +//! validates them through the write policy, and stores accepted events. + +use nostr_relay_builder::prelude::*; +use tokio::sync::mpsc; + +use super::connection::{connect_with_retry, SyncedEvent}; +use super::SYNC_SOURCE_ADDR; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; + +/// Coordinates proactive sync from configured relays +pub struct SyncManager { + /// URL of the relay to sync from + sync_relay_url: String, + /// Database for storing accepted events + database: SharedDatabase, + /// Write policy for validating events + write_policy: Nip34WritePolicy, +} + +impl SyncManager { + /// Create a new SyncManager + pub fn new( + sync_relay_url: String, + database: SharedDatabase, + write_policy: Nip34WritePolicy, + ) -> Self { + Self { + sync_relay_url, + database, + write_policy, + } + } + + /// Run the sync manager + /// + /// This spawns a connection task and processes incoming events. + /// Runs indefinitely until the task is cancelled. + pub async fn run(self) { + tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); + + // Create channel for receiving events from connection + let (tx, mut rx) = mpsc::channel::(100); + + // Spawn connection task with auto-reconnect + let url = self.sync_relay_url.clone(); + tokio::spawn(async move { + connect_with_retry(&url, tx).await; + }); + + // Process incoming events + while let Some(synced_event) = rx.recv().await { + self.process_event(synced_event).await; + } + + tracing::warn!("SyncManager event channel closed, shutting down"); + } + + /// Process a single synced event + async fn process_event(&self, synced_event: SyncedEvent) { + let event = &synced_event.event; + let event_id = event.id.to_hex(); + + tracing::debug!( + "Processing synced event {} (kind {}) from {}", + event_id, + event.kind.as_u16(), + synced_event.source_url + ); + + // Validate through write policy using SYNC_SOURCE_ADDR + let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; + + match result { + PolicyResult::Accept => { + tracing::info!( + "Synced event {} (kind {}) accepted, storing", + event_id, + event.kind.as_u16() + ); + + // Store the event in the database + if let Err(e) = self.database.save_event(event).await { + tracing::error!("Failed to store synced event {}: {}", event_id, e); + } else { + tracing::debug!("Synced event {} stored successfully", event_id); + } + } + PolicyResult::Reject(reason) => { + tracing::info!( + "Synced event {} (kind {}) rejected: {}", + event_id, + event.kind.as_u16(), + reason + ); + } + } + } +} \ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..279471b --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,22 @@ +//! Proactive Sync Module for GRASP-02 +//! +//! This module implements proactive synchronization of kind 30617 (repository state) +//! events from configured relay(s). Events are validated through the same write policy +//! as directly-submitted events. + +mod connection; +mod manager; + +pub use manager::SyncManager; + +use std::net::SocketAddr; + +/// Synthetic source address used for synced events +/// +/// This distinguishes synced events from directly-submitted events in logs and metrics. +/// Uses 127.0.0.2:0 as a recognizable "synced event" marker. +pub const SYNC_SOURCE_ADDR: SocketAddr = + SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0); + +/// Kind for repository state events (NIP-34) +pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file -- cgit v1.2.3