diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 5 | ||||
| -rw-r--r-- | src/http/nip11.rs | 2 | ||||
| -rw-r--r-- | src/lib.rs | 1 | ||||
| -rw-r--r-- | src/main.rs | 16 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 13 | ||||
| -rw-r--r-- | src/sync/connection.rs | 143 | ||||
| -rw-r--r-- | src/sync/manager.rs | 101 | ||||
| -rw-r--r-- | src/sync/mod.rs | 22 |
8 files changed, 297 insertions, 6 deletions
diff --git a/src/config.rs b/src/config.rs index 025e020..a2a27be 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -83,6 +83,10 @@ pub struct Config { | |||
| 83 | /// Number of top bandwidth repos to track in metrics | 83 | /// Number of top bandwidth repos to track in metrics |
| 84 | #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] | 84 | #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] |
| 85 | pub metrics_top_n_repos: usize, | 85 | pub metrics_top_n_repos: usize, |
| 86 | |||
| 87 | /// URL of relay to sync kind 30617 events from (optional, enables proactive sync) | ||
| 88 | #[arg(long, env = "NGIT_SYNC_RELAY_URL")] | ||
| 89 | pub sync_relay_url: Option<String>, | ||
| 86 | } | 90 | } |
| 87 | 91 | ||
| 88 | impl Config { | 92 | impl Config { |
| @@ -138,6 +142,7 @@ impl Config { | |||
| 138 | metrics_enabled: true, | 142 | metrics_enabled: true, |
| 139 | metrics_connection_per_ip_abuse_threshold: 10, | 143 | metrics_connection_per_ip_abuse_threshold: 10, |
| 140 | metrics_top_n_repos: 10, | 144 | metrics_top_n_repos: 10, |
| 145 | sync_relay_url: None, | ||
| 141 | } | 146 | } |
| 142 | } | 147 | } |
| 143 | } | 148 | } |
diff --git a/src/http/nip11.rs b/src/http/nip11.rs index 1723601..e9e1c25 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs | |||
| @@ -105,6 +105,7 @@ mod tests { | |||
| 105 | metrics_enabled: true, | 105 | metrics_enabled: true, |
| 106 | metrics_connection_per_ip_abuse_threshold: 10, | 106 | metrics_connection_per_ip_abuse_threshold: 10, |
| 107 | metrics_top_n_repos: 10, | 107 | metrics_top_n_repos: 10, |
| 108 | sync_relay_url: None, | ||
| 108 | }; | 109 | }; |
| 109 | 110 | ||
| 110 | let doc = RelayInformationDocument::from_config(&config); | 111 | let doc = RelayInformationDocument::from_config(&config); |
| @@ -139,6 +140,7 @@ mod tests { | |||
| 139 | metrics_enabled: true, | 140 | metrics_enabled: true, |
| 140 | metrics_connection_per_ip_abuse_threshold: 10, | 141 | metrics_connection_per_ip_abuse_threshold: 10, |
| 141 | metrics_top_n_repos: 10, | 142 | metrics_top_n_repos: 10, |
| 143 | sync_relay_url: None, | ||
| 142 | }; | 144 | }; |
| 143 | 145 | ||
| 144 | let doc = RelayInformationDocument::from_config(&config); | 146 | let doc = RelayInformationDocument::from_config(&config); |
| @@ -3,3 +3,4 @@ pub mod git; | |||
| 3 | pub mod http; | 3 | pub mod http; |
| 4 | pub mod metrics; | 4 | pub mod metrics; |
| 5 | pub mod nostr; | 5 | pub mod nostr; |
| 6 | pub mod sync; | ||
diff --git a/src/main.rs b/src/main.rs index 9200cc2..21b351f 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -9,6 +9,7 @@ use ngit_grasp::{ | |||
| 9 | http, | 9 | http, |
| 10 | metrics::Metrics, | 10 | metrics::Metrics, |
| 11 | nostr, | 11 | nostr, |
| 12 | sync::SyncManager, | ||
| 12 | }; | 13 | }; |
| 13 | 14 | ||
| 14 | #[tokio::main] | 15 | #[tokio::main] |
| @@ -50,6 +51,21 @@ async fn main() -> Result<()> { | |||
| 50 | config.domain | 51 | config.domain |
| 51 | ); | 52 | ); |
| 52 | 53 | ||
| 54 | // Start SyncManager if sync_relay_url is configured | ||
| 55 | if let Some(ref sync_url) = config.sync_relay_url { | ||
| 56 | info!("Starting proactive sync from: {}", sync_url); | ||
| 57 | let sync_manager = SyncManager::new( | ||
| 58 | sync_url.clone(), | ||
| 59 | relay_with_db.database.clone(), | ||
| 60 | relay_with_db.write_policy.clone(), | ||
| 61 | ); | ||
| 62 | tokio::spawn(async move { | ||
| 63 | sync_manager.run().await; | ||
| 64 | }); | ||
| 65 | } else { | ||
| 66 | info!("Proactive sync disabled (no NGIT_SYNC_RELAY_URL configured)"); | ||
| 67 | } | ||
| 68 | |||
| 53 | // Start HTTP server with integrated relay and database | 69 | // Start HTTP server with integrated relay and database |
| 54 | info!("Starting HTTP server on {}", config.bind_address); | 70 | info!("Starting HTTP server on {}", config.bind_address); |
| 55 | http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?; | 71 | http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?; |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 15ff083..2284c18 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -269,12 +269,14 @@ impl WritePolicy for Nip34WritePolicy { | |||
| 269 | } | 269 | } |
| 270 | } | 270 | } |
| 271 | 271 | ||
| 272 | /// Result of creating a relay - includes both the relay and database | 272 | /// Result of creating a relay - includes relay, database, and write policy |
| 273 | pub struct RelayWithDatabase { | 273 | pub struct RelayWithDatabase { |
| 274 | /// The local relay instance | 274 | /// The local relay instance |
| 275 | pub relay: LocalRelay, | 275 | pub relay: LocalRelay, |
| 276 | /// The database Arc that can be used for direct queries | 276 | /// The database Arc that can be used for direct queries |
| 277 | pub database: SharedDatabase, | 277 | pub database: SharedDatabase, |
| 278 | /// The write policy used for event validation | ||
| 279 | pub write_policy: Nip34WritePolicy, | ||
| 278 | } | 280 | } |
| 279 | 281 | ||
| 280 | /// Create a configured LocalRelay with full GRASP-01 validation | 282 | /// Create a configured LocalRelay with full GRASP-01 validation |
| @@ -330,13 +332,11 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> { | |||
| 330 | // Build relay with GRASP-01 validation | 332 | // Build relay with GRASP-01 validation |
| 331 | // Clone Arc for the write policy so both relay and policy can access the database | 333 | // Clone Arc for the write policy so both relay and policy can access the database |
| 332 | let git_data_path = config.effective_git_data_path(); | 334 | let git_data_path = config.effective_git_data_path(); |
| 335 | let write_policy = Nip34WritePolicy::new(&config.domain, database.clone(), &git_data_path); | ||
| 336 | |||
| 333 | let builder = RelayBuilder::default() | 337 | let builder = RelayBuilder::default() |
| 334 | .database(database.clone()) | 338 | .database(database.clone()) |
| 335 | .write_policy(Nip34WritePolicy::new( | 339 | .write_policy(write_policy.clone()); |
| 336 | &config.domain, | ||
| 337 | database.clone(), | ||
| 338 | &git_data_path, | ||
| 339 | )); | ||
| 340 | 340 | ||
| 341 | tracing::info!( | 341 | tracing::info!( |
| 342 | "Relay configured with GRASP-01 validation for domain: {}", | 342 | "Relay configured with GRASP-01 validation for domain: {}", |
| @@ -346,5 +346,6 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> { | |||
| 346 | Ok(RelayWithDatabase { | 346 | Ok(RelayWithDatabase { |
| 347 | relay: LocalRelay::new(builder), | 347 | relay: LocalRelay::new(builder), |
| 348 | database, | 348 | database, |
| 349 | write_policy, | ||
| 349 | }) | 350 | }) |
| 350 | } \ No newline at end of file | 351 | } \ No newline at end of file |
diff --git a/src/sync/connection.rs b/src/sync/connection.rs new file mode 100644 index 0000000..4a79128 --- /dev/null +++ b/src/sync/connection.rs | |||
| @@ -0,0 +1,143 @@ | |||
| 1 | //! WebSocket connection handling for sync | ||
| 2 | //! | ||
| 3 | //! Manages the connection to a source relay, subscribes to kind 30617 events, | ||
| 4 | //! and passes them through validation. | ||
| 5 | |||
| 6 | use std::time::Duration; | ||
| 7 | |||
| 8 | use nostr_sdk::prelude::*; | ||
| 9 | use tokio::sync::mpsc; | ||
| 10 | |||
| 11 | use super::KIND_REPOSITORY_STATE; | ||
| 12 | |||
| 13 | /// Event received from the sync connection | ||
| 14 | #[derive(Debug, Clone)] | ||
| 15 | pub struct SyncedEvent { | ||
| 16 | pub event: Event, | ||
| 17 | pub source_url: String, | ||
| 18 | } | ||
| 19 | |||
| 20 | /// Manages a WebSocket connection to a single relay for syncing | ||
| 21 | pub struct SyncConnection { | ||
| 22 | url: String, | ||
| 23 | client: Client, | ||
| 24 | } | ||
| 25 | |||
| 26 | impl SyncConnection { | ||
| 27 | /// Create a new sync connection to the given relay URL | ||
| 28 | pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> { | ||
| 29 | let client = Client::default(); | ||
| 30 | |||
| 31 | // Add the relay | ||
| 32 | client.add_relay(url).await?; | ||
| 33 | |||
| 34 | // Connect to the relay | ||
| 35 | client.connect().await; | ||
| 36 | |||
| 37 | tracing::info!("Sync connection established to {}", url); | ||
| 38 | |||
| 39 | Ok(Self { | ||
| 40 | url: url.to_string(), | ||
| 41 | client, | ||
| 42 | }) | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Start receiving events and send them through the channel | ||
| 46 | /// | ||
| 47 | /// This method runs indefinitely, reconnecting as needed. | ||
| 48 | pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { | ||
| 49 | // Create filter for kind 30617 (repository state) events | ||
| 50 | let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE)); | ||
| 51 | |||
| 52 | // Subscribe to events | ||
| 53 | match self.client.subscribe(filter, None).await { | ||
| 54 | Ok(output) => { | ||
| 55 | tracing::info!( | ||
| 56 | "Subscribed to kind {} events on {} (subscription: {})", | ||
| 57 | KIND_REPOSITORY_STATE, | ||
| 58 | self.url, | ||
| 59 | output.id() | ||
| 60 | ); | ||
| 61 | } | ||
| 62 | Err(e) => { | ||
| 63 | tracing::error!("Failed to subscribe on {}: {}", self.url, e); | ||
| 64 | return; | ||
| 65 | } | ||
| 66 | }; | ||
| 67 | |||
| 68 | // Handle incoming notifications | ||
| 69 | let url = self.url.clone(); | ||
| 70 | self.client | ||
| 71 | .handle_notifications(|notification| { | ||
| 72 | let tx = tx.clone(); | ||
| 73 | let url = url.clone(); | ||
| 74 | async move { | ||
| 75 | match notification { | ||
| 76 | RelayPoolNotification::Event { event, .. } => { | ||
| 77 | tracing::debug!( | ||
| 78 | "Received event {} from {} (kind {})", | ||
| 79 | event.id, | ||
| 80 | url, | ||
| 81 | event.kind.as_u16() | ||
| 82 | ); | ||
| 83 | |||
| 84 | // Send the event to the manager for processing | ||
| 85 | let synced = SyncedEvent { | ||
| 86 | event: (*event).clone(), | ||
| 87 | source_url: url.clone(), | ||
| 88 | }; | ||
| 89 | |||
| 90 | if let Err(e) = tx.send(synced).await { | ||
| 91 | tracing::warn!("Failed to send synced event: {}", e); | ||
| 92 | return Ok(true); // Stop if channel is closed | ||
| 93 | } | ||
| 94 | } | ||
| 95 | RelayPoolNotification::Shutdown => { | ||
| 96 | tracing::warn!("Relay connection shutdown for {}", url); | ||
| 97 | return Ok(true); // Stop on shutdown | ||
| 98 | } | ||
| 99 | RelayPoolNotification::Message { message, .. } => { | ||
| 100 | tracing::trace!("Received message from {}: {:?}", url, message); | ||
| 101 | } | ||
| 102 | } | ||
| 103 | Ok(false) // Continue processing | ||
| 104 | } | ||
| 105 | }) | ||
| 106 | .await | ||
| 107 | .ok(); | ||
| 108 | } | ||
| 109 | |||
| 110 | } | ||
| 111 | |||
| 112 | /// Reconnect loop with exponential backoff | ||
| 113 | pub async fn connect_with_retry( | ||
| 114 | url: &str, | ||
| 115 | tx: mpsc::Sender<SyncedEvent>, | ||
| 116 | ) { | ||
| 117 | let mut backoff = Duration::from_secs(1); | ||
| 118 | let max_backoff = Duration::from_secs(60); | ||
| 119 | |||
| 120 | loop { | ||
| 121 | match SyncConnection::new(url).await { | ||
| 122 | Ok(conn) => { | ||
| 123 | backoff = Duration::from_secs(1); // Reset backoff on successful connection | ||
| 124 | conn.run(tx.clone()).await; | ||
| 125 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | ||
| 126 | } | ||
| 127 | Err(e) => { | ||
| 128 | tracing::error!( | ||
| 129 | "Failed to connect to sync relay {}: {} (retrying in {:?})", | ||
| 130 | url, | ||
| 131 | e, | ||
| 132 | backoff | ||
| 133 | ); | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | // Wait before reconnecting | ||
| 138 | tokio::time::sleep(backoff).await; | ||
| 139 | |||
| 140 | // Exponential backoff | ||
| 141 | backoff = std::cmp::min(backoff * 2, max_backoff); | ||
| 142 | } | ||
| 143 | } \ No newline at end of file | ||
diff --git a/src/sync/manager.rs b/src/sync/manager.rs new file mode 100644 index 0000000..8c883f5 --- /dev/null +++ b/src/sync/manager.rs | |||
| @@ -0,0 +1,101 @@ | |||
| 1 | //! SyncManager - Coordinates proactive sync operations | ||
| 2 | //! | ||
| 3 | //! The SyncManager spawns connections to configured relays, receives events, | ||
| 4 | //! validates them through the write policy, and stores accepted events. | ||
| 5 | |||
| 6 | use nostr_relay_builder::prelude::*; | ||
| 7 | use tokio::sync::mpsc; | ||
| 8 | |||
| 9 | use super::connection::{connect_with_retry, SyncedEvent}; | ||
| 10 | use super::SYNC_SOURCE_ADDR; | ||
| 11 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | ||
| 12 | |||
| 13 | /// Coordinates proactive sync from configured relays | ||
| 14 | pub struct SyncManager { | ||
| 15 | /// URL of the relay to sync from | ||
| 16 | sync_relay_url: String, | ||
| 17 | /// Database for storing accepted events | ||
| 18 | database: SharedDatabase, | ||
| 19 | /// Write policy for validating events | ||
| 20 | write_policy: Nip34WritePolicy, | ||
| 21 | } | ||
| 22 | |||
| 23 | impl SyncManager { | ||
| 24 | /// Create a new SyncManager | ||
| 25 | pub fn new( | ||
| 26 | sync_relay_url: String, | ||
| 27 | database: SharedDatabase, | ||
| 28 | write_policy: Nip34WritePolicy, | ||
| 29 | ) -> Self { | ||
| 30 | Self { | ||
| 31 | sync_relay_url, | ||
| 32 | database, | ||
| 33 | write_policy, | ||
| 34 | } | ||
| 35 | } | ||
| 36 | |||
| 37 | /// Run the sync manager | ||
| 38 | /// | ||
| 39 | /// This spawns a connection task and processes incoming events. | ||
| 40 | /// Runs indefinitely until the task is cancelled. | ||
| 41 | pub async fn run(self) { | ||
| 42 | tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); | ||
| 43 | |||
| 44 | // Create channel for receiving events from connection | ||
| 45 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); | ||
| 46 | |||
| 47 | // Spawn connection task with auto-reconnect | ||
| 48 | let url = self.sync_relay_url.clone(); | ||
| 49 | tokio::spawn(async move { | ||
| 50 | connect_with_retry(&url, tx).await; | ||
| 51 | }); | ||
| 52 | |||
| 53 | // Process incoming events | ||
| 54 | while let Some(synced_event) = rx.recv().await { | ||
| 55 | self.process_event(synced_event).await; | ||
| 56 | } | ||
| 57 | |||
| 58 | tracing::warn!("SyncManager event channel closed, shutting down"); | ||
| 59 | } | ||
| 60 | |||
| 61 | /// Process a single synced event | ||
| 62 | async fn process_event(&self, synced_event: SyncedEvent) { | ||
| 63 | let event = &synced_event.event; | ||
| 64 | let event_id = event.id.to_hex(); | ||
| 65 | |||
| 66 | tracing::debug!( | ||
| 67 | "Processing synced event {} (kind {}) from {}", | ||
| 68 | event_id, | ||
| 69 | event.kind.as_u16(), | ||
| 70 | synced_event.source_url | ||
| 71 | ); | ||
| 72 | |||
| 73 | // Validate through write policy using SYNC_SOURCE_ADDR | ||
| 74 | let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; | ||
| 75 | |||
| 76 | match result { | ||
| 77 | PolicyResult::Accept => { | ||
| 78 | tracing::info!( | ||
| 79 | "Synced event {} (kind {}) accepted, storing", | ||
| 80 | event_id, | ||
| 81 | event.kind.as_u16() | ||
| 82 | ); | ||
| 83 | |||
| 84 | // Store the event in the database | ||
| 85 | if let Err(e) = self.database.save_event(event).await { | ||
| 86 | tracing::error!("Failed to store synced event {}: {}", event_id, e); | ||
| 87 | } else { | ||
| 88 | tracing::debug!("Synced event {} stored successfully", event_id); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | PolicyResult::Reject(reason) => { | ||
| 92 | tracing::info!( | ||
| 93 | "Synced event {} (kind {}) rejected: {}", | ||
| 94 | event_id, | ||
| 95 | event.kind.as_u16(), | ||
| 96 | reason | ||
| 97 | ); | ||
| 98 | } | ||
| 99 | } | ||
| 100 | } | ||
| 101 | } \ No newline at end of file | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..279471b --- /dev/null +++ b/src/sync/mod.rs | |||
| @@ -0,0 +1,22 @@ | |||
| 1 | //! Proactive Sync Module for GRASP-02 | ||
| 2 | //! | ||
| 3 | //! This module implements proactive synchronization of kind 30617 (repository state) | ||
| 4 | //! events from configured relay(s). Events are validated through the same write policy | ||
| 5 | //! as directly-submitted events. | ||
| 6 | |||
| 7 | mod connection; | ||
| 8 | mod manager; | ||
| 9 | |||
| 10 | pub use manager::SyncManager; | ||
| 11 | |||
| 12 | use std::net::SocketAddr; | ||
| 13 | |||
| 14 | /// Synthetic source address used for synced events | ||
| 15 | /// | ||
| 16 | /// This distinguishes synced events from directly-submitted events in logs and metrics. | ||
| 17 | /// Uses 127.0.0.2:0 as a recognizable "synced event" marker. | ||
| 18 | pub const SYNC_SOURCE_ADDR: SocketAddr = | ||
| 19 | SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0); | ||
| 20 | |||
| 21 | /// Kind for repository state events (NIP-34) | ||
| 22 | pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file | ||