diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 16:53:03 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 16:53:03 +0000 |
| commit | 2a9160836bb87fdea3ae891563b0169c68d1c2ab (patch) | |
| tree | 583c890687beaf7f380fc0be131bdf17485f06fa /src | |
| parent | 52489d3b1a7d79e164b4cc901b53fd06c05ce1b1 (diff) | |
fix: resolve all fmt and clippy warnings
Main lib (src/):
- Add #[allow(dead_code)] for build_info field (stored to prevent Prometheus unregistration)
- Add #[allow(dead_code)] for first_seen field (reserved for future rate limiting)
- Replace .or_insert_with(RelaySyncNeeds::default) with .or_default()
- Replace manual div_ceil implementations with .div_ceil(100)
Test code (tests/):
- Replace .expect(&format!(...)) with .unwrap_or_else(|_| panic!(...))
- Remove needless borrows in fetch_metrics() calls
- Add #[allow(dead_code)] and #[allow(unused_imports)] to test helpers module
grasp-audit:
- Apply cargo fmt to fix formatting
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 18 | ||||
| -rw-r--r-- | src/http/landing.rs | 5 | ||||
| -rw-r--r-- | src/http/mod.rs | 8 | ||||
| -rw-r--r-- | src/main.rs | 9 | ||||
| -rw-r--r-- | src/metrics/bandwidth.rs | 13 | ||||
| -rw-r--r-- | src/metrics/connection.rs | 58 | ||||
| -rw-r--r-- | src/metrics/mod.rs | 127 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 22 | ||||
| -rw-r--r-- | src/nostr/policy/announcement.rs | 7 | ||||
| -rw-r--r-- | src/nostr/policy/mod.rs | 3 | ||||
| -rw-r--r-- | src/nostr/policy/pr_event.rs | 2 | ||||
| -rw-r--r-- | src/nostr/policy/related.rs | 7 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 7 | ||||
| -rw-r--r-- | src/sync/algorithms.rs | 6 | ||||
| -rw-r--r-- | src/sync/filters.rs | 2 | ||||
| -rw-r--r-- | src/sync/health.rs | 19 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 10 | ||||
| -rw-r--r-- | src/sync/mod.rs | 8 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 24 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 42 |
20 files changed, 238 insertions, 159 deletions
diff --git a/src/config.rs b/src/config.rs index 8c6de05..7834a3f 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -77,11 +77,19 @@ pub struct Config { | |||
| 77 | pub metrics_enabled: bool, | 77 | pub metrics_enabled: bool, |
| 78 | 78 | ||
| 79 | /// Connections per IP before flagging as potential abuse in metrics (display only, no rate limiting) | 79 | /// Connections per IP before flagging as potential abuse in metrics (display only, no rate limiting) |
| 80 | #[arg(long = "metrics-connection-per-ip-abuse-threshold", env = "NGIT_METRICS_CONNECTION_PER_IP_ABUSE_THRESHOLD", default_value_t = 10)] | 80 | #[arg( |
| 81 | long = "metrics-connection-per-ip-abuse-threshold", | ||
| 82 | env = "NGIT_METRICS_CONNECTION_PER_IP_ABUSE_THRESHOLD", | ||
| 83 | default_value_t = 10 | ||
| 84 | )] | ||
| 81 | pub metrics_connection_per_ip_abuse_threshold: u32, | 85 | pub metrics_connection_per_ip_abuse_threshold: u32, |
| 82 | 86 | ||
| 83 | /// Number of top bandwidth repos to track in metrics | 87 | /// Number of top bandwidth repos to track in metrics |
| 84 | #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] | 88 | #[arg( |
| 89 | long = "metrics-top-n-repos", | ||
| 90 | env = "NGIT_METRICS_TOP_N_REPOS", | ||
| 91 | default_value_t = 10 | ||
| 92 | )] | ||
| 85 | pub metrics_top_n_repos: usize, | 93 | pub metrics_top_n_repos: usize, |
| 86 | 94 | ||
| 87 | /// URL of bootstrap relay to sync from on startup (optional) | 95 | /// URL of bootstrap relay to sync from on startup (optional) |
| @@ -95,7 +103,11 @@ pub struct Config { | |||
| 95 | 103 | ||
| 96 | /// Interval in seconds for checking disconnected relays and attempting reconnection (default: 60) | 104 | /// Interval in seconds for checking disconnected relays and attempting reconnection (default: 60) |
| 97 | /// Set to lower value for faster reconnection testing | 105 | /// Set to lower value for faster reconnection testing |
| 98 | #[arg(long, env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", default_value_t = 60)] | 106 | #[arg( |
| 107 | long, | ||
| 108 | env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", | ||
| 109 | default_value_t = 60 | ||
| 110 | )] | ||
| 99 | pub sync_disconnect_check_interval_secs: u64, | 111 | pub sync_disconnect_check_interval_secs: u64, |
| 100 | 112 | ||
| 101 | /// Base backoff time in seconds for relay reconnection (default: 5) | 113 | /// Base backoff time in seconds for relay reconnection (default: 5) |
diff --git a/src/http/landing.rs b/src/http/landing.rs index 8ab4a68..5fc1e6e 100644 --- a/src/http/landing.rs +++ b/src/http/landing.rs | |||
| @@ -341,10 +341,7 @@ fn generate_hero_tags(nip11: &RelayInformationDocument) -> String { | |||
| 341 | 341 | ||
| 342 | // Add GRASP tags | 342 | // Add GRASP tags |
| 343 | for grasp in &nip11.supported_grasps { | 343 | for grasp in &nip11.supported_grasps { |
| 344 | html.push_str(&format!( | 344 | html.push_str(&format!(r#"<span class="tag tag-grasp">{}</span>"#, grasp)); |
| 345 | r#"<span class="tag tag-grasp">{}</span>"#, | ||
| 346 | grasp | ||
| 347 | )); | ||
| 348 | html.push('\n'); | 345 | html.push('\n'); |
| 349 | } | 346 | } |
| 350 | 347 | ||
diff --git a/src/http/mod.rs b/src/http/mod.rs index f584e03..91a6067 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs | |||
| @@ -509,7 +509,13 @@ pub async fn run_server( | |||
| 509 | loop { | 509 | loop { |
| 510 | let (socket, addr) = listener.accept().await?; | 510 | let (socket, addr) = listener.accept().await?; |
| 511 | let io = TokioIo::new(socket); | 511 | let io = TokioIo::new(socket); |
| 512 | let service = HttpService::new(relay.clone(), config.clone(), addr, database.clone(), metrics.clone()); | 512 | let service = HttpService::new( |
| 513 | relay.clone(), | ||
| 514 | config.clone(), | ||
| 515 | addr, | ||
| 516 | database.clone(), | ||
| 517 | metrics.clone(), | ||
| 518 | ); | ||
| 513 | 519 | ||
| 514 | tokio::spawn(async move { | 520 | tokio::spawn(async move { |
| 515 | if let Err(e) = http1::Builder::new() | 521 | if let Err(e) = http1::Builder::new() |
diff --git a/src/main.rs b/src/main.rs index 97a14eb..6d8b4dd 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -37,7 +37,9 @@ async fn main() -> Result<()> { | |||
| 37 | // Initialize metrics if enabled | 37 | // Initialize metrics if enabled |
| 38 | let metrics = if config.metrics_enabled { | 38 | let metrics = if config.metrics_enabled { |
| 39 | info!("Metrics enabled on /metrics endpoint"); | 39 | info!("Metrics enabled on /metrics endpoint"); |
| 40 | Some(Arc::new(Metrics::new(config.metrics_connection_per_ip_abuse_threshold))) | 40 | Some(Arc::new(Metrics::new( |
| 41 | config.metrics_connection_per_ip_abuse_threshold, | ||
| 42 | ))) | ||
| 41 | } else { | 43 | } else { |
| 42 | info!("Metrics disabled"); | 44 | info!("Metrics disabled"); |
| 43 | None | 45 | None |
| @@ -65,7 +67,10 @@ async fn main() -> Result<()> { | |||
| 65 | ); | 67 | ); |
| 66 | 68 | ||
| 67 | if config.sync_bootstrap_relay_url.is_some() { | 69 | if config.sync_bootstrap_relay_url.is_some() { |
| 68 | info!("Starting proactive sync with bootstrap relay: {:?}", config.sync_bootstrap_relay_url); | 70 | info!( |
| 71 | "Starting proactive sync with bootstrap relay: {:?}", | ||
| 72 | config.sync_bootstrap_relay_url | ||
| 73 | ); | ||
| 69 | } else { | 74 | } else { |
| 70 | info!("Proactive sync enabled (will discover relays from stored announcements)"); | 75 | info!("Proactive sync enabled (will discover relays from stored announcements)"); |
| 71 | } | 76 | } |
diff --git a/src/metrics/bandwidth.rs b/src/metrics/bandwidth.rs index d2c53e8..d51af12 100644 --- a/src/metrics/bandwidth.rs +++ b/src/metrics/bandwidth.rs | |||
| @@ -80,7 +80,9 @@ impl BandwidthTracker { | |||
| 80 | &["repo"], | 80 | &["repo"], |
| 81 | ) | 81 | ) |
| 82 | .unwrap(); | 82 | .unwrap(); |
| 83 | registry.register(Box::new(top_repos_gauge.clone())).unwrap(); | 83 | registry |
| 84 | .register(Box::new(top_repos_gauge.clone())) | ||
| 85 | .unwrap(); | ||
| 84 | 86 | ||
| 85 | Self { | 87 | Self { |
| 86 | all_repos: DashMap::new(), | 88 | all_repos: DashMap::new(), |
| @@ -120,7 +122,12 @@ impl BandwidthTracker { | |||
| 120 | // Try to update the timestamp atomically to prevent concurrent refreshes | 122 | // Try to update the timestamp atomically to prevent concurrent refreshes |
| 121 | if self | 123 | if self |
| 122 | .last_refresh_nanos | 124 | .last_refresh_nanos |
| 123 | .compare_exchange(last_refresh, elapsed_nanos, Ordering::SeqCst, Ordering::Relaxed) | 125 | .compare_exchange( |
| 126 | last_refresh, | ||
| 127 | elapsed_nanos, | ||
| 128 | Ordering::SeqCst, | ||
| 129 | Ordering::Relaxed, | ||
| 130 | ) | ||
| 124 | .is_ok() | 131 | .is_ok() |
| 125 | { | 132 | { |
| 126 | self.refresh_top_n(); | 133 | self.refresh_top_n(); |
| @@ -298,4 +305,4 @@ mod tests { | |||
| 298 | // Refresh should not panic on empty data | 305 | // Refresh should not panic on empty data |
| 299 | tracker.refresh_top_n(); | 306 | tracker.refresh_top_n(); |
| 300 | } | 307 | } |
| 301 | } \ No newline at end of file | 308 | } |
diff --git a/src/metrics/connection.rs b/src/metrics/connection.rs index 6a7f406..2d42081 100644 --- a/src/metrics/connection.rs +++ b/src/metrics/connection.rs | |||
| @@ -25,7 +25,8 @@ use tracing::warn; | |||
| 25 | struct ConnectionInfo { | 25 | struct ConnectionInfo { |
| 26 | /// Number of active connections from this IP | 26 | /// Number of active connections from this IP |
| 27 | count: u32, | 27 | count: u32, |
| 28 | /// When the first connection from this IP was established | 28 | /// When the first connection from this IP was established (for future rate limiting) |
| 29 | #[allow(dead_code)] | ||
| 29 | first_seen: Instant, | 30 | first_seen: Instant, |
| 30 | /// Whether this IP has been flagged as potentially abusive | 31 | /// Whether this IP has been flagged as potentially abusive |
| 31 | flagged_as_abuse: bool, | 32 | flagged_as_abuse: bool, |
| @@ -48,16 +49,16 @@ struct ConnectionInfo { | |||
| 48 | pub struct ConnectionTracker { | 49 | pub struct ConnectionTracker { |
| 49 | /// Active connections per IP (INTERNAL ONLY - never exposed to metrics) | 50 | /// Active connections per IP (INTERNAL ONLY - never exposed to metrics) |
| 50 | connections: DashMap<IpAddr, ConnectionInfo>, | 51 | connections: DashMap<IpAddr, ConnectionInfo>, |
| 51 | 52 | ||
| 52 | /// Threshold for abuse flagging (connections per IP) | 53 | /// Threshold for abuse flagging (connections per IP) |
| 53 | abuse_threshold: u32, | 54 | abuse_threshold: u32, |
| 54 | 55 | ||
| 55 | /// Prometheus gauge: total active connections | 56 | /// Prometheus gauge: total active connections |
| 56 | active_connections: IntGauge, | 57 | active_connections: IntGauge, |
| 57 | 58 | ||
| 58 | /// Prometheus gauge: number of unique IPs connected | 59 | /// Prometheus gauge: number of unique IPs connected |
| 59 | unique_ips: IntGauge, | 60 | unique_ips: IntGauge, |
| 60 | 61 | ||
| 61 | /// Prometheus gauge: number of IPs flagged as potential abusers | 62 | /// Prometheus gauge: number of IPs flagged as potential abusers |
| 62 | flagged_abusers: IntGauge, | 63 | flagged_abusers: IntGauge, |
| 63 | } | 64 | } |
| @@ -70,29 +71,30 @@ impl ConnectionTracker { | |||
| 70 | /// * `abuse_threshold` - Number of connections from a single IP before flagging | 71 | /// * `abuse_threshold` - Number of connections from a single IP before flagging |
| 71 | /// * `registry` - Prometheus registry to register metrics with | 72 | /// * `registry` - Prometheus registry to register metrics with |
| 72 | pub fn new(abuse_threshold: u32, registry: &Registry) -> Self { | 73 | pub fn new(abuse_threshold: u32, registry: &Registry) -> Self { |
| 73 | let active_connections = IntGauge::with_opts( | 74 | let active_connections = IntGauge::with_opts(Opts::new( |
| 74 | Opts::new( | 75 | "ngit_websocket_connections_active", |
| 75 | "ngit_websocket_connections_active", | 76 | "Current active WebSocket connections", |
| 76 | "Current active WebSocket connections", | 77 | )) |
| 77 | ) | 78 | .unwrap(); |
| 78 | ).unwrap(); | 79 | registry |
| 79 | registry.register(Box::new(active_connections.clone())).unwrap(); | 80 | .register(Box::new(active_connections.clone())) |
| 80 | 81 | .unwrap(); | |
| 81 | let unique_ips = IntGauge::with_opts( | 82 | |
| 82 | Opts::new( | 83 | let unique_ips = IntGauge::with_opts(Opts::new( |
| 83 | "ngit_websocket_unique_ips", | 84 | "ngit_websocket_unique_ips", |
| 84 | "Number of unique IP addresses connected (NOT the IPs themselves)", | 85 | "Number of unique IP addresses connected (NOT the IPs themselves)", |
| 85 | ) | 86 | )) |
| 86 | ).unwrap(); | 87 | .unwrap(); |
| 87 | registry.register(Box::new(unique_ips.clone())).unwrap(); | 88 | registry.register(Box::new(unique_ips.clone())).unwrap(); |
| 88 | 89 | ||
| 89 | let flagged_abusers = IntGauge::with_opts( | 90 | let flagged_abusers = IntGauge::with_opts(Opts::new( |
| 90 | Opts::new( | 91 | "ngit_websocket_flagged_abusers", |
| 91 | "ngit_websocket_flagged_abusers", | 92 | "Number of IPs exceeding connection threshold", |
| 92 | "Number of IPs exceeding connection threshold", | 93 | )) |
| 93 | ) | 94 | .unwrap(); |
| 94 | ).unwrap(); | 95 | registry |
| 95 | registry.register(Box::new(flagged_abusers.clone())).unwrap(); | 96 | .register(Box::new(flagged_abusers.clone())) |
| 97 | .unwrap(); | ||
| 96 | 98 | ||
| 97 | Self { | 99 | Self { |
| 98 | connections: DashMap::new(), | 100 | connections: DashMap::new(), |
| @@ -140,7 +142,7 @@ impl ConnectionTracker { | |||
| 140 | 142 | ||
| 141 | // Update Prometheus metrics (aggregate counts only) | 143 | // Update Prometheus metrics (aggregate counts only) |
| 142 | self.active_connections.inc(); | 144 | self.active_connections.inc(); |
| 143 | 145 | ||
| 144 | if is_new_ip { | 146 | if is_new_ip { |
| 145 | self.unique_ips.inc(); | 147 | self.unique_ips.inc(); |
| 146 | } | 148 | } |
| @@ -334,4 +336,4 @@ mod tests { | |||
| 334 | assert_eq!(tracker.active_connections(), 0); | 336 | assert_eq!(tracker.active_connections(), 0); |
| 335 | assert_eq!(tracker.unique_ip_count(), 0); | 337 | assert_eq!(tracker.unique_ip_count(), 0); |
| 336 | } | 338 | } |
| 337 | } \ No newline at end of file | 339 | } |
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 736414f..5420dfd 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs | |||
| @@ -87,7 +87,8 @@ struct MetricsInner { | |||
| 87 | // === System Health Metrics === | 87 | // === System Health Metrics === |
| 88 | /// Server start time for uptime calculation | 88 | /// Server start time for uptime calculation |
| 89 | pub start_time: Instant, | 89 | pub start_time: Instant, |
| 90 | /// Build information gauge | 90 | /// Build information gauge (stored to prevent unregistration from Prometheus) |
| 91 | #[allow(dead_code)] | ||
| 91 | pub build_info: GaugeVec, | 92 | pub build_info: GaugeVec, |
| 92 | } | 93 | } |
| 93 | 94 | ||
| @@ -158,7 +159,10 @@ impl Metrics { | |||
| 158 | 159 | ||
| 159 | /// Start timing a git operation, returns a timer | 160 | /// Start timing a git operation, returns a timer |
| 160 | pub fn start_git_operation_timer(&self, operation: &str) -> GitOperationTimer { | 161 | pub fn start_git_operation_timer(&self, operation: &str) -> GitOperationTimer { |
| 161 | GitOperationTimer::new(self.inner.git_operation_duration.clone(), operation.to_string()) | 162 | GitOperationTimer::new( |
| 163 | self.inner.git_operation_duration.clone(), | ||
| 164 | operation.to_string(), | ||
| 165 | ) | ||
| 162 | } | 166 | } |
| 163 | 167 | ||
| 164 | /// Record bytes transferred for a git operation | 168 | /// Record bytes transferred for a git operation |
| @@ -266,13 +270,14 @@ impl MetricsInner { | |||
| 266 | } | 270 | } |
| 267 | 271 | ||
| 268 | // WebSocket metrics | 272 | // WebSocket metrics |
| 269 | let websocket_connections_total = Counter::with_opts( | 273 | let websocket_connections_total = Counter::with_opts(Opts::new( |
| 270 | Opts::new( | 274 | "ngit_websocket_connections_total", |
| 271 | "ngit_websocket_connections_total", | 275 | "Total WebSocket connections since startup", |
| 272 | "Total WebSocket connections since startup", | 276 | )) |
| 273 | ) | 277 | .unwrap(); |
| 274 | ).unwrap(); | 278 | REGISTRY |
| 275 | REGISTRY.register(Box::new(websocket_connections_total.clone())).unwrap(); | 279 | .register(Box::new(websocket_connections_total.clone())) |
| 280 | .unwrap(); | ||
| 276 | 281 | ||
| 277 | let websocket_connection_duration = Histogram::with_opts( | 282 | let websocket_connection_duration = Histogram::with_opts( |
| 278 | HistogramOpts::new( | 283 | HistogramOpts::new( |
| @@ -280,8 +285,11 @@ impl MetricsInner { | |||
| 280 | "Duration of WebSocket connections", | 285 | "Duration of WebSocket connections", |
| 281 | ) | 286 | ) |
| 282 | .buckets(vec![1.0, 5.0, 15.0, 30.0, 60.0, 300.0, 900.0, 3600.0]), | 287 | .buckets(vec![1.0, 5.0, 15.0, 30.0, 60.0, 300.0, 900.0, 3600.0]), |
| 283 | ).unwrap(); | 288 | ) |
| 284 | REGISTRY.register(Box::new(websocket_connection_duration.clone())).unwrap(); | 289 | .unwrap(); |
| 290 | REGISTRY | ||
| 291 | .register(Box::new(websocket_connection_duration.clone())) | ||
| 292 | .unwrap(); | ||
| 285 | 293 | ||
| 286 | let websocket_messages_received = CounterVec::new( | 294 | let websocket_messages_received = CounterVec::new( |
| 287 | Opts::new( | 295 | Opts::new( |
| @@ -289,8 +297,11 @@ impl MetricsInner { | |||
| 289 | "WebSocket messages received by type", | 297 | "WebSocket messages received by type", |
| 290 | ), | 298 | ), |
| 291 | &["type"], | 299 | &["type"], |
| 292 | ).unwrap(); | 300 | ) |
| 293 | REGISTRY.register(Box::new(websocket_messages_received.clone())).unwrap(); | 301 | .unwrap(); |
| 302 | REGISTRY | ||
| 303 | .register(Box::new(websocket_messages_received.clone())) | ||
| 304 | .unwrap(); | ||
| 294 | 305 | ||
| 295 | let websocket_messages_sent = CounterVec::new( | 306 | let websocket_messages_sent = CounterVec::new( |
| 296 | Opts::new( | 307 | Opts::new( |
| @@ -298,8 +309,11 @@ impl MetricsInner { | |||
| 298 | "WebSocket messages sent by type", | 309 | "WebSocket messages sent by type", |
| 299 | ), | 310 | ), |
| 300 | &["type"], | 311 | &["type"], |
| 301 | ).unwrap(); | 312 | ) |
| 302 | REGISTRY.register(Box::new(websocket_messages_sent.clone())).unwrap(); | 313 | .unwrap(); |
| 314 | REGISTRY | ||
| 315 | .register(Box::new(websocket_messages_sent.clone())) | ||
| 316 | .unwrap(); | ||
| 303 | 317 | ||
| 304 | // Git operation metrics | 318 | // Git operation metrics |
| 305 | let git_operations_total = CounterVec::new( | 319 | let git_operations_total = CounterVec::new( |
| @@ -308,8 +322,11 @@ impl MetricsInner { | |||
| 308 | "Git operations by type and status", | 322 | "Git operations by type and status", |
| 309 | ), | 323 | ), |
| 310 | &["operation", "status"], | 324 | &["operation", "status"], |
| 311 | ).unwrap(); | 325 | ) |
| 312 | REGISTRY.register(Box::new(git_operations_total.clone())).unwrap(); | 326 | .unwrap(); |
| 327 | REGISTRY | ||
| 328 | .register(Box::new(git_operations_total.clone())) | ||
| 329 | .unwrap(); | ||
| 313 | 330 | ||
| 314 | let git_operation_duration = HistogramVec::new( | 331 | let git_operation_duration = HistogramVec::new( |
| 315 | HistogramOpts::new( | 332 | HistogramOpts::new( |
| @@ -318,8 +335,11 @@ impl MetricsInner { | |||
| 318 | ) | 335 | ) |
| 319 | .buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]), | 336 | .buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]), |
| 320 | &["operation"], | 337 | &["operation"], |
| 321 | ).unwrap(); | 338 | ) |
| 322 | REGISTRY.register(Box::new(git_operation_duration.clone())).unwrap(); | 339 | .unwrap(); |
| 340 | REGISTRY | ||
| 341 | .register(Box::new(git_operation_duration.clone())) | ||
| 342 | .unwrap(); | ||
| 323 | 343 | ||
| 324 | let git_bytes_total = CounterVec::new( | 344 | let git_bytes_total = CounterVec::new( |
| 325 | Opts::new( | 345 | Opts::new( |
| @@ -327,8 +347,11 @@ impl MetricsInner { | |||
| 327 | "Total bytes transferred for git operations", | 347 | "Total bytes transferred for git operations", |
| 328 | ), | 348 | ), |
| 329 | &["direction"], | 349 | &["direction"], |
| 330 | ).unwrap(); | 350 | ) |
| 331 | REGISTRY.register(Box::new(git_bytes_total.clone())).unwrap(); | 351 | .unwrap(); |
| 352 | REGISTRY | ||
| 353 | .register(Box::new(git_bytes_total.clone())) | ||
| 354 | .unwrap(); | ||
| 332 | 355 | ||
| 333 | let git_push_authorization = CounterVec::new( | 356 | let git_push_authorization = CounterVec::new( |
| 334 | Opts::new( | 357 | Opts::new( |
| @@ -336,8 +359,11 @@ impl MetricsInner { | |||
| 336 | "Push authorization results", | 359 | "Push authorization results", |
| 337 | ), | 360 | ), |
| 338 | &["result"], | 361 | &["result"], |
| 339 | ).unwrap(); | 362 | ) |
| 340 | REGISTRY.register(Box::new(git_push_authorization.clone())).unwrap(); | 363 | .unwrap(); |
| 364 | REGISTRY | ||
| 365 | .register(Box::new(git_push_authorization.clone())) | ||
| 366 | .unwrap(); | ||
| 341 | 367 | ||
| 342 | // Nostr event metrics | 368 | // Nostr event metrics |
| 343 | let events_received_total = CounterVec::new( | 369 | let events_received_total = CounterVec::new( |
| @@ -346,8 +372,11 @@ impl MetricsInner { | |||
| 346 | "Nostr events received by kind", | 372 | "Nostr events received by kind", |
| 347 | ), | 373 | ), |
| 348 | &["kind"], | 374 | &["kind"], |
| 349 | ).unwrap(); | 375 | ) |
| 350 | REGISTRY.register(Box::new(events_received_total.clone())).unwrap(); | 376 | .unwrap(); |
| 377 | REGISTRY | ||
| 378 | .register(Box::new(events_received_total.clone())) | ||
| 379 | .unwrap(); | ||
| 351 | 380 | ||
| 352 | let events_stored_total = CounterVec::new( | 381 | let events_stored_total = CounterVec::new( |
| 353 | Opts::new( | 382 | Opts::new( |
| @@ -355,8 +384,11 @@ impl MetricsInner { | |||
| 355 | "Nostr events successfully stored by kind", | 384 | "Nostr events successfully stored by kind", |
| 356 | ), | 385 | ), |
| 357 | &["kind"], | 386 | &["kind"], |
| 358 | ).unwrap(); | 387 | ) |
| 359 | REGISTRY.register(Box::new(events_stored_total.clone())).unwrap(); | 388 | .unwrap(); |
| 389 | REGISTRY | ||
| 390 | .register(Box::new(events_stored_total.clone())) | ||
| 391 | .unwrap(); | ||
| 360 | 392 | ||
| 361 | let events_rejected_total = CounterVec::new( | 393 | let events_rejected_total = CounterVec::new( |
| 362 | Opts::new( | 394 | Opts::new( |
| @@ -364,31 +396,36 @@ impl MetricsInner { | |||
| 364 | "Nostr events rejected by kind and reason", | 396 | "Nostr events rejected by kind and reason", |
| 365 | ), | 397 | ), |
| 366 | &["kind", "reason"], | 398 | &["kind", "reason"], |
| 367 | ).unwrap(); | 399 | ) |
| 368 | REGISTRY.register(Box::new(events_rejected_total.clone())).unwrap(); | 400 | .unwrap(); |
| 401 | REGISTRY | ||
| 402 | .register(Box::new(events_rejected_total.clone())) | ||
| 403 | .unwrap(); | ||
| 369 | 404 | ||
| 370 | // Repository metrics | 405 | // Repository metrics |
| 371 | let repositories_total = Gauge::with_opts( | 406 | let repositories_total = Gauge::with_opts(Opts::new( |
| 372 | Opts::new( | 407 | "ngit_repositories_total", |
| 373 | "ngit_repositories_total", | 408 | "Total repositories hosted", |
| 374 | "Total repositories hosted", | 409 | )) |
| 375 | ) | 410 | .unwrap(); |
| 376 | ).unwrap(); | 411 | REGISTRY |
| 377 | REGISTRY.register(Box::new(repositories_total.clone())).unwrap(); | 412 | .register(Box::new(repositories_total.clone())) |
| 413 | .unwrap(); | ||
| 378 | 414 | ||
| 379 | // Build info | 415 | // Build info |
| 380 | let build_info = GaugeVec::new( | 416 | let build_info = GaugeVec::new( |
| 381 | Opts::new( | 417 | Opts::new("ngit_build_info", "Build information"), |
| 382 | "ngit_build_info", | ||
| 383 | "Build information", | ||
| 384 | ), | ||
| 385 | &["version", "commit"], | 418 | &["version", "commit"], |
| 386 | ).unwrap(); | 419 | ) |
| 420 | .unwrap(); | ||
| 387 | REGISTRY.register(Box::new(build_info.clone())).unwrap(); | 421 | REGISTRY.register(Box::new(build_info.clone())).unwrap(); |
| 388 | 422 | ||
| 389 | // Set build info gauge to 1 (it's just for labels) | 423 | // Set build info gauge to 1 (it's just for labels) |
| 390 | build_info | 424 | build_info |
| 391 | .with_label_values(&[env!("CARGO_PKG_VERSION"), option_env!("GIT_HASH").unwrap_or("unknown")]) | 425 | .with_label_values(&[ |
| 426 | env!("CARGO_PKG_VERSION"), | ||
| 427 | option_env!("GIT_HASH").unwrap_or("unknown"), | ||
| 428 | ]) | ||
| 392 | .set(1.0); | 429 | .set(1.0); |
| 393 | 430 | ||
| 394 | Self { | 431 | Self { |
| @@ -472,7 +509,7 @@ mod tests { | |||
| 472 | // Note: This test may fail if run with other tests due to global registry | 509 | // Note: This test may fail if run with other tests due to global registry |
| 473 | // In production, consider using a test-specific registry | 510 | // In production, consider using a test-specific registry |
| 474 | let metrics = Metrics::new(10); | 511 | let metrics = Metrics::new(10); |
| 475 | 512 | ||
| 476 | // Test that we can record metrics without panicking | 513 | // Test that we can record metrics without panicking |
| 477 | metrics.record_websocket_connection(); | 514 | metrics.record_websocket_connection(); |
| 478 | metrics.record_message_received("REQ"); | 515 | metrics.record_message_received("REQ"); |
| @@ -484,4 +521,4 @@ mod tests { | |||
| 484 | metrics.record_event_rejected(1, "invalid_signature"); | 521 | metrics.record_event_rejected(1, "invalid_signature"); |
| 485 | metrics.set_repositories_total(5); | 522 | metrics.set_repositories_total(5); |
| 486 | } | 523 | } |
| 487 | } \ No newline at end of file | 524 | } |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 2284c18..c9bd1e1 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -16,8 +16,8 @@ use crate::nostr::events::{ | |||
| 16 | KIND_REPOSITORY_STATE, | 16 | KIND_REPOSITORY_STATE, |
| 17 | }; | 17 | }; |
| 18 | use crate::nostr::policy::{ | 18 | use crate::nostr::policy::{ |
| 19 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, RelatedEventPolicy, | 19 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, |
| 20 | ReferenceResult, StatePolicy, StateResult, | 20 | RelatedEventPolicy, StatePolicy, StateResult, |
| 21 | }; | 21 | }; |
| 22 | 22 | ||
| 23 | /// Type alias for the shared database used by the relay | 23 | /// Type alias for the shared database used by the relay |
| @@ -77,7 +77,9 @@ impl Nip34WritePolicy { | |||
| 77 | match RepositoryAnnouncement::from_event(event.clone()) { | 77 | match RepositoryAnnouncement::from_event(event.clone()) { |
| 78 | Ok(announcement) => { | 78 | Ok(announcement) => { |
| 79 | // Try to create bare repository if it doesn't exist | 79 | // Try to create bare repository if it doesn't exist |
| 80 | if let Err(e) = self.announcement_policy.ensure_bare_repository(&announcement) | 80 | if let Err(e) = self |
| 81 | .announcement_policy | ||
| 82 | .ensure_bare_repository(&announcement) | ||
| 81 | { | 83 | { |
| 82 | tracing::warn!( | 84 | tracing::warn!( |
| 83 | "Failed to create bare repository for {}: {}", | 85 | "Failed to create bare repository for {}: {}", |
| @@ -145,22 +147,14 @@ impl Nip34WritePolicy { | |||
| 145 | Ok(_state) => { | 147 | Ok(_state) => { |
| 146 | // Process state alignment asynchronously | 148 | // Process state alignment asynchronously |
| 147 | if let Err(e) = self.state_policy.process_state_event(event).await { | 149 | if let Err(e) = self.state_policy.process_state_event(event).await { |
| 148 | tracing::warn!( | 150 | tracing::warn!("Failed to process state event {}: {}", event_id_str, e); |
| 149 | "Failed to process state event {}: {}", | ||
| 150 | event_id_str, | ||
| 151 | e | ||
| 152 | ); | ||
| 153 | } | 151 | } |
| 154 | 152 | ||
| 155 | tracing::debug!("Accepted repository state: {}", event_id_str); | 153 | tracing::debug!("Accepted repository state: {}", event_id_str); |
| 156 | PolicyResult::Accept | 154 | PolicyResult::Accept |
| 157 | } | 155 | } |
| 158 | Err(e) => { | 156 | Err(e) => { |
| 159 | tracing::warn!( | 157 | tracing::warn!("Failed to parse repository state {}: {}", event_id_str, e); |
| 160 | "Failed to parse repository state {}: {}", | ||
| 161 | event_id_str, | ||
| 162 | e | ||
| 163 | ); | ||
| 164 | // Still accept the event even if we can't parse it | 158 | // Still accept the event even if we can't parse it |
| 165 | // The validation passed, so it's structurally valid | 159 | // The validation passed, so it's structurally valid |
| 166 | PolicyResult::Accept | 160 | PolicyResult::Accept |
| @@ -348,4 +342,4 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> { | |||
| 348 | database, | 342 | database, |
| 349 | write_policy, | 343 | write_policy, |
| 350 | }) | 344 | }) |
| 351 | } \ No newline at end of file | 345 | } |
diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 8d30baf..353738b 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs | |||
| @@ -72,7 +72,10 @@ impl AnnouncementPolicy { | |||
| 72 | 72 | ||
| 73 | /// Create a bare git repository if it doesn't exist | 73 | /// Create a bare git repository if it doesn't exist |
| 74 | /// Path format: <git_data_path>/<npub>/<identifier>.git | 74 | /// Path format: <git_data_path>/<npub>/<identifier>.git |
| 75 | pub fn ensure_bare_repository(&self, announcement: &RepositoryAnnouncement) -> Result<(), String> { | 75 | pub fn ensure_bare_repository( |
| 76 | &self, | ||
| 77 | announcement: &RepositoryAnnouncement, | ||
| 78 | ) -> Result<(), String> { | ||
| 76 | let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); | 79 | let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); |
| 77 | 80 | ||
| 78 | // Check if repository already exists | 81 | // Check if repository already exists |
| @@ -154,4 +157,4 @@ impl AnnouncementPolicy { | |||
| 154 | 157 | ||
| 155 | Ok(false) | 158 | Ok(false) |
| 156 | } | 159 | } |
| 157 | } \ No newline at end of file | 160 | } |
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 6d67394..19db5f6 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs | |||
| @@ -5,7 +5,6 @@ | |||
| 5 | /// - `StatePolicy` - State event validation + ref alignment | 5 | /// - `StatePolicy` - State event validation + ref alignment |
| 6 | /// - `PrEventPolicy` - PR/PR Update validation | 6 | /// - `PrEventPolicy` - PR/PR Update validation |
| 7 | /// - `RelatedEventPolicy` - Forward/backward reference checking | 7 | /// - `RelatedEventPolicy` - Forward/backward reference checking |
| 8 | |||
| 9 | mod announcement; | 8 | mod announcement; |
| 10 | mod pr_event; | 9 | mod pr_event; |
| 11 | mod related; | 10 | mod related; |
| @@ -38,4 +37,4 @@ impl PolicyContext { | |||
| 38 | git_data_path: git_data_path.into(), | 37 | git_data_path: git_data_path.into(), |
| 39 | } | 38 | } |
| 40 | } | 39 | } |
| 41 | } \ No newline at end of file | 40 | } |
diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index fee9a2a..53da369 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs | |||
| @@ -195,4 +195,4 @@ impl PrEventPolicy { | |||
| 195 | Ok(None) | 195 | Ok(None) |
| 196 | } | 196 | } |
| 197 | } | 197 | } |
| 198 | } \ No newline at end of file | 198 | } |
diff --git a/src/nostr/policy/related.rs b/src/nostr/policy/related.rs index 1937ca7..7ce87db 100644 --- a/src/nostr/policy/related.rs +++ b/src/nostr/policy/related.rs | |||
| @@ -169,10 +169,7 @@ impl RelatedEventPolicy { | |||
| 169 | 169 | ||
| 170 | /// Check if any events exist in database | 170 | /// Check if any events exist in database |
| 171 | /// Returns the first matching event ID found, or None if none match | 171 | /// Returns the first matching event ID found, or None if none match |
| 172 | async fn find_accepted_event( | 172 | async fn find_accepted_event(&self, event_ids: &[EventId]) -> Result<Option<EventId>, String> { |
| 173 | &self, | ||
| 174 | event_ids: &[EventId], | ||
| 175 | ) -> Result<Option<EventId>, String> { | ||
| 176 | if event_ids.is_empty() { | 173 | if event_ids.is_empty() { |
| 177 | return Ok(None); | 174 | return Ok(None); |
| 178 | } | 175 | } |
| @@ -273,4 +270,4 @@ impl RelatedEventPolicy { | |||
| 273 | 270 | ||
| 274 | Ok(false) | 271 | Ok(false) |
| 275 | } | 272 | } |
| 276 | } \ No newline at end of file | 273 | } |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 5692bd8..43349e2 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -239,7 +239,10 @@ impl StatePolicy { | |||
| 239 | } | 239 | } |
| 240 | 240 | ||
| 241 | // Build repository path: <git_data_path>/<owner_npub>/<identifier>.git | 241 | // Build repository path: <git_data_path>/<owner_npub>/<identifier>.git |
| 242 | let repo_path = self.ctx.git_data_path.join(announcement.repo_path().clone()); | 242 | let repo_path = self |
| 243 | .ctx | ||
| 244 | .git_data_path | ||
| 245 | .join(announcement.repo_path().clone()); | ||
| 243 | owner_repos.push((announcement, repo_path)); | 246 | owner_repos.push((announcement, repo_path)); |
| 244 | } | 247 | } |
| 245 | 248 | ||
| @@ -416,4 +419,4 @@ impl StatePolicy { | |||
| 416 | 419 | ||
| 417 | result | 420 | result |
| 418 | } | 421 | } |
| 419 | } \ No newline at end of file | 422 | } |
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 7d87411..3063516 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -65,9 +65,7 @@ pub fn derive_relay_targets( | |||
| 65 | 65 | ||
| 66 | for (repo_id, needs) in repo_index { | 66 | for (repo_id, needs) in repo_index { |
| 67 | for relay_url in &needs.relays { | 67 | for relay_url in &needs.relays { |
| 68 | let entry = relay_targets | 68 | let entry = relay_targets.entry(relay_url.clone()).or_default(); |
| 69 | .entry(relay_url.clone()) | ||
| 70 | .or_insert_with(RelaySyncNeeds::default); | ||
| 71 | 69 | ||
| 72 | entry.repos.insert(repo_id.clone()); | 70 | entry.repos.insert(repo_id.clone()); |
| 73 | entry.root_events.extend(needs.root_events.iter().cloned()); | 71 | entry.root_events.extend(needs.root_events.iter().cloned()); |
| @@ -586,4 +584,4 @@ mod tests { | |||
| 586 | ); | 584 | ); |
| 587 | assert_eq!(actions[0].relay_url, "wss://new-relay.com"); | 585 | assert_eq!(actions[0].relay_url, "wss://new-relay.com"); |
| 588 | } | 586 | } |
| 589 | } \ No newline at end of file | 587 | } |
diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 02d580e..24e9bb2 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs | |||
| @@ -337,4 +337,4 @@ mod tests { | |||
| 337 | 337 | ||
| 338 | assert_eq!(filters.len(), 6); | 338 | assert_eq!(filters.len(), 6); |
| 339 | } | 339 | } |
| 340 | } \ No newline at end of file | 340 | } |
diff --git a/src/sync/health.rs b/src/sync/health.rs index f9a5f3a..0ae7dee 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs | |||
| @@ -206,11 +206,7 @@ impl RelayHealthTracker { | |||
| 206 | health.next_retry_at = Some(now + backoff); | 206 | health.next_retry_at = Some(now + backoff); |
| 207 | 207 | ||
| 208 | if old_state != HealthState::Degraded { | 208 | if old_state != HealthState::Degraded { |
| 209 | tracing::warn!( | 209 | tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); |
| 210 | "Relay {} degraded, backoff {:?}", | ||
| 211 | relay_url, | ||
| 212 | backoff | ||
| 213 | ); | ||
| 214 | } else { | 210 | } else { |
| 215 | tracing::debug!( | 211 | tracing::debug!( |
| 216 | "Relay {} failure #{}, backoff {:?}", | 212 | "Relay {} failure #{}, backoff {:?}", |
| @@ -308,12 +304,17 @@ impl RelayHealthTracker { | |||
| 308 | 304 | ||
| 309 | /// Get all tracked relay URLs | 305 | /// Get all tracked relay URLs |
| 310 | pub fn get_tracked_relays(&self) -> Vec<String> { | 306 | pub fn get_tracked_relays(&self) -> Vec<String> { |
| 311 | self.health.iter().map(|entry| entry.key().clone()).collect() | 307 | self.health |
| 308 | .iter() | ||
| 309 | .map(|entry| entry.key().clone()) | ||
| 310 | .collect() | ||
| 312 | } | 311 | } |
| 313 | 312 | ||
| 314 | /// Get a clone of the health info for a relay | 313 | /// Get a clone of the health info for a relay |
| 315 | pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> { | 314 | pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> { |
| 316 | self.health.get(relay_url).map(|entry| entry.value().clone()) | 315 | self.health |
| 316 | .get(relay_url) | ||
| 317 | .map(|entry| entry.value().clone()) | ||
| 317 | } | 318 | } |
| 318 | } | 319 | } |
| 319 | 320 | ||
| @@ -369,7 +370,7 @@ mod tests { | |||
| 369 | fn test_backoff_increases_exponentially() { | 370 | fn test_backoff_increases_exponentially() { |
| 370 | let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds | 371 | let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds |
| 371 | let max = 3600u64; | 372 | let max = 3600u64; |
| 372 | 373 | ||
| 373 | // failure 1: 5s (base * 2^0 = 5) | 374 | // failure 1: 5s (base * 2^0 = 5) |
| 374 | assert_eq!( | 375 | assert_eq!( |
| 375 | RelayHealthTracker::get_backoff_duration(1, base, max), | 376 | RelayHealthTracker::get_backoff_duration(1, base, max), |
| @@ -498,4 +499,4 @@ mod tests { | |||
| 498 | let health = tracker.get_health("wss://nonexistent.example.com"); | 499 | let health = tracker.get_health("wss://nonexistent.example.com"); |
| 499 | assert!(health.is_none()); | 500 | assert!(health.is_none()); |
| 500 | } | 501 | } |
| 501 | } \ No newline at end of file | 502 | } |
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 411ff63..d917dc0 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -207,7 +207,9 @@ impl SyncMetrics { | |||
| 207 | HealthState::Degraded => 2, | 207 | HealthState::Degraded => 2, |
| 208 | HealthState::Dead => 3, | 208 | HealthState::Dead => 3, |
| 209 | }; | 209 | }; |
| 210 | self.relay_status.with_label_values(&[relay]).set(state_value); | 210 | self.relay_status |
| 211 | .with_label_values(&[relay]) | ||
| 212 | .set(state_value); | ||
| 211 | } | 213 | } |
| 212 | 214 | ||
| 213 | /// Record relay failure count. | 215 | /// Record relay failure count. |
| @@ -259,9 +261,7 @@ impl SyncMetrics { | |||
| 259 | /// * `source` - The event source type (see [`record_event`](Self::record_event)) | 261 | /// * `source` - The event source type (see [`record_event`](Self::record_event)) |
| 260 | /// * `count` - Number of events to record | 262 | /// * `count` - Number of events to record |
| 261 | pub fn record_events(&self, source: &str, count: u64) { | 263 | pub fn record_events(&self, source: &str, count: u64) { |
| 262 | self.events_total | 264 | self.events_total.with_label_values(&[source]).inc_by(count); |
| 263 | .with_label_values(&[source]) | ||
| 264 | .inc_by(count); | ||
| 265 | } | 265 | } |
| 266 | 266 | ||
| 267 | /// Record a gap event filled during catchup. | 267 | /// Record a gap event filled during catchup. |
| @@ -451,4 +451,4 @@ mod tests { | |||
| 451 | let metrics2 = SyncMetrics::register(®istry); | 451 | let metrics2 = SyncMetrics::register(®istry); |
| 452 | assert!(metrics2.is_err()); | 452 | assert!(metrics2.is_err()); |
| 453 | } | 453 | } |
| 454 | } \ No newline at end of file | 454 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c4c3c7f..fb59b3c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -512,8 +512,8 @@ impl SyncManager { | |||
| 512 | }; | 512 | }; |
| 513 | 513 | ||
| 514 | // Check if relay supports NIP-77 negentropy AND negentropy is not disabled | 514 | // Check if relay supports NIP-77 negentropy AND negentropy is not disabled |
| 515 | let use_negentropy = !self.config.sync_disable_negentropy | 515 | let use_negentropy = |
| 516 | && connection.supports_negentropy().await; | 516 | !self.config.sync_disable_negentropy && connection.supports_negentropy().await; |
| 517 | 517 | ||
| 518 | // Unsubscribe all current subscriptions | 518 | // Unsubscribe all current subscriptions |
| 519 | connection.unsubscribe_all().await; | 519 | connection.unsubscribe_all().await; |
| @@ -1657,12 +1657,12 @@ impl SyncManager { | |||
| 1657 | 1657 | ||
| 1658 | let layer1_filters = 1; | 1658 | let layer1_filters = 1; |
| 1659 | let layer2_filters = if repo_count > 0 { | 1659 | let layer2_filters = if repo_count > 0 { |
| 1660 | ((repo_count + 99) / 100) * 3 | 1660 | repo_count.div_ceil(100) * 3 |
| 1661 | } else { | 1661 | } else { |
| 1662 | 0 | 1662 | 0 |
| 1663 | }; | 1663 | }; |
| 1664 | let layer3_filters = if event_count > 0 { | 1664 | let layer3_filters = if event_count > 0 { |
| 1665 | ((event_count + 99) / 100) * 3 | 1665 | event_count.div_ceil(100) * 3 |
| 1666 | } else { | 1666 | } else { |
| 1667 | 0 | 1667 | 0 |
| 1668 | }; | 1668 | }; |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index fae179b..4167a0c 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -150,17 +150,21 @@ impl RelayConnection { | |||
| 150 | // | 150 | // |
| 151 | // See: nostr-sdk-0.44 Client::try_connect_relay documentation | 151 | // See: nostr-sdk-0.44 Client::try_connect_relay documentation |
| 152 | self.client | 152 | self.client |
| 153 | .try_connect_relay(&self.url, std::time::Duration::from_secs(connection_timeout_secs)) | 153 | .try_connect_relay( |
| 154 | &self.url, | ||
| 155 | std::time::Duration::from_secs(connection_timeout_secs), | ||
| 156 | ) | ||
| 154 | .await | 157 | .await |
| 155 | .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; | 158 | .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; |
| 156 | 159 | ||
| 157 | // Subscribe to Layer 1 (announcements) | 160 | // Subscribe to Layer 1 (announcements) |
| 158 | let filter = build_announcement_filter(since); | 161 | let filter = build_announcement_filter(since); |
| 159 | let output = self | 162 | let output = self.client.subscribe(filter, None).await.map_err(|e| { |
| 160 | .client | 163 | format!( |
| 161 | .subscribe(filter, None) | 164 | "Failed to subscribe to announcements on {}: {}", |
| 162 | .await | 165 | self.url, e |
| 163 | .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?; | 166 | ) |
| 167 | })?; | ||
| 164 | 168 | ||
| 165 | tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); | 169 | tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); |
| 166 | Ok(output.val) | 170 | Ok(output.val) |
| @@ -250,7 +254,8 @@ impl RelayConnection { | |||
| 250 | } | 254 | } |
| 251 | RelayMessage::Closed { message: msg, .. } => { | 255 | RelayMessage::Closed { message: msg, .. } => { |
| 252 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | 256 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); |
| 253 | let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; | 257 | let _ = |
| 258 | event_sender.send(RelayEvent::Closed(msg.to_string())).await; | ||
| 254 | break; | 259 | break; |
| 255 | } | 260 | } |
| 256 | _ => {} | 261 | _ => {} |
| @@ -421,7 +426,10 @@ impl RelayConnection { | |||
| 421 | /// - Relay doesn't actually support NIP-77 (despite claiming to) | 426 | /// - Relay doesn't actually support NIP-77 (despite claiming to) |
| 422 | /// - Network errors during reconciliation | 427 | /// - Network errors during reconciliation |
| 423 | /// - Timeout during sync | 428 | /// - Timeout during sync |
| 424 | pub async fn negentropy_sync_filter(&self, filter: Filter) -> Result<NegentropySyncResult, String> { | 429 | pub async fn negentropy_sync_filter( |
| 430 | &self, | ||
| 431 | filter: Filter, | ||
| 432 | ) -> Result<NegentropySyncResult, String> { | ||
| 425 | // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange | 433 | // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange |
| 426 | let sync_opts = SyncOptions::default(); | 434 | let sync_opts = SyncOptions::default(); |
| 427 | 435 | ||
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index f83b081..e29e45b 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -49,7 +49,12 @@ impl PendingUpdates { | |||
| 49 | } | 49 | } |
| 50 | 50 | ||
| 51 | /// Add or update a repo with its relays and root events | 51 | /// Add or update a repo with its relays and root events |
| 52 | fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) { | 52 | fn add_repo( |
| 53 | &mut self, | ||
| 54 | repo_id: String, | ||
| 55 | relays: HashSet<String>, | ||
| 56 | root_events: HashSet<EventId>, | ||
| 57 | ) { | ||
| 53 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | 58 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { |
| 54 | relays: HashSet::new(), | 59 | relays: HashSet::new(), |
| 55 | root_events: HashSet::new(), | 60 | root_events: HashSet::new(), |
| @@ -251,9 +256,9 @@ impl SelfSubscriber { | |||
| 251 | /// | 256 | /// |
| 252 | /// Returns true if any extracted relay URL contains our domain | 257 | /// Returns true if any extracted relay URL contains our domain |
| 253 | fn lists_our_relay(&self, event: &Event) -> bool { | 258 | fn lists_our_relay(&self, event: &Event) -> bool { |
| 254 | Self::extract_relay_urls(event).iter().any(|url| { | 259 | Self::extract_relay_urls(event) |
| 255 | url.contains(&self.relay_domain) || url == &self.own_relay_url | 260 | .iter() |
| 256 | }) | 261 | .any(|url| url.contains(&self.relay_domain) || url == &self.own_relay_url) |
| 257 | } | 262 | } |
| 258 | 263 | ||
| 259 | /// Main run loop | 264 | /// Main run loop |
| @@ -413,21 +418,21 @@ impl SelfSubscriber { | |||
| 413 | if let Some(repo_sync) = index.get_mut(&repo_ref) { | 418 | if let Some(repo_sync) = index.get_mut(&repo_ref) { |
| 414 | // Add event.id to root_events set in the index (immediate availability) | 419 | // Add event.id to root_events set in the index (immediate availability) |
| 415 | repo_sync.root_events.insert(event.id); | 420 | repo_sync.root_events.insert(event.id); |
| 416 | 421 | ||
| 417 | // Clone the relays before releasing the lock - Layer 3 filters need to be | 422 | // Clone the relays before releasing the lock - Layer 3 filters need to be |
| 418 | // sent to the same relays as Layer 2 filters for this repo | 423 | // sent to the same relays as Layer 2 filters for this repo |
| 419 | let relays = repo_sync.relays.clone(); | 424 | let relays = repo_sync.relays.clone(); |
| 420 | 425 | ||
| 421 | // Release lock before modifying pending | 426 | // Release lock before modifying pending |
| 422 | drop(index); | 427 | drop(index); |
| 423 | 428 | ||
| 424 | // Also add root event to pending - this ensures batch processing runs | 429 | // Also add root event to pending - this ensures batch processing runs |
| 425 | // and creates Layer 3 filters for events referencing this root event. | 430 | // and creates Layer 3 filters for events referencing this root event. |
| 426 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! | 431 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! |
| 427 | let mut root_events = HashSet::new(); | 432 | let mut root_events = HashSet::new(); |
| 428 | root_events.insert(event.id); | 433 | root_events.insert(event.id); |
| 429 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); | 434 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); |
| 430 | 435 | ||
| 431 | tracing::debug!( | 436 | tracing::debug!( |
| 432 | event_id = %event.id, | 437 | event_id = %event.id, |
| 433 | repo_ref = %repo_ref, | 438 | repo_ref = %repo_ref, |
| @@ -475,10 +480,12 @@ impl SelfSubscriber { | |||
| 475 | 480 | ||
| 476 | for (repo_id, needs) in updates { | 481 | for (repo_id, needs) in updates { |
| 477 | // Merge with existing entry or insert new | 482 | // Merge with existing entry or insert new |
| 478 | let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { | 483 | let entry = index |
| 479 | relays: HashSet::new(), | 484 | .entry(repo_id.clone()) |
| 480 | root_events: HashSet::new(), | 485 | .or_insert_with(|| RepoSyncNeeds { |
| 481 | }); | 486 | relays: HashSet::new(), |
| 487 | root_events: HashSet::new(), | ||
| 488 | }); | ||
| 482 | entry.relays.extend(needs.relays); | 489 | entry.relays.extend(needs.relays); |
| 483 | entry.root_events.extend(needs.root_events); | 490 | entry.root_events.extend(needs.root_events); |
| 484 | 491 | ||
| @@ -556,7 +563,7 @@ fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | |||
| 556 | } else { | 563 | } else { |
| 557 | return None; | 564 | return None; |
| 558 | }; | 565 | }; |
| 559 | 566 | ||
| 560 | // Extract just the host:port part (everything before the first /) | 567 | // Extract just the host:port part (everything before the first /) |
| 561 | let host_port = rest.split('/').next()?; | 568 | let host_port = rest.split('/').next()?; |
| 562 | Some(format!("{}{}", ws_scheme, host_port)) | 569 | Some(format!("{}{}", ws_scheme, host_port)) |
| @@ -581,7 +588,7 @@ mod tests { | |||
| 581 | Some("ws://localhost:3000".to_string()) | 588 | Some("ws://localhost:3000".to_string()) |
| 582 | ); | 589 | ); |
| 583 | } | 590 | } |
| 584 | 591 | ||
| 585 | #[test] | 592 | #[test] |
| 586 | fn test_clone_url_to_relay_url_with_port() { | 593 | fn test_clone_url_to_relay_url_with_port() { |
| 587 | assert_eq!( | 594 | assert_eq!( |
| @@ -593,6 +600,9 @@ mod tests { | |||
| 593 | #[test] | 600 | #[test] |
| 594 | fn test_clone_url_to_relay_url_unsupported() { | 601 | fn test_clone_url_to_relay_url_unsupported() { |
| 595 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); | 602 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); |
| 596 | assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); | 603 | assert_eq!( |
| 604 | clone_url_to_relay_url("ssh://git@example.com/repo.git"), | ||
| 605 | None | ||
| 606 | ); | ||
| 597 | } | 607 | } |
| 598 | } \ No newline at end of file | 608 | } |