upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:03:40 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 17:03:40 +0000
commitb167f1b2ae7edbcab95554b5203d22d9e372c8b5 (patch)
tree39b3bb879302cb6a4eaabded4a5d20f7d0d68ffc /src/sync/manager.rs
parentfdbc8895e1e9e712882bd854908295a95e7afcb9 (diff)
feat(sync): Phase 1 MVP - single relay proactive sync
- Add src/sync/ module with SyncManager - Add NGIT_SYNC_RELAY_URL config option - Subscribe to kind 30617 on configured relay - Validate synced events through Nip34WritePolicy - Integration test with two TestRelay instances
Diffstat (limited to 'src/sync/manager.rs')
-rw-r--r--src/sync/manager.rs101
1 files changed, 101 insertions, 0 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
new file mode 100644
index 0000000..8c883f5
--- /dev/null
+++ b/src/sync/manager.rs
@@ -0,0 +1,101 @@
1//! SyncManager - Coordinates proactive sync operations
2//!
3//! The SyncManager spawns connections to configured relays, receives events,
4//! validates them through the write policy, and stores accepted events.
5
6use nostr_relay_builder::prelude::*;
7use tokio::sync::mpsc;
8
9use super::connection::{connect_with_retry, SyncedEvent};
10use super::SYNC_SOURCE_ADDR;
11use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
12
13/// Coordinates proactive sync from configured relays
14pub struct SyncManager {
15 /// URL of the relay to sync from
16 sync_relay_url: String,
17 /// Database for storing accepted events
18 database: SharedDatabase,
19 /// Write policy for validating events
20 write_policy: Nip34WritePolicy,
21}
22
23impl SyncManager {
24 /// Create a new SyncManager
25 pub fn new(
26 sync_relay_url: String,
27 database: SharedDatabase,
28 write_policy: Nip34WritePolicy,
29 ) -> Self {
30 Self {
31 sync_relay_url,
32 database,
33 write_policy,
34 }
35 }
36
37 /// Run the sync manager
38 ///
39 /// This spawns a connection task and processes incoming events.
40 /// Runs indefinitely until the task is cancelled.
41 pub async fn run(self) {
42 tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url);
43
44 // Create channel for receiving events from connection
45 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
46
47 // Spawn connection task with auto-reconnect
48 let url = self.sync_relay_url.clone();
49 tokio::spawn(async move {
50 connect_with_retry(&url, tx).await;
51 });
52
53 // Process incoming events
54 while let Some(synced_event) = rx.recv().await {
55 self.process_event(synced_event).await;
56 }
57
58 tracing::warn!("SyncManager event channel closed, shutting down");
59 }
60
61 /// Process a single synced event
62 async fn process_event(&self, synced_event: SyncedEvent) {
63 let event = &synced_event.event;
64 let event_id = event.id.to_hex();
65
66 tracing::debug!(
67 "Processing synced event {} (kind {}) from {}",
68 event_id,
69 event.kind.as_u16(),
70 synced_event.source_url
71 );
72
73 // Validate through write policy using SYNC_SOURCE_ADDR
74 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
75
76 match result {
77 PolicyResult::Accept => {
78 tracing::info!(
79 "Synced event {} (kind {}) accepted, storing",
80 event_id,
81 event.kind.as_u16()
82 );
83
84 // Store the event in the database
85 if let Err(e) = self.database.save_event(event).await {
86 tracing::error!("Failed to store synced event {}: {}", event_id, e);
87 } else {
88 tracing::debug!("Synced event {} stored successfully", event_id);
89 }
90 }
91 PolicyResult::Reject(reason) => {
92 tracing::info!(
93 "Synced event {} (kind {}) rejected: {}",
94 event_id,
95 event.kind.as_u16(),
96 reason
97 );
98 }
99 }
100 }
101} \ No newline at end of file