upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 15:17:04 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 15:24:19 +0000
commitfd0c87c787d0626b3546fa571541c9c809711821 (patch)
tree934f20d973127f380b807d2bd44b25c197cf349c /src
parent762cd8e815e797f173f541795de774fbbf978fc3 (diff)
add prometheus metrics
Diffstat (limited to 'src')
-rw-r--r--src/config.rs36
-rw-r--r--src/http/mod.rs84
-rw-r--r--src/http/nip11.rs6
-rw-r--r--src/lib.rs1
-rw-r--r--src/main.rs17
-rw-r--r--src/metrics/bandwidth.rs301
-rw-r--r--src/metrics/connection.rs337
-rw-r--r--src/metrics/mod.rs469
8 files changed, 1244 insertions, 7 deletions
diff --git a/src/config.rs b/src/config.rs
index d095178..025e020 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -71,6 +71,18 @@ pub struct Config {
71 /// Database backend type 71 /// Database backend type
72 #[arg(long, env = "NGIT_DATABASE_BACKEND", value_enum, default_value_t = DatabaseBackend::Lmdb)] 72 #[arg(long, env = "NGIT_DATABASE_BACKEND", value_enum, default_value_t = DatabaseBackend::Lmdb)]
73 pub database_backend: DatabaseBackend, 73 pub database_backend: DatabaseBackend,
74
75 /// Enable Prometheus metrics endpoint
76 #[arg(long, env = "NGIT_METRICS_ENABLED", default_value_t = true)]
77 pub metrics_enabled: bool,
78
79 /// Connections per IP before flagging as potential abuse in metrics (display only, no rate limiting)
80 #[arg(long = "metrics-connection-per-ip-abuse-threshold", env = "NGIT_METRICS_CONNECTION_PER_IP_ABUSE_THRESHOLD", default_value_t = 10)]
81 pub metrics_connection_per_ip_abuse_threshold: u32,
82
83 /// Number of top bandwidth repos to track in metrics
84 #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)]
85 pub metrics_top_n_repos: usize,
74} 86}
75 87
76impl Config { 88impl Config {
@@ -123,6 +135,9 @@ impl Config {
123 relay_data_path: "./test_data/relay".to_string(), 135 relay_data_path: "./test_data/relay".to_string(),
124 bind_address: "127.0.0.1:8080".to_string(), 136 bind_address: "127.0.0.1:8080".to_string(),
125 database_backend: DatabaseBackend::Memory, 137 database_backend: DatabaseBackend::Memory,
138 metrics_enabled: true,
139 metrics_connection_per_ip_abuse_threshold: 10,
140 metrics_top_n_repos: 10,
126 } 141 }
127 } 142 }
128} 143}
@@ -202,4 +217,25 @@ mod tests {
202 }; 217 };
203 assert!(config.owner_npub.is_none()); 218 assert!(config.owner_npub.is_none());
204 } 219 }
220
221 #[test]
222 fn test_metrics_config_defaults() {
223 let config = Config::for_testing();
224 assert!(config.metrics_enabled);
225 assert_eq!(config.metrics_connection_per_ip_abuse_threshold, 10);
226 assert_eq!(config.metrics_top_n_repos, 10);
227 }
228
229 #[test]
230 fn test_metrics_config_custom_values() {
231 let config = Config {
232 metrics_enabled: false,
233 metrics_connection_per_ip_abuse_threshold: 50,
234 metrics_top_n_repos: 25,
235 ..Config::for_testing()
236 };
237 assert!(!config.metrics_enabled);
238 assert_eq!(config.metrics_connection_per_ip_abuse_threshold, 50);
239 assert_eq!(config.metrics_top_n_repos, 25);
240 }
205} 241}
diff --git a/src/http/mod.rs b/src/http/mod.rs
index 8b1f687..f584e03 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -7,6 +7,7 @@ pub mod nip11;
7use std::future::Future; 7use std::future::Future;
8use std::net::SocketAddr; 8use std::net::SocketAddr;
9use std::pin::Pin; 9use std::pin::Pin;
10use std::sync::Arc;
10 11
11use base64::Engine; 12use base64::Engine;
12use http_body_util::{BodyExt, Full}; 13use http_body_util::{BodyExt, Full};
@@ -24,6 +25,7 @@ use tokio::net::TcpListener;
24 25
25use crate::config::Config; 26use crate::config::Config;
26use crate::git; 27use crate::git;
28use crate::metrics::Metrics;
27use crate::nostr::builder::SharedDatabase; 29use crate::nostr::builder::SharedDatabase;
28 30
29/// CORS headers required by GRASP-01 specification (lines 40-47) 31/// CORS headers required by GRASP-01 specification (lines 40-47)
@@ -90,6 +92,8 @@ struct HttpService {
90 remote: SocketAddr, 92 remote: SocketAddr,
91 /// Database reference for direct queries (e.g., push authorization) 93 /// Database reference for direct queries (e.g., push authorization)
92 database: SharedDatabase, 94 database: SharedDatabase,
95 /// Optional metrics for Prometheus endpoint
96 metrics: Option<Arc<Metrics>>,
93} 97}
94 98
95impl HttpService { 99impl HttpService {
@@ -98,12 +102,14 @@ impl HttpService {
98 config: Config, 102 config: Config,
99 remote: SocketAddr, 103 remote: SocketAddr,
100 database: SharedDatabase, 104 database: SharedDatabase,
105 metrics: Option<Arc<Metrics>>,
101 ) -> Self { 106 ) -> Self {
102 Self { 107 Self {
103 relay, 108 relay,
104 config, 109 config,
105 remote, 110 remote,
106 database, 111 database,
112 metrics,
107 } 113 }
108 } 114 }
109} 115}
@@ -150,6 +156,7 @@ impl Service<Request<Incoming>> for HttpService {
150 ); 156 );
151 157
152 let repo_path = git::resolve_repo_path(&git_data_path, &npub, &identifier); 158 let repo_path = git::resolve_repo_path(&git_data_path, &npub, &identifier);
159 let metrics_clone = self.metrics.clone();
153 160
154 return Box::pin(async move { 161 return Box::pin(async move {
155 // Collect request body once before the match statement 162 // Collect request body once before the match statement
@@ -170,14 +177,31 @@ impl Service<Request<Incoming>> for HttpService {
170 .and_then(git::protocol::GitService::from_query_param); 177 .and_then(git::protocol::GitService::from_query_param);
171 178
172 match service { 179 match service {
173 Some(svc) => git::handlers::handle_info_refs(repo_path, svc).await, 180 Some(svc) => {
181 let result = git::handlers::handle_info_refs(repo_path, svc).await;
182 // Track operation
183 if let Some(ref m) = metrics_clone {
184 let status = if result.is_ok() { "success" } else { "error" };
185 let operation = match svc {
186 git::protocol::GitService::UploadPack => "fetch",
187 git::protocol::GitService::ReceivePack => "push",
188 };
189 m.record_git_operation(operation, status);
190 }
191 result
192 }
174 None => Err(git::handlers::GitError::RepositoryNotFound), 193 None => Err(git::handlers::GitError::RepositoryNotFound),
175 } 194 }
176 } 195 }
177 196
178 // POST /git-upload-pack (clone/fetch) 197 // POST /git-upload-pack (clone/fetch)
179 (m, "git-upload-pack") if m == Method::POST => { 198 (m, "git-upload-pack") if m == Method::POST => {
180 git::handlers::handle_upload_pack(repo_path, body_bytes).await 199 let result = git::handlers::handle_upload_pack(repo_path, body_bytes).await;
200 if let Some(ref m) = metrics_clone {
201 let status = if result.is_ok() { "success" } else { "error" };
202 m.record_git_operation("clone", status);
203 }
204 result
181 } 205 }
182 206
183 // POST /git-receive-pack (push) - with GRASP authorization via database 207 // POST /git-receive-pack (push) - with GRASP authorization via database
@@ -187,6 +211,10 @@ impl Service<Request<Incoming>> for HttpService {
187 Ok(pk) => pk.to_hex(), 211 Ok(pk) => pk.to_hex(),
188 Err(e) => { 212 Err(e) => {
189 tracing::warn!("Invalid npub in URL {}: {}", npub, e); 213 tracing::warn!("Invalid npub in URL {}: {}", npub, e);
214 // Track failed push due to invalid npub
215 if let Some(ref m) = metrics_clone {
216 m.record_git_operation("push", "error");
217 }
190 return Ok(add_cors_headers(Response::builder()) 218 return Ok(add_cors_headers(Response::builder())
191 .status(hyper::StatusCode::BAD_REQUEST) 219 .status(hyper::StatusCode::BAD_REQUEST)
192 .body(Full::new(Bytes::from(format!("Invalid npub: {}", e)))) 220 .body(Full::new(Bytes::from(format!("Invalid npub: {}", e))))
@@ -194,14 +222,21 @@ impl Service<Request<Incoming>> for HttpService {
194 } 222 }
195 }; 223 };
196 224
197 git::handlers::handle_receive_pack( 225 let result = git::handlers::handle_receive_pack(
198 repo_path, 226 repo_path,
199 body_bytes.clone(), 227 body_bytes.clone(),
200 Some(database.clone()), 228 Some(database.clone()),
201 &identifier, 229 &identifier,
202 &owner_pubkey_hex, 230 &owner_pubkey_hex,
203 ) 231 )
204 .await 232 .await;
233
234 if let Some(ref m) = metrics_clone {
235 let status = if result.is_ok() { "success" } else { "error" };
236 m.record_git_operation("push", status);
237 }
238
239 result
205 } 240 }
206 241
207 _ => Err(git::handlers::GitError::RepositoryNotFound), 242 _ => Err(git::handlers::GitError::RepositoryNotFound),
@@ -333,17 +368,27 @@ impl Service<Request<Incoming>> for HttpService {
333 368
334 let addr = self.remote; 369 let addr = self.remote;
335 let relay = self.relay.clone(); 370 let relay = self.relay.clone();
371 let metrics_clone = self.metrics.clone();
336 372
337 tokio::spawn(async move { 373 tokio::spawn(async move {
338 match hyper::upgrade::on(req).await { 374 match hyper::upgrade::on(req).await {
339 Ok(upgraded) => { 375 Ok(upgraded) => {
340 tracing::info!("WebSocket connection established from {}", addr); 376 tracing::info!("WebSocket connection established from {}", addr);
377 // Track connection
378 if let Some(ref m) = metrics_clone {
379 m.connection_tracker().on_connect(addr.ip());
380 m.record_websocket_connection();
381 }
341 if let Err(e) = 382 if let Err(e) =
342 relay.take_connection(TokioIo::new(upgraded), addr).await 383 relay.take_connection(TokioIo::new(upgraded), addr).await
343 { 384 {
344 tracing::error!("Relay error for {}: {}", addr, e); 385 tracing::error!("Relay error for {}: {}", addr, e);
345 } 386 }
346 tracing::info!("WebSocket connection closed for {}", addr); 387 tracing::info!("WebSocket connection closed for {}", addr);
388 // Untrack connection
389 if let Some(ref m) = metrics_clone {
390 m.connection_tracker().on_disconnect(addr.ip());
391 }
347 } 392 }
348 Err(e) => tracing::error!("Upgrade error: {}", e), 393 Err(e) => tracing::error!("Upgrade error: {}", e),
349 } 394 }
@@ -361,6 +406,33 @@ impl Service<Request<Incoming>> for HttpService {
361 } 406 }
362 } 407 }
363 408
409 // Serve Prometheus metrics if enabled
410 if path == "/metrics" {
411 if let Some(ref metrics) = self.metrics {
412 let metrics = metrics.clone();
413 return Box::pin(async move {
414 let output = metrics.render();
415 Ok(
416 add_cors_headers(Response::builder().header("server", "ngit-grasp"))
417 .status(200)
418 .header("content-type", "text/plain; version=0.0.4; charset=utf-8")
419 .body(Full::new(Bytes::from(output)))
420 .unwrap(),
421 )
422 });
423 } else {
424 // Metrics disabled
425 return Box::pin(async move {
426 Ok(
427 add_cors_headers(Response::builder().header("server", "ngit-grasp"))
428 .status(404)
429 .body(Full::new(Bytes::from("Metrics disabled")))
430 .unwrap(),
431 )
432 });
433 }
434 }
435
364 // Serve static icon at /icon.png 436 // Serve static icon at /icon.png
365 if path == "/icon.png" { 437 if path == "/icon.png" {
366 return Box::pin(async move { 438 return Box::pin(async move {
@@ -419,10 +491,12 @@ fn derive_accept_key(request_key: &[u8]) -> String {
419/// * `config` - Server configuration 491/// * `config` - Server configuration
420/// * `relay` - The LocalRelay for WebSocket connections 492/// * `relay` - The LocalRelay for WebSocket connections
421/// * `database` - The database for direct queries (e.g., push authorization) 493/// * `database` - The database for direct queries (e.g., push authorization)
494/// * `metrics` - Optional metrics for Prometheus endpoint
422pub async fn run_server( 495pub async fn run_server(
423 config: Config, 496 config: Config,
424 relay: LocalRelay, 497 relay: LocalRelay,
425 database: SharedDatabase, 498 database: SharedDatabase,
499 metrics: Option<Arc<Metrics>>,
426) -> anyhow::Result<()> { 500) -> anyhow::Result<()> {
427 let bind_addr: SocketAddr = config.bind_address.parse()?; 501 let bind_addr: SocketAddr = config.bind_address.parse()?;
428 502
@@ -435,7 +509,7 @@ pub async fn run_server(
435 loop { 509 loop {
436 let (socket, addr) = listener.accept().await?; 510 let (socket, addr) = listener.accept().await?;
437 let io = TokioIo::new(socket); 511 let io = TokioIo::new(socket);
438 let service = HttpService::new(relay.clone(), config.clone(), addr, database.clone()); 512 let service = HttpService::new(relay.clone(), config.clone(), addr, database.clone(), metrics.clone());
439 513
440 tokio::spawn(async move { 514 tokio::spawn(async move {
441 if let Err(e) = http1::Builder::new() 515 if let Err(e) = http1::Builder::new()
diff --git a/src/http/nip11.rs b/src/http/nip11.rs
index 1ed80de..1723601 100644
--- a/src/http/nip11.rs
+++ b/src/http/nip11.rs
@@ -102,6 +102,9 @@ mod tests {
102 relay_data_path: "./data/relay".to_string(), 102 relay_data_path: "./data/relay".to_string(),
103 bind_address: "127.0.0.1:8080".to_string(), 103 bind_address: "127.0.0.1:8080".to_string(),
104 database_backend: crate::config::DatabaseBackend::Memory, 104 database_backend: crate::config::DatabaseBackend::Memory,
105 metrics_enabled: true,
106 metrics_connection_per_ip_abuse_threshold: 10,
107 metrics_top_n_repos: 10,
105 }; 108 };
106 109
107 let doc = RelayInformationDocument::from_config(&config); 110 let doc = RelayInformationDocument::from_config(&config);
@@ -133,6 +136,9 @@ mod tests {
133 relay_data_path: "./data/relay".to_string(), 136 relay_data_path: "./data/relay".to_string(),
134 bind_address: "127.0.0.1:8080".to_string(), 137 bind_address: "127.0.0.1:8080".to_string(),
135 database_backend: crate::config::DatabaseBackend::Memory, 138 database_backend: crate::config::DatabaseBackend::Memory,
139 metrics_enabled: true,
140 metrics_connection_per_ip_abuse_threshold: 10,
141 metrics_top_n_repos: 10,
136 }; 142 };
137 143
138 let doc = RelayInformationDocument::from_config(&config); 144 let doc = RelayInformationDocument::from_config(&config);
diff --git a/src/lib.rs b/src/lib.rs
index 9ccc212..4d5aab0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,5 @@
1pub mod config; 1pub mod config;
2pub mod git; 2pub mod git;
3pub mod http; 3pub mod http;
4pub mod metrics;
4pub mod nostr; 5pub mod nostr;
diff --git a/src/main.rs b/src/main.rs
index f80e920..9200cc2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,10 +1,14 @@
1use std::sync::Arc;
2
1use anyhow::Result; 3use anyhow::Result;
2use tracing::{info, Level}; 4use tracing::{info, Level};
3use tracing_subscriber::FmtSubscriber; 5use tracing_subscriber::FmtSubscriber;
4 6
5use ngit_grasp::{ 7use ngit_grasp::{
6 config::{Config, DatabaseBackend}, 8 config::{Config, DatabaseBackend},
7 http, nostr, 9 http,
10 metrics::Metrics,
11 nostr,
8}; 12};
9 13
10#[tokio::main] 14#[tokio::main]
@@ -29,6 +33,15 @@ async fn main() -> Result<()> {
29 } 33 }
30 info!("Database backend: {}", config.database_backend); 34 info!("Database backend: {}", config.database_backend);
31 35
36 // Initialize metrics if enabled
37 let metrics = if config.metrics_enabled {
38 info!("Metrics enabled on /metrics endpoint");
39 Some(Arc::new(Metrics::new(config.metrics_connection_per_ip_abuse_threshold)))
40 } else {
41 info!("Metrics disabled");
42 None
43 };
44
32 // Create Nostr relay with NIP-34 validation 45 // Create Nostr relay with NIP-34 validation
33 // Returns both the relay and database for direct queries in handlers 46 // Returns both the relay and database for direct queries in handlers
34 if let Ok(relay_with_db) = nostr::builder::create_relay(&config) { 47 if let Ok(relay_with_db) = nostr::builder::create_relay(&config) {
@@ -39,7 +52,7 @@ async fn main() -> Result<()> {
39 52
40 // Start HTTP server with integrated relay and database 53 // Start HTTP server with integrated relay and database
41 info!("Starting HTTP server on {}", config.bind_address); 54 info!("Starting HTTP server on {}", config.bind_address);
42 http::run_server(config, relay_with_db.relay, relay_with_db.database).await?; 55 http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?;
43 } 56 }
44 57
45 Ok(()) 58 Ok(())
diff --git a/src/metrics/bandwidth.rs b/src/metrics/bandwidth.rs
new file mode 100644
index 0000000..d2c53e8
--- /dev/null
+++ b/src/metrics/bandwidth.rs
@@ -0,0 +1,301 @@
1//! Repository bandwidth tracking with cardinality control.
2//!
3//! This module tracks bandwidth per repository but only exposes the top N
4//! repositories to Prometheus to prevent cardinality explosion with many repos.
5//!
6//! # Cardinality Control
7//!
8//! - All per-repo bandwidth is tracked internally in a `DashMap<RepoId, u64>`
9//! - Every 60 seconds, the top 10 are calculated and exposed to Prometheus
10//! - Previous repo labels are cleared before setting new ones
11//! - Prometheus only ever sees ~10 label values, keeping cardinality low
12
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use dashmap::DashMap;
17use prometheus::{GaugeVec, Opts, Registry};
18
19/// Default number of top repositories to expose in metrics
20const DEFAULT_TOP_N: usize = 10;
21
22/// Default refresh interval for top-N calculation (60 seconds)
23const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
24
25/// Tracks bandwidth per repository with top-N exposure to Prometheus.
26///
27/// # Design
28///
29/// All repositories are tracked internally for accurate total bandwidth,
30/// but only the top N by bytes transferred are exposed to Prometheus.
31/// This prevents cardinality explosion when hosting thousands of repositories.
32///
33/// # Thread Safety
34///
35/// Uses `DashMap` for lock-free concurrent access and atomics for
36/// the refresh timestamp.
37pub struct BandwidthTracker {
38 /// Internal: tracks ALL repos (memory only, not exposed)
39 all_repos: DashMap<String, u64>,
40
41 /// Exposed to Prometheus: only top N repos
42 top_repos_gauge: GaugeVec,
43
44 /// Last refresh timestamp (stored as nanos since some epoch)
45 last_refresh_nanos: AtomicU64,
46
47 /// Instant when the tracker was created (for relative timing)
48 start_instant: Instant,
49
50 /// Number of top repos to expose
51 top_n: usize,
52
53 /// Refresh interval
54 refresh_interval: Duration,
55}
56
57impl BandwidthTracker {
58 /// Creates a new BandwidthTracker and registers metrics with Prometheus.
59 ///
60 /// Uses default settings:
61 /// - Top 10 repositories exposed
62 /// - 60 second refresh interval
63 pub fn new(registry: &Registry) -> Self {
64 Self::with_config(registry, DEFAULT_TOP_N, DEFAULT_REFRESH_INTERVAL)
65 }
66
67 /// Creates a new BandwidthTracker with custom configuration.
68 ///
69 /// # Arguments
70 ///
71 /// * `registry` - Prometheus registry to register metrics with
72 /// * `top_n` - Number of top repositories to expose in metrics
73 /// * `refresh_interval` - How often to recalculate the top-N list
74 pub fn with_config(registry: &Registry, top_n: usize, refresh_interval: Duration) -> Self {
75 let top_repos_gauge = GaugeVec::new(
76 Opts::new(
77 "ngit_git_top_repos_bytes",
78 "Top repositories by bandwidth (refreshed periodically)",
79 ),
80 &["repo"],
81 )
82 .unwrap();
83 registry.register(Box::new(top_repos_gauge.clone())).unwrap();
84
85 Self {
86 all_repos: DashMap::new(),
87 top_repos_gauge,
88 last_refresh_nanos: AtomicU64::new(0),
89 start_instant: Instant::now(),
90 top_n,
91 refresh_interval,
92 }
93 }
94
95 /// Records bytes transferred for a repository.
96 ///
97 /// # Arguments
98 ///
99 /// * `repo_id` - Repository identifier (e.g., npub or repo name)
100 /// * `bytes` - Number of bytes transferred
101 pub fn record_transfer(&self, repo_id: &str, bytes: u64) {
102 self.all_repos
103 .entry(repo_id.to_string())
104 .and_modify(|v| *v = v.saturating_add(bytes))
105 .or_insert(bytes);
106 }
107
108 /// Conditionally refreshes the top-N list if the refresh interval has elapsed.
109 ///
110 /// This method is designed to be called frequently (e.g., on every
111 /// `/metrics` request) without performance impact - it only does work
112 /// when the refresh interval has elapsed.
113 pub fn maybe_refresh_top_n(&self) {
114 let elapsed_nanos = self.start_instant.elapsed().as_nanos() as u64;
115 let last_refresh = self.last_refresh_nanos.load(Ordering::Relaxed);
116 let interval_nanos = self.refresh_interval.as_nanos() as u64;
117
118 // Check if enough time has passed since last refresh
119 if elapsed_nanos.saturating_sub(last_refresh) >= interval_nanos {
120 // Try to update the timestamp atomically to prevent concurrent refreshes
121 if self
122 .last_refresh_nanos
123 .compare_exchange(last_refresh, elapsed_nanos, Ordering::SeqCst, Ordering::Relaxed)
124 .is_ok()
125 {
126 self.refresh_top_n();
127 }
128 }
129 }
130
131 /// Forces a refresh of the top-N list.
132 ///
133 /// This recalculates which repositories are in the top N by bandwidth
134 /// and updates the Prometheus gauges accordingly.
135 pub fn refresh_top_n(&self) {
136 // Collect all repo data
137 let mut sorted: Vec<_> = self
138 .all_repos
139 .iter()
140 .map(|r| (r.key().clone(), *r.value()))
141 .collect();
142
143 // Sort by bytes descending
144 sorted.sort_by(|a, b| b.1.cmp(&a.1));
145
146 // Clear old labels and set new top N
147 self.top_repos_gauge.reset();
148 for (repo, bytes) in sorted.into_iter().take(self.top_n) {
149 self.top_repos_gauge
150 .with_label_values(&[&repo])
151 .set(bytes as f64);
152 }
153 }
154
155 /// Returns the total bytes transferred for a specific repository.
156 ///
157 /// Returns `None` if the repository has not been seen.
158 pub fn get_repo_bytes(&self, repo_id: &str) -> Option<u64> {
159 self.all_repos.get(repo_id).map(|v| *v)
160 }
161
162 /// Returns the total bytes transferred across all repositories.
163 pub fn total_bytes(&self) -> u64 {
164 self.all_repos.iter().map(|r| *r.value()).sum()
165 }
166
167 /// Returns the number of repositories being tracked.
168 pub fn repo_count(&self) -> usize {
169 self.all_repos.len()
170 }
171
172 /// Returns the top N repositories by bandwidth.
173 ///
174 /// This is a snapshot and may not match the Prometheus gauges if
175 /// a refresh hasn't occurred recently.
176 pub fn get_top_repos(&self) -> Vec<(String, u64)> {
177 let mut sorted: Vec<_> = self
178 .all_repos
179 .iter()
180 .map(|r| (r.key().clone(), *r.value()))
181 .collect();
182
183 sorted.sort_by(|a, b| b.1.cmp(&a.1));
184 sorted.truncate(self.top_n);
185 sorted
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 fn test_registry() -> Registry {
194 Registry::new()
195 }
196
197 #[test]
198 fn test_bandwidth_tracking() {
199 let registry = test_registry();
200 let tracker = BandwidthTracker::new(&registry);
201
202 // Record transfers
203 tracker.record_transfer("repo-a", 1000);
204 tracker.record_transfer("repo-b", 2000);
205 tracker.record_transfer("repo-a", 500); // Additional transfer to repo-a
206
207 assert_eq!(tracker.get_repo_bytes("repo-a"), Some(1500));
208 assert_eq!(tracker.get_repo_bytes("repo-b"), Some(2000));
209 assert_eq!(tracker.get_repo_bytes("repo-c"), None);
210 assert_eq!(tracker.total_bytes(), 3500);
211 assert_eq!(tracker.repo_count(), 2);
212 }
213
214 #[test]
215 fn test_top_n_repos() {
216 let registry = test_registry();
217 let tracker = BandwidthTracker::with_config(&registry, 3, Duration::from_secs(60));
218
219 // Create 5 repos with different bandwidth
220 tracker.record_transfer("repo-1", 100);
221 tracker.record_transfer("repo-2", 500);
222 tracker.record_transfer("repo-3", 200);
223 tracker.record_transfer("repo-4", 800);
224 tracker.record_transfer("repo-5", 300);
225
226 let top = tracker.get_top_repos();
227 assert_eq!(top.len(), 3);
228 assert_eq!(top[0], ("repo-4".to_string(), 800));
229 assert_eq!(top[1], ("repo-2".to_string(), 500));
230 assert_eq!(top[2], ("repo-5".to_string(), 300));
231 }
232
233 #[test]
234 fn test_refresh_updates_gauge() {
235 let registry = test_registry();
236 let tracker = BandwidthTracker::new(&registry);
237
238 tracker.record_transfer("high-bandwidth-repo", 10_000_000);
239 tracker.record_transfer("low-bandwidth-repo", 1000);
240
241 // Force a refresh
242 tracker.refresh_top_n();
243
244 // Verify the gauge values (we can't easily access them directly,
245 // but we can verify the tracker state is correct)
246 assert_eq!(tracker.repo_count(), 2);
247 assert_eq!(tracker.total_bytes(), 10_001_000);
248 }
249
250 #[test]
251 fn test_saturating_add() {
252 let registry = test_registry();
253 let tracker = BandwidthTracker::new(&registry);
254
255 // Test that we don't overflow
256 tracker.record_transfer("huge-repo", u64::MAX - 100);
257 tracker.record_transfer("huge-repo", 200);
258
259 // Should saturate to MAX, not overflow
260 assert_eq!(tracker.get_repo_bytes("huge-repo"), Some(u64::MAX));
261 }
262
263 #[test]
264 fn test_maybe_refresh_respects_interval() {
265 let registry = test_registry();
266 // Use a very short interval for testing
267 let tracker = BandwidthTracker::with_config(&registry, 10, Duration::from_millis(10));
268
269 tracker.record_transfer("repo-a", 1000);
270
271 // First call should trigger refresh (no previous refresh)
272 tracker.maybe_refresh_top_n();
273
274 // Add more data
275 tracker.record_transfer("repo-b", 2000);
276
277 // Immediate second call should NOT trigger refresh
278 let count_before = tracker.repo_count();
279 tracker.maybe_refresh_top_n();
280 assert_eq!(tracker.repo_count(), count_before);
281
282 // Wait for interval to pass
283 std::thread::sleep(Duration::from_millis(15));
284
285 // Now it should refresh
286 tracker.maybe_refresh_top_n();
287 }
288
289 #[test]
290 fn test_empty_tracker() {
291 let registry = test_registry();
292 let tracker = BandwidthTracker::new(&registry);
293
294 assert_eq!(tracker.total_bytes(), 0);
295 assert_eq!(tracker.repo_count(), 0);
296 assert!(tracker.get_top_repos().is_empty());
297
298 // Refresh should not panic on empty data
299 tracker.refresh_top_n();
300 }
301} \ No newline at end of file
diff --git a/src/metrics/connection.rs b/src/metrics/connection.rs
new file mode 100644
index 0000000..6a7f406
--- /dev/null
+++ b/src/metrics/connection.rs
@@ -0,0 +1,337 @@
1//! Connection tracking with privacy-preserving abuse detection.
2//!
3//! This module tracks WebSocket connections per IP address internally for abuse
4//! detection, but NEVER exposes IP addresses in Prometheus metrics. Only aggregate
5//! counts are exposed.
6//!
7//! # Privacy Model
8//!
9//! | Data | Location | Exposed? |
10//! |------|----------|----------|
11//! | Total connections | Prometheus | ✅ Yes |
12//! | Unique IP count | Prometheus | ✅ Yes |
13//! | Flagged abuser count | Prometheus | ✅ Yes |
14//! | Actual IP addresses | Internal HashMap | ❌ No |
15//! | IP + abuse flag | Logs (when flagged) | ⚠️ Logs only |
16
17use std::net::IpAddr;
18use std::time::Instant;
19
20use dashmap::DashMap;
21use prometheus::{IntGauge, Opts, Registry};
22use tracing::warn;
23
24/// Information about connections from a specific IP address.
25struct ConnectionInfo {
26 /// Number of active connections from this IP
27 count: u32,
28 /// When the first connection from this IP was established
29 first_seen: Instant,
30 /// Whether this IP has been flagged as potentially abusive
31 flagged_as_abuse: bool,
32}
33
34/// Tracks WebSocket connections per IP with abuse detection.
35///
36/// # Thread Safety
37///
38/// Uses `DashMap` for lock-free concurrent access, as connection tracking
39/// happens across multiple tokio tasks.
40///
41/// # Privacy
42///
43/// IP addresses are stored internally only for abuse detection and are
44/// NEVER exposed in Prometheus metrics. Only aggregate counts are exposed:
45/// - Total active connections
46/// - Number of unique IPs
47/// - Number of IPs flagged as potential abusers
48pub struct ConnectionTracker {
49 /// Active connections per IP (INTERNAL ONLY - never exposed to metrics)
50 connections: DashMap<IpAddr, ConnectionInfo>,
51
52 /// Threshold for abuse flagging (connections per IP)
53 abuse_threshold: u32,
54
55 /// Prometheus gauge: total active connections
56 active_connections: IntGauge,
57
58 /// Prometheus gauge: number of unique IPs connected
59 unique_ips: IntGauge,
60
61 /// Prometheus gauge: number of IPs flagged as potential abusers
62 flagged_abusers: IntGauge,
63}
64
65impl ConnectionTracker {
66 /// Creates a new ConnectionTracker and registers metrics with Prometheus.
67 ///
68 /// # Arguments
69 ///
70 /// * `abuse_threshold` - Number of connections from a single IP before flagging
71 /// * `registry` - Prometheus registry to register metrics with
72 pub fn new(abuse_threshold: u32, registry: &Registry) -> Self {
73 let active_connections = IntGauge::with_opts(
74 Opts::new(
75 "ngit_websocket_connections_active",
76 "Current active WebSocket connections",
77 )
78 ).unwrap();
79 registry.register(Box::new(active_connections.clone())).unwrap();
80
81 let unique_ips = IntGauge::with_opts(
82 Opts::new(
83 "ngit_websocket_unique_ips",
84 "Number of unique IP addresses connected (NOT the IPs themselves)",
85 )
86 ).unwrap();
87 registry.register(Box::new(unique_ips.clone())).unwrap();
88
89 let flagged_abusers = IntGauge::with_opts(
90 Opts::new(
91 "ngit_websocket_flagged_abusers",
92 "Number of IPs exceeding connection threshold",
93 )
94 ).unwrap();
95 registry.register(Box::new(flagged_abusers.clone())).unwrap();
96
97 Self {
98 connections: DashMap::new(),
99 abuse_threshold,
100 active_connections,
101 unique_ips,
102 flagged_abusers,
103 }
104 }
105
106 /// Called when a new WebSocket connection is established.
107 ///
108 /// This method:
109 /// 1. Increments the connection count for this IP
110 /// 2. Checks if the IP has exceeded the abuse threshold
111 /// 3. Logs a warning if abuse is detected (IP is logged here only)
112 /// 4. Updates Prometheus metrics (aggregate counts only)
113 ///
114 /// # Privacy
115 ///
116 /// The IP address is logged only when abuse is detected. It is NEVER
117 /// exposed in Prometheus metrics.
118 pub fn on_connect(&self, ip: IpAddr) {
119 let mut is_new_ip = false;
120 let mut newly_flagged = false;
121
122 self.connections
123 .entry(ip)
124 .and_modify(|info| {
125 info.count += 1;
126 // Check if this connection pushes us over the threshold
127 if !info.flagged_as_abuse && info.count >= self.abuse_threshold {
128 info.flagged_as_abuse = true;
129 newly_flagged = true;
130 }
131 })
132 .or_insert_with(|| {
133 is_new_ip = true;
134 ConnectionInfo {
135 count: 1,
136 first_seen: Instant::now(),
137 flagged_as_abuse: false,
138 }
139 });
140
141 // Update Prometheus metrics (aggregate counts only)
142 self.active_connections.inc();
143
144 if is_new_ip {
145 self.unique_ips.inc();
146 }
147
148 if newly_flagged {
149 self.flagged_abusers.inc();
150 // Log the abuse detection - IP is only exposed in logs, not metrics
151 warn!(
152 ip = %ip,
153 threshold = self.abuse_threshold,
154 "Potential abuse detected: IP exceeded connection threshold"
155 );
156 }
157 }
158
159 /// Called when a WebSocket connection is closed.
160 ///
161 /// This method:
162 /// 1. Decrements the connection count for this IP
163 /// 2. Removes the IP from tracking if count reaches 0
164 /// 3. Updates the abuse flag count if the IP was flagged
165 /// 4. Updates Prometheus metrics (aggregate counts only)
166 pub fn on_disconnect(&self, ip: IpAddr) {
167 let mut remove_entry = false;
168 let mut was_flagged = false;
169 let mut had_connection = false;
170
171 if let Some(mut entry) = self.connections.get_mut(&ip) {
172 had_connection = true;
173 entry.count = entry.count.saturating_sub(1);
174 if entry.count == 0 {
175 remove_entry = true;
176 was_flagged = entry.flagged_as_abuse;
177 }
178 }
179
180 // Remove the entry if count is 0
181 if remove_entry {
182 self.connections.remove(&ip);
183 self.unique_ips.dec();
184 if was_flagged {
185 self.flagged_abusers.dec();
186 }
187 }
188
189 // Update total connections only if this IP had a tracked connection
190 if had_connection {
191 self.active_connections.dec();
192 }
193 }
194
195 /// Returns the current number of active connections.
196 pub fn active_connections(&self) -> u64 {
197 self.active_connections.get() as u64
198 }
199
200 /// Returns the current number of unique IPs.
201 pub fn unique_ip_count(&self) -> u64 {
202 self.unique_ips.get() as u64
203 }
204
205 /// Returns the current number of flagged abusers.
206 pub fn flagged_abuser_count(&self) -> u64 {
207 self.flagged_abusers.get() as u64
208 }
209
210 /// Returns the connection count for a specific IP (for internal use only).
211 ///
212 /// # Privacy
213 ///
214 /// This is an internal method. The returned data should NEVER be exposed
215 /// in metrics or logs without privacy consideration.
216 #[cfg(test)]
217 pub(crate) fn connection_count(&self, ip: &IpAddr) -> Option<u32> {
218 self.connections.get(ip).map(|info| info.count)
219 }
220
221 /// Returns whether an IP is flagged as abusive (for internal use only).
222 #[cfg(test)]
223 pub(crate) fn is_flagged(&self, ip: &IpAddr) -> bool {
224 self.connections
225 .get(ip)
226 .map(|info| info.flagged_as_abuse)
227 .unwrap_or(false)
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use std::net::{Ipv4Addr, Ipv6Addr};
235
236 fn test_registry() -> Registry {
237 Registry::new()
238 }
239
240 #[test]
241 fn test_connection_tracking() {
242 let registry = test_registry();
243 let tracker = ConnectionTracker::new(5, &registry);
244 let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
245
246 // Connect
247 tracker.on_connect(ip);
248 assert_eq!(tracker.active_connections(), 1);
249 assert_eq!(tracker.unique_ip_count(), 1);
250 assert_eq!(tracker.connection_count(&ip), Some(1));
251
252 // Connect again from same IP
253 tracker.on_connect(ip);
254 assert_eq!(tracker.active_connections(), 2);
255 assert_eq!(tracker.unique_ip_count(), 1); // Still 1 unique IP
256 assert_eq!(tracker.connection_count(&ip), Some(2));
257
258 // Disconnect one
259 tracker.on_disconnect(ip);
260 assert_eq!(tracker.active_connections(), 1);
261 assert_eq!(tracker.unique_ip_count(), 1);
262 assert_eq!(tracker.connection_count(&ip), Some(1));
263
264 // Disconnect last
265 tracker.on_disconnect(ip);
266 assert_eq!(tracker.active_connections(), 0);
267 assert_eq!(tracker.unique_ip_count(), 0);
268 assert_eq!(tracker.connection_count(&ip), None);
269 }
270
271 #[test]
272 fn test_multiple_ips() {
273 let registry = test_registry();
274 let tracker = ConnectionTracker::new(5, &registry);
275 let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
276 let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2));
277 let ip3 = IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1));
278
279 tracker.on_connect(ip1);
280 tracker.on_connect(ip2);
281 tracker.on_connect(ip3);
282
283 assert_eq!(tracker.active_connections(), 3);
284 assert_eq!(tracker.unique_ip_count(), 3);
285
286 tracker.on_disconnect(ip2);
287 assert_eq!(tracker.active_connections(), 2);
288 assert_eq!(tracker.unique_ip_count(), 2);
289 }
290
291 #[test]
292 fn test_abuse_detection() {
293 let registry = test_registry();
294 let threshold = 3;
295 let tracker = ConnectionTracker::new(threshold, &registry);
296 let abuser_ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
297 let normal_ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2));
298
299 // Normal user with 1 connection
300 tracker.on_connect(normal_ip);
301 assert!(!tracker.is_flagged(&normal_ip));
302 assert_eq!(tracker.flagged_abuser_count(), 0);
303
304 // Abuser approaching threshold
305 tracker.on_connect(abuser_ip);
306 tracker.on_connect(abuser_ip);
307 assert!(!tracker.is_flagged(&abuser_ip));
308 assert_eq!(tracker.flagged_abuser_count(), 0);
309
310 // Abuser hits threshold
311 tracker.on_connect(abuser_ip);
312 assert!(tracker.is_flagged(&abuser_ip));
313 assert_eq!(tracker.flagged_abuser_count(), 1);
314
315 // Normal user still not flagged
316 assert!(!tracker.is_flagged(&normal_ip));
317
318 // Abuser disconnects all - should be removed from flagged count
319 tracker.on_disconnect(abuser_ip);
320 tracker.on_disconnect(abuser_ip);
321 tracker.on_disconnect(abuser_ip);
322 assert_eq!(tracker.flagged_abuser_count(), 0);
323 assert_eq!(tracker.active_connections(), 1); // Only normal user remains
324 }
325
326 #[test]
327 fn test_disconnect_without_connect() {
328 let registry = test_registry();
329 let tracker = ConnectionTracker::new(5, &registry);
330 let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
331
332 // Disconnect without connect should not panic or go negative
333 tracker.on_disconnect(ip);
334 assert_eq!(tracker.active_connections(), 0);
335 assert_eq!(tracker.unique_ip_count(), 0);
336 }
337} \ No newline at end of file
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs
new file mode 100644
index 0000000..4a4fe57
--- /dev/null
+++ b/src/metrics/mod.rs
@@ -0,0 +1,469 @@
1//! Prometheus metrics for ngit-grasp relay.
2//!
3//! This module provides comprehensive monitoring metrics including:
4//! - WebSocket connection tracking (with privacy-preserving IP aggregation)
5//! - Git operation metrics (clone, fetch, push)
6//! - Repository bandwidth tracking (top-N only for cardinality control)
7//! - Nostr event metrics
8//!
9//! # Privacy
10//! IP addresses are NEVER exposed in metrics. The `ConnectionTracker` maintains
11//! per-IP counts internally only for abuse detection. Only aggregate counts
12//! are exposed to Prometheus.
13
14pub mod bandwidth;
15pub mod connection;
16
17use std::sync::Arc;
18use std::time::Instant;
19
20use lazy_static::lazy_static;
21use prometheus::{
22 Counter, CounterVec, Encoder, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts,
23 Registry, TextEncoder,
24};
25
26use bandwidth::BandwidthTracker;
27use connection::ConnectionTracker;
28
29lazy_static! {
30 /// Global Prometheus registry for ngit-grasp metrics
31 pub static ref REGISTRY: Registry = Registry::new();
32}
33
34/// Central metrics collection for ngit-grasp relay.
35///
36/// Thread-safe and designed for concurrent access from multiple tokio tasks.
37#[derive(Clone)]
38pub struct Metrics {
39 inner: Arc<MetricsInner>,
40}
41
42struct MetricsInner {
43 /// Connection tracking with abuse detection
44 pub connection_tracker: ConnectionTracker,
45
46 /// Repository bandwidth tracking (top-N only)
47 pub bandwidth_tracker: BandwidthTracker,
48
49 // === WebSocket Metrics ===
50 /// Total WebSocket connections since startup
51 pub websocket_connections_total: Counter,
52 /// Connection duration histogram
53 pub websocket_connection_duration: Histogram,
54 /// Messages received by type (REQ, EVENT, CLOSE)
55 pub websocket_messages_received: CounterVec,
56 /// Messages sent by type (EVENT, EOSE, OK, NOTICE)
57 pub websocket_messages_sent: CounterVec,
58
59 // === Git Operation Metrics ===
60 /// Git operations by type and status
61 pub git_operations_total: CounterVec,
62 /// Git operation duration histogram
63 pub git_operation_duration: HistogramVec,
64 /// Total bytes transferred
65 pub git_bytes_total: CounterVec,
66 /// Push authorization results
67 pub git_push_authorization: CounterVec,
68
69 // === Nostr Event Metrics ===
70 /// Events received by kind
71 pub events_received_total: CounterVec,
72 /// Events successfully stored by kind
73 pub events_stored_total: CounterVec,
74 /// Events rejected by kind and reason
75 pub events_rejected_total: CounterVec,
76
77 // === Repository Metrics ===
78 /// Total repositories hosted
79 pub repositories_total: Gauge,
80
81 // === System Health Metrics ===
82 /// Server start time for uptime calculation
83 pub start_time: Instant,
84 /// Build information gauge
85 pub build_info: GaugeVec,
86}
87
88impl Metrics {
89 /// Creates a new Metrics instance and registers all metrics with Prometheus.
90 ///
91 /// # Arguments
92 /// * `abuse_threshold` - Number of connections from a single IP before flagging as abuse
93 pub fn new(abuse_threshold: u32) -> Self {
94 let inner = MetricsInner::new(abuse_threshold);
95 Self {
96 inner: Arc::new(inner),
97 }
98 }
99
100 /// Returns the connection tracker for WebSocket connection management.
101 pub fn connection_tracker(&self) -> &ConnectionTracker {
102 &self.inner.connection_tracker
103 }
104
105 /// Returns the bandwidth tracker for repository bandwidth tracking.
106 pub fn bandwidth_tracker(&self) -> &BandwidthTracker {
107 &self.inner.bandwidth_tracker
108 }
109
110 // === WebSocket Recording Methods ===
111
112 /// Record a new WebSocket connection
113 pub fn record_websocket_connection(&self) {
114 self.inner.websocket_connections_total.inc();
115 }
116
117 /// Start timing a WebSocket connection, returns timer that records on drop
118 pub fn start_connection_timer(&self) -> HistogramTimer {
119 HistogramTimer::new(self.inner.websocket_connection_duration.clone())
120 }
121
122 /// Record a received WebSocket message
123 pub fn record_message_received(&self, msg_type: &str) {
124 self.inner
125 .websocket_messages_received
126 .with_label_values(&[msg_type])
127 .inc();
128 }
129
130 /// Record a sent WebSocket message
131 pub fn record_message_sent(&self, msg_type: &str) {
132 self.inner
133 .websocket_messages_sent
134 .with_label_values(&[msg_type])
135 .inc();
136 }
137
138 // === Git Operation Recording Methods ===
139
140 /// Record a git operation completion
141 pub fn record_git_operation(&self, operation: &str, status: &str) {
142 self.inner
143 .git_operations_total
144 .with_label_values(&[operation, status])
145 .inc();
146 }
147
148 /// Start timing a git operation, returns a timer
149 pub fn start_git_operation_timer(&self, operation: &str) -> GitOperationTimer {
150 GitOperationTimer::new(self.inner.git_operation_duration.clone(), operation.to_string())
151 }
152
153 /// Record bytes transferred for a git operation
154 pub fn record_git_bytes(&self, direction: &str, bytes: u64) {
155 self.inner
156 .git_bytes_total
157 .with_label_values(&[direction])
158 .inc_by(bytes as f64);
159 }
160
161 /// Record a push authorization result
162 pub fn record_push_authorization(&self, result: &str) {
163 self.inner
164 .git_push_authorization
165 .with_label_values(&[result])
166 .inc();
167 }
168
169 // === Nostr Event Recording Methods ===
170
171 /// Record a received Nostr event
172 pub fn record_event_received(&self, kind: u64) {
173 self.inner
174 .events_received_total
175 .with_label_values(&[&kind.to_string()])
176 .inc();
177 }
178
179 /// Record a stored Nostr event
180 pub fn record_event_stored(&self, kind: u64) {
181 self.inner
182 .events_stored_total
183 .with_label_values(&[&kind.to_string()])
184 .inc();
185 }
186
187 /// Record a rejected Nostr event
188 pub fn record_event_rejected(&self, kind: u64, reason: &str) {
189 self.inner
190 .events_rejected_total
191 .with_label_values(&[&kind.to_string(), reason])
192 .inc();
193 }
194
195 // === Repository Metrics ===
196
197 /// Set the total number of repositories
198 pub fn set_repositories_total(&self, count: u64) {
199 self.inner.repositories_total.set(count as f64);
200 }
201
202 /// Increment the repository count
203 pub fn inc_repositories_total(&self) {
204 self.inner.repositories_total.inc();
205 }
206
207 // === Rendering ===
208
209 /// Render all metrics in Prometheus text format.
210 ///
211 /// This method:
212 /// 1. Refreshes the top-N bandwidth metrics if needed
213 /// 2. Updates uptime
214 /// 3. Gathers all metrics from the registry
215 /// 4. Encodes them in Prometheus text format
216 pub fn render(&self) -> String {
217 // Refresh top-N bandwidth repos if needed
218 self.inner.bandwidth_tracker.maybe_refresh_top_n();
219
220 // Gather and encode metrics
221 let encoder = TextEncoder::new();
222 let metric_families = REGISTRY.gather();
223 let mut buffer = Vec::new();
224 encoder.encode(&metric_families, &mut buffer).unwrap();
225
226 // Add uptime as a comment (it's derived, not a registered metric)
227 let uptime = self.inner.start_time.elapsed().as_secs();
228 let mut output = String::from_utf8(buffer).unwrap();
229 output.push_str(&format!(
230 "\n# HELP ngit_uptime_seconds Seconds since server startup\n# TYPE ngit_uptime_seconds counter\nngit_uptime_seconds {}\n",
231 uptime
232 ));
233
234 output
235 }
236
237 /// Check if the system is under high load (for sync scheduling)
238 pub fn is_high_load(&self, threshold: u64) -> bool {
239 self.inner.connection_tracker.active_connections() > threshold
240 }
241}
242
243impl MetricsInner {
244 fn new(abuse_threshold: u32) -> Self {
245 // Create connection tracker
246 let connection_tracker = ConnectionTracker::new(abuse_threshold, &REGISTRY);
247
248 // Create bandwidth tracker
249 let bandwidth_tracker = BandwidthTracker::new(&REGISTRY);
250
251 // WebSocket metrics
252 let websocket_connections_total = Counter::with_opts(
253 Opts::new(
254 "ngit_websocket_connections_total",
255 "Total WebSocket connections since startup",
256 )
257 ).unwrap();
258 REGISTRY.register(Box::new(websocket_connections_total.clone())).unwrap();
259
260 let websocket_connection_duration = Histogram::with_opts(
261 HistogramOpts::new(
262 "ngit_websocket_connection_duration_seconds",
263 "Duration of WebSocket connections",
264 )
265 .buckets(vec![1.0, 5.0, 15.0, 30.0, 60.0, 300.0, 900.0, 3600.0]),
266 ).unwrap();
267 REGISTRY.register(Box::new(websocket_connection_duration.clone())).unwrap();
268
269 let websocket_messages_received = CounterVec::new(
270 Opts::new(
271 "ngit_websocket_messages_received_total",
272 "WebSocket messages received by type",
273 ),
274 &["type"],
275 ).unwrap();
276 REGISTRY.register(Box::new(websocket_messages_received.clone())).unwrap();
277
278 let websocket_messages_sent = CounterVec::new(
279 Opts::new(
280 "ngit_websocket_messages_sent_total",
281 "WebSocket messages sent by type",
282 ),
283 &["type"],
284 ).unwrap();
285 REGISTRY.register(Box::new(websocket_messages_sent.clone())).unwrap();
286
287 // Git operation metrics
288 let git_operations_total = CounterVec::new(
289 Opts::new(
290 "ngit_git_operations_total",
291 "Git operations by type and status",
292 ),
293 &["operation", "status"],
294 ).unwrap();
295 REGISTRY.register(Box::new(git_operations_total.clone())).unwrap();
296
297 let git_operation_duration = HistogramVec::new(
298 HistogramOpts::new(
299 "ngit_git_operation_duration_seconds",
300 "Duration of git operations",
301 )
302 .buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
303 &["operation"],
304 ).unwrap();
305 REGISTRY.register(Box::new(git_operation_duration.clone())).unwrap();
306
307 let git_bytes_total = CounterVec::new(
308 Opts::new(
309 "ngit_git_bytes_total",
310 "Total bytes transferred for git operations",
311 ),
312 &["direction"],
313 ).unwrap();
314 REGISTRY.register(Box::new(git_bytes_total.clone())).unwrap();
315
316 let git_push_authorization = CounterVec::new(
317 Opts::new(
318 "ngit_git_push_authorization_total",
319 "Push authorization results",
320 ),
321 &["result"],
322 ).unwrap();
323 REGISTRY.register(Box::new(git_push_authorization.clone())).unwrap();
324
325 // Nostr event metrics
326 let events_received_total = CounterVec::new(
327 Opts::new(
328 "ngit_events_received_total",
329 "Nostr events received by kind",
330 ),
331 &["kind"],
332 ).unwrap();
333 REGISTRY.register(Box::new(events_received_total.clone())).unwrap();
334
335 let events_stored_total = CounterVec::new(
336 Opts::new(
337 "ngit_events_stored_total",
338 "Nostr events successfully stored by kind",
339 ),
340 &["kind"],
341 ).unwrap();
342 REGISTRY.register(Box::new(events_stored_total.clone())).unwrap();
343
344 let events_rejected_total = CounterVec::new(
345 Opts::new(
346 "ngit_events_rejected_total",
347 "Nostr events rejected by kind and reason",
348 ),
349 &["kind", "reason"],
350 ).unwrap();
351 REGISTRY.register(Box::new(events_rejected_total.clone())).unwrap();
352
353 // Repository metrics
354 let repositories_total = Gauge::with_opts(
355 Opts::new(
356 "ngit_repositories_total",
357 "Total repositories hosted",
358 )
359 ).unwrap();
360 REGISTRY.register(Box::new(repositories_total.clone())).unwrap();
361
362 // Build info
363 let build_info = GaugeVec::new(
364 Opts::new(
365 "ngit_build_info",
366 "Build information",
367 ),
368 &["version", "commit"],
369 ).unwrap();
370 REGISTRY.register(Box::new(build_info.clone())).unwrap();
371
372 // Set build info gauge to 1 (it's just for labels)
373 build_info
374 .with_label_values(&[env!("CARGO_PKG_VERSION"), option_env!("GIT_HASH").unwrap_or("unknown")])
375 .set(1.0);
376
377 Self {
378 connection_tracker,
379 bandwidth_tracker,
380 websocket_connections_total,
381 websocket_connection_duration,
382 websocket_messages_received,
383 websocket_messages_sent,
384 git_operations_total,
385 git_operation_duration,
386 git_bytes_total,
387 git_push_authorization,
388 events_received_total,
389 events_stored_total,
390 events_rejected_total,
391 repositories_total,
392 start_time: Instant::now(),
393 build_info,
394 }
395 }
396}
397
398/// Timer for tracking WebSocket connection duration.
399/// Records the elapsed time when dropped.
400pub struct HistogramTimer {
401 histogram: Histogram,
402 start: Instant,
403}
404
405impl HistogramTimer {
406 fn new(histogram: Histogram) -> Self {
407 Self {
408 histogram,
409 start: Instant::now(),
410 }
411 }
412}
413
414impl Drop for HistogramTimer {
415 fn drop(&mut self) {
416 let elapsed = self.start.elapsed().as_secs_f64();
417 self.histogram.observe(elapsed);
418 }
419}
420
421/// Timer for tracking Git operation duration.
422/// Records the elapsed time when dropped.
423pub struct GitOperationTimer {
424 histogram_vec: HistogramVec,
425 operation: String,
426 start: Instant,
427}
428
429impl GitOperationTimer {
430 fn new(histogram_vec: HistogramVec, operation: String) -> Self {
431 Self {
432 histogram_vec,
433 operation,
434 start: Instant::now(),
435 }
436 }
437}
438
439impl Drop for GitOperationTimer {
440 fn drop(&mut self) {
441 let elapsed = self.start.elapsed().as_secs_f64();
442 self.histogram_vec
443 .with_label_values(&[&self.operation])
444 .observe(elapsed);
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_metrics_creation() {
454 // Note: This test may fail if run with other tests due to global registry
455 // In production, consider using a test-specific registry
456 let metrics = Metrics::new(10);
457
458 // Test that we can record metrics without panicking
459 metrics.record_websocket_connection();
460 metrics.record_message_received("REQ");
461 metrics.record_message_sent("EVENT");
462 metrics.record_git_operation("clone", "success");
463 metrics.record_git_bytes("in", 1024);
464 metrics.record_event_received(1);
465 metrics.record_event_stored(1);
466 metrics.record_event_rejected(1, "invalid_signature");
467 metrics.set_repositories_total(5);
468 }
469} \ No newline at end of file