From 2a9160836bb87fdea3ae891563b0169c68d1c2ab Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 11 Dec 2025 16:53:03 +0000 Subject: fix: resolve all fmt and clippy warnings Main lib (src/): - Add #[allow(dead_code)] for build_info field (stored to prevent Prometheus unregistration) - Add #[allow(dead_code)] for first_seen field (reserved for future rate limiting) - Replace .or_insert_with(RelaySyncNeeds::default) with .or_default() - Replace manual div_ceil implementations with .div_ceil(100) Test code (tests/): - Replace .expect(&format!(...)) with .unwrap_or_else(|_| panic!(...)) - Remove needless borrows in fetch_metrics() calls - Add #[allow(dead_code)] and #[allow(unused_imports)] to test helpers module grasp-audit: - Apply cargo fmt to fix formatting --- src/config.rs | 18 +++++- src/http/landing.rs | 5 +- src/http/mod.rs | 8 ++- src/main.rs | 9 ++- src/metrics/bandwidth.rs | 13 +++- src/metrics/connection.rs | 58 +++++++++--------- src/metrics/mod.rs | 127 +++++++++++++++++++++++++-------------- src/nostr/builder.rs | 22 +++---- src/nostr/policy/announcement.rs | 7 ++- src/nostr/policy/mod.rs | 3 +- src/nostr/policy/pr_event.rs | 2 +- src/nostr/policy/related.rs | 7 +-- src/nostr/policy/state.rs | 7 ++- src/sync/algorithms.rs | 6 +- src/sync/filters.rs | 2 +- src/sync/health.rs | 19 +++--- src/sync/metrics.rs | 10 +-- src/sync/mod.rs | 8 +-- src/sync/relay_connection.rs | 24 +++++--- src/sync/self_subscriber.rs | 42 ++++++++----- 20 files changed, 238 insertions(+), 159 deletions(-) (limited to 'src') diff --git a/src/config.rs b/src/config.rs index 8c6de05..7834a3f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -77,11 +77,19 @@ pub struct Config { pub metrics_enabled: bool, /// Connections per IP before flagging as potential abuse in metrics (display only, no rate limiting) - #[arg(long = "metrics-connection-per-ip-abuse-threshold", env = "NGIT_METRICS_CONNECTION_PER_IP_ABUSE_THRESHOLD", default_value_t = 10)] + #[arg( + long = "metrics-connection-per-ip-abuse-threshold", + env = "NGIT_METRICS_CONNECTION_PER_IP_ABUSE_THRESHOLD", + default_value_t = 10 + )] pub metrics_connection_per_ip_abuse_threshold: u32, /// Number of top bandwidth repos to track in metrics - #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] + #[arg( + long = "metrics-top-n-repos", + env = "NGIT_METRICS_TOP_N_REPOS", + default_value_t = 10 + )] pub metrics_top_n_repos: usize, /// URL of bootstrap relay to sync from on startup (optional) @@ -95,7 +103,11 @@ pub struct Config { /// Interval in seconds for checking disconnected relays and attempting reconnection (default: 60) /// Set to lower value for faster reconnection testing - #[arg(long, env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", default_value_t = 60)] + #[arg( + long, + env = "NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", + default_value_t = 60 + )] pub sync_disconnect_check_interval_secs: u64, /// Base backoff time in seconds for relay reconnection (default: 5) diff --git a/src/http/landing.rs b/src/http/landing.rs index 8ab4a68..5fc1e6e 100644 --- a/src/http/landing.rs +++ b/src/http/landing.rs @@ -341,10 +341,7 @@ fn generate_hero_tags(nip11: &RelayInformationDocument) -> String { // Add GRASP tags for grasp in &nip11.supported_grasps { - html.push_str(&format!( - r#"{}"#, - grasp - )); + html.push_str(&format!(r#"{}"#, grasp)); html.push('\n'); } diff --git a/src/http/mod.rs b/src/http/mod.rs index f584e03..91a6067 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -509,7 +509,13 @@ pub async fn run_server( loop { let (socket, addr) = listener.accept().await?; let io = TokioIo::new(socket); - let service = HttpService::new(relay.clone(), config.clone(), addr, database.clone(), metrics.clone()); + let service = HttpService::new( + relay.clone(), + config.clone(), + addr, + database.clone(), + metrics.clone(), + ); tokio::spawn(async move { if let Err(e) = http1::Builder::new() diff --git a/src/main.rs b/src/main.rs index 97a14eb..6d8b4dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,9 @@ async fn main() -> Result<()> { // Initialize metrics if enabled let metrics = if config.metrics_enabled { info!("Metrics enabled on /metrics endpoint"); - Some(Arc::new(Metrics::new(config.metrics_connection_per_ip_abuse_threshold))) + Some(Arc::new(Metrics::new( + config.metrics_connection_per_ip_abuse_threshold, + ))) } else { info!("Metrics disabled"); None @@ -65,7 +67,10 @@ async fn main() -> Result<()> { ); if config.sync_bootstrap_relay_url.is_some() { - info!("Starting proactive sync with bootstrap relay: {:?}", config.sync_bootstrap_relay_url); + info!( + "Starting proactive sync with bootstrap relay: {:?}", + config.sync_bootstrap_relay_url + ); } else { info!("Proactive sync enabled (will discover relays from stored announcements)"); } diff --git a/src/metrics/bandwidth.rs b/src/metrics/bandwidth.rs index d2c53e8..d51af12 100644 --- a/src/metrics/bandwidth.rs +++ b/src/metrics/bandwidth.rs @@ -80,7 +80,9 @@ impl BandwidthTracker { &["repo"], ) .unwrap(); - registry.register(Box::new(top_repos_gauge.clone())).unwrap(); + registry + .register(Box::new(top_repos_gauge.clone())) + .unwrap(); Self { all_repos: DashMap::new(), @@ -120,7 +122,12 @@ impl BandwidthTracker { // Try to update the timestamp atomically to prevent concurrent refreshes if self .last_refresh_nanos - .compare_exchange(last_refresh, elapsed_nanos, Ordering::SeqCst, Ordering::Relaxed) + .compare_exchange( + last_refresh, + elapsed_nanos, + Ordering::SeqCst, + Ordering::Relaxed, + ) .is_ok() { self.refresh_top_n(); @@ -298,4 +305,4 @@ mod tests { // Refresh should not panic on empty data tracker.refresh_top_n(); } -} \ No newline at end of file +} diff --git a/src/metrics/connection.rs b/src/metrics/connection.rs index 6a7f406..2d42081 100644 --- a/src/metrics/connection.rs +++ b/src/metrics/connection.rs @@ -25,7 +25,8 @@ use tracing::warn; struct ConnectionInfo { /// Number of active connections from this IP count: u32, - /// When the first connection from this IP was established + /// When the first connection from this IP was established (for future rate limiting) + #[allow(dead_code)] first_seen: Instant, /// Whether this IP has been flagged as potentially abusive flagged_as_abuse: bool, @@ -48,16 +49,16 @@ struct ConnectionInfo { pub struct ConnectionTracker { /// Active connections per IP (INTERNAL ONLY - never exposed to metrics) connections: DashMap, - + /// Threshold for abuse flagging (connections per IP) abuse_threshold: u32, - + /// Prometheus gauge: total active connections active_connections: IntGauge, - + /// Prometheus gauge: number of unique IPs connected unique_ips: IntGauge, - + /// Prometheus gauge: number of IPs flagged as potential abusers flagged_abusers: IntGauge, } @@ -70,29 +71,30 @@ impl ConnectionTracker { /// * `abuse_threshold` - Number of connections from a single IP before flagging /// * `registry` - Prometheus registry to register metrics with pub fn new(abuse_threshold: u32, registry: &Registry) -> Self { - let active_connections = IntGauge::with_opts( - Opts::new( - "ngit_websocket_connections_active", - "Current active WebSocket connections", - ) - ).unwrap(); - registry.register(Box::new(active_connections.clone())).unwrap(); - - let unique_ips = IntGauge::with_opts( - Opts::new( - "ngit_websocket_unique_ips", - "Number of unique IP addresses connected (NOT the IPs themselves)", - ) - ).unwrap(); + let active_connections = IntGauge::with_opts(Opts::new( + "ngit_websocket_connections_active", + "Current active WebSocket connections", + )) + .unwrap(); + registry + .register(Box::new(active_connections.clone())) + .unwrap(); + + let unique_ips = IntGauge::with_opts(Opts::new( + "ngit_websocket_unique_ips", + "Number of unique IP addresses connected (NOT the IPs themselves)", + )) + .unwrap(); registry.register(Box::new(unique_ips.clone())).unwrap(); - let flagged_abusers = IntGauge::with_opts( - Opts::new( - "ngit_websocket_flagged_abusers", - "Number of IPs exceeding connection threshold", - ) - ).unwrap(); - registry.register(Box::new(flagged_abusers.clone())).unwrap(); + let flagged_abusers = IntGauge::with_opts(Opts::new( + "ngit_websocket_flagged_abusers", + "Number of IPs exceeding connection threshold", + )) + .unwrap(); + registry + .register(Box::new(flagged_abusers.clone())) + .unwrap(); Self { connections: DashMap::new(), @@ -140,7 +142,7 @@ impl ConnectionTracker { // Update Prometheus metrics (aggregate counts only) self.active_connections.inc(); - + if is_new_ip { self.unique_ips.inc(); } @@ -334,4 +336,4 @@ mod tests { assert_eq!(tracker.active_connections(), 0); assert_eq!(tracker.unique_ip_count(), 0); } -} \ No newline at end of file +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 736414f..5420dfd 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -87,7 +87,8 @@ struct MetricsInner { // === System Health Metrics === /// Server start time for uptime calculation pub start_time: Instant, - /// Build information gauge + /// Build information gauge (stored to prevent unregistration from Prometheus) + #[allow(dead_code)] pub build_info: GaugeVec, } @@ -158,7 +159,10 @@ impl Metrics { /// Start timing a git operation, returns a timer pub fn start_git_operation_timer(&self, operation: &str) -> GitOperationTimer { - GitOperationTimer::new(self.inner.git_operation_duration.clone(), operation.to_string()) + GitOperationTimer::new( + self.inner.git_operation_duration.clone(), + operation.to_string(), + ) } /// Record bytes transferred for a git operation @@ -266,13 +270,14 @@ impl MetricsInner { } // WebSocket metrics - let websocket_connections_total = Counter::with_opts( - Opts::new( - "ngit_websocket_connections_total", - "Total WebSocket connections since startup", - ) - ).unwrap(); - REGISTRY.register(Box::new(websocket_connections_total.clone())).unwrap(); + let websocket_connections_total = Counter::with_opts(Opts::new( + "ngit_websocket_connections_total", + "Total WebSocket connections since startup", + )) + .unwrap(); + REGISTRY + .register(Box::new(websocket_connections_total.clone())) + .unwrap(); let websocket_connection_duration = Histogram::with_opts( HistogramOpts::new( @@ -280,8 +285,11 @@ impl MetricsInner { "Duration of WebSocket connections", ) .buckets(vec![1.0, 5.0, 15.0, 30.0, 60.0, 300.0, 900.0, 3600.0]), - ).unwrap(); - REGISTRY.register(Box::new(websocket_connection_duration.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(websocket_connection_duration.clone())) + .unwrap(); let websocket_messages_received = CounterVec::new( Opts::new( @@ -289,8 +297,11 @@ impl MetricsInner { "WebSocket messages received by type", ), &["type"], - ).unwrap(); - REGISTRY.register(Box::new(websocket_messages_received.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(websocket_messages_received.clone())) + .unwrap(); let websocket_messages_sent = CounterVec::new( Opts::new( @@ -298,8 +309,11 @@ impl MetricsInner { "WebSocket messages sent by type", ), &["type"], - ).unwrap(); - REGISTRY.register(Box::new(websocket_messages_sent.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(websocket_messages_sent.clone())) + .unwrap(); // Git operation metrics let git_operations_total = CounterVec::new( @@ -308,8 +322,11 @@ impl MetricsInner { "Git operations by type and status", ), &["operation", "status"], - ).unwrap(); - REGISTRY.register(Box::new(git_operations_total.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(git_operations_total.clone())) + .unwrap(); let git_operation_duration = HistogramVec::new( HistogramOpts::new( @@ -318,8 +335,11 @@ impl MetricsInner { ) .buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]), &["operation"], - ).unwrap(); - REGISTRY.register(Box::new(git_operation_duration.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(git_operation_duration.clone())) + .unwrap(); let git_bytes_total = CounterVec::new( Opts::new( @@ -327,8 +347,11 @@ impl MetricsInner { "Total bytes transferred for git operations", ), &["direction"], - ).unwrap(); - REGISTRY.register(Box::new(git_bytes_total.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(git_bytes_total.clone())) + .unwrap(); let git_push_authorization = CounterVec::new( Opts::new( @@ -336,8 +359,11 @@ impl MetricsInner { "Push authorization results", ), &["result"], - ).unwrap(); - REGISTRY.register(Box::new(git_push_authorization.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(git_push_authorization.clone())) + .unwrap(); // Nostr event metrics let events_received_total = CounterVec::new( @@ -346,8 +372,11 @@ impl MetricsInner { "Nostr events received by kind", ), &["kind"], - ).unwrap(); - REGISTRY.register(Box::new(events_received_total.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(events_received_total.clone())) + .unwrap(); let events_stored_total = CounterVec::new( Opts::new( @@ -355,8 +384,11 @@ impl MetricsInner { "Nostr events successfully stored by kind", ), &["kind"], - ).unwrap(); - REGISTRY.register(Box::new(events_stored_total.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(events_stored_total.clone())) + .unwrap(); let events_rejected_total = CounterVec::new( Opts::new( @@ -364,31 +396,36 @@ impl MetricsInner { "Nostr events rejected by kind and reason", ), &["kind", "reason"], - ).unwrap(); - REGISTRY.register(Box::new(events_rejected_total.clone())).unwrap(); + ) + .unwrap(); + REGISTRY + .register(Box::new(events_rejected_total.clone())) + .unwrap(); // Repository metrics - let repositories_total = Gauge::with_opts( - Opts::new( - "ngit_repositories_total", - "Total repositories hosted", - ) - ).unwrap(); - REGISTRY.register(Box::new(repositories_total.clone())).unwrap(); + let repositories_total = Gauge::with_opts(Opts::new( + "ngit_repositories_total", + "Total repositories hosted", + )) + .unwrap(); + REGISTRY + .register(Box::new(repositories_total.clone())) + .unwrap(); // Build info let build_info = GaugeVec::new( - Opts::new( - "ngit_build_info", - "Build information", - ), + Opts::new("ngit_build_info", "Build information"), &["version", "commit"], - ).unwrap(); + ) + .unwrap(); REGISTRY.register(Box::new(build_info.clone())).unwrap(); - + // Set build info gauge to 1 (it's just for labels) build_info - .with_label_values(&[env!("CARGO_PKG_VERSION"), option_env!("GIT_HASH").unwrap_or("unknown")]) + .with_label_values(&[ + env!("CARGO_PKG_VERSION"), + option_env!("GIT_HASH").unwrap_or("unknown"), + ]) .set(1.0); Self { @@ -472,7 +509,7 @@ mod tests { // Note: This test may fail if run with other tests due to global registry // In production, consider using a test-specific registry let metrics = Metrics::new(10); - + // Test that we can record metrics without panicking metrics.record_websocket_connection(); metrics.record_message_received("REQ"); @@ -484,4 +521,4 @@ mod tests { metrics.record_event_rejected(1, "invalid_signature"); metrics.set_repositories_total(5); } -} \ No newline at end of file +} diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 2284c18..c9bd1e1 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -16,8 +16,8 @@ use crate::nostr::events::{ KIND_REPOSITORY_STATE, }; use crate::nostr::policy::{ - AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, RelatedEventPolicy, - ReferenceResult, StatePolicy, StateResult, + AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, + RelatedEventPolicy, StatePolicy, StateResult, }; /// Type alias for the shared database used by the relay @@ -77,7 +77,9 @@ impl Nip34WritePolicy { match RepositoryAnnouncement::from_event(event.clone()) { Ok(announcement) => { // Try to create bare repository if it doesn't exist - if let Err(e) = self.announcement_policy.ensure_bare_repository(&announcement) + if let Err(e) = self + .announcement_policy + .ensure_bare_repository(&announcement) { tracing::warn!( "Failed to create bare repository for {}: {}", @@ -145,22 +147,14 @@ impl Nip34WritePolicy { Ok(_state) => { // Process state alignment asynchronously if let Err(e) = self.state_policy.process_state_event(event).await { - tracing::warn!( - "Failed to process state event {}: {}", - event_id_str, - e - ); + tracing::warn!("Failed to process state event {}: {}", event_id_str, e); } tracing::debug!("Accepted repository state: {}", event_id_str); PolicyResult::Accept } Err(e) => { - tracing::warn!( - "Failed to parse repository state {}: {}", - event_id_str, - e - ); + tracing::warn!("Failed to parse repository state {}: {}", event_id_str, e); // Still accept the event even if we can't parse it // The validation passed, so it's structurally valid PolicyResult::Accept @@ -348,4 +342,4 @@ pub fn create_relay(config: &Config) -> Result { database, write_policy, }) -} \ No newline at end of file +} diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 8d30baf..353738b 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -72,7 +72,10 @@ impl AnnouncementPolicy { /// Create a bare git repository if it doesn't exist /// Path format: //.git - pub fn ensure_bare_repository(&self, announcement: &RepositoryAnnouncement) -> Result<(), String> { + pub fn ensure_bare_repository( + &self, + announcement: &RepositoryAnnouncement, + ) -> Result<(), String> { let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); // Check if repository already exists @@ -154,4 +157,4 @@ impl AnnouncementPolicy { Ok(false) } -} \ No newline at end of file +} diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 6d67394..19db5f6 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -5,7 +5,6 @@ /// - `StatePolicy` - State event validation + ref alignment /// - `PrEventPolicy` - PR/PR Update validation /// - `RelatedEventPolicy` - Forward/backward reference checking - mod announcement; mod pr_event; mod related; @@ -38,4 +37,4 @@ impl PolicyContext { git_data_path: git_data_path.into(), } } -} \ No newline at end of file +} diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index fee9a2a..53da369 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs @@ -195,4 +195,4 @@ impl PrEventPolicy { Ok(None) } } -} \ No newline at end of file +} diff --git a/src/nostr/policy/related.rs b/src/nostr/policy/related.rs index 1937ca7..7ce87db 100644 --- a/src/nostr/policy/related.rs +++ b/src/nostr/policy/related.rs @@ -169,10 +169,7 @@ impl RelatedEventPolicy { /// Check if any events exist in database /// Returns the first matching event ID found, or None if none match - async fn find_accepted_event( - &self, - event_ids: &[EventId], - ) -> Result, String> { + async fn find_accepted_event(&self, event_ids: &[EventId]) -> Result, String> { if event_ids.is_empty() { return Ok(None); } @@ -273,4 +270,4 @@ impl RelatedEventPolicy { Ok(false) } -} \ No newline at end of file +} diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 5692bd8..43349e2 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -239,7 +239,10 @@ impl StatePolicy { } // Build repository path: //.git - let repo_path = self.ctx.git_data_path.join(announcement.repo_path().clone()); + let repo_path = self + .ctx + .git_data_path + .join(announcement.repo_path().clone()); owner_repos.push((announcement, repo_path)); } @@ -416,4 +419,4 @@ impl StatePolicy { result } -} \ No newline at end of file +} diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 7d87411..3063516 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -65,9 +65,7 @@ pub fn derive_relay_targets( for (repo_id, needs) in repo_index { for relay_url in &needs.relays { - let entry = relay_targets - .entry(relay_url.clone()) - .or_insert_with(RelaySyncNeeds::default); + let entry = relay_targets.entry(relay_url.clone()).or_default(); entry.repos.insert(repo_id.clone()); entry.root_events.extend(needs.root_events.iter().cloned()); @@ -586,4 +584,4 @@ mod tests { ); assert_eq!(actions[0].relay_url, "wss://new-relay.com"); } -} \ No newline at end of file +} diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 02d580e..24e9bb2 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs @@ -337,4 +337,4 @@ mod tests { assert_eq!(filters.len(), 6); } -} \ No newline at end of file +} diff --git a/src/sync/health.rs b/src/sync/health.rs index f9a5f3a..0ae7dee 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs @@ -206,11 +206,7 @@ impl RelayHealthTracker { health.next_retry_at = Some(now + backoff); if old_state != HealthState::Degraded { - tracing::warn!( - "Relay {} degraded, backoff {:?}", - relay_url, - backoff - ); + tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); } else { tracing::debug!( "Relay {} failure #{}, backoff {:?}", @@ -308,12 +304,17 @@ impl RelayHealthTracker { /// Get all tracked relay URLs pub fn get_tracked_relays(&self) -> Vec { - self.health.iter().map(|entry| entry.key().clone()).collect() + self.health + .iter() + .map(|entry| entry.key().clone()) + .collect() } /// Get a clone of the health info for a relay pub fn get_health(&self, relay_url: &str) -> Option { - self.health.get(relay_url).map(|entry| entry.value().clone()) + self.health + .get(relay_url) + .map(|entry| entry.value().clone()) } } @@ -369,7 +370,7 @@ mod tests { fn test_backoff_increases_exponentially() { let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds let max = 3600u64; - + // failure 1: 5s (base * 2^0 = 5) assert_eq!( RelayHealthTracker::get_backoff_duration(1, base, max), @@ -498,4 +499,4 @@ mod tests { let health = tracker.get_health("wss://nonexistent.example.com"); assert!(health.is_none()); } -} \ No newline at end of file +} diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 411ff63..d917dc0 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs @@ -207,7 +207,9 @@ impl SyncMetrics { HealthState::Degraded => 2, HealthState::Dead => 3, }; - self.relay_status.with_label_values(&[relay]).set(state_value); + self.relay_status + .with_label_values(&[relay]) + .set(state_value); } /// Record relay failure count. @@ -259,9 +261,7 @@ impl SyncMetrics { /// * `source` - The event source type (see [`record_event`](Self::record_event)) /// * `count` - Number of events to record pub fn record_events(&self, source: &str, count: u64) { - self.events_total - .with_label_values(&[source]) - .inc_by(count); + self.events_total.with_label_values(&[source]).inc_by(count); } /// Record a gap event filled during catchup. @@ -451,4 +451,4 @@ mod tests { let metrics2 = SyncMetrics::register(®istry); assert!(metrics2.is_err()); } -} \ No newline at end of file +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c4c3c7f..fb59b3c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -512,8 +512,8 @@ impl SyncManager { }; // Check if relay supports NIP-77 negentropy AND negentropy is not disabled - let use_negentropy = !self.config.sync_disable_negentropy - && connection.supports_negentropy().await; + let use_negentropy = + !self.config.sync_disable_negentropy && connection.supports_negentropy().await; // Unsubscribe all current subscriptions connection.unsubscribe_all().await; @@ -1657,12 +1657,12 @@ impl SyncManager { let layer1_filters = 1; let layer2_filters = if repo_count > 0 { - ((repo_count + 99) / 100) * 3 + repo_count.div_ceil(100) * 3 } else { 0 }; let layer3_filters = if event_count > 0 { - ((event_count + 99) / 100) * 3 + event_count.div_ceil(100) * 3 } else { 0 }; diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index fae179b..4167a0c 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -150,17 +150,21 @@ impl RelayConnection { // // See: nostr-sdk-0.44 Client::try_connect_relay documentation self.client - .try_connect_relay(&self.url, std::time::Duration::from_secs(connection_timeout_secs)) + .try_connect_relay( + &self.url, + std::time::Duration::from_secs(connection_timeout_secs), + ) .await .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; // Subscribe to Layer 1 (announcements) let filter = build_announcement_filter(since); - let output = self - .client - .subscribe(filter, None) - .await - .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?; + let output = self.client.subscribe(filter, None).await.map_err(|e| { + format!( + "Failed to subscribe to announcements on {}: {}", + self.url, e + ) + })?; tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); Ok(output.val) @@ -250,7 +254,8 @@ impl RelayConnection { } RelayMessage::Closed { message: msg, .. } => { tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); - let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; + let _ = + event_sender.send(RelayEvent::Closed(msg.to_string())).await; break; } _ => {} @@ -421,7 +426,10 @@ impl RelayConnection { /// - Relay doesn't actually support NIP-77 (despite claiming to) /// - Network errors during reconciliation /// - Timeout during sync - pub async fn negentropy_sync_filter(&self, filter: Filter) -> Result { + pub async fn negentropy_sync_filter( + &self, + filter: Filter, + ) -> Result { // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange let sync_opts = SyncOptions::default(); diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index f83b081..e29e45b 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -49,7 +49,12 @@ impl PendingUpdates { } /// Add or update a repo with its relays and root events - fn add_repo(&mut self, repo_id: String, relays: HashSet, root_events: HashSet) { + fn add_repo( + &mut self, + repo_id: String, + relays: HashSet, + root_events: HashSet, + ) { let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), @@ -251,9 +256,9 @@ impl SelfSubscriber { /// /// Returns true if any extracted relay URL contains our domain fn lists_our_relay(&self, event: &Event) -> bool { - Self::extract_relay_urls(event).iter().any(|url| { - url.contains(&self.relay_domain) || url == &self.own_relay_url - }) + Self::extract_relay_urls(event) + .iter() + .any(|url| url.contains(&self.relay_domain) || url == &self.own_relay_url) } /// Main run loop @@ -413,21 +418,21 @@ impl SelfSubscriber { if let Some(repo_sync) = index.get_mut(&repo_ref) { // Add event.id to root_events set in the index (immediate availability) repo_sync.root_events.insert(event.id); - + // Clone the relays before releasing the lock - Layer 3 filters need to be // sent to the same relays as Layer 2 filters for this repo let relays = repo_sync.relays.clone(); - + // Release lock before modifying pending drop(index); - + // Also add root event to pending - this ensures batch processing runs // and creates Layer 3 filters for events referencing this root event. // CRITICAL: Include relays so derive_relay_targets knows where to send filters! let mut root_events = HashSet::new(); root_events.insert(event.id); pending.add_repo(repo_ref.clone(), relays.clone(), root_events); - + tracing::debug!( event_id = %event.id, repo_ref = %repo_ref, @@ -475,10 +480,12 @@ impl SelfSubscriber { for (repo_id, needs) in updates { // Merge with existing entry or insert new - let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { - relays: HashSet::new(), - root_events: HashSet::new(), - }); + let entry = index + .entry(repo_id.clone()) + .or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + }); entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); @@ -556,7 +563,7 @@ fn clone_url_to_relay_url(clone_url: &str) -> Option { } else { return None; }; - + // Extract just the host:port part (everything before the first /) let host_port = rest.split('/').next()?; Some(format!("{}{}", ws_scheme, host_port)) @@ -581,7 +588,7 @@ mod tests { Some("ws://localhost:3000".to_string()) ); } - + #[test] fn test_clone_url_to_relay_url_with_port() { assert_eq!( @@ -593,6 +600,9 @@ mod tests { #[test] fn test_clone_url_to_relay_url_unsupported() { assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); - assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); + assert_eq!( + clone_url_to_relay_url("ssh://git@example.com/repo.git"), + None + ); } -} \ No newline at end of file +} -- cgit v1.2.3