diff options
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index f594454..3bc190d 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -35,6 +35,7 @@ use tokio::sync::mpsc; | |||
| 35 | use super::connection::{connect_with_retry, SyncedEvent}; | 35 | use super::connection::{connect_with_retry, SyncedEvent}; |
| 36 | use super::filter::FilterService; | 36 | use super::filter::FilterService; |
| 37 | use super::health::RelayHealthTracker; | 37 | use super::health::RelayHealthTracker; |
| 38 | use super::metrics::SyncMetrics; | ||
| 38 | use super::SYNC_SOURCE_ADDR; | 39 | use super::SYNC_SOURCE_ADDR; |
| 39 | use crate::config::Config; | 40 | use crate::config::Config; |
| 40 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| @@ -54,6 +55,8 @@ pub struct SyncManager { | |||
| 54 | write_policy: Nip34WritePolicy, | 55 | write_policy: Nip34WritePolicy, |
| 55 | /// Health tracker for relay connections | 56 | /// Health tracker for relay connections |
| 56 | health_tracker: Arc<RelayHealthTracker>, | 57 | health_tracker: Arc<RelayHealthTracker>, |
| 58 | /// Sync metrics for Prometheus | ||
| 59 | metrics: Option<SyncMetrics>, | ||
| 57 | } | 60 | } |
| 58 | 61 | ||
| 59 | impl SyncManager { | 62 | impl SyncManager { |
| @@ -78,6 +81,34 @@ impl SyncManager { | |||
| 78 | database, | 81 | database, |
| 79 | write_policy, | 82 | write_policy, |
| 80 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 83 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 84 | metrics: None, | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | /// Create a new SyncManager with metrics | ||
| 89 | /// | ||
| 90 | /// # Arguments | ||
| 91 | /// * `initial_relay_url` - Optional initial relay URL from config | ||
| 92 | /// * `relay_domain` - Our relay's domain (used to exclude self from sync) | ||
| 93 | /// * `database` - Shared database for storing events and querying announcements | ||
| 94 | /// * `write_policy` - Write policy for validating synced events | ||
| 95 | /// * `config` - Configuration for health tracking settings | ||
| 96 | /// * `metrics` - Sync metrics for Prometheus | ||
| 97 | pub fn with_metrics( | ||
| 98 | initial_relay_url: Option<String>, | ||
| 99 | relay_domain: String, | ||
| 100 | database: SharedDatabase, | ||
| 101 | write_policy: Nip34WritePolicy, | ||
| 102 | config: &Config, | ||
| 103 | metrics: SyncMetrics, | ||
| 104 | ) -> Self { | ||
| 105 | Self { | ||
| 106 | initial_relay_url, | ||
| 107 | relay_domain, | ||
| 108 | database, | ||
| 109 | write_policy, | ||
| 110 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | ||
| 111 | metrics: Some(metrics), | ||
| 81 | } | 112 | } |
| 82 | } | 113 | } |
| 83 | 114 | ||
| @@ -95,9 +126,20 @@ impl SyncManager { | |||
| 95 | database, | 126 | database, |
| 96 | write_policy, | 127 | write_policy, |
| 97 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), | 128 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), |
| 129 | metrics: None, | ||
| 98 | } | 130 | } |
| 99 | } | 131 | } |
| 100 | 132 | ||
| 133 | /// Set metrics for the sync manager | ||
| 134 | pub fn set_metrics(&mut self, metrics: SyncMetrics) { | ||
| 135 | self.metrics = Some(metrics); | ||
| 136 | } | ||
| 137 | |||
| 138 | /// Get a reference to the metrics | ||
| 139 | pub fn metrics(&self) -> Option<&SyncMetrics> { | ||
| 140 | self.metrics.as_ref() | ||
| 141 | } | ||
| 142 | |||
| 101 | /// Get a reference to the health tracker | 143 | /// Get a reference to the health tracker |
| 102 | pub fn health_tracker(&self) -> Arc<RelayHealthTracker> { | 144 | pub fn health_tracker(&self) -> Arc<RelayHealthTracker> { |
| 103 | self.health_tracker.clone() | 145 | self.health_tracker.clone() |
| @@ -148,6 +190,11 @@ impl SyncManager { | |||
| 148 | } | 190 | } |
| 149 | } | 191 | } |
| 150 | 192 | ||
| 193 | // Record initial tracked relay count | ||
| 194 | if let Some(ref metrics) = self.metrics { | ||
| 195 | metrics.set_tracked_count(active_relays.len() as i64); | ||
| 196 | } | ||
| 197 | |||
| 151 | // Spawn connections with startup jitter to prevent thundering herd | 198 | // Spawn connections with startup jitter to prevent thundering herd |
| 152 | for url in relays_to_connect { | 199 | for url in relays_to_connect { |
| 153 | tracing::info!("Scheduling connection to sync relay: {}", url); | 200 | tracing::info!("Scheduling connection to sync relay: {}", url); |
| @@ -172,6 +219,12 @@ impl SyncManager { | |||
| 172 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { | 219 | if !active_relays.contains(&url) && !self.is_own_relay(&url) { |
| 173 | tracing::info!("Discovered new relay from event, connecting: {}", url); | 220 | tracing::info!("Discovered new relay from event, connecting: {}", url); |
| 174 | active_relays.insert(url.clone()); | 221 | active_relays.insert(url.clone()); |
| 222 | |||
| 223 | // Update tracked relay count | ||
| 224 | if let Some(ref metrics) = self.metrics { | ||
| 225 | metrics.inc_tracked_count(); | ||
| 226 | } | ||
| 227 | |||
| 175 | // New relays discovered during runtime don't need jitter | 228 | // New relays discovered during runtime don't need jitter |
| 176 | self.spawn_connection(url, tx.clone(), filter_service.clone()); | 229 | self.spawn_connection(url, tx.clone(), filter_service.clone()); |
| 177 | } | 230 | } |
| @@ -200,6 +253,7 @@ impl SyncManager { | |||
| 200 | ) { | 253 | ) { |
| 201 | let domain = self.relay_domain.clone(); | 254 | let domain = self.relay_domain.clone(); |
| 202 | let health_tracker = self.health_tracker.clone(); | 255 | let health_tracker = self.health_tracker.clone(); |
| 256 | let metrics = self.metrics.clone(); | ||
| 203 | 257 | ||
| 204 | tokio::spawn(async move { | 258 | tokio::spawn(async move { |
| 205 | // Apply startup jitter | 259 | // Apply startup jitter |
| @@ -211,7 +265,7 @@ impl SyncManager { | |||
| 211 | ); | 265 | ); |
| 212 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | 266 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; |
| 213 | 267 | ||
| 214 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; | 268 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; |
| 215 | }); | 269 | }); |
| 216 | } | 270 | } |
| 217 | 271 | ||
| @@ -226,9 +280,10 @@ impl SyncManager { | |||
| 226 | ) { | 280 | ) { |
| 227 | let domain = self.relay_domain.clone(); | 281 | let domain = self.relay_domain.clone(); |
| 228 | let health_tracker = self.health_tracker.clone(); | 282 | let health_tracker = self.health_tracker.clone(); |
| 283 | let metrics = self.metrics.clone(); | ||
| 229 | 284 | ||
| 230 | tokio::spawn(async move { | 285 | tokio::spawn(async move { |
| 231 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker).await; | 286 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; |
| 232 | }); | 287 | }); |
| 233 | } | 288 | } |
| 234 | 289 | ||