diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 15:17:04 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-04 15:24:19 +0000 |
| commit | fd0c87c787d0626b3546fa571541c9c809711821 (patch) | |
| tree | 934f20d973127f380b807d2bd44b25c197cf349c /src | |
| parent | 762cd8e815e797f173f541795de774fbbf978fc3 (diff) | |
add prometheus metrics
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 36 | ||||
| -rw-r--r-- | src/http/mod.rs | 84 | ||||
| -rw-r--r-- | src/http/nip11.rs | 6 | ||||
| -rw-r--r-- | src/lib.rs | 1 | ||||
| -rw-r--r-- | src/main.rs | 17 | ||||
| -rw-r--r-- | src/metrics/bandwidth.rs | 301 | ||||
| -rw-r--r-- | src/metrics/connection.rs | 337 | ||||
| -rw-r--r-- | src/metrics/mod.rs | 469 |
8 files changed, 1244 insertions, 7 deletions
diff --git a/src/config.rs b/src/config.rs index d095178..025e020 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -71,6 +71,18 @@ pub struct Config { | |||
| 71 | /// Database backend type | 71 | /// Database backend type |
| 72 | #[arg(long, env = "NGIT_DATABASE_BACKEND", value_enum, default_value_t = DatabaseBackend::Lmdb)] | 72 | #[arg(long, env = "NGIT_DATABASE_BACKEND", value_enum, default_value_t = DatabaseBackend::Lmdb)] |
| 73 | pub database_backend: DatabaseBackend, | 73 | pub database_backend: DatabaseBackend, |
| 74 | |||
| 75 | /// Enable Prometheus metrics endpoint | ||
| 76 | #[arg(long, env = "NGIT_METRICS_ENABLED", default_value_t = true)] | ||
| 77 | pub metrics_enabled: bool, | ||
| 78 | |||
| 79 | /// Connections per IP before flagging as potential abuse in metrics (display only, no rate limiting) | ||
| 80 | #[arg(long = "metrics-connection-per-ip-abuse-threshold", env = "NGIT_METRICS_CONNECTION_PER_IP_ABUSE_THRESHOLD", default_value_t = 10)] | ||
| 81 | pub metrics_connection_per_ip_abuse_threshold: u32, | ||
| 82 | |||
| 83 | /// Number of top bandwidth repos to track in metrics | ||
| 84 | #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] | ||
| 85 | pub metrics_top_n_repos: usize, | ||
| 74 | } | 86 | } |
| 75 | 87 | ||
| 76 | impl Config { | 88 | impl Config { |
| @@ -123,6 +135,9 @@ impl Config { | |||
| 123 | relay_data_path: "./test_data/relay".to_string(), | 135 | relay_data_path: "./test_data/relay".to_string(), |
| 124 | bind_address: "127.0.0.1:8080".to_string(), | 136 | bind_address: "127.0.0.1:8080".to_string(), |
| 125 | database_backend: DatabaseBackend::Memory, | 137 | database_backend: DatabaseBackend::Memory, |
| 138 | metrics_enabled: true, | ||
| 139 | metrics_connection_per_ip_abuse_threshold: 10, | ||
| 140 | metrics_top_n_repos: 10, | ||
| 126 | } | 141 | } |
| 127 | } | 142 | } |
| 128 | } | 143 | } |
| @@ -202,4 +217,25 @@ mod tests { | |||
| 202 | }; | 217 | }; |
| 203 | assert!(config.owner_npub.is_none()); | 218 | assert!(config.owner_npub.is_none()); |
| 204 | } | 219 | } |
| 220 | |||
| 221 | #[test] | ||
| 222 | fn test_metrics_config_defaults() { | ||
| 223 | let config = Config::for_testing(); | ||
| 224 | assert!(config.metrics_enabled); | ||
| 225 | assert_eq!(config.metrics_connection_per_ip_abuse_threshold, 10); | ||
| 226 | assert_eq!(config.metrics_top_n_repos, 10); | ||
| 227 | } | ||
| 228 | |||
| 229 | #[test] | ||
| 230 | fn test_metrics_config_custom_values() { | ||
| 231 | let config = Config { | ||
| 232 | metrics_enabled: false, | ||
| 233 | metrics_connection_per_ip_abuse_threshold: 50, | ||
| 234 | metrics_top_n_repos: 25, | ||
| 235 | ..Config::for_testing() | ||
| 236 | }; | ||
| 237 | assert!(!config.metrics_enabled); | ||
| 238 | assert_eq!(config.metrics_connection_per_ip_abuse_threshold, 50); | ||
| 239 | assert_eq!(config.metrics_top_n_repos, 25); | ||
| 240 | } | ||
| 205 | } | 241 | } |
diff --git a/src/http/mod.rs b/src/http/mod.rs index 8b1f687..f584e03 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs | |||
| @@ -7,6 +7,7 @@ pub mod nip11; | |||
| 7 | use std::future::Future; | 7 | use std::future::Future; |
| 8 | use std::net::SocketAddr; | 8 | use std::net::SocketAddr; |
| 9 | use std::pin::Pin; | 9 | use std::pin::Pin; |
| 10 | use std::sync::Arc; | ||
| 10 | 11 | ||
| 11 | use base64::Engine; | 12 | use base64::Engine; |
| 12 | use http_body_util::{BodyExt, Full}; | 13 | use http_body_util::{BodyExt, Full}; |
| @@ -24,6 +25,7 @@ use tokio::net::TcpListener; | |||
| 24 | 25 | ||
| 25 | use crate::config::Config; | 26 | use crate::config::Config; |
| 26 | use crate::git; | 27 | use crate::git; |
| 28 | use crate::metrics::Metrics; | ||
| 27 | use crate::nostr::builder::SharedDatabase; | 29 | use crate::nostr::builder::SharedDatabase; |
| 28 | 30 | ||
| 29 | /// CORS headers required by GRASP-01 specification (lines 40-47) | 31 | /// CORS headers required by GRASP-01 specification (lines 40-47) |
| @@ -90,6 +92,8 @@ struct HttpService { | |||
| 90 | remote: SocketAddr, | 92 | remote: SocketAddr, |
| 91 | /// Database reference for direct queries (e.g., push authorization) | 93 | /// Database reference for direct queries (e.g., push authorization) |
| 92 | database: SharedDatabase, | 94 | database: SharedDatabase, |
| 95 | /// Optional metrics for Prometheus endpoint | ||
| 96 | metrics: Option<Arc<Metrics>>, | ||
| 93 | } | 97 | } |
| 94 | 98 | ||
| 95 | impl HttpService { | 99 | impl HttpService { |
| @@ -98,12 +102,14 @@ impl HttpService { | |||
| 98 | config: Config, | 102 | config: Config, |
| 99 | remote: SocketAddr, | 103 | remote: SocketAddr, |
| 100 | database: SharedDatabase, | 104 | database: SharedDatabase, |
| 105 | metrics: Option<Arc<Metrics>>, | ||
| 101 | ) -> Self { | 106 | ) -> Self { |
| 102 | Self { | 107 | Self { |
| 103 | relay, | 108 | relay, |
| 104 | config, | 109 | config, |
| 105 | remote, | 110 | remote, |
| 106 | database, | 111 | database, |
| 112 | metrics, | ||
| 107 | } | 113 | } |
| 108 | } | 114 | } |
| 109 | } | 115 | } |
| @@ -150,6 +156,7 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 150 | ); | 156 | ); |
| 151 | 157 | ||
| 152 | let repo_path = git::resolve_repo_path(&git_data_path, &npub, &identifier); | 158 | let repo_path = git::resolve_repo_path(&git_data_path, &npub, &identifier); |
| 159 | let metrics_clone = self.metrics.clone(); | ||
| 153 | 160 | ||
| 154 | return Box::pin(async move { | 161 | return Box::pin(async move { |
| 155 | // Collect request body once before the match statement | 162 | // Collect request body once before the match statement |
| @@ -170,14 +177,31 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 170 | .and_then(git::protocol::GitService::from_query_param); | 177 | .and_then(git::protocol::GitService::from_query_param); |
| 171 | 178 | ||
| 172 | match service { | 179 | match service { |
| 173 | Some(svc) => git::handlers::handle_info_refs(repo_path, svc).await, | 180 | Some(svc) => { |
| 181 | let result = git::handlers::handle_info_refs(repo_path, svc).await; | ||
| 182 | // Track operation | ||
| 183 | if let Some(ref m) = metrics_clone { | ||
| 184 | let status = if result.is_ok() { "success" } else { "error" }; | ||
| 185 | let operation = match svc { | ||
| 186 | git::protocol::GitService::UploadPack => "fetch", | ||
| 187 | git::protocol::GitService::ReceivePack => "push", | ||
| 188 | }; | ||
| 189 | m.record_git_operation(operation, status); | ||
| 190 | } | ||
| 191 | result | ||
| 192 | } | ||
| 174 | None => Err(git::handlers::GitError::RepositoryNotFound), | 193 | None => Err(git::handlers::GitError::RepositoryNotFound), |
| 175 | } | 194 | } |
| 176 | } | 195 | } |
| 177 | 196 | ||
| 178 | // POST /git-upload-pack (clone/fetch) | 197 | // POST /git-upload-pack (clone/fetch) |
| 179 | (m, "git-upload-pack") if m == Method::POST => { | 198 | (m, "git-upload-pack") if m == Method::POST => { |
| 180 | git::handlers::handle_upload_pack(repo_path, body_bytes).await | 199 | let result = git::handlers::handle_upload_pack(repo_path, body_bytes).await; |
| 200 | if let Some(ref m) = metrics_clone { | ||
| 201 | let status = if result.is_ok() { "success" } else { "error" }; | ||
| 202 | m.record_git_operation("clone", status); | ||
| 203 | } | ||
| 204 | result | ||
| 181 | } | 205 | } |
| 182 | 206 | ||
| 183 | // POST /git-receive-pack (push) - with GRASP authorization via database | 207 | // POST /git-receive-pack (push) - with GRASP authorization via database |
| @@ -187,6 +211,10 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 187 | Ok(pk) => pk.to_hex(), | 211 | Ok(pk) => pk.to_hex(), |
| 188 | Err(e) => { | 212 | Err(e) => { |
| 189 | tracing::warn!("Invalid npub in URL {}: {}", npub, e); | 213 | tracing::warn!("Invalid npub in URL {}: {}", npub, e); |
| 214 | // Track failed push due to invalid npub | ||
| 215 | if let Some(ref m) = metrics_clone { | ||
| 216 | m.record_git_operation("push", "error"); | ||
| 217 | } | ||
| 190 | return Ok(add_cors_headers(Response::builder()) | 218 | return Ok(add_cors_headers(Response::builder()) |
| 191 | .status(hyper::StatusCode::BAD_REQUEST) | 219 | .status(hyper::StatusCode::BAD_REQUEST) |
| 192 | .body(Full::new(Bytes::from(format!("Invalid npub: {}", e)))) | 220 | .body(Full::new(Bytes::from(format!("Invalid npub: {}", e)))) |
| @@ -194,14 +222,21 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 194 | } | 222 | } |
| 195 | }; | 223 | }; |
| 196 | 224 | ||
| 197 | git::handlers::handle_receive_pack( | 225 | let result = git::handlers::handle_receive_pack( |
| 198 | repo_path, | 226 | repo_path, |
| 199 | body_bytes.clone(), | 227 | body_bytes.clone(), |
| 200 | Some(database.clone()), | 228 | Some(database.clone()), |
| 201 | &identifier, | 229 | &identifier, |
| 202 | &owner_pubkey_hex, | 230 | &owner_pubkey_hex, |
| 203 | ) | 231 | ) |
| 204 | .await | 232 | .await; |
| 233 | |||
| 234 | if let Some(ref m) = metrics_clone { | ||
| 235 | let status = if result.is_ok() { "success" } else { "error" }; | ||
| 236 | m.record_git_operation("push", status); | ||
| 237 | } | ||
| 238 | |||
| 239 | result | ||
| 205 | } | 240 | } |
| 206 | 241 | ||
| 207 | _ => Err(git::handlers::GitError::RepositoryNotFound), | 242 | _ => Err(git::handlers::GitError::RepositoryNotFound), |
| @@ -333,17 +368,27 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 333 | 368 | ||
| 334 | let addr = self.remote; | 369 | let addr = self.remote; |
| 335 | let relay = self.relay.clone(); | 370 | let relay = self.relay.clone(); |
| 371 | let metrics_clone = self.metrics.clone(); | ||
| 336 | 372 | ||
| 337 | tokio::spawn(async move { | 373 | tokio::spawn(async move { |
| 338 | match hyper::upgrade::on(req).await { | 374 | match hyper::upgrade::on(req).await { |
| 339 | Ok(upgraded) => { | 375 | Ok(upgraded) => { |
| 340 | tracing::info!("WebSocket connection established from {}", addr); | 376 | tracing::info!("WebSocket connection established from {}", addr); |
| 377 | // Track connection | ||
| 378 | if let Some(ref m) = metrics_clone { | ||
| 379 | m.connection_tracker().on_connect(addr.ip()); | ||
| 380 | m.record_websocket_connection(); | ||
| 381 | } | ||
| 341 | if let Err(e) = | 382 | if let Err(e) = |
| 342 | relay.take_connection(TokioIo::new(upgraded), addr).await | 383 | relay.take_connection(TokioIo::new(upgraded), addr).await |
| 343 | { | 384 | { |
| 344 | tracing::error!("Relay error for {}: {}", addr, e); | 385 | tracing::error!("Relay error for {}: {}", addr, e); |
| 345 | } | 386 | } |
| 346 | tracing::info!("WebSocket connection closed for {}", addr); | 387 | tracing::info!("WebSocket connection closed for {}", addr); |
| 388 | // Untrack connection | ||
| 389 | if let Some(ref m) = metrics_clone { | ||
| 390 | m.connection_tracker().on_disconnect(addr.ip()); | ||
| 391 | } | ||
| 347 | } | 392 | } |
| 348 | Err(e) => tracing::error!("Upgrade error: {}", e), | 393 | Err(e) => tracing::error!("Upgrade error: {}", e), |
| 349 | } | 394 | } |
| @@ -361,6 +406,33 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 361 | } | 406 | } |
| 362 | } | 407 | } |
| 363 | 408 | ||
| 409 | // Serve Prometheus metrics if enabled | ||
| 410 | if path == "/metrics" { | ||
| 411 | if let Some(ref metrics) = self.metrics { | ||
| 412 | let metrics = metrics.clone(); | ||
| 413 | return Box::pin(async move { | ||
| 414 | let output = metrics.render(); | ||
| 415 | Ok( | ||
| 416 | add_cors_headers(Response::builder().header("server", "ngit-grasp")) | ||
| 417 | .status(200) | ||
| 418 | .header("content-type", "text/plain; version=0.0.4; charset=utf-8") | ||
| 419 | .body(Full::new(Bytes::from(output))) | ||
| 420 | .unwrap(), | ||
| 421 | ) | ||
| 422 | }); | ||
| 423 | } else { | ||
| 424 | // Metrics disabled | ||
| 425 | return Box::pin(async move { | ||
| 426 | Ok( | ||
| 427 | add_cors_headers(Response::builder().header("server", "ngit-grasp")) | ||
| 428 | .status(404) | ||
| 429 | .body(Full::new(Bytes::from("Metrics disabled"))) | ||
| 430 | .unwrap(), | ||
| 431 | ) | ||
| 432 | }); | ||
| 433 | } | ||
| 434 | } | ||
| 435 | |||
| 364 | // Serve static icon at /icon.png | 436 | // Serve static icon at /icon.png |
| 365 | if path == "/icon.png" { | 437 | if path == "/icon.png" { |
| 366 | return Box::pin(async move { | 438 | return Box::pin(async move { |
| @@ -419,10 +491,12 @@ fn derive_accept_key(request_key: &[u8]) -> String { | |||
| 419 | /// * `config` - Server configuration | 491 | /// * `config` - Server configuration |
| 420 | /// * `relay` - The LocalRelay for WebSocket connections | 492 | /// * `relay` - The LocalRelay for WebSocket connections |
| 421 | /// * `database` - The database for direct queries (e.g., push authorization) | 493 | /// * `database` - The database for direct queries (e.g., push authorization) |
| 494 | /// * `metrics` - Optional metrics for Prometheus endpoint | ||
| 422 | pub async fn run_server( | 495 | pub async fn run_server( |
| 423 | config: Config, | 496 | config: Config, |
| 424 | relay: LocalRelay, | 497 | relay: LocalRelay, |
| 425 | database: SharedDatabase, | 498 | database: SharedDatabase, |
| 499 | metrics: Option<Arc<Metrics>>, | ||
| 426 | ) -> anyhow::Result<()> { | 500 | ) -> anyhow::Result<()> { |
| 427 | let bind_addr: SocketAddr = config.bind_address.parse()?; | 501 | let bind_addr: SocketAddr = config.bind_address.parse()?; |
| 428 | 502 | ||
| @@ -435,7 +509,7 @@ pub async fn run_server( | |||
| 435 | loop { | 509 | loop { |
| 436 | let (socket, addr) = listener.accept().await?; | 510 | let (socket, addr) = listener.accept().await?; |
| 437 | let io = TokioIo::new(socket); | 511 | let io = TokioIo::new(socket); |
| 438 | let service = HttpService::new(relay.clone(), config.clone(), addr, database.clone()); | 512 | let service = HttpService::new(relay.clone(), config.clone(), addr, database.clone(), metrics.clone()); |
| 439 | 513 | ||
| 440 | tokio::spawn(async move { | 514 | tokio::spawn(async move { |
| 441 | if let Err(e) = http1::Builder::new() | 515 | if let Err(e) = http1::Builder::new() |
diff --git a/src/http/nip11.rs b/src/http/nip11.rs index 1ed80de..1723601 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs | |||
| @@ -102,6 +102,9 @@ mod tests { | |||
| 102 | relay_data_path: "./data/relay".to_string(), | 102 | relay_data_path: "./data/relay".to_string(), |
| 103 | bind_address: "127.0.0.1:8080".to_string(), | 103 | bind_address: "127.0.0.1:8080".to_string(), |
| 104 | database_backend: crate::config::DatabaseBackend::Memory, | 104 | database_backend: crate::config::DatabaseBackend::Memory, |
| 105 | metrics_enabled: true, | ||
| 106 | metrics_connection_per_ip_abuse_threshold: 10, | ||
| 107 | metrics_top_n_repos: 10, | ||
| 105 | }; | 108 | }; |
| 106 | 109 | ||
| 107 | let doc = RelayInformationDocument::from_config(&config); | 110 | let doc = RelayInformationDocument::from_config(&config); |
| @@ -133,6 +136,9 @@ mod tests { | |||
| 133 | relay_data_path: "./data/relay".to_string(), | 136 | relay_data_path: "./data/relay".to_string(), |
| 134 | bind_address: "127.0.0.1:8080".to_string(), | 137 | bind_address: "127.0.0.1:8080".to_string(), |
| 135 | database_backend: crate::config::DatabaseBackend::Memory, | 138 | database_backend: crate::config::DatabaseBackend::Memory, |
| 139 | metrics_enabled: true, | ||
| 140 | metrics_connection_per_ip_abuse_threshold: 10, | ||
| 141 | metrics_top_n_repos: 10, | ||
| 136 | }; | 142 | }; |
| 137 | 143 | ||
| 138 | let doc = RelayInformationDocument::from_config(&config); | 144 | let doc = RelayInformationDocument::from_config(&config); |
| @@ -1,4 +1,5 @@ | |||
| 1 | pub mod config; | 1 | pub mod config; |
| 2 | pub mod git; | 2 | pub mod git; |
| 3 | pub mod http; | 3 | pub mod http; |
| 4 | pub mod metrics; | ||
| 4 | pub mod nostr; | 5 | pub mod nostr; |
diff --git a/src/main.rs b/src/main.rs index f80e920..9200cc2 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -1,10 +1,14 @@ | |||
| 1 | use std::sync::Arc; | ||
| 2 | |||
| 1 | use anyhow::Result; | 3 | use anyhow::Result; |
| 2 | use tracing::{info, Level}; | 4 | use tracing::{info, Level}; |
| 3 | use tracing_subscriber::FmtSubscriber; | 5 | use tracing_subscriber::FmtSubscriber; |
| 4 | 6 | ||
| 5 | use ngit_grasp::{ | 7 | use ngit_grasp::{ |
| 6 | config::{Config, DatabaseBackend}, | 8 | config::{Config, DatabaseBackend}, |
| 7 | http, nostr, | 9 | http, |
| 10 | metrics::Metrics, | ||
| 11 | nostr, | ||
| 8 | }; | 12 | }; |
| 9 | 13 | ||
| 10 | #[tokio::main] | 14 | #[tokio::main] |
| @@ -29,6 +33,15 @@ async fn main() -> Result<()> { | |||
| 29 | } | 33 | } |
| 30 | info!("Database backend: {}", config.database_backend); | 34 | info!("Database backend: {}", config.database_backend); |
| 31 | 35 | ||
| 36 | // Initialize metrics if enabled | ||
| 37 | let metrics = if config.metrics_enabled { | ||
| 38 | info!("Metrics enabled on /metrics endpoint"); | ||
| 39 | Some(Arc::new(Metrics::new(config.metrics_connection_per_ip_abuse_threshold))) | ||
| 40 | } else { | ||
| 41 | info!("Metrics disabled"); | ||
| 42 | None | ||
| 43 | }; | ||
| 44 | |||
| 32 | // Create Nostr relay with NIP-34 validation | 45 | // Create Nostr relay with NIP-34 validation |
| 33 | // Returns both the relay and database for direct queries in handlers | 46 | // Returns both the relay and database for direct queries in handlers |
| 34 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config) { | 47 | if let Ok(relay_with_db) = nostr::builder::create_relay(&config) { |
| @@ -39,7 +52,7 @@ async fn main() -> Result<()> { | |||
| 39 | 52 | ||
| 40 | // Start HTTP server with integrated relay and database | 53 | // Start HTTP server with integrated relay and database |
| 41 | info!("Starting HTTP server on {}", config.bind_address); | 54 | info!("Starting HTTP server on {}", config.bind_address); |
| 42 | http::run_server(config, relay_with_db.relay, relay_with_db.database).await?; | 55 | http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?; |
| 43 | } | 56 | } |
| 44 | 57 | ||
| 45 | Ok(()) | 58 | Ok(()) |
diff --git a/src/metrics/bandwidth.rs b/src/metrics/bandwidth.rs new file mode 100644 index 0000000..d2c53e8 --- /dev/null +++ b/src/metrics/bandwidth.rs | |||
| @@ -0,0 +1,301 @@ | |||
| 1 | //! Repository bandwidth tracking with cardinality control. | ||
| 2 | //! | ||
| 3 | //! This module tracks bandwidth per repository but only exposes the top N | ||
| 4 | //! repositories to Prometheus to prevent cardinality explosion with many repos. | ||
| 5 | //! | ||
| 6 | //! # Cardinality Control | ||
| 7 | //! | ||
| 8 | //! - All per-repo bandwidth is tracked internally in a `DashMap<RepoId, u64>` | ||
| 9 | //! - Every 60 seconds, the top 10 are calculated and exposed to Prometheus | ||
| 10 | //! - Previous repo labels are cleared before setting new ones | ||
| 11 | //! - Prometheus only ever sees ~10 label values, keeping cardinality low | ||
| 12 | |||
| 13 | use std::sync::atomic::{AtomicU64, Ordering}; | ||
| 14 | use std::time::{Duration, Instant}; | ||
| 15 | |||
| 16 | use dashmap::DashMap; | ||
| 17 | use prometheus::{GaugeVec, Opts, Registry}; | ||
| 18 | |||
| 19 | /// Default number of top repositories to expose in metrics | ||
| 20 | const DEFAULT_TOP_N: usize = 10; | ||
| 21 | |||
| 22 | /// Default refresh interval for top-N calculation (60 seconds) | ||
| 23 | const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(60); | ||
| 24 | |||
| 25 | /// Tracks bandwidth per repository with top-N exposure to Prometheus. | ||
| 26 | /// | ||
| 27 | /// # Design | ||
| 28 | /// | ||
| 29 | /// All repositories are tracked internally for accurate total bandwidth, | ||
| 30 | /// but only the top N by bytes transferred are exposed to Prometheus. | ||
| 31 | /// This prevents cardinality explosion when hosting thousands of repositories. | ||
| 32 | /// | ||
| 33 | /// # Thread Safety | ||
| 34 | /// | ||
| 35 | /// Uses `DashMap` for lock-free concurrent access and atomics for | ||
| 36 | /// the refresh timestamp. | ||
| 37 | pub struct BandwidthTracker { | ||
| 38 | /// Internal: tracks ALL repos (memory only, not exposed) | ||
| 39 | all_repos: DashMap<String, u64>, | ||
| 40 | |||
| 41 | /// Exposed to Prometheus: only top N repos | ||
| 42 | top_repos_gauge: GaugeVec, | ||
| 43 | |||
| 44 | /// Last refresh timestamp (stored as nanos since some epoch) | ||
| 45 | last_refresh_nanos: AtomicU64, | ||
| 46 | |||
| 47 | /// Instant when the tracker was created (for relative timing) | ||
| 48 | start_instant: Instant, | ||
| 49 | |||
| 50 | /// Number of top repos to expose | ||
| 51 | top_n: usize, | ||
| 52 | |||
| 53 | /// Refresh interval | ||
| 54 | refresh_interval: Duration, | ||
| 55 | } | ||
| 56 | |||
| 57 | impl BandwidthTracker { | ||
| 58 | /// Creates a new BandwidthTracker and registers metrics with Prometheus. | ||
| 59 | /// | ||
| 60 | /// Uses default settings: | ||
| 61 | /// - Top 10 repositories exposed | ||
| 62 | /// - 60 second refresh interval | ||
| 63 | pub fn new(registry: &Registry) -> Self { | ||
| 64 | Self::with_config(registry, DEFAULT_TOP_N, DEFAULT_REFRESH_INTERVAL) | ||
| 65 | } | ||
| 66 | |||
| 67 | /// Creates a new BandwidthTracker with custom configuration. | ||
| 68 | /// | ||
| 69 | /// # Arguments | ||
| 70 | /// | ||
| 71 | /// * `registry` - Prometheus registry to register metrics with | ||
| 72 | /// * `top_n` - Number of top repositories to expose in metrics | ||
| 73 | /// * `refresh_interval` - How often to recalculate the top-N list | ||
| 74 | pub fn with_config(registry: &Registry, top_n: usize, refresh_interval: Duration) -> Self { | ||
| 75 | let top_repos_gauge = GaugeVec::new( | ||
| 76 | Opts::new( | ||
| 77 | "ngit_git_top_repos_bytes", | ||
| 78 | "Top repositories by bandwidth (refreshed periodically)", | ||
| 79 | ), | ||
| 80 | &["repo"], | ||
| 81 | ) | ||
| 82 | .unwrap(); | ||
| 83 | registry.register(Box::new(top_repos_gauge.clone())).unwrap(); | ||
| 84 | |||
| 85 | Self { | ||
| 86 | all_repos: DashMap::new(), | ||
| 87 | top_repos_gauge, | ||
| 88 | last_refresh_nanos: AtomicU64::new(0), | ||
| 89 | start_instant: Instant::now(), | ||
| 90 | top_n, | ||
| 91 | refresh_interval, | ||
| 92 | } | ||
| 93 | } | ||
| 94 | |||
| 95 | /// Records bytes transferred for a repository. | ||
| 96 | /// | ||
| 97 | /// # Arguments | ||
| 98 | /// | ||
| 99 | /// * `repo_id` - Repository identifier (e.g., npub or repo name) | ||
| 100 | /// * `bytes` - Number of bytes transferred | ||
| 101 | pub fn record_transfer(&self, repo_id: &str, bytes: u64) { | ||
| 102 | self.all_repos | ||
| 103 | .entry(repo_id.to_string()) | ||
| 104 | .and_modify(|v| *v = v.saturating_add(bytes)) | ||
| 105 | .or_insert(bytes); | ||
| 106 | } | ||
| 107 | |||
| 108 | /// Conditionally refreshes the top-N list if the refresh interval has elapsed. | ||
| 109 | /// | ||
| 110 | /// This method is designed to be called frequently (e.g., on every | ||
| 111 | /// `/metrics` request) without performance impact - it only does work | ||
| 112 | /// when the refresh interval has elapsed. | ||
| 113 | pub fn maybe_refresh_top_n(&self) { | ||
| 114 | let elapsed_nanos = self.start_instant.elapsed().as_nanos() as u64; | ||
| 115 | let last_refresh = self.last_refresh_nanos.load(Ordering::Relaxed); | ||
| 116 | let interval_nanos = self.refresh_interval.as_nanos() as u64; | ||
| 117 | |||
| 118 | // Check if enough time has passed since last refresh | ||
| 119 | if elapsed_nanos.saturating_sub(last_refresh) >= interval_nanos { | ||
| 120 | // Try to update the timestamp atomically to prevent concurrent refreshes | ||
| 121 | if self | ||
| 122 | .last_refresh_nanos | ||
| 123 | .compare_exchange(last_refresh, elapsed_nanos, Ordering::SeqCst, Ordering::Relaxed) | ||
| 124 | .is_ok() | ||
| 125 | { | ||
| 126 | self.refresh_top_n(); | ||
| 127 | } | ||
| 128 | } | ||
| 129 | } | ||
| 130 | |||
| 131 | /// Forces a refresh of the top-N list. | ||
| 132 | /// | ||
| 133 | /// This recalculates which repositories are in the top N by bandwidth | ||
| 134 | /// and updates the Prometheus gauges accordingly. | ||
| 135 | pub fn refresh_top_n(&self) { | ||
| 136 | // Collect all repo data | ||
| 137 | let mut sorted: Vec<_> = self | ||
| 138 | .all_repos | ||
| 139 | .iter() | ||
| 140 | .map(|r| (r.key().clone(), *r.value())) | ||
| 141 | .collect(); | ||
| 142 | |||
| 143 | // Sort by bytes descending | ||
| 144 | sorted.sort_by(|a, b| b.1.cmp(&a.1)); | ||
| 145 | |||
| 146 | // Clear old labels and set new top N | ||
| 147 | self.top_repos_gauge.reset(); | ||
| 148 | for (repo, bytes) in sorted.into_iter().take(self.top_n) { | ||
| 149 | self.top_repos_gauge | ||
| 150 | .with_label_values(&[&repo]) | ||
| 151 | .set(bytes as f64); | ||
| 152 | } | ||
| 153 | } | ||
| 154 | |||
| 155 | /// Returns the total bytes transferred for a specific repository. | ||
| 156 | /// | ||
| 157 | /// Returns `None` if the repository has not been seen. | ||
| 158 | pub fn get_repo_bytes(&self, repo_id: &str) -> Option<u64> { | ||
| 159 | self.all_repos.get(repo_id).map(|v| *v) | ||
| 160 | } | ||
| 161 | |||
| 162 | /// Returns the total bytes transferred across all repositories. | ||
| 163 | pub fn total_bytes(&self) -> u64 { | ||
| 164 | self.all_repos.iter().map(|r| *r.value()).sum() | ||
| 165 | } | ||
| 166 | |||
| 167 | /// Returns the number of repositories being tracked. | ||
| 168 | pub fn repo_count(&self) -> usize { | ||
| 169 | self.all_repos.len() | ||
| 170 | } | ||
| 171 | |||
| 172 | /// Returns the top N repositories by bandwidth. | ||
| 173 | /// | ||
| 174 | /// This is a snapshot and may not match the Prometheus gauges if | ||
| 175 | /// a refresh hasn't occurred recently. | ||
| 176 | pub fn get_top_repos(&self) -> Vec<(String, u64)> { | ||
| 177 | let mut sorted: Vec<_> = self | ||
| 178 | .all_repos | ||
| 179 | .iter() | ||
| 180 | .map(|r| (r.key().clone(), *r.value())) | ||
| 181 | .collect(); | ||
| 182 | |||
| 183 | sorted.sort_by(|a, b| b.1.cmp(&a.1)); | ||
| 184 | sorted.truncate(self.top_n); | ||
| 185 | sorted | ||
| 186 | } | ||
| 187 | } | ||
| 188 | |||
| 189 | #[cfg(test)] | ||
| 190 | mod tests { | ||
| 191 | use super::*; | ||
| 192 | |||
| 193 | fn test_registry() -> Registry { | ||
| 194 | Registry::new() | ||
| 195 | } | ||
| 196 | |||
| 197 | #[test] | ||
| 198 | fn test_bandwidth_tracking() { | ||
| 199 | let registry = test_registry(); | ||
| 200 | let tracker = BandwidthTracker::new(®istry); | ||
| 201 | |||
| 202 | // Record transfers | ||
| 203 | tracker.record_transfer("repo-a", 1000); | ||
| 204 | tracker.record_transfer("repo-b", 2000); | ||
| 205 | tracker.record_transfer("repo-a", 500); // Additional transfer to repo-a | ||
| 206 | |||
| 207 | assert_eq!(tracker.get_repo_bytes("repo-a"), Some(1500)); | ||
| 208 | assert_eq!(tracker.get_repo_bytes("repo-b"), Some(2000)); | ||
| 209 | assert_eq!(tracker.get_repo_bytes("repo-c"), None); | ||
| 210 | assert_eq!(tracker.total_bytes(), 3500); | ||
| 211 | assert_eq!(tracker.repo_count(), 2); | ||
| 212 | } | ||
| 213 | |||
| 214 | #[test] | ||
| 215 | fn test_top_n_repos() { | ||
| 216 | let registry = test_registry(); | ||
| 217 | let tracker = BandwidthTracker::with_config(®istry, 3, Duration::from_secs(60)); | ||
| 218 | |||
| 219 | // Create 5 repos with different bandwidth | ||
| 220 | tracker.record_transfer("repo-1", 100); | ||
| 221 | tracker.record_transfer("repo-2", 500); | ||
| 222 | tracker.record_transfer("repo-3", 200); | ||
| 223 | tracker.record_transfer("repo-4", 800); | ||
| 224 | tracker.record_transfer("repo-5", 300); | ||
| 225 | |||
| 226 | let top = tracker.get_top_repos(); | ||
| 227 | assert_eq!(top.len(), 3); | ||
| 228 | assert_eq!(top[0], ("repo-4".to_string(), 800)); | ||
| 229 | assert_eq!(top[1], ("repo-2".to_string(), 500)); | ||
| 230 | assert_eq!(top[2], ("repo-5".to_string(), 300)); | ||
| 231 | } | ||
| 232 | |||
| 233 | #[test] | ||
| 234 | fn test_refresh_updates_gauge() { | ||
| 235 | let registry = test_registry(); | ||
| 236 | let tracker = BandwidthTracker::new(®istry); | ||
| 237 | |||
| 238 | tracker.record_transfer("high-bandwidth-repo", 10_000_000); | ||
| 239 | tracker.record_transfer("low-bandwidth-repo", 1000); | ||
| 240 | |||
| 241 | // Force a refresh | ||
| 242 | tracker.refresh_top_n(); | ||
| 243 | |||
| 244 | // Verify the gauge values (we can't easily access them directly, | ||
| 245 | // but we can verify the tracker state is correct) | ||
| 246 | assert_eq!(tracker.repo_count(), 2); | ||
| 247 | assert_eq!(tracker.total_bytes(), 10_001_000); | ||
| 248 | } | ||
| 249 | |||
| 250 | #[test] | ||
| 251 | fn test_saturating_add() { | ||
| 252 | let registry = test_registry(); | ||
| 253 | let tracker = BandwidthTracker::new(®istry); | ||
| 254 | |||
| 255 | // Test that we don't overflow | ||
| 256 | tracker.record_transfer("huge-repo", u64::MAX - 100); | ||
| 257 | tracker.record_transfer("huge-repo", 200); | ||
| 258 | |||
| 259 | // Should saturate to MAX, not overflow | ||
| 260 | assert_eq!(tracker.get_repo_bytes("huge-repo"), Some(u64::MAX)); | ||
| 261 | } | ||
| 262 | |||
| 263 | #[test] | ||
| 264 | fn test_maybe_refresh_respects_interval() { | ||
| 265 | let registry = test_registry(); | ||
| 266 | // Use a very short interval for testing | ||
| 267 | let tracker = BandwidthTracker::with_config(®istry, 10, Duration::from_millis(10)); | ||
| 268 | |||
| 269 | tracker.record_transfer("repo-a", 1000); | ||
| 270 | |||
| 271 | // First call should trigger refresh (no previous refresh) | ||
| 272 | tracker.maybe_refresh_top_n(); | ||
| 273 | |||
| 274 | // Add more data | ||
| 275 | tracker.record_transfer("repo-b", 2000); | ||
| 276 | |||
| 277 | // Immediate second call should NOT trigger refresh | ||
| 278 | let count_before = tracker.repo_count(); | ||
| 279 | tracker.maybe_refresh_top_n(); | ||
| 280 | assert_eq!(tracker.repo_count(), count_before); | ||
| 281 | |||
| 282 | // Wait for interval to pass | ||
| 283 | std::thread::sleep(Duration::from_millis(15)); | ||
| 284 | |||
| 285 | // Now it should refresh | ||
| 286 | tracker.maybe_refresh_top_n(); | ||
| 287 | } | ||
| 288 | |||
| 289 | #[test] | ||
| 290 | fn test_empty_tracker() { | ||
| 291 | let registry = test_registry(); | ||
| 292 | let tracker = BandwidthTracker::new(®istry); | ||
| 293 | |||
| 294 | assert_eq!(tracker.total_bytes(), 0); | ||
| 295 | assert_eq!(tracker.repo_count(), 0); | ||
| 296 | assert!(tracker.get_top_repos().is_empty()); | ||
| 297 | |||
| 298 | // Refresh should not panic on empty data | ||
| 299 | tracker.refresh_top_n(); | ||
| 300 | } | ||
| 301 | } \ No newline at end of file | ||
diff --git a/src/metrics/connection.rs b/src/metrics/connection.rs new file mode 100644 index 0000000..6a7f406 --- /dev/null +++ b/src/metrics/connection.rs | |||
| @@ -0,0 +1,337 @@ | |||
| 1 | //! Connection tracking with privacy-preserving abuse detection. | ||
| 2 | //! | ||
| 3 | //! This module tracks WebSocket connections per IP address internally for abuse | ||
| 4 | //! detection, but NEVER exposes IP addresses in Prometheus metrics. Only aggregate | ||
| 5 | //! counts are exposed. | ||
| 6 | //! | ||
| 7 | //! # Privacy Model | ||
| 8 | //! | ||
| 9 | //! | Data | Location | Exposed? | | ||
| 10 | //! |------|----------|----------| | ||
| 11 | //! | Total connections | Prometheus | ✅ Yes | | ||
| 12 | //! | Unique IP count | Prometheus | ✅ Yes | | ||
| 13 | //! | Flagged abuser count | Prometheus | ✅ Yes | | ||
| 14 | //! | Actual IP addresses | Internal HashMap | ❌ No | | ||
| 15 | //! | IP + abuse flag | Logs (when flagged) | ⚠️ Logs only | | ||
| 16 | |||
| 17 | use std::net::IpAddr; | ||
| 18 | use std::time::Instant; | ||
| 19 | |||
| 20 | use dashmap::DashMap; | ||
| 21 | use prometheus::{IntGauge, Opts, Registry}; | ||
| 22 | use tracing::warn; | ||
| 23 | |||
| 24 | /// Information about connections from a specific IP address. | ||
| 25 | struct ConnectionInfo { | ||
| 26 | /// Number of active connections from this IP | ||
| 27 | count: u32, | ||
| 28 | /// When the first connection from this IP was established | ||
| 29 | first_seen: Instant, | ||
| 30 | /// Whether this IP has been flagged as potentially abusive | ||
| 31 | flagged_as_abuse: bool, | ||
| 32 | } | ||
| 33 | |||
| 34 | /// Tracks WebSocket connections per IP with abuse detection. | ||
| 35 | /// | ||
| 36 | /// # Thread Safety | ||
| 37 | /// | ||
| 38 | /// Uses `DashMap` for lock-free concurrent access, as connection tracking | ||
| 39 | /// happens across multiple tokio tasks. | ||
| 40 | /// | ||
| 41 | /// # Privacy | ||
| 42 | /// | ||
| 43 | /// IP addresses are stored internally only for abuse detection and are | ||
| 44 | /// NEVER exposed in Prometheus metrics. Only aggregate counts are exposed: | ||
| 45 | /// - Total active connections | ||
| 46 | /// - Number of unique IPs | ||
| 47 | /// - Number of IPs flagged as potential abusers | ||
| 48 | pub struct ConnectionTracker { | ||
| 49 | /// Active connections per IP (INTERNAL ONLY - never exposed to metrics) | ||
| 50 | connections: DashMap<IpAddr, ConnectionInfo>, | ||
| 51 | |||
| 52 | /// Threshold for abuse flagging (connections per IP) | ||
| 53 | abuse_threshold: u32, | ||
| 54 | |||
| 55 | /// Prometheus gauge: total active connections | ||
| 56 | active_connections: IntGauge, | ||
| 57 | |||
| 58 | /// Prometheus gauge: number of unique IPs connected | ||
| 59 | unique_ips: IntGauge, | ||
| 60 | |||
| 61 | /// Prometheus gauge: number of IPs flagged as potential abusers | ||
| 62 | flagged_abusers: IntGauge, | ||
| 63 | } | ||
| 64 | |||
| 65 | impl ConnectionTracker { | ||
| 66 | /// Creates a new ConnectionTracker and registers metrics with Prometheus. | ||
| 67 | /// | ||
| 68 | /// # Arguments | ||
| 69 | /// | ||
| 70 | /// * `abuse_threshold` - Number of connections from a single IP before flagging | ||
| 71 | /// * `registry` - Prometheus registry to register metrics with | ||
| 72 | pub fn new(abuse_threshold: u32, registry: &Registry) -> Self { | ||
| 73 | let active_connections = IntGauge::with_opts( | ||
| 74 | Opts::new( | ||
| 75 | "ngit_websocket_connections_active", | ||
| 76 | "Current active WebSocket connections", | ||
| 77 | ) | ||
| 78 | ).unwrap(); | ||
| 79 | registry.register(Box::new(active_connections.clone())).unwrap(); | ||
| 80 | |||
| 81 | let unique_ips = IntGauge::with_opts( | ||
| 82 | Opts::new( | ||
| 83 | "ngit_websocket_unique_ips", | ||
| 84 | "Number of unique IP addresses connected (NOT the IPs themselves)", | ||
| 85 | ) | ||
| 86 | ).unwrap(); | ||
| 87 | registry.register(Box::new(unique_ips.clone())).unwrap(); | ||
| 88 | |||
| 89 | let flagged_abusers = IntGauge::with_opts( | ||
| 90 | Opts::new( | ||
| 91 | "ngit_websocket_flagged_abusers", | ||
| 92 | "Number of IPs exceeding connection threshold", | ||
| 93 | ) | ||
| 94 | ).unwrap(); | ||
| 95 | registry.register(Box::new(flagged_abusers.clone())).unwrap(); | ||
| 96 | |||
| 97 | Self { | ||
| 98 | connections: DashMap::new(), | ||
| 99 | abuse_threshold, | ||
| 100 | active_connections, | ||
| 101 | unique_ips, | ||
| 102 | flagged_abusers, | ||
| 103 | } | ||
| 104 | } | ||
| 105 | |||
| 106 | /// Called when a new WebSocket connection is established. | ||
| 107 | /// | ||
| 108 | /// This method: | ||
| 109 | /// 1. Increments the connection count for this IP | ||
| 110 | /// 2. Checks if the IP has exceeded the abuse threshold | ||
| 111 | /// 3. Logs a warning if abuse is detected (IP is logged here only) | ||
| 112 | /// 4. Updates Prometheus metrics (aggregate counts only) | ||
| 113 | /// | ||
| 114 | /// # Privacy | ||
| 115 | /// | ||
| 116 | /// The IP address is logged only when abuse is detected. It is NEVER | ||
| 117 | /// exposed in Prometheus metrics. | ||
| 118 | pub fn on_connect(&self, ip: IpAddr) { | ||
| 119 | let mut is_new_ip = false; | ||
| 120 | let mut newly_flagged = false; | ||
| 121 | |||
| 122 | self.connections | ||
| 123 | .entry(ip) | ||
| 124 | .and_modify(|info| { | ||
| 125 | info.count += 1; | ||
| 126 | // Check if this connection pushes us over the threshold | ||
| 127 | if !info.flagged_as_abuse && info.count >= self.abuse_threshold { | ||
| 128 | info.flagged_as_abuse = true; | ||
| 129 | newly_flagged = true; | ||
| 130 | } | ||
| 131 | }) | ||
| 132 | .or_insert_with(|| { | ||
| 133 | is_new_ip = true; | ||
| 134 | ConnectionInfo { | ||
| 135 | count: 1, | ||
| 136 | first_seen: Instant::now(), | ||
| 137 | flagged_as_abuse: false, | ||
| 138 | } | ||
| 139 | }); | ||
| 140 | |||
| 141 | // Update Prometheus metrics (aggregate counts only) | ||
| 142 | self.active_connections.inc(); | ||
| 143 | |||
| 144 | if is_new_ip { | ||
| 145 | self.unique_ips.inc(); | ||
| 146 | } | ||
| 147 | |||
| 148 | if newly_flagged { | ||
| 149 | self.flagged_abusers.inc(); | ||
| 150 | // Log the abuse detection - IP is only exposed in logs, not metrics | ||
| 151 | warn!( | ||
| 152 | ip = %ip, | ||
| 153 | threshold = self.abuse_threshold, | ||
| 154 | "Potential abuse detected: IP exceeded connection threshold" | ||
| 155 | ); | ||
| 156 | } | ||
| 157 | } | ||
| 158 | |||
| 159 | /// Called when a WebSocket connection is closed. | ||
| 160 | /// | ||
| 161 | /// This method: | ||
| 162 | /// 1. Decrements the connection count for this IP | ||
| 163 | /// 2. Removes the IP from tracking if count reaches 0 | ||
| 164 | /// 3. Updates the abuse flag count if the IP was flagged | ||
| 165 | /// 4. Updates Prometheus metrics (aggregate counts only) | ||
| 166 | pub fn on_disconnect(&self, ip: IpAddr) { | ||
| 167 | let mut remove_entry = false; | ||
| 168 | let mut was_flagged = false; | ||
| 169 | let mut had_connection = false; | ||
| 170 | |||
| 171 | if let Some(mut entry) = self.connections.get_mut(&ip) { | ||
| 172 | had_connection = true; | ||
| 173 | entry.count = entry.count.saturating_sub(1); | ||
| 174 | if entry.count == 0 { | ||
| 175 | remove_entry = true; | ||
| 176 | was_flagged = entry.flagged_as_abuse; | ||
| 177 | } | ||
| 178 | } | ||
| 179 | |||
| 180 | // Remove the entry if count is 0 | ||
| 181 | if remove_entry { | ||
| 182 | self.connections.remove(&ip); | ||
| 183 | self.unique_ips.dec(); | ||
| 184 | if was_flagged { | ||
| 185 | self.flagged_abusers.dec(); | ||
| 186 | } | ||
| 187 | } | ||
| 188 | |||
| 189 | // Update total connections only if this IP had a tracked connection | ||
| 190 | if had_connection { | ||
| 191 | self.active_connections.dec(); | ||
| 192 | } | ||
| 193 | } | ||
| 194 | |||
| 195 | /// Returns the current number of active connections. | ||
| 196 | pub fn active_connections(&self) -> u64 { | ||
| 197 | self.active_connections.get() as u64 | ||
| 198 | } | ||
| 199 | |||
| 200 | /// Returns the current number of unique IPs. | ||
| 201 | pub fn unique_ip_count(&self) -> u64 { | ||
| 202 | self.unique_ips.get() as u64 | ||
| 203 | } | ||
| 204 | |||
| 205 | /// Returns the current number of flagged abusers. | ||
| 206 | pub fn flagged_abuser_count(&self) -> u64 { | ||
| 207 | self.flagged_abusers.get() as u64 | ||
| 208 | } | ||
| 209 | |||
| 210 | /// Returns the connection count for a specific IP (for internal use only). | ||
| 211 | /// | ||
| 212 | /// # Privacy | ||
| 213 | /// | ||
| 214 | /// This is an internal method. The returned data should NEVER be exposed | ||
| 215 | /// in metrics or logs without privacy consideration. | ||
| 216 | #[cfg(test)] | ||
| 217 | pub(crate) fn connection_count(&self, ip: &IpAddr) -> Option<u32> { | ||
| 218 | self.connections.get(ip).map(|info| info.count) | ||
| 219 | } | ||
| 220 | |||
| 221 | /// Returns whether an IP is flagged as abusive (for internal use only). | ||
| 222 | #[cfg(test)] | ||
| 223 | pub(crate) fn is_flagged(&self, ip: &IpAddr) -> bool { | ||
| 224 | self.connections | ||
| 225 | .get(ip) | ||
| 226 | .map(|info| info.flagged_as_abuse) | ||
| 227 | .unwrap_or(false) | ||
| 228 | } | ||
| 229 | } | ||
| 230 | |||
| 231 | #[cfg(test)] | ||
| 232 | mod tests { | ||
| 233 | use super::*; | ||
| 234 | use std::net::{Ipv4Addr, Ipv6Addr}; | ||
| 235 | |||
| 236 | fn test_registry() -> Registry { | ||
| 237 | Registry::new() | ||
| 238 | } | ||
| 239 | |||
| 240 | #[test] | ||
| 241 | fn test_connection_tracking() { | ||
| 242 | let registry = test_registry(); | ||
| 243 | let tracker = ConnectionTracker::new(5, ®istry); | ||
| 244 | let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); | ||
| 245 | |||
| 246 | // Connect | ||
| 247 | tracker.on_connect(ip); | ||
| 248 | assert_eq!(tracker.active_connections(), 1); | ||
| 249 | assert_eq!(tracker.unique_ip_count(), 1); | ||
| 250 | assert_eq!(tracker.connection_count(&ip), Some(1)); | ||
| 251 | |||
| 252 | // Connect again from same IP | ||
| 253 | tracker.on_connect(ip); | ||
| 254 | assert_eq!(tracker.active_connections(), 2); | ||
| 255 | assert_eq!(tracker.unique_ip_count(), 1); // Still 1 unique IP | ||
| 256 | assert_eq!(tracker.connection_count(&ip), Some(2)); | ||
| 257 | |||
| 258 | // Disconnect one | ||
| 259 | tracker.on_disconnect(ip); | ||
| 260 | assert_eq!(tracker.active_connections(), 1); | ||
| 261 | assert_eq!(tracker.unique_ip_count(), 1); | ||
| 262 | assert_eq!(tracker.connection_count(&ip), Some(1)); | ||
| 263 | |||
| 264 | // Disconnect last | ||
| 265 | tracker.on_disconnect(ip); | ||
| 266 | assert_eq!(tracker.active_connections(), 0); | ||
| 267 | assert_eq!(tracker.unique_ip_count(), 0); | ||
| 268 | assert_eq!(tracker.connection_count(&ip), None); | ||
| 269 | } | ||
| 270 | |||
| 271 | #[test] | ||
| 272 | fn test_multiple_ips() { | ||
| 273 | let registry = test_registry(); | ||
| 274 | let tracker = ConnectionTracker::new(5, ®istry); | ||
| 275 | let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); | ||
| 276 | let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)); | ||
| 277 | let ip3 = IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)); | ||
| 278 | |||
| 279 | tracker.on_connect(ip1); | ||
| 280 | tracker.on_connect(ip2); | ||
| 281 | tracker.on_connect(ip3); | ||
| 282 | |||
| 283 | assert_eq!(tracker.active_connections(), 3); | ||
| 284 | assert_eq!(tracker.unique_ip_count(), 3); | ||
| 285 | |||
| 286 | tracker.on_disconnect(ip2); | ||
| 287 | assert_eq!(tracker.active_connections(), 2); | ||
| 288 | assert_eq!(tracker.unique_ip_count(), 2); | ||
| 289 | } | ||
| 290 | |||
| 291 | #[test] | ||
| 292 | fn test_abuse_detection() { | ||
| 293 | let registry = test_registry(); | ||
| 294 | let threshold = 3; | ||
| 295 | let tracker = ConnectionTracker::new(threshold, ®istry); | ||
| 296 | let abuser_ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); | ||
| 297 | let normal_ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); | ||
| 298 | |||
| 299 | // Normal user with 1 connection | ||
| 300 | tracker.on_connect(normal_ip); | ||
| 301 | assert!(!tracker.is_flagged(&normal_ip)); | ||
| 302 | assert_eq!(tracker.flagged_abuser_count(), 0); | ||
| 303 | |||
| 304 | // Abuser approaching threshold | ||
| 305 | tracker.on_connect(abuser_ip); | ||
| 306 | tracker.on_connect(abuser_ip); | ||
| 307 | assert!(!tracker.is_flagged(&abuser_ip)); | ||
| 308 | assert_eq!(tracker.flagged_abuser_count(), 0); | ||
| 309 | |||
| 310 | // Abuser hits threshold | ||
| 311 | tracker.on_connect(abuser_ip); | ||
| 312 | assert!(tracker.is_flagged(&abuser_ip)); | ||
| 313 | assert_eq!(tracker.flagged_abuser_count(), 1); | ||
| 314 | |||
| 315 | // Normal user still not flagged | ||
| 316 | assert!(!tracker.is_flagged(&normal_ip)); | ||
| 317 | |||
| 318 | // Abuser disconnects all - should be removed from flagged count | ||
| 319 | tracker.on_disconnect(abuser_ip); | ||
| 320 | tracker.on_disconnect(abuser_ip); | ||
| 321 | tracker.on_disconnect(abuser_ip); | ||
| 322 | assert_eq!(tracker.flagged_abuser_count(), 0); | ||
| 323 | assert_eq!(tracker.active_connections(), 1); // Only normal user remains | ||
| 324 | } | ||
| 325 | |||
| 326 | #[test] | ||
| 327 | fn test_disconnect_without_connect() { | ||
| 328 | let registry = test_registry(); | ||
| 329 | let tracker = ConnectionTracker::new(5, ®istry); | ||
| 330 | let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); | ||
| 331 | |||
| 332 | // Disconnect without connect should not panic or go negative | ||
| 333 | tracker.on_disconnect(ip); | ||
| 334 | assert_eq!(tracker.active_connections(), 0); | ||
| 335 | assert_eq!(tracker.unique_ip_count(), 0); | ||
| 336 | } | ||
| 337 | } \ No newline at end of file | ||
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..4a4fe57 --- /dev/null +++ b/src/metrics/mod.rs | |||
| @@ -0,0 +1,469 @@ | |||
| 1 | //! Prometheus metrics for ngit-grasp relay. | ||
| 2 | //! | ||
| 3 | //! This module provides comprehensive monitoring metrics including: | ||
| 4 | //! - WebSocket connection tracking (with privacy-preserving IP aggregation) | ||
| 5 | //! - Git operation metrics (clone, fetch, push) | ||
| 6 | //! - Repository bandwidth tracking (top-N only for cardinality control) | ||
| 7 | //! - Nostr event metrics | ||
| 8 | //! | ||
| 9 | //! # Privacy | ||
| 10 | //! IP addresses are NEVER exposed in metrics. The `ConnectionTracker` maintains | ||
| 11 | //! per-IP counts internally only for abuse detection. Only aggregate counts | ||
| 12 | //! are exposed to Prometheus. | ||
| 13 | |||
| 14 | pub mod bandwidth; | ||
| 15 | pub mod connection; | ||
| 16 | |||
| 17 | use std::sync::Arc; | ||
| 18 | use std::time::Instant; | ||
| 19 | |||
| 20 | use lazy_static::lazy_static; | ||
| 21 | use prometheus::{ | ||
| 22 | Counter, CounterVec, Encoder, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts, | ||
| 23 | Registry, TextEncoder, | ||
| 24 | }; | ||
| 25 | |||
| 26 | use bandwidth::BandwidthTracker; | ||
| 27 | use connection::ConnectionTracker; | ||
| 28 | |||
| 29 | lazy_static! { | ||
| 30 | /// Global Prometheus registry for ngit-grasp metrics | ||
| 31 | pub static ref REGISTRY: Registry = Registry::new(); | ||
| 32 | } | ||
| 33 | |||
| 34 | /// Central metrics collection for ngit-grasp relay. | ||
| 35 | /// | ||
| 36 | /// Thread-safe and designed for concurrent access from multiple tokio tasks. | ||
| 37 | #[derive(Clone)] | ||
| 38 | pub struct Metrics { | ||
| 39 | inner: Arc<MetricsInner>, | ||
| 40 | } | ||
| 41 | |||
| 42 | struct MetricsInner { | ||
| 43 | /// Connection tracking with abuse detection | ||
| 44 | pub connection_tracker: ConnectionTracker, | ||
| 45 | |||
| 46 | /// Repository bandwidth tracking (top-N only) | ||
| 47 | pub bandwidth_tracker: BandwidthTracker, | ||
| 48 | |||
| 49 | // === WebSocket Metrics === | ||
| 50 | /// Total WebSocket connections since startup | ||
| 51 | pub websocket_connections_total: Counter, | ||
| 52 | /// Connection duration histogram | ||
| 53 | pub websocket_connection_duration: Histogram, | ||
| 54 | /// Messages received by type (REQ, EVENT, CLOSE) | ||
| 55 | pub websocket_messages_received: CounterVec, | ||
| 56 | /// Messages sent by type (EVENT, EOSE, OK, NOTICE) | ||
| 57 | pub websocket_messages_sent: CounterVec, | ||
| 58 | |||
| 59 | // === Git Operation Metrics === | ||
| 60 | /// Git operations by type and status | ||
| 61 | pub git_operations_total: CounterVec, | ||
| 62 | /// Git operation duration histogram | ||
| 63 | pub git_operation_duration: HistogramVec, | ||
| 64 | /// Total bytes transferred | ||
| 65 | pub git_bytes_total: CounterVec, | ||
| 66 | /// Push authorization results | ||
| 67 | pub git_push_authorization: CounterVec, | ||
| 68 | |||
| 69 | // === Nostr Event Metrics === | ||
| 70 | /// Events received by kind | ||
| 71 | pub events_received_total: CounterVec, | ||
| 72 | /// Events successfully stored by kind | ||
| 73 | pub events_stored_total: CounterVec, | ||
| 74 | /// Events rejected by kind and reason | ||
| 75 | pub events_rejected_total: CounterVec, | ||
| 76 | |||
| 77 | // === Repository Metrics === | ||
| 78 | /// Total repositories hosted | ||
| 79 | pub repositories_total: Gauge, | ||
| 80 | |||
| 81 | // === System Health Metrics === | ||
| 82 | /// Server start time for uptime calculation | ||
| 83 | pub start_time: Instant, | ||
| 84 | /// Build information gauge | ||
| 85 | pub build_info: GaugeVec, | ||
| 86 | } | ||
| 87 | |||
| 88 | impl Metrics { | ||
| 89 | /// Creates a new Metrics instance and registers all metrics with Prometheus. | ||
| 90 | /// | ||
| 91 | /// # Arguments | ||
| 92 | /// * `abuse_threshold` - Number of connections from a single IP before flagging as abuse | ||
| 93 | pub fn new(abuse_threshold: u32) -> Self { | ||
| 94 | let inner = MetricsInner::new(abuse_threshold); | ||
| 95 | Self { | ||
| 96 | inner: Arc::new(inner), | ||
| 97 | } | ||
| 98 | } | ||
| 99 | |||
| 100 | /// Returns the connection tracker for WebSocket connection management. | ||
| 101 | pub fn connection_tracker(&self) -> &ConnectionTracker { | ||
| 102 | &self.inner.connection_tracker | ||
| 103 | } | ||
| 104 | |||
| 105 | /// Returns the bandwidth tracker for repository bandwidth tracking. | ||
| 106 | pub fn bandwidth_tracker(&self) -> &BandwidthTracker { | ||
| 107 | &self.inner.bandwidth_tracker | ||
| 108 | } | ||
| 109 | |||
| 110 | // === WebSocket Recording Methods === | ||
| 111 | |||
| 112 | /// Record a new WebSocket connection | ||
| 113 | pub fn record_websocket_connection(&self) { | ||
| 114 | self.inner.websocket_connections_total.inc(); | ||
| 115 | } | ||
| 116 | |||
| 117 | /// Start timing a WebSocket connection, returns timer that records on drop | ||
| 118 | pub fn start_connection_timer(&self) -> HistogramTimer { | ||
| 119 | HistogramTimer::new(self.inner.websocket_connection_duration.clone()) | ||
| 120 | } | ||
| 121 | |||
| 122 | /// Record a received WebSocket message | ||
| 123 | pub fn record_message_received(&self, msg_type: &str) { | ||
| 124 | self.inner | ||
| 125 | .websocket_messages_received | ||
| 126 | .with_label_values(&[msg_type]) | ||
| 127 | .inc(); | ||
| 128 | } | ||
| 129 | |||
| 130 | /// Record a sent WebSocket message | ||
| 131 | pub fn record_message_sent(&self, msg_type: &str) { | ||
| 132 | self.inner | ||
| 133 | .websocket_messages_sent | ||
| 134 | .with_label_values(&[msg_type]) | ||
| 135 | .inc(); | ||
| 136 | } | ||
| 137 | |||
| 138 | // === Git Operation Recording Methods === | ||
| 139 | |||
| 140 | /// Record a git operation completion | ||
| 141 | pub fn record_git_operation(&self, operation: &str, status: &str) { | ||
| 142 | self.inner | ||
| 143 | .git_operations_total | ||
| 144 | .with_label_values(&[operation, status]) | ||
| 145 | .inc(); | ||
| 146 | } | ||
| 147 | |||
| 148 | /// Start timing a git operation, returns a timer | ||
| 149 | pub fn start_git_operation_timer(&self, operation: &str) -> GitOperationTimer { | ||
| 150 | GitOperationTimer::new(self.inner.git_operation_duration.clone(), operation.to_string()) | ||
| 151 | } | ||
| 152 | |||
| 153 | /// Record bytes transferred for a git operation | ||
| 154 | pub fn record_git_bytes(&self, direction: &str, bytes: u64) { | ||
| 155 | self.inner | ||
| 156 | .git_bytes_total | ||
| 157 | .with_label_values(&[direction]) | ||
| 158 | .inc_by(bytes as f64); | ||
| 159 | } | ||
| 160 | |||
| 161 | /// Record a push authorization result | ||
| 162 | pub fn record_push_authorization(&self, result: &str) { | ||
| 163 | self.inner | ||
| 164 | .git_push_authorization | ||
| 165 | .with_label_values(&[result]) | ||
| 166 | .inc(); | ||
| 167 | } | ||
| 168 | |||
| 169 | // === Nostr Event Recording Methods === | ||
| 170 | |||
| 171 | /// Record a received Nostr event | ||
| 172 | pub fn record_event_received(&self, kind: u64) { | ||
| 173 | self.inner | ||
| 174 | .events_received_total | ||
| 175 | .with_label_values(&[&kind.to_string()]) | ||
| 176 | .inc(); | ||
| 177 | } | ||
| 178 | |||
| 179 | /// Record a stored Nostr event | ||
| 180 | pub fn record_event_stored(&self, kind: u64) { | ||
| 181 | self.inner | ||
| 182 | .events_stored_total | ||
| 183 | .with_label_values(&[&kind.to_string()]) | ||
| 184 | .inc(); | ||
| 185 | } | ||
| 186 | |||
| 187 | /// Record a rejected Nostr event | ||
| 188 | pub fn record_event_rejected(&self, kind: u64, reason: &str) { | ||
| 189 | self.inner | ||
| 190 | .events_rejected_total | ||
| 191 | .with_label_values(&[&kind.to_string(), reason]) | ||
| 192 | .inc(); | ||
| 193 | } | ||
| 194 | |||
| 195 | // === Repository Metrics === | ||
| 196 | |||
| 197 | /// Set the total number of repositories | ||
| 198 | pub fn set_repositories_total(&self, count: u64) { | ||
| 199 | self.inner.repositories_total.set(count as f64); | ||
| 200 | } | ||
| 201 | |||
| 202 | /// Increment the repository count | ||
| 203 | pub fn inc_repositories_total(&self) { | ||
| 204 | self.inner.repositories_total.inc(); | ||
| 205 | } | ||
| 206 | |||
| 207 | // === Rendering === | ||
| 208 | |||
| 209 | /// Render all metrics in Prometheus text format. | ||
| 210 | /// | ||
| 211 | /// This method: | ||
| 212 | /// 1. Refreshes the top-N bandwidth metrics if needed | ||
| 213 | /// 2. Updates uptime | ||
| 214 | /// 3. Gathers all metrics from the registry | ||
| 215 | /// 4. Encodes them in Prometheus text format | ||
| 216 | pub fn render(&self) -> String { | ||
| 217 | // Refresh top-N bandwidth repos if needed | ||
| 218 | self.inner.bandwidth_tracker.maybe_refresh_top_n(); | ||
| 219 | |||
| 220 | // Gather and encode metrics | ||
| 221 | let encoder = TextEncoder::new(); | ||
| 222 | let metric_families = REGISTRY.gather(); | ||
| 223 | let mut buffer = Vec::new(); | ||
| 224 | encoder.encode(&metric_families, &mut buffer).unwrap(); | ||
| 225 | |||
| 226 | // Add uptime as a comment (it's derived, not a registered metric) | ||
| 227 | let uptime = self.inner.start_time.elapsed().as_secs(); | ||
| 228 | let mut output = String::from_utf8(buffer).unwrap(); | ||
| 229 | output.push_str(&format!( | ||
| 230 | "\n# HELP ngit_uptime_seconds Seconds since server startup\n# TYPE ngit_uptime_seconds counter\nngit_uptime_seconds {}\n", | ||
| 231 | uptime | ||
| 232 | )); | ||
| 233 | |||
| 234 | output | ||
| 235 | } | ||
| 236 | |||
| 237 | /// Check if the system is under high load (for sync scheduling) | ||
| 238 | pub fn is_high_load(&self, threshold: u64) -> bool { | ||
| 239 | self.inner.connection_tracker.active_connections() > threshold | ||
| 240 | } | ||
| 241 | } | ||
| 242 | |||
| 243 | impl MetricsInner { | ||
| 244 | fn new(abuse_threshold: u32) -> Self { | ||
| 245 | // Create connection tracker | ||
| 246 | let connection_tracker = ConnectionTracker::new(abuse_threshold, ®ISTRY); | ||
| 247 | |||
| 248 | // Create bandwidth tracker | ||
| 249 | let bandwidth_tracker = BandwidthTracker::new(®ISTRY); | ||
| 250 | |||
| 251 | // WebSocket metrics | ||
| 252 | let websocket_connections_total = Counter::with_opts( | ||
| 253 | Opts::new( | ||
| 254 | "ngit_websocket_connections_total", | ||
| 255 | "Total WebSocket connections since startup", | ||
| 256 | ) | ||
| 257 | ).unwrap(); | ||
| 258 | REGISTRY.register(Box::new(websocket_connections_total.clone())).unwrap(); | ||
| 259 | |||
| 260 | let websocket_connection_duration = Histogram::with_opts( | ||
| 261 | HistogramOpts::new( | ||
| 262 | "ngit_websocket_connection_duration_seconds", | ||
| 263 | "Duration of WebSocket connections", | ||
| 264 | ) | ||
| 265 | .buckets(vec![1.0, 5.0, 15.0, 30.0, 60.0, 300.0, 900.0, 3600.0]), | ||
| 266 | ).unwrap(); | ||
| 267 | REGISTRY.register(Box::new(websocket_connection_duration.clone())).unwrap(); | ||
| 268 | |||
| 269 | let websocket_messages_received = CounterVec::new( | ||
| 270 | Opts::new( | ||
| 271 | "ngit_websocket_messages_received_total", | ||
| 272 | "WebSocket messages received by type", | ||
| 273 | ), | ||
| 274 | &["type"], | ||
| 275 | ).unwrap(); | ||
| 276 | REGISTRY.register(Box::new(websocket_messages_received.clone())).unwrap(); | ||
| 277 | |||
| 278 | let websocket_messages_sent = CounterVec::new( | ||
| 279 | Opts::new( | ||
| 280 | "ngit_websocket_messages_sent_total", | ||
| 281 | "WebSocket messages sent by type", | ||
| 282 | ), | ||
| 283 | &["type"], | ||
| 284 | ).unwrap(); | ||
| 285 | REGISTRY.register(Box::new(websocket_messages_sent.clone())).unwrap(); | ||
| 286 | |||
| 287 | // Git operation metrics | ||
| 288 | let git_operations_total = CounterVec::new( | ||
| 289 | Opts::new( | ||
| 290 | "ngit_git_operations_total", | ||
| 291 | "Git operations by type and status", | ||
| 292 | ), | ||
| 293 | &["operation", "status"], | ||
| 294 | ).unwrap(); | ||
| 295 | REGISTRY.register(Box::new(git_operations_total.clone())).unwrap(); | ||
| 296 | |||
| 297 | let git_operation_duration = HistogramVec::new( | ||
| 298 | HistogramOpts::new( | ||
| 299 | "ngit_git_operation_duration_seconds", | ||
| 300 | "Duration of git operations", | ||
| 301 | ) | ||
| 302 | .buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]), | ||
| 303 | &["operation"], | ||
| 304 | ).unwrap(); | ||
| 305 | REGISTRY.register(Box::new(git_operation_duration.clone())).unwrap(); | ||
| 306 | |||
| 307 | let git_bytes_total = CounterVec::new( | ||
| 308 | Opts::new( | ||
| 309 | "ngit_git_bytes_total", | ||
| 310 | "Total bytes transferred for git operations", | ||
| 311 | ), | ||
| 312 | &["direction"], | ||
| 313 | ).unwrap(); | ||
| 314 | REGISTRY.register(Box::new(git_bytes_total.clone())).unwrap(); | ||
| 315 | |||
| 316 | let git_push_authorization = CounterVec::new( | ||
| 317 | Opts::new( | ||
| 318 | "ngit_git_push_authorization_total", | ||
| 319 | "Push authorization results", | ||
| 320 | ), | ||
| 321 | &["result"], | ||
| 322 | ).unwrap(); | ||
| 323 | REGISTRY.register(Box::new(git_push_authorization.clone())).unwrap(); | ||
| 324 | |||
| 325 | // Nostr event metrics | ||
| 326 | let events_received_total = CounterVec::new( | ||
| 327 | Opts::new( | ||
| 328 | "ngit_events_received_total", | ||
| 329 | "Nostr events received by kind", | ||
| 330 | ), | ||
| 331 | &["kind"], | ||
| 332 | ).unwrap(); | ||
| 333 | REGISTRY.register(Box::new(events_received_total.clone())).unwrap(); | ||
| 334 | |||
| 335 | let events_stored_total = CounterVec::new( | ||
| 336 | Opts::new( | ||
| 337 | "ngit_events_stored_total", | ||
| 338 | "Nostr events successfully stored by kind", | ||
| 339 | ), | ||
| 340 | &["kind"], | ||
| 341 | ).unwrap(); | ||
| 342 | REGISTRY.register(Box::new(events_stored_total.clone())).unwrap(); | ||
| 343 | |||
| 344 | let events_rejected_total = CounterVec::new( | ||
| 345 | Opts::new( | ||
| 346 | "ngit_events_rejected_total", | ||
| 347 | "Nostr events rejected by kind and reason", | ||
| 348 | ), | ||
| 349 | &["kind", "reason"], | ||
| 350 | ).unwrap(); | ||
| 351 | REGISTRY.register(Box::new(events_rejected_total.clone())).unwrap(); | ||
| 352 | |||
| 353 | // Repository metrics | ||
| 354 | let repositories_total = Gauge::with_opts( | ||
| 355 | Opts::new( | ||
| 356 | "ngit_repositories_total", | ||
| 357 | "Total repositories hosted", | ||
| 358 | ) | ||
| 359 | ).unwrap(); | ||
| 360 | REGISTRY.register(Box::new(repositories_total.clone())).unwrap(); | ||
| 361 | |||
| 362 | // Build info | ||
| 363 | let build_info = GaugeVec::new( | ||
| 364 | Opts::new( | ||
| 365 | "ngit_build_info", | ||
| 366 | "Build information", | ||
| 367 | ), | ||
| 368 | &["version", "commit"], | ||
| 369 | ).unwrap(); | ||
| 370 | REGISTRY.register(Box::new(build_info.clone())).unwrap(); | ||
| 371 | |||
| 372 | // Set build info gauge to 1 (it's just for labels) | ||
| 373 | build_info | ||
| 374 | .with_label_values(&[env!("CARGO_PKG_VERSION"), option_env!("GIT_HASH").unwrap_or("unknown")]) | ||
| 375 | .set(1.0); | ||
| 376 | |||
| 377 | Self { | ||
| 378 | connection_tracker, | ||
| 379 | bandwidth_tracker, | ||
| 380 | websocket_connections_total, | ||
| 381 | websocket_connection_duration, | ||
| 382 | websocket_messages_received, | ||
| 383 | websocket_messages_sent, | ||
| 384 | git_operations_total, | ||
| 385 | git_operation_duration, | ||
| 386 | git_bytes_total, | ||
| 387 | git_push_authorization, | ||
| 388 | events_received_total, | ||
| 389 | events_stored_total, | ||
| 390 | events_rejected_total, | ||
| 391 | repositories_total, | ||
| 392 | start_time: Instant::now(), | ||
| 393 | build_info, | ||
| 394 | } | ||
| 395 | } | ||
| 396 | } | ||
| 397 | |||
| 398 | /// Timer for tracking WebSocket connection duration. | ||
| 399 | /// Records the elapsed time when dropped. | ||
| 400 | pub struct HistogramTimer { | ||
| 401 | histogram: Histogram, | ||
| 402 | start: Instant, | ||
| 403 | } | ||
| 404 | |||
| 405 | impl HistogramTimer { | ||
| 406 | fn new(histogram: Histogram) -> Self { | ||
| 407 | Self { | ||
| 408 | histogram, | ||
| 409 | start: Instant::now(), | ||
| 410 | } | ||
| 411 | } | ||
| 412 | } | ||
| 413 | |||
| 414 | impl Drop for HistogramTimer { | ||
| 415 | fn drop(&mut self) { | ||
| 416 | let elapsed = self.start.elapsed().as_secs_f64(); | ||
| 417 | self.histogram.observe(elapsed); | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | /// Timer for tracking Git operation duration. | ||
| 422 | /// Records the elapsed time when dropped. | ||
| 423 | pub struct GitOperationTimer { | ||
| 424 | histogram_vec: HistogramVec, | ||
| 425 | operation: String, | ||
| 426 | start: Instant, | ||
| 427 | } | ||
| 428 | |||
| 429 | impl GitOperationTimer { | ||
| 430 | fn new(histogram_vec: HistogramVec, operation: String) -> Self { | ||
| 431 | Self { | ||
| 432 | histogram_vec, | ||
| 433 | operation, | ||
| 434 | start: Instant::now(), | ||
| 435 | } | ||
| 436 | } | ||
| 437 | } | ||
| 438 | |||
| 439 | impl Drop for GitOperationTimer { | ||
| 440 | fn drop(&mut self) { | ||
| 441 | let elapsed = self.start.elapsed().as_secs_f64(); | ||
| 442 | self.histogram_vec | ||
| 443 | .with_label_values(&[&self.operation]) | ||
| 444 | .observe(elapsed); | ||
| 445 | } | ||
| 446 | } | ||
| 447 | |||
| 448 | #[cfg(test)] | ||
| 449 | mod tests { | ||
| 450 | use super::*; | ||
| 451 | |||
| 452 | #[test] | ||
| 453 | fn test_metrics_creation() { | ||
| 454 | // Note: This test may fail if run with other tests due to global registry | ||
| 455 | // In production, consider using a test-specific registry | ||
| 456 | let metrics = Metrics::new(10); | ||
| 457 | |||
| 458 | // Test that we can record metrics without panicking | ||
| 459 | metrics.record_websocket_connection(); | ||
| 460 | metrics.record_message_received("REQ"); | ||
| 461 | metrics.record_message_sent("EVENT"); | ||
| 462 | metrics.record_git_operation("clone", "success"); | ||
| 463 | metrics.record_git_bytes("in", 1024); | ||
| 464 | metrics.record_event_received(1); | ||
| 465 | metrics.record_event_stored(1); | ||
| 466 | metrics.record_event_rejected(1, "invalid_signature"); | ||
| 467 | metrics.set_repositories_total(5); | ||
| 468 | } | ||
| 469 | } \ No newline at end of file | ||