upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/connection.rs143
-rw-r--r--src/sync/manager.rs101
-rw-r--r--src/sync/mod.rs22
3 files changed, 266 insertions, 0 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
new file mode 100644
index 0000000..4a79128
--- /dev/null
+++ b/src/sync/connection.rs
@@ -0,0 +1,143 @@
1//! WebSocket connection handling for sync
2//!
3//! Manages the connection to a source relay, subscribes to kind 30617 events,
4//! and passes them through validation.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use super::KIND_REPOSITORY_STATE;
12
13/// Event received from the sync connection
14#[derive(Debug, Clone)]
15pub struct SyncedEvent {
16 pub event: Event,
17 pub source_url: String,
18}
19
20/// Manages a WebSocket connection to a single relay for syncing
21pub struct SyncConnection {
22 url: String,
23 client: Client,
24}
25
26impl SyncConnection {
27 /// Create a new sync connection to the given relay URL
28 pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
29 let client = Client::default();
30
31 // Add the relay
32 client.add_relay(url).await?;
33
34 // Connect to the relay
35 client.connect().await;
36
37 tracing::info!("Sync connection established to {}", url);
38
39 Ok(Self {
40 url: url.to_string(),
41 client,
42 })
43 }
44
45 /// Start receiving events and send them through the channel
46 ///
47 /// This method runs indefinitely, reconnecting as needed.
48 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) {
49 // Create filter for kind 30617 (repository state) events
50 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE));
51
52 // Subscribe to events
53 match self.client.subscribe(filter, None).await {
54 Ok(output) => {
55 tracing::info!(
56 "Subscribed to kind {} events on {} (subscription: {})",
57 KIND_REPOSITORY_STATE,
58 self.url,
59 output.id()
60 );
61 }
62 Err(e) => {
63 tracing::error!("Failed to subscribe on {}: {}", self.url, e);
64 return;
65 }
66 };
67
68 // Handle incoming notifications
69 let url = self.url.clone();
70 self.client
71 .handle_notifications(|notification| {
72 let tx = tx.clone();
73 let url = url.clone();
74 async move {
75 match notification {
76 RelayPoolNotification::Event { event, .. } => {
77 tracing::debug!(
78 "Received event {} from {} (kind {})",
79 event.id,
80 url,
81 event.kind.as_u16()
82 );
83
84 // Send the event to the manager for processing
85 let synced = SyncedEvent {
86 event: (*event).clone(),
87 source_url: url.clone(),
88 };
89
90 if let Err(e) = tx.send(synced).await {
91 tracing::warn!("Failed to send synced event: {}", e);
92 return Ok(true); // Stop if channel is closed
93 }
94 }
95 RelayPoolNotification::Shutdown => {
96 tracing::warn!("Relay connection shutdown for {}", url);
97 return Ok(true); // Stop on shutdown
98 }
99 RelayPoolNotification::Message { message, .. } => {
100 tracing::trace!("Received message from {}: {:?}", url, message);
101 }
102 }
103 Ok(false) // Continue processing
104 }
105 })
106 .await
107 .ok();
108 }
109
110}
111
112/// Reconnect loop with exponential backoff
113pub async fn connect_with_retry(
114 url: &str,
115 tx: mpsc::Sender<SyncedEvent>,
116) {
117 let mut backoff = Duration::from_secs(1);
118 let max_backoff = Duration::from_secs(60);
119
120 loop {
121 match SyncConnection::new(url).await {
122 Ok(conn) => {
123 backoff = Duration::from_secs(1); // Reset backoff on successful connection
124 conn.run(tx.clone()).await;
125 tracing::warn!("Sync connection to {} ended, will reconnect", url);
126 }
127 Err(e) => {
128 tracing::error!(
129 "Failed to connect to sync relay {}: {} (retrying in {:?})",
130 url,
131 e,
132 backoff
133 );
134 }
135 }
136
137 // Wait before reconnecting
138 tokio::time::sleep(backoff).await;
139
140 // Exponential backoff
141 backoff = std::cmp::min(backoff * 2, max_backoff);
142 }
143} \ No newline at end of file
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
new file mode 100644
index 0000000..8c883f5
--- /dev/null
+++ b/src/sync/manager.rs
@@ -0,0 +1,101 @@
1//! SyncManager - Coordinates proactive sync operations
2//!
3//! The SyncManager spawns connections to configured relays, receives events,
4//! validates them through the write policy, and stores accepted events.
5
6use nostr_relay_builder::prelude::*;
7use tokio::sync::mpsc;
8
9use super::connection::{connect_with_retry, SyncedEvent};
10use super::SYNC_SOURCE_ADDR;
11use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
12
13/// Coordinates proactive sync from configured relays
14pub struct SyncManager {
15 /// URL of the relay to sync from
16 sync_relay_url: String,
17 /// Database for storing accepted events
18 database: SharedDatabase,
19 /// Write policy for validating events
20 write_policy: Nip34WritePolicy,
21}
22
23impl SyncManager {
24 /// Create a new SyncManager
25 pub fn new(
26 sync_relay_url: String,
27 database: SharedDatabase,
28 write_policy: Nip34WritePolicy,
29 ) -> Self {
30 Self {
31 sync_relay_url,
32 database,
33 write_policy,
34 }
35 }
36
37 /// Run the sync manager
38 ///
39 /// This spawns a connection task and processes incoming events.
40 /// Runs indefinitely until the task is cancelled.
41 pub async fn run(self) {
42 tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url);
43
44 // Create channel for receiving events from connection
45 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
46
47 // Spawn connection task with auto-reconnect
48 let url = self.sync_relay_url.clone();
49 tokio::spawn(async move {
50 connect_with_retry(&url, tx).await;
51 });
52
53 // Process incoming events
54 while let Some(synced_event) = rx.recv().await {
55 self.process_event(synced_event).await;
56 }
57
58 tracing::warn!("SyncManager event channel closed, shutting down");
59 }
60
61 /// Process a single synced event
62 async fn process_event(&self, synced_event: SyncedEvent) {
63 let event = &synced_event.event;
64 let event_id = event.id.to_hex();
65
66 tracing::debug!(
67 "Processing synced event {} (kind {}) from {}",
68 event_id,
69 event.kind.as_u16(),
70 synced_event.source_url
71 );
72
73 // Validate through write policy using SYNC_SOURCE_ADDR
74 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
75
76 match result {
77 PolicyResult::Accept => {
78 tracing::info!(
79 "Synced event {} (kind {}) accepted, storing",
80 event_id,
81 event.kind.as_u16()
82 );
83
84 // Store the event in the database
85 if let Err(e) = self.database.save_event(event).await {
86 tracing::error!("Failed to store synced event {}: {}", event_id, e);
87 } else {
88 tracing::debug!("Synced event {} stored successfully", event_id);
89 }
90 }
91 PolicyResult::Reject(reason) => {
92 tracing::info!(
93 "Synced event {} (kind {}) rejected: {}",
94 event_id,
95 event.kind.as_u16(),
96 reason
97 );
98 }
99 }
100 }
101} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
new file mode 100644
index 0000000..279471b
--- /dev/null
+++ b/src/sync/mod.rs
@@ -0,0 +1,22 @@
1//! Proactive Sync Module for GRASP-02
2//!
3//! This module implements proactive synchronization of kind 30617 (repository state)
4//! events from configured relay(s). Events are validated through the same write policy
5//! as directly-submitted events.
6
7mod connection;
8mod manager;
9
10pub use manager::SyncManager;
11
12use std::net::SocketAddr;
13
14/// Synthetic source address used for synced events
15///
16/// This distinguishes synced events from directly-submitted events in logs and metrics.
17/// Uses 127.0.0.2:0 as a recognizable "synced event" marker.
18pub const SYNC_SOURCE_ADDR: SocketAddr =
19 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0);
20
21/// Kind for repository state events (NIP-34)
22pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file