diff options
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs new file mode 100644 index 0000000..8c883f5 --- /dev/null +++ b/src/sync/manager.rs | |||
| @@ -0,0 +1,101 @@ | |||
| 1 | //! SyncManager - Coordinates proactive sync operations | ||
| 2 | //! | ||
| 3 | //! The SyncManager spawns connections to configured relays, receives events, | ||
| 4 | //! validates them through the write policy, and stores accepted events. | ||
| 5 | |||
| 6 | use nostr_relay_builder::prelude::*; | ||
| 7 | use tokio::sync::mpsc; | ||
| 8 | |||
| 9 | use super::connection::{connect_with_retry, SyncedEvent}; | ||
| 10 | use super::SYNC_SOURCE_ADDR; | ||
| 11 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | ||
| 12 | |||
| 13 | /// Coordinates proactive sync from configured relays | ||
| 14 | pub struct SyncManager { | ||
| 15 | /// URL of the relay to sync from | ||
| 16 | sync_relay_url: String, | ||
| 17 | /// Database for storing accepted events | ||
| 18 | database: SharedDatabase, | ||
| 19 | /// Write policy for validating events | ||
| 20 | write_policy: Nip34WritePolicy, | ||
| 21 | } | ||
| 22 | |||
| 23 | impl SyncManager { | ||
| 24 | /// Create a new SyncManager | ||
| 25 | pub fn new( | ||
| 26 | sync_relay_url: String, | ||
| 27 | database: SharedDatabase, | ||
| 28 | write_policy: Nip34WritePolicy, | ||
| 29 | ) -> Self { | ||
| 30 | Self { | ||
| 31 | sync_relay_url, | ||
| 32 | database, | ||
| 33 | write_policy, | ||
| 34 | } | ||
| 35 | } | ||
| 36 | |||
| 37 | /// Run the sync manager | ||
| 38 | /// | ||
| 39 | /// This spawns a connection task and processes incoming events. | ||
| 40 | /// Runs indefinitely until the task is cancelled. | ||
| 41 | pub async fn run(self) { | ||
| 42 | tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url); | ||
| 43 | |||
| 44 | // Create channel for receiving events from connection | ||
| 45 | let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100); | ||
| 46 | |||
| 47 | // Spawn connection task with auto-reconnect | ||
| 48 | let url = self.sync_relay_url.clone(); | ||
| 49 | tokio::spawn(async move { | ||
| 50 | connect_with_retry(&url, tx).await; | ||
| 51 | }); | ||
| 52 | |||
| 53 | // Process incoming events | ||
| 54 | while let Some(synced_event) = rx.recv().await { | ||
| 55 | self.process_event(synced_event).await; | ||
| 56 | } | ||
| 57 | |||
| 58 | tracing::warn!("SyncManager event channel closed, shutting down"); | ||
| 59 | } | ||
| 60 | |||
| 61 | /// Process a single synced event | ||
| 62 | async fn process_event(&self, synced_event: SyncedEvent) { | ||
| 63 | let event = &synced_event.event; | ||
| 64 | let event_id = event.id.to_hex(); | ||
| 65 | |||
| 66 | tracing::debug!( | ||
| 67 | "Processing synced event {} (kind {}) from {}", | ||
| 68 | event_id, | ||
| 69 | event.kind.as_u16(), | ||
| 70 | synced_event.source_url | ||
| 71 | ); | ||
| 72 | |||
| 73 | // Validate through write policy using SYNC_SOURCE_ADDR | ||
| 74 | let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; | ||
| 75 | |||
| 76 | match result { | ||
| 77 | PolicyResult::Accept => { | ||
| 78 | tracing::info!( | ||
| 79 | "Synced event {} (kind {}) accepted, storing", | ||
| 80 | event_id, | ||
| 81 | event.kind.as_u16() | ||
| 82 | ); | ||
| 83 | |||
| 84 | // Store the event in the database | ||
| 85 | if let Err(e) = self.database.save_event(event).await { | ||
| 86 | tracing::error!("Failed to store synced event {}: {}", event_id, e); | ||
| 87 | } else { | ||
| 88 | tracing::debug!("Synced event {} stored successfully", event_id); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | PolicyResult::Reject(reason) => { | ||
| 92 | tracing::info!( | ||
| 93 | "Synced event {} (kind {}) rejected: {}", | ||
| 94 | event_id, | ||
| 95 | event.kind.as_u16(), | ||
| 96 | reason | ||
| 97 | ); | ||
| 98 | } | ||
| 99 | } | ||
| 100 | } | ||
| 101 | } \ No newline at end of file | ||