1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
//! SyncManager - Coordinates proactive sync operations
//!
//! The SyncManager spawns connections to configured relays, receives events,
//! validates them through the write policy, and stores accepted events.
use nostr_relay_builder::prelude::*;
use tokio::sync::mpsc;
use super::connection::{connect_with_retry, SyncedEvent};
use super::SYNC_SOURCE_ADDR;
use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
/// Coordinates proactive sync from configured relays
pub struct SyncManager {
/// URL of the relay to sync from
sync_relay_url: String,
/// Database for storing accepted events
database: SharedDatabase,
/// Write policy for validating events
write_policy: Nip34WritePolicy,
}
impl SyncManager {
/// Create a new SyncManager
pub fn new(
sync_relay_url: String,
database: SharedDatabase,
write_policy: Nip34WritePolicy,
) -> Self {
Self {
sync_relay_url,
database,
write_policy,
}
}
/// Run the sync manager
///
/// This spawns a connection task and processes incoming events.
/// Runs indefinitely until the task is cancelled.
pub async fn run(self) {
tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url);
// Create channel for receiving events from connection
let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
// Spawn connection task with auto-reconnect
let url = self.sync_relay_url.clone();
tokio::spawn(async move {
connect_with_retry(&url, tx).await;
});
// Process incoming events
while let Some(synced_event) = rx.recv().await {
self.process_event(synced_event).await;
}
tracing::warn!("SyncManager event channel closed, shutting down");
}
/// Process a single synced event
async fn process_event(&self, synced_event: SyncedEvent) {
let event = &synced_event.event;
let event_id = event.id.to_hex();
tracing::debug!(
"Processing synced event {} (kind {}) from {}",
event_id,
event.kind.as_u16(),
synced_event.source_url
);
// Validate through write policy using SYNC_SOURCE_ADDR
let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
match result {
PolicyResult::Accept => {
tracing::info!(
"Synced event {} (kind {}) accepted, storing",
event_id,
event.kind.as_u16()
);
// Store the event in the database
if let Err(e) = self.database.save_event(event).await {
tracing::error!("Failed to store synced event {}: {}", event_id, e);
} else {
tracing::debug!("Synced event {} stored successfully", event_id);
}
}
PolicyResult::Reject(reason) => {
tracing::info!(
"Synced event {} (kind {}) rejected: {}",
event_id,
event.kind.as_u16(),
reason
);
}
}
}
}
|