upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/metrics/connection.rs
blob: 2d42081896e52b27d7227b7bf5d26ef0332edc2f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
//! Connection tracking with privacy-preserving abuse detection.
//!
//! This module tracks WebSocket connections per IP address internally for abuse
//! detection, but NEVER exposes IP addresses in Prometheus metrics. Only aggregate
//! counts are exposed.
//!
//! # Privacy Model
//!
//! | Data | Location | Exposed? |
//! |------|----------|----------|
//! | Total connections | Prometheus | ✅ Yes |
//! | Unique IP count | Prometheus | ✅ Yes |
//! | Flagged abuser count | Prometheus | ✅ Yes |
//! | Actual IP addresses | Internal HashMap | ❌ No |
//! | IP + abuse flag | Logs (when flagged) | ⚠️ Logs only |

use std::net::IpAddr;
use std::time::Instant;

use dashmap::DashMap;
use prometheus::{IntGauge, Opts, Registry};
use tracing::warn;

/// Information about connections from a specific IP address.
struct ConnectionInfo {
    /// Number of active connections from this IP
    count: u32,
    /// When the first connection from this IP was established (for future rate limiting)
    #[allow(dead_code)]
    first_seen: Instant,
    /// Whether this IP has been flagged as potentially abusive
    flagged_as_abuse: bool,
}

/// Tracks WebSocket connections per IP with abuse detection.
///
/// # Thread Safety
///
/// Uses `DashMap` for lock-free concurrent access, as connection tracking
/// happens across multiple tokio tasks.
///
/// # Privacy
///
/// IP addresses are stored internally only for abuse detection and are
/// NEVER exposed in Prometheus metrics. Only aggregate counts are exposed:
/// - Total active connections
/// - Number of unique IPs
/// - Number of IPs flagged as potential abusers
pub struct ConnectionTracker {
    /// Active connections per IP (INTERNAL ONLY - never exposed to metrics)
    connections: DashMap<IpAddr, ConnectionInfo>,

    /// Threshold for abuse flagging (connections per IP)
    abuse_threshold: u32,

    /// Prometheus gauge: total active connections
    active_connections: IntGauge,

    /// Prometheus gauge: number of unique IPs connected
    unique_ips: IntGauge,

    /// Prometheus gauge: number of IPs flagged as potential abusers
    flagged_abusers: IntGauge,
}

impl ConnectionTracker {
    /// Creates a new ConnectionTracker and registers metrics with Prometheus.
    ///
    /// # Arguments
    ///
    /// * `abuse_threshold` - Number of connections from a single IP before flagging
    /// * `registry` - Prometheus registry to register metrics with
    pub fn new(abuse_threshold: u32, registry: &Registry) -> Self {
        let active_connections = IntGauge::with_opts(Opts::new(
            "ngit_websocket_connections_active",
            "Current active WebSocket connections",
        ))
        .unwrap();
        registry
            .register(Box::new(active_connections.clone()))
            .unwrap();

        let unique_ips = IntGauge::with_opts(Opts::new(
            "ngit_websocket_unique_ips",
            "Number of unique IP addresses connected (NOT the IPs themselves)",
        ))
        .unwrap();
        registry.register(Box::new(unique_ips.clone())).unwrap();

        let flagged_abusers = IntGauge::with_opts(Opts::new(
            "ngit_websocket_flagged_abusers",
            "Number of IPs exceeding connection threshold",
        ))
        .unwrap();
        registry
            .register(Box::new(flagged_abusers.clone()))
            .unwrap();

        Self {
            connections: DashMap::new(),
            abuse_threshold,
            active_connections,
            unique_ips,
            flagged_abusers,
        }
    }

    /// Called when a new WebSocket connection is established.
    ///
    /// This method:
    /// 1. Increments the connection count for this IP
    /// 2. Checks if the IP has exceeded the abuse threshold
    /// 3. Logs a warning if abuse is detected (IP is logged here only)
    /// 4. Updates Prometheus metrics (aggregate counts only)
    ///
    /// # Privacy
    ///
    /// The IP address is logged only when abuse is detected. It is NEVER
    /// exposed in Prometheus metrics.
    pub fn on_connect(&self, ip: IpAddr) {
        let mut is_new_ip = false;
        let mut newly_flagged = false;

        self.connections
            .entry(ip)
            .and_modify(|info| {
                info.count += 1;
                // Check if this connection pushes us over the threshold
                if !info.flagged_as_abuse && info.count >= self.abuse_threshold {
                    info.flagged_as_abuse = true;
                    newly_flagged = true;
                }
            })
            .or_insert_with(|| {
                is_new_ip = true;
                ConnectionInfo {
                    count: 1,
                    first_seen: Instant::now(),
                    flagged_as_abuse: false,
                }
            });

        // Update Prometheus metrics (aggregate counts only)
        self.active_connections.inc();

        if is_new_ip {
            self.unique_ips.inc();
        }

        if newly_flagged {
            self.flagged_abusers.inc();
            // Log the abuse detection - IP is only exposed in logs, not metrics
            warn!(
                ip = %ip,
                threshold = self.abuse_threshold,
                "Potential abuse detected: IP exceeded connection threshold"
            );
        }
    }

    /// Called when a WebSocket connection is closed.
    ///
    /// This method:
    /// 1. Decrements the connection count for this IP
    /// 2. Removes the IP from tracking if count reaches 0
    /// 3. Updates the abuse flag count if the IP was flagged
    /// 4. Updates Prometheus metrics (aggregate counts only)
    pub fn on_disconnect(&self, ip: IpAddr) {
        let mut remove_entry = false;
        let mut was_flagged = false;
        let mut had_connection = false;

        if let Some(mut entry) = self.connections.get_mut(&ip) {
            had_connection = true;
            entry.count = entry.count.saturating_sub(1);
            if entry.count == 0 {
                remove_entry = true;
                was_flagged = entry.flagged_as_abuse;
            }
        }

        // Remove the entry if count is 0
        if remove_entry {
            self.connections.remove(&ip);
            self.unique_ips.dec();
            if was_flagged {
                self.flagged_abusers.dec();
            }
        }

        // Update total connections only if this IP had a tracked connection
        if had_connection {
            self.active_connections.dec();
        }
    }

    /// Returns the current number of active connections.
    pub fn active_connections(&self) -> u64 {
        self.active_connections.get() as u64
    }

    /// Returns the current number of unique IPs.
    pub fn unique_ip_count(&self) -> u64 {
        self.unique_ips.get() as u64
    }

    /// Returns the current number of flagged abusers.
    pub fn flagged_abuser_count(&self) -> u64 {
        self.flagged_abusers.get() as u64
    }

    /// Returns the connection count for a specific IP (for internal use only).
    ///
    /// # Privacy
    ///
    /// This is an internal method. The returned data should NEVER be exposed
    /// in metrics or logs without privacy consideration.
    #[cfg(test)]
    pub(crate) fn connection_count(&self, ip: &IpAddr) -> Option<u32> {
        self.connections.get(ip).map(|info| info.count)
    }

    /// Returns whether an IP is flagged as abusive (for internal use only).
    #[cfg(test)]
    pub(crate) fn is_flagged(&self, ip: &IpAddr) -> bool {
        self.connections
            .get(ip)
            .map(|info| info.flagged_as_abuse)
            .unwrap_or(false)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::{Ipv4Addr, Ipv6Addr};

    fn test_registry() -> Registry {
        Registry::new()
    }

    #[test]
    fn test_connection_tracking() {
        let registry = test_registry();
        let tracker = ConnectionTracker::new(5, &registry);
        let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));

        // Connect
        tracker.on_connect(ip);
        assert_eq!(tracker.active_connections(), 1);
        assert_eq!(tracker.unique_ip_count(), 1);
        assert_eq!(tracker.connection_count(&ip), Some(1));

        // Connect again from same IP
        tracker.on_connect(ip);
        assert_eq!(tracker.active_connections(), 2);
        assert_eq!(tracker.unique_ip_count(), 1); // Still 1 unique IP
        assert_eq!(tracker.connection_count(&ip), Some(2));

        // Disconnect one
        tracker.on_disconnect(ip);
        assert_eq!(tracker.active_connections(), 1);
        assert_eq!(tracker.unique_ip_count(), 1);
        assert_eq!(tracker.connection_count(&ip), Some(1));

        // Disconnect last
        tracker.on_disconnect(ip);
        assert_eq!(tracker.active_connections(), 0);
        assert_eq!(tracker.unique_ip_count(), 0);
        assert_eq!(tracker.connection_count(&ip), None);
    }

    #[test]
    fn test_multiple_ips() {
        let registry = test_registry();
        let tracker = ConnectionTracker::new(5, &registry);
        let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
        let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2));
        let ip3 = IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1));

        tracker.on_connect(ip1);
        tracker.on_connect(ip2);
        tracker.on_connect(ip3);

        assert_eq!(tracker.active_connections(), 3);
        assert_eq!(tracker.unique_ip_count(), 3);

        tracker.on_disconnect(ip2);
        assert_eq!(tracker.active_connections(), 2);
        assert_eq!(tracker.unique_ip_count(), 2);
    }

    #[test]
    fn test_abuse_detection() {
        let registry = test_registry();
        let threshold = 3;
        let tracker = ConnectionTracker::new(threshold, &registry);
        let abuser_ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
        let normal_ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2));

        // Normal user with 1 connection
        tracker.on_connect(normal_ip);
        assert!(!tracker.is_flagged(&normal_ip));
        assert_eq!(tracker.flagged_abuser_count(), 0);

        // Abuser approaching threshold
        tracker.on_connect(abuser_ip);
        tracker.on_connect(abuser_ip);
        assert!(!tracker.is_flagged(&abuser_ip));
        assert_eq!(tracker.flagged_abuser_count(), 0);

        // Abuser hits threshold
        tracker.on_connect(abuser_ip);
        assert!(tracker.is_flagged(&abuser_ip));
        assert_eq!(tracker.flagged_abuser_count(), 1);

        // Normal user still not flagged
        assert!(!tracker.is_flagged(&normal_ip));

        // Abuser disconnects all - should be removed from flagged count
        tracker.on_disconnect(abuser_ip);
        tracker.on_disconnect(abuser_ip);
        tracker.on_disconnect(abuser_ip);
        assert_eq!(tracker.flagged_abuser_count(), 0);
        assert_eq!(tracker.active_connections(), 1); // Only normal user remains
    }

    #[test]
    fn test_disconnect_without_connect() {
        let registry = test_registry();
        let tracker = ConnectionTracker::new(5, &registry);
        let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));

        // Disconnect without connect should not panic or go negative
        tracker.on_disconnect(ip);
        assert_eq!(tracker.active_connections(), 0);
        assert_eq!(tracker.unique_ip_count(), 0);
    }
}