diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-05 16:37:09 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-05 16:37:09 +0000 |
| commit | b4da09689ee0bd6ac327a6ed7ffb01e2175e2596 (patch) | |
| tree | cb662fb7da60568a959c75526c501ba76c0f4043 /src/sync/connection.rs | |
| parent | b7aa7b72c189290b45fb388ec1826862bc8dda49 (diff) | |
remove stupid tests and methods
Diffstat (limited to 'src/sync/connection.rs')
| -rw-r--r-- | src/sync/connection.rs | 41 |
1 files changed, 19 insertions, 22 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs index e921185..61a33f8 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs | |||
| @@ -70,10 +70,8 @@ impl SyncConnection { | |||
| 70 | tracing::info!("Sync connection established to {}", url); | 70 | tracing::info!("Sync connection established to {}", url); |
| 71 | 71 | ||
| 72 | // Create subscription manager for this connection | 72 | // Create subscription manager for this connection |
| 73 | let subscription_manager = SubscriptionManager::new( | 73 | let subscription_manager = |
| 74 | filter_service.clone(), | 74 | SubscriptionManager::new(filter_service.clone(), remote_domain.to_string()); |
| 75 | remote_domain.to_string(), | ||
| 76 | ); | ||
| 77 | 75 | ||
| 78 | Ok(Self { | 76 | Ok(Self { |
| 79 | url: url.to_string(), | 77 | url: url.to_string(), |
| @@ -208,10 +206,8 @@ impl SyncConnection { | |||
| 208 | /// - kind 30617/30618: New announcement → add Layer 2 subscription | 206 | /// - kind 30617/30618: New announcement → add Layer 2 subscription |
| 209 | /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription | 207 | /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription |
| 210 | async fn handle_dynamic_subscription(&mut self, event: &Event) { | 208 | async fn handle_dynamic_subscription(&mut self, event: &Event) { |
| 211 | let kind = event.kind.as_u16(); | ||
| 212 | |||
| 213 | // Check if this is an announcement kind (triggers Layer 2 subscription) | 209 | // Check if this is an announcement kind (triggers Layer 2 subscription) |
| 214 | if SubscriptionManager::is_announcement_kind(kind) { | 210 | if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) { |
| 215 | if let Some(new_filters) = self.subscription_manager.add_announcement(event) { | 211 | if let Some(new_filters) = self.subscription_manager.add_announcement(event) { |
| 216 | tracing::info!( | 212 | tracing::info!( |
| 217 | "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", | 213 | "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", |
| @@ -224,8 +220,11 @@ impl SyncConnection { | |||
| 224 | } | 220 | } |
| 225 | } | 221 | } |
| 226 | 222 | ||
| 227 | // Check if this is a PR/Issue kind (triggers Layer 3 subscription) | 223 | // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription) |
| 228 | if SubscriptionManager::is_pr_issue_kind(kind) { | 224 | if matches!( |
| 225 | event.kind, | ||
| 226 | Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618) | ||
| 227 | ) { | ||
| 229 | if let Some(new_filters) = self.subscription_manager.add_event(event) { | 228 | if let Some(new_filters) = self.subscription_manager.add_event(event) { |
| 230 | tracing::info!( | 229 | tracing::info!( |
| 231 | "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", | 230 | "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", |
| @@ -366,11 +365,13 @@ pub async fn connect_with_retry( | |||
| 366 | ); | 365 | ); |
| 367 | } | 366 | } |
| 368 | 367 | ||
| 369 | match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { | 368 | match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()) |
| 369 | .await | ||
| 370 | { | ||
| 370 | Ok(conn) => { | 371 | Ok(conn) => { |
| 371 | // Record successful connection | 372 | // Record successful connection |
| 372 | health_tracker.record_success(url); | 373 | health_tracker.record_success(url); |
| 373 | 374 | ||
| 374 | // Record metrics | 375 | // Record metrics |
| 375 | if let Some(ref m) = metrics { | 376 | if let Some(ref m) = metrics { |
| 376 | m.record_connection_attempt(url, true); | 377 | m.record_connection_attempt(url, true); |
| @@ -379,7 +380,7 @@ pub async fn connect_with_retry( | |||
| 379 | m.record_health_state(url, health_tracker.get_state(url)); | 380 | m.record_health_state(url, health_tracker.get_state(url)); |
| 380 | m.record_failure_count(url, 0); | 381 | m.record_failure_count(url, 0); |
| 381 | } | 382 | } |
| 382 | 383 | ||
| 383 | tracing::info!("Sync connection established to {}", url); | 384 | tracing::info!("Sync connection established to {}", url); |
| 384 | 385 | ||
| 385 | // Run the connection (this blocks until disconnection) | 386 | // Run the connection (this blocks until disconnection) |
| @@ -388,7 +389,7 @@ pub async fn connect_with_retry( | |||
| 388 | // Connection ended - record as failure for reconnection backoff | 389 | // Connection ended - record as failure for reconnection backoff |
| 389 | // (The connection ending is considered a failure even if it worked for a while) | 390 | // (The connection ending is considered a failure even if it worked for a while) |
| 390 | health_tracker.record_failure(url); | 391 | health_tracker.record_failure(url); |
| 391 | 392 | ||
| 392 | // Update metrics for disconnection | 393 | // Update metrics for disconnection |
| 393 | if let Some(ref m) = metrics { | 394 | if let Some(ref m) = metrics { |
| 394 | m.set_relay_connected(url, false); | 395 | m.set_relay_connected(url, false); |
| @@ -396,7 +397,7 @@ pub async fn connect_with_retry( | |||
| 396 | m.record_health_state(url, health_tracker.get_state(url)); | 397 | m.record_health_state(url, health_tracker.get_state(url)); |
| 397 | m.record_failure_count(url, health_tracker.get_failure_count(url)); | 398 | m.record_failure_count(url, health_tracker.get_failure_count(url)); |
| 398 | } | 399 | } |
| 399 | 400 | ||
| 400 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | 401 | tracing::warn!("Sync connection to {} ended, will reconnect", url); |
| 401 | } | 402 | } |
| 402 | Err(e) => { | 403 | Err(e) => { |
| @@ -405,14 +406,14 @@ pub async fn connect_with_retry( | |||
| 405 | 406 | ||
| 406 | let failure_count = health_tracker.get_failure_count(url); | 407 | let failure_count = health_tracker.get_failure_count(url); |
| 407 | let state = health_tracker.get_state(url); | 408 | let state = health_tracker.get_state(url); |
| 408 | 409 | ||
| 409 | // Record metrics | 410 | // Record metrics |
| 410 | if let Some(ref m) = metrics { | 411 | if let Some(ref m) = metrics { |
| 411 | m.record_connection_attempt(url, false); | 412 | m.record_connection_attempt(url, false); |
| 412 | m.set_relay_connected(url, false); | 413 | m.set_relay_connected(url, false); |
| 413 | m.record_health_state(url, state); | 414 | m.record_health_state(url, state); |
| 414 | m.record_failure_count(url, failure_count); | 415 | m.record_failure_count(url, failure_count); |
| 415 | 416 | ||
| 416 | // Track dead relays | 417 | // Track dead relays |
| 417 | if state == super::health::HealthState::Dead { | 418 | if state == super::health::HealthState::Dead { |
| 418 | m.inc_dead_count(); | 419 | m.inc_dead_count(); |
| @@ -435,11 +436,7 @@ pub async fn connect_with_retry( | |||
| 435 | .get_remaining_backoff(url) | 436 | .get_remaining_backoff(url) |
| 436 | .unwrap_or(Duration::from_secs(5)); | 437 | .unwrap_or(Duration::from_secs(5)); |
| 437 | 438 | ||
| 438 | tracing::debug!( | 439 | tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url); |
| 439 | "Waiting {:?} before reconnecting to {}", | ||
| 440 | wait_duration, | ||
| 441 | url | ||
| 442 | ); | ||
| 443 | tokio::time::sleep(wait_duration).await; | 440 | tokio::time::sleep(wait_duration).await; |
| 444 | } | 441 | } |
| 445 | } | 442 | } |
| @@ -473,4 +470,4 @@ mod tests { | |||
| 473 | Some("relay.example.com".to_string()) | 470 | Some("relay.example.com".to_string()) |
| 474 | ); | 471 | ); |
| 475 | } | 472 | } |
| 476 | } \ No newline at end of file | 473 | } |