diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-05 11:04:00 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-05 11:04:00 +0000 |
| commit | ef7ba7c59b8e0b6369f63b8a46e202693963d92b (patch) | |
| tree | 9a3abe34e41995b02717292050cfb09d4b7d0de1 /src | |
| parent | 83ede29fb2ce563fe53ee4dc62334c03c67026cb (diff) | |
fix basic sync tests
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 5 | ||||
| -rw-r--r-- | src/sync/manager.rs | 31 |
2 files changed, 24 insertions, 12 deletions
diff --git a/src/config.rs b/src/config.rs index 0ca534c..07e67c8 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -103,6 +103,11 @@ pub struct Config { | |||
| 103 | /// Number of days to look back for reconnect catchup (default: 3) | 103 | /// Number of days to look back for reconnect catchup (default: 3) |
| 104 | #[arg(long, env = "NGIT_SYNC_RECONNECT_LOOKBACK_DAYS", default_value_t = 3)] | 104 | #[arg(long, env = "NGIT_SYNC_RECONNECT_LOOKBACK_DAYS", default_value_t = 3)] |
| 105 | pub sync_reconnect_lookback_days: u64, | 105 | pub sync_reconnect_lookback_days: u64, |
| 106 | |||
| 107 | /// Maximum startup jitter in milliseconds for sync connections (default: 10000 = 10 seconds) | ||
| 108 | /// Set to 0 to disable jitter (useful for testing) | ||
| 109 | #[arg(long, env = "NGIT_SYNC_STARTUP_JITTER_MS", default_value_t = 10_000)] | ||
| 110 | pub sync_startup_jitter_ms: u64, | ||
| 106 | } | 111 | } |
| 107 | 112 | ||
| 108 | impl Config { | 113 | impl Config { |
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 97ea81a..6fcfcd7 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -40,8 +40,6 @@ use super::metrics::SyncMetrics; | |||
| 40 | use crate::config::Config; | 40 | use crate::config::Config; |
| 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 42 | 42 | ||
| 43 | /// Maximum startup jitter in milliseconds (10 seconds) | ||
| 44 | const MAX_STARTUP_JITTER_MS: u64 = 10_000; | ||
| 45 | 43 | ||
| 46 | /// Default fallback address for sync source when bind_address cannot be parsed | 44 | /// Default fallback address for sync source when bind_address cannot be parsed |
| 47 | /// | 45 | /// |
| @@ -76,6 +74,8 @@ pub struct SyncManager { | |||
| 76 | metrics: Option<SyncMetrics>, | 74 | metrics: Option<SyncMetrics>, |
| 77 | /// Source address for synced events (derived from config.bind_address) | 75 | /// Source address for synced events (derived from config.bind_address) |
| 78 | sync_source_addr: SocketAddr, | 76 | sync_source_addr: SocketAddr, |
| 77 | /// Maximum startup jitter in milliseconds (from config) | ||
| 78 | startup_jitter_ms: u64, | ||
| 79 | } | 79 | } |
| 80 | 80 | ||
| 81 | impl SyncManager { | 81 | impl SyncManager { |
| @@ -102,6 +102,7 @@ impl SyncManager { | |||
| 102 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 102 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 103 | metrics: None, | 103 | metrics: None, |
| 104 | sync_source_addr: get_sync_source_addr(&config.bind_address), | 104 | sync_source_addr: get_sync_source_addr(&config.bind_address), |
| 105 | startup_jitter_ms: config.sync_startup_jitter_ms, | ||
| 105 | } | 106 | } |
| 106 | } | 107 | } |
| 107 | 108 | ||
| @@ -130,6 +131,7 @@ impl SyncManager { | |||
| 130 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 131 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 131 | metrics: Some(metrics), | 132 | metrics: Some(metrics), |
| 132 | sync_source_addr: get_sync_source_addr(&config.bind_address), | 133 | sync_source_addr: get_sync_source_addr(&config.bind_address), |
| 134 | startup_jitter_ms: config.sync_startup_jitter_ms, | ||
| 133 | } | 135 | } |
| 134 | } | 136 | } |
| 135 | 137 | ||
| @@ -149,6 +151,7 @@ impl SyncManager { | |||
| 149 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), | 151 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), |
| 150 | metrics: None, | 152 | metrics: None, |
| 151 | sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, | 153 | sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, |
| 154 | startup_jitter_ms: 10_000, // Default 10 seconds | ||
| 152 | } | 155 | } |
| 153 | } | 156 | } |
| 154 | 157 | ||
| @@ -265,8 +268,9 @@ impl SyncManager { | |||
| 265 | 268 | ||
| 266 | /// Spawn a connection task for a relay with startup jitter | 269 | /// Spawn a connection task for a relay with startup jitter |
| 267 | /// | 270 | /// |
| 268 | /// Adds a random delay (0-10s) before connecting to prevent thundering herd | 271 | /// Adds a random delay (0 to startup_jitter_ms) before connecting to prevent |
| 269 | /// on startup when multiple relays are configured. | 272 | /// thundering herd on startup when multiple relays are configured. |
| 273 | /// Set startup_jitter_ms to 0 to disable jitter (useful for testing). | ||
| 270 | fn spawn_connection_with_jitter( | 274 | fn spawn_connection_with_jitter( |
| 271 | &self, | 275 | &self, |
| 272 | url: String, | 276 | url: String, |
| @@ -276,16 +280,19 @@ impl SyncManager { | |||
| 276 | let domain = self.relay_domain.clone(); | 280 | let domain = self.relay_domain.clone(); |
| 277 | let health_tracker = self.health_tracker.clone(); | 281 | let health_tracker = self.health_tracker.clone(); |
| 278 | let metrics = self.metrics.clone(); | 282 | let metrics = self.metrics.clone(); |
| 283 | let max_jitter = self.startup_jitter_ms; | ||
| 279 | 284 | ||
| 280 | tokio::spawn(async move { | 285 | tokio::spawn(async move { |
| 281 | // Apply startup jitter | 286 | // Apply startup jitter (if configured) |
| 282 | let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS); | 287 | if max_jitter > 0 { |
| 283 | tracing::debug!( | 288 | let jitter_ms = rand::thread_rng().gen_range(0..max_jitter); |
| 284 | "Applying {}ms startup jitter before connecting to {}", | 289 | tracing::debug!( |
| 285 | jitter_ms, | 290 | "Applying {}ms startup jitter before connecting to {}", |
| 286 | url | 291 | jitter_ms, |
| 287 | ); | 292 | url |
| 288 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | 293 | ); |
| 294 | tokio::time::sleep(Duration::from_millis(jitter_ms)).await; | ||
| 295 | } | ||
| 289 | 296 | ||
| 290 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; | 297 | connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; |
| 291 | }); | 298 | }); |