diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-03 14:50:22 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-03 15:18:23 +0000 |
| commit | 874a8abe1d076cfafd9baf919ec23d7d58200698 (patch) | |
| tree | dce0d0d36bddc496ff32f8555a8790d8dc7be7e4 /src/sync | |
| parent | 9fd4350c57bbe986ebf65bf3ea4c996572e81884 (diff) | |
| parent | 92a9a3bfe0bc522e8ae411991a366a3a6310d525 (diff) | |
Merge relay.ngit.dev migration: bug fixes and migration tooling
This merge includes critical bug fixes and comprehensive migration tooling
developed during the relay.ngit.dev migration effort.
Bug Fixes:
- Fix git protocol error handling to return HTTP 200 with ERR pkt-line
- Fix naughty list false positives and DNS failure identification
- Fix database query filters in load_existing_events (remove .since())
- Fix OID fetch tracking to distinguish 0 OIDs from successful fetches
- Fix purgatory event source tracking for filtered expiry logging
- Implement OID retry logic for 'not our ref' errors
Migration Tools & Documentation:
- Complete 5-phase migration analysis pipeline with orchestration script
- Phase 1: Event fetching from source relay
- Phase 2: Git sync verification
- Phase 3: Categorization and relay comparison
- Phase 4: Log extraction (parse failures, purgatory expiry)
- Phase 5: Action classification for migration decisions
- Comprehensive migration guide with lessons learned
- Troubleshooting guide for permission and corruption issues
Configuration:
- Add NGIT_LOG_LEVEL configuration option
- Update git throttle limits to 60/minute
- Improve logging throughout for better observability
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 3 | ||||
| -rw-r--r-- | src/sync/naughty_list.rs | 485 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 141 |
3 files changed, 430 insertions, 199 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index bc8c428..d6634ff 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -584,6 +584,7 @@ impl SyncManager { | |||
| 584 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | 585 | /// * `data_path` - Path to git data directory (for persistence) |
| 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 587 | #[allow(clippy::too_many_arguments)] | ||
| 587 | pub fn new( | 588 | pub fn new( |
| 588 | bootstrap_relay_url: Option<String>, | 589 | bootstrap_relay_url: Option<String>, |
| 589 | service_domain: String, | 590 | service_domain: String, |
| @@ -1442,6 +1443,7 @@ impl SyncManager { | |||
| 1442 | self.service_domain.clone(), | 1443 | self.service_domain.clone(), |
| 1443 | Arc::clone(&self.repo_sync_index), | 1444 | Arc::clone(&self.repo_sync_index), |
| 1444 | action_tx, | 1445 | action_tx, |
| 1446 | self.database.clone(), | ||
| 1445 | ); | 1447 | ); |
| 1446 | let subscriber_shutdown = shutdown_tx.subscribe(); | 1448 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 1447 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | 1449 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); |
| @@ -2811,6 +2813,7 @@ impl SyncManager { | |||
| 2811 | event_id = %event.id, | 2813 | event_id = %event.id, |
| 2812 | kind = %event.kind.as_u16(), | 2814 | kind = %event.kind.as_u16(), |
| 2813 | identifier = %identifier, | 2815 | identifier = %identifier, |
| 2816 | pubkey = %event.pubkey, | ||
| 2814 | "Added rejected announcement to two-tier index" | 2817 | "Added rejected announcement to two-tier index" |
| 2815 | ); | 2818 | ); |
| 2816 | } | 2819 | } |
diff --git a/src/sync/naughty_list.rs b/src/sync/naughty_list.rs index 35fcc0f..0abb986 100644 --- a/src/sync/naughty_list.rs +++ b/src/sync/naughty_list.rs | |||
| @@ -101,6 +101,69 @@ impl NaughtyListTracker { | |||
| 101 | Self::new(12) | 101 | Self::new(12) |
| 102 | } | 102 | } |
| 103 | 103 | ||
| 104 | /// Strip URLs from an error message to prevent false positives from URL components. | ||
| 105 | /// | ||
| 106 | /// URLs can contain path components, repository names, or user identifiers that | ||
| 107 | /// accidentally match error patterns (e.g., "my-openssl-project", "ssl-team", | ||
| 108 | /// "certificate-manager"). By stripping URLs before classification, we ensure | ||
| 109 | /// only the actual error message text is analyzed. | ||
| 110 | /// | ||
| 111 | /// Handles: http://, https://, git://, ws://, wss:// | ||
| 112 | fn strip_urls(error: &str) -> String { | ||
| 113 | let mut result = String::with_capacity(error.len()); | ||
| 114 | let mut chars = error.chars().peekable(); | ||
| 115 | |||
| 116 | while let Some(c) = chars.next() { | ||
| 117 | // Check for URL start patterns | ||
| 118 | let potential_url = match c { | ||
| 119 | 'h' => { | ||
| 120 | // Check for http:// or https:// | ||
| 121 | let rest: String = chars.clone().take(7).collect(); | ||
| 122 | rest.starts_with("ttp://") || rest.starts_with("ttps://") | ||
| 123 | } | ||
| 124 | 'g' => { | ||
| 125 | // Check for git:// | ||
| 126 | let rest: String = chars.clone().take(5).collect(); | ||
| 127 | rest.starts_with("it://") | ||
| 128 | } | ||
| 129 | 'w' => { | ||
| 130 | // Check for ws:// or wss:// | ||
| 131 | let rest: String = chars.clone().take(5).collect(); | ||
| 132 | rest.starts_with("s://") || rest.starts_with("ss://") | ||
| 133 | } | ||
| 134 | _ => false, | ||
| 135 | }; | ||
| 136 | |||
| 137 | if potential_url { | ||
| 138 | // Found URL start, consume until URL end | ||
| 139 | result.push_str("[URL]"); | ||
| 140 | |||
| 141 | // Skip until we hit a URL terminator | ||
| 142 | loop { | ||
| 143 | match chars.peek() { | ||
| 144 | Some(&ch) if Self::is_url_char(ch) => { | ||
| 145 | chars.next(); | ||
| 146 | } | ||
| 147 | _ => break, | ||
| 148 | } | ||
| 149 | } | ||
| 150 | } else { | ||
| 151 | result.push(c); | ||
| 152 | } | ||
| 153 | } | ||
| 154 | |||
| 155 | result | ||
| 156 | } | ||
| 157 | |||
| 158 | /// Check if a character can be part of a URL | ||
| 159 | #[inline] | ||
| 160 | fn is_url_char(c: char) -> bool { | ||
| 161 | // URLs end at whitespace, quotes, or certain brackets | ||
| 162 | // This is conservative - real URLs can contain more, but git errors | ||
| 163 | // typically have URLs followed by these terminators | ||
| 164 | !matches!(c, ' ' | '\t' | '\n' | '\r' | '"' | '\'' | '>' | ']' | ')') | ||
| 165 | } | ||
| 166 | |||
| 104 | /// Classify an error string into a naughty category or return None for transient errors | 167 | /// Classify an error string into a naughty category or return None for transient errors |
| 105 | /// | 168 | /// |
| 106 | /// # Arguments | 169 | /// # Arguments |
| @@ -112,21 +175,56 @@ impl NaughtyListTracker { | |||
| 112 | /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue | 175 | /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue |
| 113 | /// - `None` if the error is a transient network issue (use HealthTracker backoff) | 176 | /// - `None` if the error is a transient network issue (use HealthTracker backoff) |
| 114 | pub fn classify_error(error: &str) -> Option<NaughtyCategory> { | 177 | pub fn classify_error(error: &str) -> Option<NaughtyCategory> { |
| 115 | let error_lower = error.to_lowercase(); | 178 | // Filter out remote warnings - these are informational messages from the remote |
| 179 | // server that don't indicate infrastructure problems with the domain itself. | ||
| 180 | // Example: "remote: warning: unable to access '/root/.config/git/attributes': Permission denied" | ||
| 181 | // These warnings are about the remote server's internal configuration, not connectivity. | ||
| 182 | let filtered_error: String = error | ||
| 183 | .lines() | ||
| 184 | .filter(|line| { | ||
| 185 | let line_lower = line.to_lowercase(); | ||
| 186 | // Keep lines that are NOT remote warnings | ||
| 187 | !(line_lower.starts_with("remote: warning:") | ||
| 188 | || line_lower.starts_with("warning: remote")) | ||
| 189 | }) | ||
| 190 | .collect::<Vec<_>>() | ||
| 191 | .join("\n"); | ||
| 192 | |||
| 193 | // If after filtering we have no content, this was just warnings - not a real error | ||
| 194 | if filtered_error.trim().is_empty() { | ||
| 195 | return None; | ||
| 196 | } | ||
| 197 | |||
| 198 | // Strip URLs to prevent false positives from URL components | ||
| 199 | // (e.g., repository named "openssl-test" or path containing "certificate") | ||
| 200 | let url_stripped = Self::strip_urls(&filtered_error); | ||
| 201 | let error_lower = url_stripped.to_lowercase(); | ||
| 116 | 202 | ||
| 117 | // DNS lookup failures | 203 | // DNS lookup failures |
| 118 | if error_lower.contains("failed to lookup address") | 204 | if error_lower.contains("failed to lookup address") |
| 119 | || error_lower.contains("name or service not known") | 205 | || error_lower.contains("name or service not known") |
| 120 | || error_lower.contains("nodename nor servname provided") | 206 | || error_lower.contains("nodename nor servname provided") |
| 121 | || (error_lower.contains("dns") && !error_lower.contains("timeout")) | 207 | || error_lower.contains("dns error") |
| 208 | || error_lower.contains("dns lookup") | ||
| 209 | || error_lower.contains("dns resolution") | ||
| 210 | || error_lower.contains("getaddrinfo") | ||
| 122 | { | 211 | { |
| 123 | return Some(NaughtyCategory::DnsLookupFailed); | 212 | return Some(NaughtyCategory::DnsLookupFailed); |
| 124 | } | 213 | } |
| 125 | 214 | ||
| 126 | // TLS certificate errors | 215 | // TLS certificate errors |
| 127 | if error_lower.contains("certificate") | 216 | if error_lower.contains("certificate") |
| 128 | || error_lower.contains("ssl") | 217 | || error_lower.contains("ssl error") |
| 129 | || error_lower.contains("tls") | 218 | || error_lower.contains("ssl certificate") |
| 219 | || error_lower.contains("ssl handshake") | ||
| 220 | || error_lower.contains("ssl_error") | ||
| 221 | || error_lower.contains("tls error") | ||
| 222 | || error_lower.contains("tls handshake") | ||
| 223 | || error_lower.contains("tls alert") | ||
| 224 | || error_lower.contains("tls_error") | ||
| 225 | || error_lower.contains("openssl") | ||
| 226 | || error_lower.contains("schannel") | ||
| 227 | || error_lower.contains("secure channel") | ||
| 130 | { | 228 | { |
| 131 | // Exclude timeout errors that mention TLS | 229 | // Exclude timeout errors that mention TLS |
| 132 | if !error_lower.contains("timeout") && !error_lower.contains("timed out") { | 230 | if !error_lower.contains("timeout") && !error_lower.contains("timed out") { |
| @@ -134,12 +232,12 @@ impl NaughtyListTracker { | |||
| 134 | } | 232 | } |
| 135 | } | 233 | } |
| 136 | 234 | ||
| 137 | // Protocol errors | 235 | // Protocol errors - specifically WebSocket/Nostr protocol violations |
| 138 | if error_lower.contains("websocket") | 236 | // Note: We check for "websocket" specifically, NOT generic "protocol" keyword |
| 139 | || error_lower.contains("protocol") | 237 | // because git errors often contain "protocol error" (e.g., "fatal: protocol error: bad line length") |
| 140 | || error_lower.contains("invalid frame") | 238 | // which are transient network issues, not persistent infrastructure problems. |
| 141 | { | 239 | if error_lower.contains("websocket") || error_lower.contains("invalid frame") { |
| 142 | // Exclude connection errors | 240 | // Exclude connection errors (transient) |
| 143 | if !error_lower.contains("connection") | 241 | if !error_lower.contains("connection") |
| 144 | && !error_lower.contains("timeout") | 242 | && !error_lower.contains("timeout") |
| 145 | && !error_lower.contains("refused") | 243 | && !error_lower.contains("refused") |
| @@ -290,183 +388,216 @@ impl NaughtyListTracker { | |||
| 290 | mod tests { | 388 | mod tests { |
| 291 | use super::*; | 389 | use super::*; |
| 292 | 390 | ||
| 391 | // ========================================================================= | ||
| 392 | // URL STRIPPING TESTS | ||
| 393 | // ========================================================================= | ||
| 394 | |||
| 293 | #[test] | 395 | #[test] |
| 294 | fn test_classify_dns_errors() { | 396 | fn test_strip_urls_basic_protocols() { |
| 295 | assert_eq!( | 397 | // HTTP/HTTPS |
| 296 | NaughtyListTracker::classify_error("failed to lookup address information"), | ||
| 297 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 298 | ); | ||
| 299 | assert_eq!( | 398 | assert_eq!( |
| 300 | NaughtyListTracker::classify_error("Name or service not known"), | 399 | NaughtyListTracker::strip_urls("error: https://example.com/repo.git failed"), |
| 301 | Some(NaughtyCategory::DnsLookupFailed) | 400 | "error: [URL] failed" |
| 302 | ); | ||
| 303 | assert_eq!( | ||
| 304 | NaughtyListTracker::classify_error("nodename nor servname provided"), | ||
| 305 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 306 | ); | 401 | ); |
| 307 | assert_eq!( | 402 | assert_eq!( |
| 308 | NaughtyListTracker::classify_error("dns error: NXDOMAIN"), | 403 | NaughtyListTracker::strip_urls("error: http://example.com/path failed"), |
| 309 | Some(NaughtyCategory::DnsLookupFailed) | 404 | "error: [URL] failed" |
| 310 | ); | 405 | ); |
| 311 | } | ||
| 312 | 406 | ||
| 313 | #[test] | 407 | // Git protocol |
| 314 | fn test_classify_tls_errors() { | ||
| 315 | assert_eq!( | 408 | assert_eq!( |
| 316 | NaughtyListTracker::classify_error("certificate not valid for 'example.com'"), | 409 | NaughtyListTracker::strip_urls("fatal: git://github.com/user/repo.git not found"), |
| 317 | Some(NaughtyCategory::TlsCertificateInvalid) | 410 | "fatal: [URL] not found" |
| 318 | ); | 411 | ); |
| 412 | |||
| 413 | // WebSocket protocols (used for relay URLs) | ||
| 319 | assert_eq!( | 414 | assert_eq!( |
| 320 | NaughtyListTracker::classify_error("SSL certificate problem"), | 415 | NaughtyListTracker::strip_urls("error: wss://relay.example.com failed"), |
| 321 | Some(NaughtyCategory::TlsCertificateInvalid) | 416 | "error: [URL] failed" |
| 322 | ); | 417 | ); |
| 323 | assert_eq!( | 418 | assert_eq!( |
| 324 | NaughtyListTracker::classify_error("TLS handshake failed"), | 419 | NaughtyListTracker::strip_urls("error: ws://localhost:8080 failed"), |
| 325 | Some(NaughtyCategory::TlsCertificateInvalid) | 420 | "error: [URL] failed" |
| 326 | ); | 421 | ); |
| 422 | } | ||
| 327 | 423 | ||
| 328 | // TLS timeout should NOT be classified as naughty | 424 | #[test] |
| 329 | assert_eq!( | 425 | fn test_strip_urls_multiple() { |
| 330 | NaughtyListTracker::classify_error("TLS connection timed out"), | 426 | let error = "failed to clone https://a.com/repo.git and wss://relay.com"; |
| 331 | None | 427 | let stripped = NaughtyListTracker::strip_urls(error); |
| 332 | ); | 428 | assert_eq!(stripped, "failed to clone [URL] and [URL]"); |
| 333 | } | 429 | } |
| 334 | 430 | ||
| 335 | #[test] | 431 | #[test] |
| 336 | fn test_classify_protocol_errors() { | 432 | fn test_strip_urls_preserves_error_text() { |
| 337 | assert_eq!( | 433 | let error = |
| 338 | NaughtyListTracker::classify_error("websocket protocol error"), | 434 | "fatal: unable to access 'https://example.com/repo.git/': SSL certificate problem"; |
| 339 | Some(NaughtyCategory::ProtocolError) | 435 | let stripped = NaughtyListTracker::strip_urls(error); |
| 340 | ); | 436 | assert!(stripped.contains("SSL certificate problem")); |
| 437 | assert!(!stripped.contains("example.com")); | ||
| 438 | } | ||
| 439 | |||
| 440 | // ========================================================================= | ||
| 441 | // EDGE CASES: TIMEOUT/CONNECTION EXCEPTIONS | ||
| 442 | // These are the "unusual rules" where a pattern matches but should be excluded | ||
| 443 | // ========================================================================= | ||
| 444 | |||
| 445 | #[test] | ||
| 446 | fn test_tls_timeout_not_naughty() { | ||
| 447 | // TLS errors with timeout should NOT be classified as naughty | ||
| 448 | // (timeout is transient, not a certificate problem) | ||
| 341 | assert_eq!( | 449 | assert_eq!( |
| 342 | NaughtyListTracker::classify_error("invalid frame header"), | 450 | NaughtyListTracker::classify_error("TLS connection timed out"), |
| 343 | Some(NaughtyCategory::ProtocolError) | 451 | None |
| 344 | ); | 452 | ); |
| 345 | |||
| 346 | // WebSocket connection errors should NOT be classified as naughty | ||
| 347 | assert_eq!( | 453 | assert_eq!( |
| 348 | NaughtyListTracker::classify_error("websocket connection refused"), | 454 | NaughtyListTracker::classify_error("SSL handshake timeout"), |
| 349 | None | 455 | None |
| 350 | ); | 456 | ); |
| 351 | } | 457 | } |
| 352 | 458 | ||
| 353 | #[test] | 459 | #[test] |
| 354 | fn test_classify_transient_errors() { | 460 | fn test_websocket_connection_errors_not_naughty() { |
| 355 | // Timeouts are transient | 461 | // WebSocket connection errors are transient, not protocol violations |
| 356 | assert_eq!( | 462 | assert_eq!( |
| 357 | NaughtyListTracker::classify_error("connection timed out"), | 463 | NaughtyListTracker::classify_error("websocket connection refused"), |
| 358 | None | 464 | None |
| 359 | ); | 465 | ); |
| 360 | assert_eq!( | 466 | assert_eq!( |
| 361 | NaughtyListTracker::classify_error("operation timed out"), | 467 | NaughtyListTracker::classify_error("websocket connection timeout"), |
| 362 | None | 468 | None |
| 363 | ); | 469 | ); |
| 470 | } | ||
| 364 | 471 | ||
| 365 | // Connection refused is transient | 472 | #[test] |
| 473 | fn test_remote_warnings_filtered() { | ||
| 474 | // Remote warnings should be filtered out before classification | ||
| 475 | let warning_only = | ||
| 476 | "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"; | ||
| 477 | assert_eq!(NaughtyListTracker::classify_error(warning_only), None); | ||
| 478 | |||
| 479 | // But real errors after warnings should still be classified | ||
| 480 | let warning_with_error = "remote: warning: something\nfatal: failed to lookup address"; | ||
| 366 | assert_eq!( | 481 | assert_eq!( |
| 367 | NaughtyListTracker::classify_error("connection refused"), | 482 | NaughtyListTracker::classify_error(warning_with_error), |
| 368 | None | 483 | Some(NaughtyCategory::DnsLookupFailed) |
| 369 | ); | 484 | ); |
| 485 | } | ||
| 370 | 486 | ||
| 371 | // Generic network errors are transient | 487 | // ========================================================================= |
| 488 | // INTEGRATION: FULL CLASSIFICATION FLOW | ||
| 489 | // Verify URL stripping + classification work together correctly | ||
| 490 | // ========================================================================= | ||
| 491 | |||
| 492 | #[test] | ||
| 493 | fn test_url_with_keywords_not_false_positive() { | ||
| 494 | // URLs containing keywords should NOT trigger classification | ||
| 495 | let cases = [ | ||
| 496 | ("https://example.com/my-openssl-project.git", "not found"), | ||
| 497 | ("https://example.com/ssl-team/repo.git", "not found"), | ||
| 498 | ("https://example.com/certificate-manager.git", "not found"), | ||
| 499 | ("https://example.com/dns-tools.git", "not found"), | ||
| 500 | ("wss://relay-tls-test.example.com", "connection refused"), | ||
| 501 | ]; | ||
| 502 | |||
| 503 | for (url, suffix) in cases { | ||
| 504 | let error = format!("fatal: repository '{}/' {}", url, suffix); | ||
| 505 | assert_eq!( | ||
| 506 | NaughtyListTracker::classify_error(&error), | ||
| 507 | None, | ||
| 508 | "URL '{}' should not trigger false positive", | ||
| 509 | url | ||
| 510 | ); | ||
| 511 | } | ||
| 512 | } | ||
| 513 | |||
| 514 | #[test] | ||
| 515 | fn test_real_errors_still_detected() { | ||
| 516 | // Real errors in the message text (not URL) should still be detected | ||
| 372 | assert_eq!( | 517 | assert_eq!( |
| 373 | NaughtyListTracker::classify_error("network unreachable"), | 518 | NaughtyListTracker::classify_error( |
| 374 | None | 519 | "fatal: 'https://example.com/repo.git': SSL certificate problem" |
| 520 | ), | ||
| 521 | Some(NaughtyCategory::TlsCertificateInvalid) | ||
| 522 | ); | ||
| 523 | assert_eq!( | ||
| 524 | NaughtyListTracker::classify_error( | ||
| 525 | "fatal: 'https://example.com/repo.git': failed to lookup address" | ||
| 526 | ), | ||
| 527 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 528 | ); | ||
| 529 | assert_eq!( | ||
| 530 | NaughtyListTracker::classify_error("websocket protocol error"), | ||
| 531 | Some(NaughtyCategory::ProtocolError) | ||
| 375 | ); | 532 | ); |
| 376 | } | 533 | } |
| 377 | 534 | ||
| 378 | #[test] | 535 | #[test] |
| 379 | fn test_record_new_entry() { | 536 | fn test_url_with_keyword_and_real_error() { |
| 380 | let tracker = NaughtyListTracker::with_defaults(); | 537 | // URL contains keyword AND there's a real error - should detect the error |
| 381 | let url = "wss://bad-relay.example.com"; | 538 | let error = "fatal: 'https://example.com/ssl-tools/repo.git': SSL certificate problem"; |
| 382 | 539 | assert_eq!( | |
| 383 | let is_new = tracker.record( | 540 | NaughtyListTracker::classify_error(error), |
| 384 | url, | 541 | Some(NaughtyCategory::TlsCertificateInvalid) |
| 385 | NaughtyCategory::DnsLookupFailed, | ||
| 386 | "failed to lookup address".to_string(), | ||
| 387 | ); | 542 | ); |
| 388 | |||
| 389 | assert!(is_new); | ||
| 390 | assert!(tracker.is_naughty(url)); | ||
| 391 | |||
| 392 | let entry = tracker.get_entry(url).unwrap(); | ||
| 393 | assert_eq!(entry.category, NaughtyCategory::DnsLookupFailed); | ||
| 394 | assert_eq!(entry.occurrence_count, 1); | ||
| 395 | } | 543 | } |
| 396 | 544 | ||
| 545 | // ========================================================================= | ||
| 546 | // TRACKER FUNCTIONALITY | ||
| 547 | // ========================================================================= | ||
| 548 | |||
| 397 | #[test] | 549 | #[test] |
| 398 | fn test_record_updates_existing() { | 550 | fn test_tracker_record_and_update() { |
| 399 | let tracker = NaughtyListTracker::with_defaults(); | 551 | let tracker = NaughtyListTracker::with_defaults(); |
| 400 | let url = "wss://bad-relay.example.com"; | 552 | let url = "wss://bad-relay.example.com"; |
| 401 | 553 | ||
| 402 | // First occurrence | 554 | // First occurrence |
| 403 | let is_new1 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); | 555 | let is_new = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); |
| 404 | assert!(is_new1); | 556 | assert!(is_new); |
| 557 | assert!(tracker.is_naughty(url)); | ||
| 405 | 558 | ||
| 406 | // Second occurrence | 559 | // Second occurrence updates existing |
| 407 | let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); | 560 | let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); |
| 408 | assert!(!is_new2); | 561 | assert!(!is_new2); |
| 409 | 562 | ||
| 410 | let entry = tracker.get_entry(url).unwrap(); | 563 | let entry = tracker.get_entry(url).unwrap(); |
| 411 | assert_eq!(entry.occurrence_count, 2); | 564 | assert_eq!(entry.occurrence_count, 2); |
| 412 | assert_eq!(entry.reason, "error 2"); // Updated to latest | 565 | assert_eq!(entry.reason, "error 2"); |
| 413 | } | 566 | } |
| 414 | 567 | ||
| 415 | #[test] | 568 | #[test] |
| 416 | fn test_is_naughty() { | 569 | fn test_tracker_expiration() { |
| 417 | let tracker = NaughtyListTracker::with_defaults(); | 570 | let tracker = NaughtyListTracker::new(0); // Expire immediately |
| 418 | let url = "wss://bad-relay.example.com"; | ||
| 419 | |||
| 420 | assert!(!tracker.is_naughty(url)); | ||
| 421 | 571 | ||
| 422 | tracker.record( | 572 | tracker.record( |
| 423 | url, | 573 | "wss://relay.example.com", |
| 424 | NaughtyCategory::TlsCertificateInvalid, | 574 | NaughtyCategory::DnsLookupFailed, |
| 425 | "cert error".to_string(), | 575 | "error".to_string(), |
| 426 | ); | 576 | ); |
| 427 | 577 | ||
| 428 | assert!(tracker.is_naughty(url)); | 578 | // Entry exists but is expired |
| 429 | } | 579 | assert!(!tracker.is_naughty("wss://relay.example.com")); |
| 430 | 580 | ||
| 431 | #[test] | 581 | std::thread::sleep(std::time::Duration::from_millis(10)); |
| 432 | fn test_get_all() { | ||
| 433 | let tracker = NaughtyListTracker::with_defaults(); | ||
| 434 | |||
| 435 | tracker.record( | ||
| 436 | "wss://relay1.example.com", | ||
| 437 | NaughtyCategory::DnsLookupFailed, | ||
| 438 | "dns error".to_string(), | ||
| 439 | ); | ||
| 440 | tracker.record( | ||
| 441 | "wss://relay2.example.com", | ||
| 442 | NaughtyCategory::TlsCertificateInvalid, | ||
| 443 | "tls error".to_string(), | ||
| 444 | ); | ||
| 445 | 582 | ||
| 446 | let all = tracker.get_all(); | 583 | let expired = tracker.expire_old_entries(); |
| 447 | assert_eq!(all.len(), 2); | 584 | assert_eq!(expired.len(), 1); |
| 585 | assert_eq!(tracker.total_count(), 0); | ||
| 448 | } | 586 | } |
| 449 | 587 | ||
| 450 | #[test] | 588 | #[test] |
| 451 | fn test_count_by_category() { | 589 | fn test_tracker_counts() { |
| 452 | let tracker = NaughtyListTracker::with_defaults(); | 590 | let tracker = NaughtyListTracker::with_defaults(); |
| 453 | 591 | ||
| 592 | tracker.record("wss://r1.com", NaughtyCategory::DnsLookupFailed, "e".into()); | ||
| 593 | tracker.record("wss://r2.com", NaughtyCategory::DnsLookupFailed, "e".into()); | ||
| 454 | tracker.record( | 594 | tracker.record( |
| 455 | "wss://relay1.example.com", | 595 | "wss://r3.com", |
| 456 | NaughtyCategory::DnsLookupFailed, | ||
| 457 | "error".to_string(), | ||
| 458 | ); | ||
| 459 | tracker.record( | ||
| 460 | "wss://relay2.example.com", | ||
| 461 | NaughtyCategory::DnsLookupFailed, | ||
| 462 | "error".to_string(), | ||
| 463 | ); | ||
| 464 | tracker.record( | ||
| 465 | "wss://relay3.example.com", | ||
| 466 | NaughtyCategory::TlsCertificateInvalid, | 596 | NaughtyCategory::TlsCertificateInvalid, |
| 467 | "error".to_string(), | 597 | "e".into(), |
| 468 | ); | 598 | ); |
| 469 | 599 | ||
| 600 | assert_eq!(tracker.total_count(), 3); | ||
| 470 | assert_eq!( | 601 | assert_eq!( |
| 471 | tracker.count_by_category(NaughtyCategory::DnsLookupFailed), | 602 | tracker.count_by_category(NaughtyCategory::DnsLookupFailed), |
| 472 | 2 | 603 | 2 |
| @@ -475,82 +606,84 @@ mod tests { | |||
| 475 | tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), | 606 | tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), |
| 476 | 1 | 607 | 1 |
| 477 | ); | 608 | ); |
| 478 | assert_eq!(tracker.count_by_category(NaughtyCategory::ProtocolError), 0); | 609 | assert_eq!(tracker.get_all().len(), 3); |
| 479 | } | 610 | } |
| 480 | 611 | ||
| 481 | #[test] | 612 | #[test] |
| 482 | fn test_total_count() { | 613 | fn test_category_display() { |
| 483 | let tracker = NaughtyListTracker::with_defaults(); | 614 | assert_eq!( |
| 484 | assert_eq!(tracker.total_count(), 0); | 615 | NaughtyCategory::DnsLookupFailed.as_str(), |
| 485 | 616 | "dns_lookup_failed" | |
| 486 | tracker.record( | ||
| 487 | "wss://relay1.example.com", | ||
| 488 | NaughtyCategory::DnsLookupFailed, | ||
| 489 | "error".to_string(), | ||
| 490 | ); | 617 | ); |
| 491 | assert_eq!(tracker.total_count(), 1); | 618 | assert_eq!( |
| 492 | 619 | NaughtyCategory::TlsCertificateInvalid.as_str(), | |
| 493 | tracker.record( | 620 | "tls_certificate_invalid" |
| 494 | "wss://relay2.example.com", | ||
| 495 | NaughtyCategory::TlsCertificateInvalid, | ||
| 496 | "error".to_string(), | ||
| 497 | ); | 621 | ); |
| 498 | assert_eq!(tracker.total_count(), 2); | 622 | assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); |
| 499 | } | 623 | } |
| 624 | } | ||
| 500 | 625 | ||
| 501 | #[test] | 626 | #[cfg(test)] |
| 502 | fn test_expire_old_entries() { | 627 | mod production_tests { |
| 503 | // Use very short expiration for testing | 628 | use super::*; |
| 504 | let tracker = NaughtyListTracker::new(0); // Expire immediately (0 hours) | ||
| 505 | |||
| 506 | tracker.record( | ||
| 507 | "wss://relay1.example.com", | ||
| 508 | NaughtyCategory::DnsLookupFailed, | ||
| 509 | "error".to_string(), | ||
| 510 | ); | ||
| 511 | |||
| 512 | // Entry should exist in the map | ||
| 513 | assert_eq!(tracker.total_count(), 1); | ||
| 514 | |||
| 515 | // But is_naughty should return false since it's already expired (0 hours) | ||
| 516 | assert!(!tracker.is_naughty("wss://relay1.example.com")); | ||
| 517 | |||
| 518 | // Sleep to ensure time passes | ||
| 519 | std::thread::sleep(std::time::Duration::from_millis(10)); | ||
| 520 | 629 | ||
| 521 | // Expire old entries (should remove the 0-hour expired entry) | 630 | /// Production case from relay.ngit.dev - remote warning should not be classified |
| 522 | let expired = tracker.expire_old_entries(); | 631 | #[test] |
| 523 | assert_eq!(expired.len(), 1); | 632 | fn test_classify_production_relay_ngit_dev_warning() { |
| 524 | assert_eq!(expired[0], "wss://relay1.example.com"); | 633 | let error = |
| 634 | "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"; | ||
| 635 | assert_eq!(NaughtyListTracker::classify_error(error), None); | ||
| 636 | } | ||
| 525 | 637 | ||
| 526 | // Entry should be gone | 638 | /// Git protocol errors are transient, not persistent infrastructure issues |
| 527 | assert!(!tracker.is_naughty("wss://relay1.example.com")); | 639 | #[test] |
| 528 | assert_eq!(tracker.total_count(), 0); | 640 | fn test_git_protocol_errors_not_naughty() { |
| 641 | // These are common git protocol errors that should NOT be classified as naughty | ||
| 642 | let git_protocol_errors = [ | ||
| 643 | "fatal: protocol error: bad line length character: remo", | ||
| 644 | "fatal: protocol error: expected old/new/ref, got 'shallow", | ||
| 645 | "fatal: git upload-pack: protocol error", | ||
| 646 | "error: protocol error: bad pack header", | ||
| 647 | "fatal: protocol error: bad band #3", | ||
| 648 | ]; | ||
| 649 | |||
| 650 | for error in git_protocol_errors { | ||
| 651 | assert_eq!( | ||
| 652 | NaughtyListTracker::classify_error(error), | ||
| 653 | None, | ||
| 654 | "Git protocol error should not be classified as naughty: {}", | ||
| 655 | error | ||
| 656 | ); | ||
| 657 | } | ||
| 529 | } | 658 | } |
| 530 | 659 | ||
| 660 | /// Remote warning followed by git protocol error - both should be filtered/ignored | ||
| 531 | #[test] | 661 | #[test] |
| 532 | fn test_category_display() { | 662 | fn test_warning_with_git_protocol_error() { |
| 663 | let error = "remote: warning: unable to access '/root/.config/git/attributes': Permission denied\nfatal: protocol error: bad line length character: remo"; | ||
| 533 | assert_eq!( | 664 | assert_eq!( |
| 534 | NaughtyCategory::DnsLookupFailed.to_string(), | 665 | NaughtyListTracker::classify_error(error), |
| 535 | "dns_lookup_failed" | 666 | None, |
| 667 | "Warning + git protocol error should not be classified as naughty" | ||
| 536 | ); | 668 | ); |
| 537 | assert_eq!( | ||
| 538 | NaughtyCategory::TlsCertificateInvalid.to_string(), | ||
| 539 | "tls_certificate_invalid" | ||
| 540 | ); | ||
| 541 | assert_eq!(NaughtyCategory::ProtocolError.to_string(), "protocol_error"); | ||
| 542 | } | 669 | } |
| 543 | 670 | ||
| 671 | /// WebSocket protocol errors ARE naughty (persistent infrastructure issues) | ||
| 544 | #[test] | 672 | #[test] |
| 545 | fn test_category_as_str() { | 673 | fn test_websocket_errors_still_naughty() { |
| 546 | assert_eq!( | 674 | let websocket_errors = [ |
| 547 | NaughtyCategory::DnsLookupFailed.as_str(), | 675 | "websocket protocol error", |
| 548 | "dns_lookup_failed" | 676 | "websocket handshake failed", |
| 549 | ); | 677 | "invalid frame received", |
| 550 | assert_eq!( | 678 | ]; |
| 551 | NaughtyCategory::TlsCertificateInvalid.as_str(), | 679 | |
| 552 | "tls_certificate_invalid" | 680 | for error in websocket_errors { |
| 553 | ); | 681 | assert_eq!( |
| 554 | assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); | 682 | NaughtyListTracker::classify_error(error), |
| 683 | Some(NaughtyCategory::ProtocolError), | ||
| 684 | "WebSocket error should be classified as protocol_error: {}", | ||
| 685 | error | ||
| 686 | ); | ||
| 687 | } | ||
| 555 | } | 688 | } |
| 556 | } | 689 | } |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..86e4583 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -98,6 +100,8 @@ pub struct SelfSubscriber { | |||
| 98 | action_tx: mpsc::Sender<AddFilters>, | 100 | action_tx: mpsc::Sender<AddFilters>, |
| 99 | /// Last time we connected - used for since filter on reconnect | 101 | /// Last time we connected - used for since filter on reconnect |
| 100 | last_connected: Option<Timestamp>, | 102 | last_connected: Option<Timestamp>, |
| 103 | /// Database for querying existing events on startup | ||
| 104 | database: SharedDatabase, | ||
| 101 | } | 105 | } |
| 102 | 106 | ||
| 103 | impl SelfSubscriber { | 107 | impl SelfSubscriber { |
| @@ -108,11 +112,13 @@ impl SelfSubscriber { | |||
| 108 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 112 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 109 | /// * `repo_sync_index` - Shared index to update with discovered repos | 113 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 110 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 114 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 115 | /// * `database` - Database for querying existing events on startup | ||
| 111 | pub fn new( | 116 | pub fn new( |
| 112 | own_relay_url: String, | 117 | own_relay_url: String, |
| 113 | relay_domain: String, | 118 | relay_domain: String, |
| 114 | repo_sync_index: RepoSyncIndex, | 119 | repo_sync_index: RepoSyncIndex, |
| 115 | action_tx: mpsc::Sender<AddFilters>, | 120 | action_tx: mpsc::Sender<AddFilters>, |
| 121 | database: SharedDatabase, | ||
| 116 | ) -> Self { | 122 | ) -> Self { |
| 117 | Self { | 123 | Self { |
| 118 | own_relay_url, | 124 | own_relay_url, |
| @@ -120,6 +126,7 @@ impl SelfSubscriber { | |||
| 120 | repo_sync_index, | 126 | repo_sync_index, |
| 121 | action_tx, | 127 | action_tx, |
| 122 | last_connected: None, | 128 | last_connected: None, |
| 129 | database, | ||
| 123 | } | 130 | } |
| 124 | } | 131 | } |
| 125 | 132 | ||
| @@ -135,6 +142,101 @@ impl SelfSubscriber { | |||
| 135 | .unwrap_or(Duration::from_millis(5000)) | 142 | .unwrap_or(Duration::from_millis(5000)) |
| 136 | } | 143 | } |
| 137 | 144 | ||
| 145 | /// Load existing events from database on startup | ||
| 146 | /// | ||
| 147 | /// Queries the database with two separate queries to build the initial | ||
| 148 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 149 | /// created, not just those returned by the WebSocket subscription | ||
| 150 | /// (which has limits on the number of events returned). | ||
| 151 | /// | ||
| 152 | /// Query order: | ||
| 153 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 154 | /// with repos and their relays | ||
| 155 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 156 | /// to add root event IDs for Layer 3 filter creation | ||
| 157 | /// | ||
| 158 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 159 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 160 | let mut pending = PendingUpdates::new(); | ||
| 161 | |||
| 162 | tracing::info!("Loading all events from database"); | ||
| 163 | |||
| 164 | // First query: Get all announcements to populate repo_sync_index | ||
| 165 | let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 166 | |||
| 167 | let announcements = match self.database.query(announcement_filter).await { | ||
| 168 | Ok(events) => { | ||
| 169 | tracing::info!(count = events.len(), "Loaded announcements from database"); | ||
| 170 | events | ||
| 171 | } | ||
| 172 | Err(e) => { | ||
| 173 | tracing::error!( | ||
| 174 | error = %e, | ||
| 175 | "Failed to query announcements from database" | ||
| 176 | ); | ||
| 177 | return pending; | ||
| 178 | } | ||
| 179 | }; | ||
| 180 | |||
| 181 | // Process announcements | ||
| 182 | let mut announcements_loaded = 0; | ||
| 183 | for event in announcements.iter() { | ||
| 184 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 185 | let relays = Self::extract_relay_urls(event); | ||
| 186 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 187 | announcements_loaded += 1; | ||
| 188 | } | ||
| 189 | } | ||
| 190 | |||
| 191 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 192 | { | ||
| 193 | let mut index = self.repo_sync_index.write().await; | ||
| 194 | for (repo_id, needs) in &pending.repos { | ||
| 195 | let entry = index | ||
| 196 | .entry(repo_id.clone()) | ||
| 197 | .or_insert_with(|| RepoSyncNeeds { | ||
| 198 | relays: HashSet::new(), | ||
| 199 | root_events: HashSet::new(), | ||
| 200 | }); | ||
| 201 | entry.relays.extend(needs.relays.clone()); | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | // Second query: Get all root events for handle_root_event() | ||
| 206 | let root_filter = | ||
| 207 | Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]); | ||
| 208 | |||
| 209 | let root_events = match self.database.query(root_filter).await { | ||
| 210 | Ok(events) => { | ||
| 211 | tracing::info!(count = events.len(), "Loaded root events from database"); | ||
| 212 | events | ||
| 213 | } | ||
| 214 | Err(e) => { | ||
| 215 | tracing::error!( | ||
| 216 | error = %e, | ||
| 217 | "Failed to query root events from database" | ||
| 218 | ); | ||
| 219 | // Continue with just announcements | ||
| 220 | return pending; | ||
| 221 | } | ||
| 222 | }; | ||
| 223 | |||
| 224 | // Process root events | ||
| 225 | let mut root_events_processed = 0; | ||
| 226 | for event in root_events.iter() { | ||
| 227 | self.handle_root_event(event, &mut pending).await; | ||
| 228 | root_events_processed += 1; | ||
| 229 | } | ||
| 230 | |||
| 231 | tracing::info!( | ||
| 232 | announcements_loaded = announcements_loaded, | ||
| 233 | root_events_processed = root_events_processed, | ||
| 234 | "Processed existing events from database" | ||
| 235 | ); | ||
| 236 | |||
| 237 | pending | ||
| 238 | } | ||
| 239 | |||
| 138 | /// Process a relay pool notification | 240 | /// Process a relay pool notification |
| 139 | /// | 241 | /// |
| 140 | /// Handles incoming events from the subscription, queueing 30617 announcements | 242 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -276,33 +378,22 @@ impl SelfSubscriber { | |||
| 276 | // Subscribe to announcement and root event kinds | 378 | // Subscribe to announcement and root event kinds |
| 277 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 379 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 278 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 380 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 279 | // Check if we have a last_connected time for reconnect filtering | 381 | let mut filter = Filter::new().kinds(vec![ |
| 280 | let filter = if let Some(last) = self.last_connected { | 382 | Kind::GitRepoAnnouncement, |
| 383 | Kind::GitPatch, | ||
| 384 | Kind::GitIssue, | ||
| 385 | Kind::GitPullRequest, | ||
| 386 | Kind::GitUserGraspList, | ||
| 387 | ]); | ||
| 388 | if let Some(timestamp) = self.last_connected { | ||
| 281 | // Quick reconnect - use since filter (15 min buffer) | 389 | // Quick reconnect - use since filter (15 min buffer) |
| 282 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 390 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 283 | tracing::debug!( | 391 | tracing::debug!( |
| 284 | since = %since, | 392 | since = %since, |
| 285 | "Using since filter for reconnect" | 393 | "Using since filter for reconnect" |
| 286 | ); | 394 | ); |
| 287 | Filter::new() | 395 | filter = filter.since(since); |
| 288 | .kinds(vec![ | 396 | } |
| 289 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 290 | Kind::GitPatch, // Patches | ||
| 291 | Kind::GitIssue, // Issues | ||
| 292 | Kind::GitPullRequest, // Pull Requests | ||
| 293 | Kind::GitUserGraspList, // User Grasp List | ||
| 294 | ]) | ||
| 295 | .since(since) | ||
| 296 | } else { | ||
| 297 | // First connection - no since filter | ||
| 298 | Filter::new().kinds(vec![ | ||
| 299 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 300 | Kind::GitPatch, // Patches | ||
| 301 | Kind::GitIssue, // Issues | ||
| 302 | Kind::GitPullRequest, // Pull Requests | ||
| 303 | Kind::GitUserGraspList, // User Grasp List | ||
| 304 | ]) | ||
| 305 | }; | ||
| 306 | 397 | ||
| 307 | // Update last_connected AFTER creating filter but BEFORE subscribing | 398 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 308 | self.last_connected = Some(Timestamp::now()); | 399 | self.last_connected = Some(Timestamp::now()); |
| @@ -323,7 +414,11 @@ impl SelfSubscriber { | |||
| 323 | 414 | ||
| 324 | let mut notifications = client.notifications(); | 415 | let mut notifications = client.notifications(); |
| 325 | let batch_window = Self::get_batch_window(); | 416 | let batch_window = Self::get_batch_window(); |
| 326 | let mut pending = PendingUpdates::new(); | 417 | |
| 418 | // Load existing events from database on startup | ||
| 419 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 420 | // returned by the WebSocket subscription (which has limits) | ||
| 421 | let mut pending = self.load_existing_events().await; | ||
| 327 | 422 | ||
| 328 | // Timer does NOT reset on new events - use interval | 423 | // Timer does NOT reset on new events - use interval |
| 329 | let mut timer = tokio::time::interval(batch_window); | 424 | let mut timer = tokio::time::interval(batch_window); |