upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs41
1 files changed, 19 insertions, 22 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index e921185..61a33f8 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -70,10 +70,8 @@ impl SyncConnection {
70 tracing::info!("Sync connection established to {}", url); 70 tracing::info!("Sync connection established to {}", url);
71 71
72 // Create subscription manager for this connection 72 // Create subscription manager for this connection
73 let subscription_manager = SubscriptionManager::new( 73 let subscription_manager =
74 filter_service.clone(), 74 SubscriptionManager::new(filter_service.clone(), remote_domain.to_string());
75 remote_domain.to_string(),
76 );
77 75
78 Ok(Self { 76 Ok(Self {
79 url: url.to_string(), 77 url: url.to_string(),
@@ -208,10 +206,8 @@ impl SyncConnection {
208 /// - kind 30617/30618: New announcement → add Layer 2 subscription 206 /// - kind 30617/30618: New announcement → add Layer 2 subscription
209 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription 207 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
210 async fn handle_dynamic_subscription(&mut self, event: &Event) { 208 async fn handle_dynamic_subscription(&mut self, event: &Event) {
211 let kind = event.kind.as_u16();
212
213 // Check if this is an announcement kind (triggers Layer 2 subscription) 209 // Check if this is an announcement kind (triggers Layer 2 subscription)
214 if SubscriptionManager::is_announcement_kind(kind) { 210 if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
215 if let Some(new_filters) = self.subscription_manager.add_announcement(event) { 211 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
216 tracing::info!( 212 tracing::info!(
217 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", 213 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
@@ -224,8 +220,11 @@ impl SyncConnection {
224 } 220 }
225 } 221 }
226 222
227 // Check if this is a PR/Issue kind (triggers Layer 3 subscription) 223 // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription)
228 if SubscriptionManager::is_pr_issue_kind(kind) { 224 if matches!(
225 event.kind,
226 Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618)
227 ) {
229 if let Some(new_filters) = self.subscription_manager.add_event(event) { 228 if let Some(new_filters) = self.subscription_manager.add_event(event) {
230 tracing::info!( 229 tracing::info!(
231 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", 230 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
@@ -366,11 +365,13 @@ pub async fn connect_with_retry(
366 ); 365 );
367 } 366 }
368 367
369 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { 368 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone())
369 .await
370 {
370 Ok(conn) => { 371 Ok(conn) => {
371 // Record successful connection 372 // Record successful connection
372 health_tracker.record_success(url); 373 health_tracker.record_success(url);
373 374
374 // Record metrics 375 // Record metrics
375 if let Some(ref m) = metrics { 376 if let Some(ref m) = metrics {
376 m.record_connection_attempt(url, true); 377 m.record_connection_attempt(url, true);
@@ -379,7 +380,7 @@ pub async fn connect_with_retry(
379 m.record_health_state(url, health_tracker.get_state(url)); 380 m.record_health_state(url, health_tracker.get_state(url));
380 m.record_failure_count(url, 0); 381 m.record_failure_count(url, 0);
381 } 382 }
382 383
383 tracing::info!("Sync connection established to {}", url); 384 tracing::info!("Sync connection established to {}", url);
384 385
385 // Run the connection (this blocks until disconnection) 386 // Run the connection (this blocks until disconnection)
@@ -388,7 +389,7 @@ pub async fn connect_with_retry(
388 // Connection ended - record as failure for reconnection backoff 389 // Connection ended - record as failure for reconnection backoff
389 // (The connection ending is considered a failure even if it worked for a while) 390 // (The connection ending is considered a failure even if it worked for a while)
390 health_tracker.record_failure(url); 391 health_tracker.record_failure(url);
391 392
392 // Update metrics for disconnection 393 // Update metrics for disconnection
393 if let Some(ref m) = metrics { 394 if let Some(ref m) = metrics {
394 m.set_relay_connected(url, false); 395 m.set_relay_connected(url, false);
@@ -396,7 +397,7 @@ pub async fn connect_with_retry(
396 m.record_health_state(url, health_tracker.get_state(url)); 397 m.record_health_state(url, health_tracker.get_state(url));
397 m.record_failure_count(url, health_tracker.get_failure_count(url)); 398 m.record_failure_count(url, health_tracker.get_failure_count(url));
398 } 399 }
399 400
400 tracing::warn!("Sync connection to {} ended, will reconnect", url); 401 tracing::warn!("Sync connection to {} ended, will reconnect", url);
401 } 402 }
402 Err(e) => { 403 Err(e) => {
@@ -405,14 +406,14 @@ pub async fn connect_with_retry(
405 406
406 let failure_count = health_tracker.get_failure_count(url); 407 let failure_count = health_tracker.get_failure_count(url);
407 let state = health_tracker.get_state(url); 408 let state = health_tracker.get_state(url);
408 409
409 // Record metrics 410 // Record metrics
410 if let Some(ref m) = metrics { 411 if let Some(ref m) = metrics {
411 m.record_connection_attempt(url, false); 412 m.record_connection_attempt(url, false);
412 m.set_relay_connected(url, false); 413 m.set_relay_connected(url, false);
413 m.record_health_state(url, state); 414 m.record_health_state(url, state);
414 m.record_failure_count(url, failure_count); 415 m.record_failure_count(url, failure_count);
415 416
416 // Track dead relays 417 // Track dead relays
417 if state == super::health::HealthState::Dead { 418 if state == super::health::HealthState::Dead {
418 m.inc_dead_count(); 419 m.inc_dead_count();
@@ -435,11 +436,7 @@ pub async fn connect_with_retry(
435 .get_remaining_backoff(url) 436 .get_remaining_backoff(url)
436 .unwrap_or(Duration::from_secs(5)); 437 .unwrap_or(Duration::from_secs(5));
437 438
438 tracing::debug!( 439 tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url);
439 "Waiting {:?} before reconnecting to {}",
440 wait_duration,
441 url
442 );
443 tokio::time::sleep(wait_duration).await; 440 tokio::time::sleep(wait_duration).await;
444 } 441 }
445} 442}
@@ -473,4 +470,4 @@ mod tests {
473 Some("relay.example.com".to_string()) 470 Some("relay.example.com".to_string())
474 ); 471 );
475 } 472 }
476} \ No newline at end of file 473}