diff options
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 3bc190d..97ea81a 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -25,6 +25,7 @@ | |||
| 25 | //! - Consolidation when filter count exceeds 150 | 25 | //! - Consolidation when filter count exceeds 150 |
| 26 | 26 | ||
| 27 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| 28 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
| 28 | use std::sync::Arc; | 29 | use std::sync::Arc; |
| 29 | use std::time::Duration; | 30 | use std::time::Duration; |
| 30 | 31 | ||
| @@ -36,13 +37,29 @@ use super::connection::{connect_with_retry, SyncedEvent}; | |||
| 36 | use super::filter::FilterService; | 37 | use super::filter::FilterService; |
| 37 | use super::health::RelayHealthTracker; | 38 | use super::health::RelayHealthTracker; |
| 38 | use super::metrics::SyncMetrics; | 39 | use super::metrics::SyncMetrics; |
| 39 | use super::SYNC_SOURCE_ADDR; | ||
| 40 | use crate::config::Config; | 40 | use crate::config::Config; |
| 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; | 41 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 42 | 42 | ||
| 43 | /// Maximum startup jitter in milliseconds (10 seconds) | 43 | /// Maximum startup jitter in milliseconds (10 seconds) |
| 44 | const MAX_STARTUP_JITTER_MS: u64 = 10_000; | 44 | const MAX_STARTUP_JITTER_MS: u64 = 10_000; |
| 45 | 45 | ||
| 46 | /// Default fallback address for sync source when bind_address cannot be parsed | ||
| 47 | /// | ||
| 48 | /// This distinguishes synced events from directly-submitted events in logs and metrics. | ||
| 49 | /// Uses 127.0.0.1:8080 as a recognizable default "synced event" marker. | ||
| 50 | pub const DEFAULT_SYNC_SOURCE_ADDR: SocketAddr = | ||
| 51 | SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); | ||
| 52 | |||
| 53 | /// Derive sync source address from config bind_address | ||
| 54 | /// | ||
| 55 | /// Parses the bind_address string and returns a SocketAddr. | ||
| 56 | /// Falls back to 127.0.0.1:8080 if parsing fails. | ||
| 57 | fn get_sync_source_addr(bind_address: &str) -> SocketAddr { | ||
| 58 | bind_address | ||
| 59 | .parse() | ||
| 60 | .unwrap_or(DEFAULT_SYNC_SOURCE_ADDR) | ||
| 61 | } | ||
| 62 | |||
| 46 | /// Coordinates proactive sync from configured and discovered relays | 63 | /// Coordinates proactive sync from configured and discovered relays |
| 47 | pub struct SyncManager { | 64 | pub struct SyncManager { |
| 48 | /// Initial relay URL to sync from (from config) | 65 | /// Initial relay URL to sync from (from config) |
| @@ -57,6 +74,8 @@ pub struct SyncManager { | |||
| 57 | health_tracker: Arc<RelayHealthTracker>, | 74 | health_tracker: Arc<RelayHealthTracker>, |
| 58 | /// Sync metrics for Prometheus | 75 | /// Sync metrics for Prometheus |
| 59 | metrics: Option<SyncMetrics>, | 76 | metrics: Option<SyncMetrics>, |
| 77 | /// Source address for synced events (derived from config.bind_address) | ||
| 78 | sync_source_addr: SocketAddr, | ||
| 60 | } | 79 | } |
| 61 | 80 | ||
| 62 | impl SyncManager { | 81 | impl SyncManager { |
| @@ -82,6 +101,7 @@ impl SyncManager { | |||
| 82 | write_policy, | 101 | write_policy, |
| 83 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 102 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 84 | metrics: None, | 103 | metrics: None, |
| 104 | sync_source_addr: get_sync_source_addr(&config.bind_address), | ||
| 85 | } | 105 | } |
| 86 | } | 106 | } |
| 87 | 107 | ||
| @@ -109,6 +129,7 @@ impl SyncManager { | |||
| 109 | write_policy, | 129 | write_policy, |
| 110 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 130 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 111 | metrics: Some(metrics), | 131 | metrics: Some(metrics), |
| 132 | sync_source_addr: get_sync_source_addr(&config.bind_address), | ||
| 112 | } | 133 | } |
| 113 | } | 134 | } |
| 114 | 135 | ||
| @@ -127,6 +148,7 @@ impl SyncManager { | |||
| 127 | write_policy, | 148 | write_policy, |
| 128 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), | 149 | health_tracker: Arc::new(RelayHealthTracker::with_defaults()), |
| 129 | metrics: None, | 150 | metrics: None, |
| 151 | sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, | ||
| 130 | } | 152 | } |
| 131 | } | 153 | } |
| 132 | 154 | ||
| @@ -320,8 +342,8 @@ impl SyncManager { | |||
| 320 | _ => {} | 342 | _ => {} |
| 321 | } | 343 | } |
| 322 | 344 | ||
| 323 | // Validate through write policy using SYNC_SOURCE_ADDR | 345 | // Validate through write policy using sync_source_addr derived from config |
| 324 | let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; | 346 | let result = self.write_policy.admit_event(event, &self.sync_source_addr).await; |
| 325 | 347 | ||
| 326 | match result { | 348 | match result { |
| 327 | PolicyResult::Accept => { | 349 | PolicyResult::Accept => { |