diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-05 16:37:09 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-05 16:37:09 +0000 |
| commit | b4da09689ee0bd6ac327a6ed7ffb01e2175e2596 (patch) | |
| tree | cb662fb7da60568a959c75526c501ba76c0f4043 /src/sync | |
| parent | b7aa7b72c189290b45fb388ec1826862bc8dda49 (diff) | |
remove stupid tests and methods
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/connection.rs | 41 | ||||
| -rw-r--r-- | src/sync/subscription.rs | 71 |
2 files changed, 30 insertions, 82 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs index e921185..61a33f8 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs | |||
| @@ -70,10 +70,8 @@ impl SyncConnection { | |||
| 70 | tracing::info!("Sync connection established to {}", url); | 70 | tracing::info!("Sync connection established to {}", url); |
| 71 | 71 | ||
| 72 | // Create subscription manager for this connection | 72 | // Create subscription manager for this connection |
| 73 | let subscription_manager = SubscriptionManager::new( | 73 | let subscription_manager = |
| 74 | filter_service.clone(), | 74 | SubscriptionManager::new(filter_service.clone(), remote_domain.to_string()); |
| 75 | remote_domain.to_string(), | ||
| 76 | ); | ||
| 77 | 75 | ||
| 78 | Ok(Self { | 76 | Ok(Self { |
| 79 | url: url.to_string(), | 77 | url: url.to_string(), |
| @@ -208,10 +206,8 @@ impl SyncConnection { | |||
| 208 | /// - kind 30617/30618: New announcement → add Layer 2 subscription | 206 | /// - kind 30617/30618: New announcement → add Layer 2 subscription |
| 209 | /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription | 207 | /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription |
| 210 | async fn handle_dynamic_subscription(&mut self, event: &Event) { | 208 | async fn handle_dynamic_subscription(&mut self, event: &Event) { |
| 211 | let kind = event.kind.as_u16(); | ||
| 212 | |||
| 213 | // Check if this is an announcement kind (triggers Layer 2 subscription) | 209 | // Check if this is an announcement kind (triggers Layer 2 subscription) |
| 214 | if SubscriptionManager::is_announcement_kind(kind) { | 210 | if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) { |
| 215 | if let Some(new_filters) = self.subscription_manager.add_announcement(event) { | 211 | if let Some(new_filters) = self.subscription_manager.add_announcement(event) { |
| 216 | tracing::info!( | 212 | tracing::info!( |
| 217 | "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", | 213 | "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", |
| @@ -224,8 +220,11 @@ impl SyncConnection { | |||
| 224 | } | 220 | } |
| 225 | } | 221 | } |
| 226 | 222 | ||
| 227 | // Check if this is a PR/Issue kind (triggers Layer 3 subscription) | 223 | // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription) |
| 228 | if SubscriptionManager::is_pr_issue_kind(kind) { | 224 | if matches!( |
| 225 | event.kind, | ||
| 226 | Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618) | ||
| 227 | ) { | ||
| 229 | if let Some(new_filters) = self.subscription_manager.add_event(event) { | 228 | if let Some(new_filters) = self.subscription_manager.add_event(event) { |
| 230 | tracing::info!( | 229 | tracing::info!( |
| 231 | "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", | 230 | "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", |
| @@ -366,11 +365,13 @@ pub async fn connect_with_retry( | |||
| 366 | ); | 365 | ); |
| 367 | } | 366 | } |
| 368 | 367 | ||
| 369 | match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { | 368 | match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()) |
| 369 | .await | ||
| 370 | { | ||
| 370 | Ok(conn) => { | 371 | Ok(conn) => { |
| 371 | // Record successful connection | 372 | // Record successful connection |
| 372 | health_tracker.record_success(url); | 373 | health_tracker.record_success(url); |
| 373 | 374 | ||
| 374 | // Record metrics | 375 | // Record metrics |
| 375 | if let Some(ref m) = metrics { | 376 | if let Some(ref m) = metrics { |
| 376 | m.record_connection_attempt(url, true); | 377 | m.record_connection_attempt(url, true); |
| @@ -379,7 +380,7 @@ pub async fn connect_with_retry( | |||
| 379 | m.record_health_state(url, health_tracker.get_state(url)); | 380 | m.record_health_state(url, health_tracker.get_state(url)); |
| 380 | m.record_failure_count(url, 0); | 381 | m.record_failure_count(url, 0); |
| 381 | } | 382 | } |
| 382 | 383 | ||
| 383 | tracing::info!("Sync connection established to {}", url); | 384 | tracing::info!("Sync connection established to {}", url); |
| 384 | 385 | ||
| 385 | // Run the connection (this blocks until disconnection) | 386 | // Run the connection (this blocks until disconnection) |
| @@ -388,7 +389,7 @@ pub async fn connect_with_retry( | |||
| 388 | // Connection ended - record as failure for reconnection backoff | 389 | // Connection ended - record as failure for reconnection backoff |
| 389 | // (The connection ending is considered a failure even if it worked for a while) | 390 | // (The connection ending is considered a failure even if it worked for a while) |
| 390 | health_tracker.record_failure(url); | 391 | health_tracker.record_failure(url); |
| 391 | 392 | ||
| 392 | // Update metrics for disconnection | 393 | // Update metrics for disconnection |
| 393 | if let Some(ref m) = metrics { | 394 | if let Some(ref m) = metrics { |
| 394 | m.set_relay_connected(url, false); | 395 | m.set_relay_connected(url, false); |
| @@ -396,7 +397,7 @@ pub async fn connect_with_retry( | |||
| 396 | m.record_health_state(url, health_tracker.get_state(url)); | 397 | m.record_health_state(url, health_tracker.get_state(url)); |
| 397 | m.record_failure_count(url, health_tracker.get_failure_count(url)); | 398 | m.record_failure_count(url, health_tracker.get_failure_count(url)); |
| 398 | } | 399 | } |
| 399 | 400 | ||
| 400 | tracing::warn!("Sync connection to {} ended, will reconnect", url); | 401 | tracing::warn!("Sync connection to {} ended, will reconnect", url); |
| 401 | } | 402 | } |
| 402 | Err(e) => { | 403 | Err(e) => { |
| @@ -405,14 +406,14 @@ pub async fn connect_with_retry( | |||
| 405 | 406 | ||
| 406 | let failure_count = health_tracker.get_failure_count(url); | 407 | let failure_count = health_tracker.get_failure_count(url); |
| 407 | let state = health_tracker.get_state(url); | 408 | let state = health_tracker.get_state(url); |
| 408 | 409 | ||
| 409 | // Record metrics | 410 | // Record metrics |
| 410 | if let Some(ref m) = metrics { | 411 | if let Some(ref m) = metrics { |
| 411 | m.record_connection_attempt(url, false); | 412 | m.record_connection_attempt(url, false); |
| 412 | m.set_relay_connected(url, false); | 413 | m.set_relay_connected(url, false); |
| 413 | m.record_health_state(url, state); | 414 | m.record_health_state(url, state); |
| 414 | m.record_failure_count(url, failure_count); | 415 | m.record_failure_count(url, failure_count); |
| 415 | 416 | ||
| 416 | // Track dead relays | 417 | // Track dead relays |
| 417 | if state == super::health::HealthState::Dead { | 418 | if state == super::health::HealthState::Dead { |
| 418 | m.inc_dead_count(); | 419 | m.inc_dead_count(); |
| @@ -435,11 +436,7 @@ pub async fn connect_with_retry( | |||
| 435 | .get_remaining_backoff(url) | 436 | .get_remaining_backoff(url) |
| 436 | .unwrap_or(Duration::from_secs(5)); | 437 | .unwrap_or(Duration::from_secs(5)); |
| 437 | 438 | ||
| 438 | tracing::debug!( | 439 | tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url); |
| 439 | "Waiting {:?} before reconnecting to {}", | ||
| 440 | wait_duration, | ||
| 441 | url | ||
| 442 | ); | ||
| 443 | tokio::time::sleep(wait_duration).await; | 440 | tokio::time::sleep(wait_duration).await; |
| 444 | } | 441 | } |
| 445 | } | 442 | } |
| @@ -473,4 +470,4 @@ mod tests { | |||
| 473 | Some("relay.example.com".to_string()) | 470 | Some("relay.example.com".to_string()) |
| 474 | ); | 471 | ); |
| 475 | } | 472 | } |
| 476 | } \ No newline at end of file | 473 | } |
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs index c37404f..bbeaa2a 100644 --- a/src/sync/subscription.rs +++ b/src/sync/subscription.rs | |||
| @@ -26,12 +26,6 @@ use super::filter::FilterService; | |||
| 26 | /// Maximum number of filters before consolidation is triggered | 26 | /// Maximum number of filters before consolidation is triggered |
| 27 | const CONSOLIDATION_THRESHOLD: usize = 150; | 27 | const CONSOLIDATION_THRESHOLD: usize = 150; |
| 28 | 28 | ||
| 29 | /// Kind 30617 - Repository Announcement (NIP-34) | ||
| 30 | const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617; | ||
| 31 | |||
| 32 | /// Kind 30618 - Maintainer List (NIP-34) | ||
| 33 | const KIND_MAINTAINER_LIST: u16 = 30618; | ||
| 34 | |||
| 35 | /// Manages subscriptions for a single relay connection | 29 | /// Manages subscriptions for a single relay connection |
| 36 | /// | 30 | /// |
| 37 | /// Tracks which announcements and events have been subscribed to, | 31 | /// Tracks which announcements and events have been subscribed to, |
| @@ -113,10 +107,7 @@ impl SubscriptionManager { | |||
| 113 | 107 | ||
| 114 | // Build Layer 3 filter for this event | 108 | // Build Layer 3 filter for this event |
| 115 | // Layer 3 filters target events with 'e' tags pointing to this event | 109 | // Layer 3 filters target events with 'e' tags pointing to this event |
| 116 | let filter = Filter::new().custom_tag( | 110 | let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id); |
| 117 | SingleLetterTag::lowercase(Alphabet::E), | ||
| 118 | event_id, | ||
| 119 | ); | ||
| 120 | 111 | ||
| 121 | Some(vec![filter]) | 112 | Some(vec![filter]) |
| 122 | } | 113 | } |
| @@ -212,67 +203,27 @@ impl SubscriptionManager { | |||
| 212 | } | 203 | } |
| 213 | }; | 204 | }; |
| 214 | 205 | ||
| 215 | // Determine the kind for the coordinate | 206 | // Verify this is an announcement kind |
| 216 | let kind = event.kind.as_u16(); | 207 | if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) { |
| 217 | if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST { | ||
| 218 | tracing::warn!( | 208 | tracing::warn!( |
| 219 | "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", | 209 | "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", |
| 220 | event.id.to_hex(), | 210 | event.id.to_hex(), |
| 221 | kind | 211 | event.kind |
| 222 | ); | 212 | ); |
| 223 | return Vec::new(); | 213 | return Vec::new(); |
| 224 | } | 214 | } |
| 225 | 215 | ||
| 226 | // Build the addressable coordinate: kind:pubkey:identifier | 216 | // Build the addressable coordinate: kind:pubkey:identifier |
| 227 | let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier); | 217 | let coord = format!( |
| 218 | "{}:{}:{}", | ||
| 219 | event.kind.as_u16(), | ||
| 220 | event.pubkey.to_hex(), | ||
| 221 | identifier | ||
| 222 | ); | ||
| 228 | 223 | ||
| 229 | // Create filter with 'a' tag for this coordinate | 224 | // Create filter with 'a' tag for this coordinate |
| 230 | let filter = Filter::new().custom_tag( | 225 | let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord); |
| 231 | SingleLetterTag::lowercase(Alphabet::A), | ||
| 232 | coord, | ||
| 233 | ); | ||
| 234 | 226 | ||
| 235 | vec![filter] | 227 | vec![filter] |
| 236 | } | 228 | } |
| 237 | |||
| 238 | /// Check if an event kind is an announcement kind | ||
| 239 | pub fn is_announcement_kind(kind: u16) -> bool { | ||
| 240 | kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST | ||
| 241 | } | ||
| 242 | |||
| 243 | /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3 | ||
| 244 | pub fn is_pr_issue_kind(kind: u16) -> bool { | ||
| 245 | matches!( | ||
| 246 | kind, | ||
| 247 | 1617 | // Patch proposal (NIP-34) | ||
| 248 | 1618 | // PR | ||
| 249 | 1619 | // PR Update | ||
| 250 | 1621 | // Issue | ||
| 251 | 1622 // Reply | ||
| 252 | ) | ||
| 253 | } | ||
| 254 | } | 229 | } |
| 255 | |||
| 256 | #[cfg(test)] | ||
| 257 | mod tests { | ||
| 258 | use super::SubscriptionManager; | ||
| 259 | |||
| 260 | #[test] | ||
| 261 | fn test_is_announcement_kind() { | ||
| 262 | assert!(SubscriptionManager::is_announcement_kind(30617)); | ||
| 263 | assert!(SubscriptionManager::is_announcement_kind(30618)); | ||
| 264 | assert!(!SubscriptionManager::is_announcement_kind(1)); | ||
| 265 | assert!(!SubscriptionManager::is_announcement_kind(1617)); | ||
| 266 | } | ||
| 267 | |||
| 268 | #[test] | ||
| 269 | fn test_is_pr_issue_kind() { | ||
| 270 | assert!(SubscriptionManager::is_pr_issue_kind(1617)); | ||
| 271 | assert!(SubscriptionManager::is_pr_issue_kind(1618)); | ||
| 272 | assert!(SubscriptionManager::is_pr_issue_kind(1619)); | ||
| 273 | assert!(SubscriptionManager::is_pr_issue_kind(1621)); | ||
| 274 | assert!(SubscriptionManager::is_pr_issue_kind(1622)); | ||
| 275 | assert!(!SubscriptionManager::is_pr_issue_kind(30617)); | ||
| 276 | assert!(!SubscriptionManager::is_pr_issue_kind(1)); | ||
| 277 | } | ||
| 278 | } \ No newline at end of file | ||