upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:43:49 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:43:49 +0000
commitdd403b17e7c74db9443d0891a9de1f0f0f9f89eb (patch)
tree177dd9f664dde3565492c1d11016dabfeda28bbc /src/sync/manager.rs
parent950c2e4e68448d2abcad90a31bfffaca6d7bc47e (diff)
feat(sync): Phase 6 - observability and production readiness
- Add SyncMetrics with full Prometheus integration - Track sync gaps via catchup events - Update Grafana dashboard with sync panels - Document all sync configuration options - Update design doc with implementation notes
Diffstat (limited to 'src/sync/manager.rs')
-rw-r--r--src/sync/manager.rs59
1 files changed, 57 insertions, 2 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index f594454..3bc190d 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -35,6 +35,7 @@ use tokio::sync::mpsc;
35use super::connection::{connect_with_retry, SyncedEvent}; 35use super::connection::{connect_with_retry, SyncedEvent};
36use super::filter::FilterService; 36use super::filter::FilterService;
37use super::health::RelayHealthTracker; 37use super::health::RelayHealthTracker;
38use super::metrics::SyncMetrics;
38use super::SYNC_SOURCE_ADDR; 39use super::SYNC_SOURCE_ADDR;
39use crate::config::Config; 40use crate::config::Config;
40use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
@@ -54,6 +55,8 @@ pub struct SyncManager {
54 write_policy: Nip34WritePolicy, 55 write_policy: Nip34WritePolicy,
55 /// Health tracker for relay connections 56 /// Health tracker for relay connections
56 health_tracker: Arc<RelayHealthTracker>, 57 health_tracker: Arc<RelayHealthTracker>,
58 /// Sync metrics for Prometheus
59 metrics: Option<SyncMetrics>,
57} 60}
58 61
59impl SyncManager { 62impl SyncManager {
@@ -78,6 +81,34 @@ impl SyncManager {
78 database, 81 database,
79 write_policy, 82 write_policy,
80 health_tracker: Arc::new(RelayHealthTracker::new(config)), 83 health_tracker: Arc::new(RelayHealthTracker::new(config)),
84 metrics: None,
85 }
86 }
87
88 /// Create a new SyncManager with metrics
89 ///
90 /// # Arguments
91 /// * `initial_relay_url` - Optional initial relay URL from config
92 /// * `relay_domain` - Our relay's domain (used to exclude self from sync)
93 /// * `database` - Shared database for storing events and querying announcements
94 /// * `write_policy` - Write policy for validating synced events
95 /// * `config` - Configuration for health tracking settings
96 /// * `metrics` - Sync metrics for Prometheus
97 pub fn with_metrics(
98 initial_relay_url: Option<String>,
99 relay_domain: String,
100 database: SharedDatabase,
101 write_policy: Nip34WritePolicy,
102 config: &Config,
103 metrics: SyncMetrics,
104 ) -> Self {
105 Self {
106 initial_relay_url,
107 relay_domain,
108 database,
109 write_policy,
110 health_tracker: Arc::new(RelayHealthTracker::new(config)),
111 metrics: Some(metrics),
81 } 112 }
82 } 113 }
83 114
@@ -95,9 +126,20 @@ impl SyncManager {
95 database, 126 database,
96 write_policy, 127 write_policy,
97 health_tracker: Arc::new(RelayHealthTracker::with_defaults()), 128 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
129 metrics: None,
98 } 130 }
99 } 131 }
100 132
133 /// Set metrics for the sync manager
134 pub fn set_metrics(&mut self, metrics: SyncMetrics) {
135 self.metrics = Some(metrics);
136 }
137
138 /// Get a reference to the metrics
139 pub fn metrics(&self) -> Option<&SyncMetrics> {
140 self.metrics.as_ref()
141 }
142
101 /// Get a reference to the health tracker 143 /// Get a reference to the health tracker
102 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> { 144 pub fn health_tracker(&self) -> Arc<RelayHealthTracker> {
103 self.health_tracker.clone() 145 self.health_tracker.clone()
@@ -148,6 +190,11 @@ impl SyncManager {
148 } 190 }
149 } 191 }
150 192
193 // Record initial tracked relay count
194 if let Some(ref metrics) = self.metrics {
195 metrics.set_tracked_count(active_relays.len() as i64);
196 }
197
151 // Spawn connections with startup jitter to prevent thundering herd 198 // Spawn connections with startup jitter to prevent thundering herd
152 for url in relays_to_connect { 199 for url in relays_to_connect {
153 tracing::info!("Scheduling connection to sync relay: {}", url); 200 tracing::info!("Scheduling connection to sync relay: {}", url);
@@ -172,6 +219,12 @@ impl SyncManager {
172 if !active_relays.contains(&url) && !self.is_own_relay(&url) { 219 if !active_relays.contains(&url) && !self.is_own_relay(&url) {
173 tracing::info!("Discovered new relay from event, connecting: {}", url); 220 tracing::info!("Discovered new relay from event, connecting: {}", url);
174 active_relays.insert(url.clone()); 221 active_relays.insert(url.clone());
222
223 // Update tracked relay count
224 if let Some(ref metrics) = self.metrics {
225 metrics.inc_tracked_count();
226 }
227
175 // New relays discovered during runtime don't need jitter 228 // New relays discovered during runtime don't need jitter
176 self.spawn_connection(url, tx.clone(), filter_service.clone()); 229 self.spawn_connection(url, tx.clone(), filter_service.clone());
177 } 230 }
@@ -200,6 +253,7 @@ impl SyncManager {
200 ) { 253 ) {
201 let domain = self.relay_domain.clone(); 254 let domain = self.relay_domain.clone();
202 let health_tracker = self.health_tracker.clone(); 255 let health_tracker = self.health_tracker.clone();
256 let metrics = self.metrics.clone();
203 257
204 tokio::spawn(async move { 258 tokio::spawn(async move {
205 // Apply startup jitter 259 // Apply startup jitter
@@ -211,7 +265,7 @@ impl SyncManager {
211 ); 265 );
212 tokio::time::sleep(Duration::from_millis(jitter_ms)).await; 266 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
213 267
214 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; 268 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
215 }); 269 });
216 } 270 }
217 271
@@ -226,9 +280,10 @@ impl SyncManager {
226 ) { 280 ) {
227 let domain = self.relay_domain.clone(); 281 let domain = self.relay_domain.clone();
228 let health_tracker = self.health_tracker.clone(); 282 let health_tracker = self.health_tracker.clone();
283 let metrics = self.metrics.clone();
229 284
230 tokio::spawn(async move { 285 tokio::spawn(async move {
231 connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; 286 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
232 }); 287 });
233 } 288 }
234 289