upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config.rs5
-rw-r--r--src/http/nip11.rs2
-rw-r--r--src/lib.rs1
-rw-r--r--src/main.rs16
-rw-r--r--src/nostr/builder.rs13
-rw-r--r--src/sync/connection.rs143
-rw-r--r--src/sync/manager.rs101
-rw-r--r--src/sync/mod.rs22
8 files changed, 297 insertions, 6 deletions
diff --git a/src/config.rs b/src/config.rs
index 025e020..a2a27be 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -83,6 +83,10 @@ pub struct Config {
83 /// Number of top bandwidth repos to track in metrics 83 /// Number of top bandwidth repos to track in metrics
84 #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] 84 #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)]
85 pub metrics_top_n_repos: usize, 85 pub metrics_top_n_repos: usize,
86
87 /// URL of relay to sync kind 30617 events from (optional, enables proactive sync)
88 #[arg(long, env = "NGIT_SYNC_RELAY_URL")]
89 pub sync_relay_url: Option<String>,
86} 90}
87 91
88impl Config { 92impl Config {
@@ -138,6 +142,7 @@ impl Config {
138 metrics_enabled: true, 142 metrics_enabled: true,
139 metrics_connection_per_ip_abuse_threshold: 10, 143 metrics_connection_per_ip_abuse_threshold: 10,
140 metrics_top_n_repos: 10, 144 metrics_top_n_repos: 10,
145 sync_relay_url: None,
141 } 146 }
142 } 147 }
143} 148}
diff --git a/src/http/nip11.rs b/src/http/nip11.rs
index 1723601..e9e1c25 100644
--- a/src/http/nip11.rs
+++ b/src/http/nip11.rs
@@ -105,6 +105,7 @@ mod tests {
105 metrics_enabled: true, 105 metrics_enabled: true,
106 metrics_connection_per_ip_abuse_threshold: 10, 106 metrics_connection_per_ip_abuse_threshold: 10,
107 metrics_top_n_repos: 10, 107 metrics_top_n_repos: 10,
108 sync_relay_url: None,
108 }; 109 };
109 110
110 let doc = RelayInformationDocument::from_config(&config); 111 let doc = RelayInformationDocument::from_config(&config);
@@ -139,6 +140,7 @@ mod tests {
139 metrics_enabled: true, 140 metrics_enabled: true,
140 metrics_connection_per_ip_abuse_threshold: 10, 141 metrics_connection_per_ip_abuse_threshold: 10,
141 metrics_top_n_repos: 10, 142 metrics_top_n_repos: 10,
143 sync_relay_url: None,
142 }; 144 };
143 145
144 let doc = RelayInformationDocument::from_config(&config); 146 let doc = RelayInformationDocument::from_config(&config);
diff --git a/src/lib.rs b/src/lib.rs
index 4d5aab0..a1306c4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,3 +3,4 @@ pub mod git;
3pub mod http; 3pub mod http;
4pub mod metrics; 4pub mod metrics;
5pub mod nostr; 5pub mod nostr;
6pub mod sync;
diff --git a/src/main.rs b/src/main.rs
index 9200cc2..21b351f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,6 +9,7 @@ use ngit_grasp::{
9 http, 9 http,
10 metrics::Metrics, 10 metrics::Metrics,
11 nostr, 11 nostr,
12 sync::SyncManager,
12}; 13};
13 14
14#[tokio::main] 15#[tokio::main]
@@ -50,6 +51,21 @@ async fn main() -> Result<()> {
50 config.domain 51 config.domain
51 ); 52 );
52 53
54 // Start SyncManager if sync_relay_url is configured
55 if let Some(ref sync_url) = config.sync_relay_url {
56 info!("Starting proactive sync from: {}", sync_url);
57 let sync_manager = SyncManager::new(
58 sync_url.clone(),
59 relay_with_db.database.clone(),
60 relay_with_db.write_policy.clone(),
61 );
62 tokio::spawn(async move {
63 sync_manager.run().await;
64 });
65 } else {
66 info!("Proactive sync disabled (no NGIT_SYNC_RELAY_URL configured)");
67 }
68
53 // Start HTTP server with integrated relay and database 69 // Start HTTP server with integrated relay and database
54 info!("Starting HTTP server on {}", config.bind_address); 70 info!("Starting HTTP server on {}", config.bind_address);
55 http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?; 71 http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?;
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 15ff083..2284c18 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -269,12 +269,14 @@ impl WritePolicy for Nip34WritePolicy {
269 } 269 }
270} 270}
271 271
272/// Result of creating a relay - includes both the relay and database 272/// Result of creating a relay - includes relay, database, and write policy
273pub struct RelayWithDatabase { 273pub struct RelayWithDatabase {
274 /// The local relay instance 274 /// The local relay instance
275 pub relay: LocalRelay, 275 pub relay: LocalRelay,
276 /// The database Arc that can be used for direct queries 276 /// The database Arc that can be used for direct queries
277 pub database: SharedDatabase, 277 pub database: SharedDatabase,
278 /// The write policy used for event validation
279 pub write_policy: Nip34WritePolicy,
278} 280}
279 281
280/// Create a configured LocalRelay with full GRASP-01 validation 282/// Create a configured LocalRelay with full GRASP-01 validation
@@ -330,13 +332,11 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> {
330 // Build relay with GRASP-01 validation 332 // Build relay with GRASP-01 validation
331 // Clone Arc for the write policy so both relay and policy can access the database 333 // Clone Arc for the write policy so both relay and policy can access the database
332 let git_data_path = config.effective_git_data_path(); 334 let git_data_path = config.effective_git_data_path();
335 let write_policy = Nip34WritePolicy::new(&config.domain, database.clone(), &git_data_path);
336
333 let builder = RelayBuilder::default() 337 let builder = RelayBuilder::default()
334 .database(database.clone()) 338 .database(database.clone())
335 .write_policy(Nip34WritePolicy::new( 339 .write_policy(write_policy.clone());
336 &config.domain,
337 database.clone(),
338 &git_data_path,
339 ));
340 340
341 tracing::info!( 341 tracing::info!(
342 "Relay configured with GRASP-01 validation for domain: {}", 342 "Relay configured with GRASP-01 validation for domain: {}",
@@ -346,5 +346,6 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> {
346 Ok(RelayWithDatabase { 346 Ok(RelayWithDatabase {
347 relay: LocalRelay::new(builder), 347 relay: LocalRelay::new(builder),
348 database, 348 database,
349 write_policy,
349 }) 350 })
350} \ No newline at end of file 351} \ No newline at end of file
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
new file mode 100644
index 0000000..4a79128
--- /dev/null
+++ b/src/sync/connection.rs
@@ -0,0 +1,143 @@
1//! WebSocket connection handling for sync
2//!
3//! Manages the connection to a source relay, subscribes to kind 30617 events,
4//! and passes them through validation.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use super::KIND_REPOSITORY_STATE;
12
13/// Event received from the sync connection
14#[derive(Debug, Clone)]
15pub struct SyncedEvent {
16 pub event: Event,
17 pub source_url: String,
18}
19
20/// Manages a WebSocket connection to a single relay for syncing
21pub struct SyncConnection {
22 url: String,
23 client: Client,
24}
25
26impl SyncConnection {
27 /// Create a new sync connection to the given relay URL
28 pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
29 let client = Client::default();
30
31 // Add the relay
32 client.add_relay(url).await?;
33
34 // Connect to the relay
35 client.connect().await;
36
37 tracing::info!("Sync connection established to {}", url);
38
39 Ok(Self {
40 url: url.to_string(),
41 client,
42 })
43 }
44
45 /// Start receiving events and send them through the channel
46 ///
47 /// This method runs indefinitely, reconnecting as needed.
48 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) {
49 // Create filter for kind 30617 (repository state) events
50 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE));
51
52 // Subscribe to events
53 match self.client.subscribe(filter, None).await {
54 Ok(output) => {
55 tracing::info!(
56 "Subscribed to kind {} events on {} (subscription: {})",
57 KIND_REPOSITORY_STATE,
58 self.url,
59 output.id()
60 );
61 }
62 Err(e) => {
63 tracing::error!("Failed to subscribe on {}: {}", self.url, e);
64 return;
65 }
66 };
67
68 // Handle incoming notifications
69 let url = self.url.clone();
70 self.client
71 .handle_notifications(|notification| {
72 let tx = tx.clone();
73 let url = url.clone();
74 async move {
75 match notification {
76 RelayPoolNotification::Event { event, .. } => {
77 tracing::debug!(
78 "Received event {} from {} (kind {})",
79 event.id,
80 url,
81 event.kind.as_u16()
82 );
83
84 // Send the event to the manager for processing
85 let synced = SyncedEvent {
86 event: (*event).clone(),
87 source_url: url.clone(),
88 };
89
90 if let Err(e) = tx.send(synced).await {
91 tracing::warn!("Failed to send synced event: {}", e);
92 return Ok(true); // Stop if channel is closed
93 }
94 }
95 RelayPoolNotification::Shutdown => {
96 tracing::warn!("Relay connection shutdown for {}", url);
97 return Ok(true); // Stop on shutdown
98 }
99 RelayPoolNotification::Message { message, .. } => {
100 tracing::trace!("Received message from {}: {:?}", url, message);
101 }
102 }
103 Ok(false) // Continue processing
104 }
105 })
106 .await
107 .ok();
108 }
109
110}
111
112/// Reconnect loop with exponential backoff
113pub async fn connect_with_retry(
114 url: &str,
115 tx: mpsc::Sender<SyncedEvent>,
116) {
117 let mut backoff = Duration::from_secs(1);
118 let max_backoff = Duration::from_secs(60);
119
120 loop {
121 match SyncConnection::new(url).await {
122 Ok(conn) => {
123 backoff = Duration::from_secs(1); // Reset backoff on successful connection
124 conn.run(tx.clone()).await;
125 tracing::warn!("Sync connection to {} ended, will reconnect", url);
126 }
127 Err(e) => {
128 tracing::error!(
129 "Failed to connect to sync relay {}: {} (retrying in {:?})",
130 url,
131 e,
132 backoff
133 );
134 }
135 }
136
137 // Wait before reconnecting
138 tokio::time::sleep(backoff).await;
139
140 // Exponential backoff
141 backoff = std::cmp::min(backoff * 2, max_backoff);
142 }
143} \ No newline at end of file
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
new file mode 100644
index 0000000..8c883f5
--- /dev/null
+++ b/src/sync/manager.rs
@@ -0,0 +1,101 @@
1//! SyncManager - Coordinates proactive sync operations
2//!
3//! The SyncManager spawns connections to configured relays, receives events,
4//! validates them through the write policy, and stores accepted events.
5
6use nostr_relay_builder::prelude::*;
7use tokio::sync::mpsc;
8
9use super::connection::{connect_with_retry, SyncedEvent};
10use super::SYNC_SOURCE_ADDR;
11use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
12
13/// Coordinates proactive sync from configured relays
14pub struct SyncManager {
15 /// URL of the relay to sync from
16 sync_relay_url: String,
17 /// Database for storing accepted events
18 database: SharedDatabase,
19 /// Write policy for validating events
20 write_policy: Nip34WritePolicy,
21}
22
23impl SyncManager {
24 /// Create a new SyncManager
25 pub fn new(
26 sync_relay_url: String,
27 database: SharedDatabase,
28 write_policy: Nip34WritePolicy,
29 ) -> Self {
30 Self {
31 sync_relay_url,
32 database,
33 write_policy,
34 }
35 }
36
37 /// Run the sync manager
38 ///
39 /// This spawns a connection task and processes incoming events.
40 /// Runs indefinitely until the task is cancelled.
41 pub async fn run(self) {
42 tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url);
43
44 // Create channel for receiving events from connection
45 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
46
47 // Spawn connection task with auto-reconnect
48 let url = self.sync_relay_url.clone();
49 tokio::spawn(async move {
50 connect_with_retry(&url, tx).await;
51 });
52
53 // Process incoming events
54 while let Some(synced_event) = rx.recv().await {
55 self.process_event(synced_event).await;
56 }
57
58 tracing::warn!("SyncManager event channel closed, shutting down");
59 }
60
61 /// Process a single synced event
62 async fn process_event(&self, synced_event: SyncedEvent) {
63 let event = &synced_event.event;
64 let event_id = event.id.to_hex();
65
66 tracing::debug!(
67 "Processing synced event {} (kind {}) from {}",
68 event_id,
69 event.kind.as_u16(),
70 synced_event.source_url
71 );
72
73 // Validate through write policy using SYNC_SOURCE_ADDR
74 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
75
76 match result {
77 PolicyResult::Accept => {
78 tracing::info!(
79 "Synced event {} (kind {}) accepted, storing",
80 event_id,
81 event.kind.as_u16()
82 );
83
84 // Store the event in the database
85 if let Err(e) = self.database.save_event(event).await {
86 tracing::error!("Failed to store synced event {}: {}", event_id, e);
87 } else {
88 tracing::debug!("Synced event {} stored successfully", event_id);
89 }
90 }
91 PolicyResult::Reject(reason) => {
92 tracing::info!(
93 "Synced event {} (kind {}) rejected: {}",
94 event_id,
95 event.kind.as_u16(),
96 reason
97 );
98 }
99 }
100 }
101} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
new file mode 100644
index 0000000..279471b
--- /dev/null
+++ b/src/sync/mod.rs
@@ -0,0 +1,22 @@
1//! Proactive Sync Module for GRASP-02
2//!
3//! This module implements proactive synchronization of kind 30617 (repository state)
4//! events from configured relay(s). Events are validated through the same write policy
5//! as directly-submitted events.
6
7mod connection;
8mod manager;
9
10pub use manager::SyncManager;
11
12use std::net::SocketAddr;
13
14/// Synthetic source address used for synced events
15///
16/// This distinguishes synced events from directly-submitted events in logs and metrics.
17/// Uses 127.0.0.2:0 as a recognizable "synced event" marker.
18pub const SYNC_SOURCE_ADDR: SocketAddr =
19 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0);
20
21/// Kind for repository state events (NIP-34)
22pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file