diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/algorithms.rs | 6 | ||||
| -rw-r--r-- | src/sync/filters.rs | 2 | ||||
| -rw-r--r-- | src/sync/health.rs | 19 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 10 | ||||
| -rw-r--r-- | src/sync/mod.rs | 8 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 24 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 42 |
7 files changed, 64 insertions, 47 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 7d87411..3063516 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -65,9 +65,7 @@ pub fn derive_relay_targets( | |||
| 65 | 65 | ||
| 66 | for (repo_id, needs) in repo_index { | 66 | for (repo_id, needs) in repo_index { |
| 67 | for relay_url in &needs.relays { | 67 | for relay_url in &needs.relays { |
| 68 | let entry = relay_targets | 68 | let entry = relay_targets.entry(relay_url.clone()).or_default(); |
| 69 | .entry(relay_url.clone()) | ||
| 70 | .or_insert_with(RelaySyncNeeds::default); | ||
| 71 | 69 | ||
| 72 | entry.repos.insert(repo_id.clone()); | 70 | entry.repos.insert(repo_id.clone()); |
| 73 | entry.root_events.extend(needs.root_events.iter().cloned()); | 71 | entry.root_events.extend(needs.root_events.iter().cloned()); |
| @@ -586,4 +584,4 @@ mod tests { | |||
| 586 | ); | 584 | ); |
| 587 | assert_eq!(actions[0].relay_url, "wss://new-relay.com"); | 585 | assert_eq!(actions[0].relay_url, "wss://new-relay.com"); |
| 588 | } | 586 | } |
| 589 | } \ No newline at end of file | 587 | } |
diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 02d580e..24e9bb2 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs | |||
| @@ -337,4 +337,4 @@ mod tests { | |||
| 337 | 337 | ||
| 338 | assert_eq!(filters.len(), 6); | 338 | assert_eq!(filters.len(), 6); |
| 339 | } | 339 | } |
| 340 | } \ No newline at end of file | 340 | } |
diff --git a/src/sync/health.rs b/src/sync/health.rs index f9a5f3a..0ae7dee 100644 --- a/src/sync/health.rs +++ b/src/sync/health.rs | |||
| @@ -206,11 +206,7 @@ impl RelayHealthTracker { | |||
| 206 | health.next_retry_at = Some(now + backoff); | 206 | health.next_retry_at = Some(now + backoff); |
| 207 | 207 | ||
| 208 | if old_state != HealthState::Degraded { | 208 | if old_state != HealthState::Degraded { |
| 209 | tracing::warn!( | 209 | tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff); |
| 210 | "Relay {} degraded, backoff {:?}", | ||
| 211 | relay_url, | ||
| 212 | backoff | ||
| 213 | ); | ||
| 214 | } else { | 210 | } else { |
| 215 | tracing::debug!( | 211 | tracing::debug!( |
| 216 | "Relay {} failure #{}, backoff {:?}", | 212 | "Relay {} failure #{}, backoff {:?}", |
| @@ -308,12 +304,17 @@ impl RelayHealthTracker { | |||
| 308 | 304 | ||
| 309 | /// Get all tracked relay URLs | 305 | /// Get all tracked relay URLs |
| 310 | pub fn get_tracked_relays(&self) -> Vec<String> { | 306 | pub fn get_tracked_relays(&self) -> Vec<String> { |
| 311 | self.health.iter().map(|entry| entry.key().clone()).collect() | 307 | self.health |
| 308 | .iter() | ||
| 309 | .map(|entry| entry.key().clone()) | ||
| 310 | .collect() | ||
| 312 | } | 311 | } |
| 313 | 312 | ||
| 314 | /// Get a clone of the health info for a relay | 313 | /// Get a clone of the health info for a relay |
| 315 | pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> { | 314 | pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> { |
| 316 | self.health.get(relay_url).map(|entry| entry.value().clone()) | 315 | self.health |
| 316 | .get(relay_url) | ||
| 317 | .map(|entry| entry.value().clone()) | ||
| 317 | } | 318 | } |
| 318 | } | 319 | } |
| 319 | 320 | ||
| @@ -369,7 +370,7 @@ mod tests { | |||
| 369 | fn test_backoff_increases_exponentially() { | 370 | fn test_backoff_increases_exponentially() { |
| 370 | let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds | 371 | let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds |
| 371 | let max = 3600u64; | 372 | let max = 3600u64; |
| 372 | 373 | ||
| 373 | // failure 1: 5s (base * 2^0 = 5) | 374 | // failure 1: 5s (base * 2^0 = 5) |
| 374 | assert_eq!( | 375 | assert_eq!( |
| 375 | RelayHealthTracker::get_backoff_duration(1, base, max), | 376 | RelayHealthTracker::get_backoff_duration(1, base, max), |
| @@ -498,4 +499,4 @@ mod tests { | |||
| 498 | let health = tracker.get_health("wss://nonexistent.example.com"); | 499 | let health = tracker.get_health("wss://nonexistent.example.com"); |
| 499 | assert!(health.is_none()); | 500 | assert!(health.is_none()); |
| 500 | } | 501 | } |
| 501 | } \ No newline at end of file | 502 | } |
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 411ff63..d917dc0 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -207,7 +207,9 @@ impl SyncMetrics { | |||
| 207 | HealthState::Degraded => 2, | 207 | HealthState::Degraded => 2, |
| 208 | HealthState::Dead => 3, | 208 | HealthState::Dead => 3, |
| 209 | }; | 209 | }; |
| 210 | self.relay_status.with_label_values(&[relay]).set(state_value); | 210 | self.relay_status |
| 211 | .with_label_values(&[relay]) | ||
| 212 | .set(state_value); | ||
| 211 | } | 213 | } |
| 212 | 214 | ||
| 213 | /// Record relay failure count. | 215 | /// Record relay failure count. |
| @@ -259,9 +261,7 @@ impl SyncMetrics { | |||
| 259 | /// * `source` - The event source type (see [`record_event`](Self::record_event)) | 261 | /// * `source` - The event source type (see [`record_event`](Self::record_event)) |
| 260 | /// * `count` - Number of events to record | 262 | /// * `count` - Number of events to record |
| 261 | pub fn record_events(&self, source: &str, count: u64) { | 263 | pub fn record_events(&self, source: &str, count: u64) { |
| 262 | self.events_total | 264 | self.events_total.with_label_values(&[source]).inc_by(count); |
| 263 | .with_label_values(&[source]) | ||
| 264 | .inc_by(count); | ||
| 265 | } | 265 | } |
| 266 | 266 | ||
| 267 | /// Record a gap event filled during catchup. | 267 | /// Record a gap event filled during catchup. |
| @@ -451,4 +451,4 @@ mod tests { | |||
| 451 | let metrics2 = SyncMetrics::register(®istry); | 451 | let metrics2 = SyncMetrics::register(®istry); |
| 452 | assert!(metrics2.is_err()); | 452 | assert!(metrics2.is_err()); |
| 453 | } | 453 | } |
| 454 | } \ No newline at end of file | 454 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index c4c3c7f..fb59b3c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -512,8 +512,8 @@ impl SyncManager { | |||
| 512 | }; | 512 | }; |
| 513 | 513 | ||
| 514 | // Check if relay supports NIP-77 negentropy AND negentropy is not disabled | 514 | // Check if relay supports NIP-77 negentropy AND negentropy is not disabled |
| 515 | let use_negentropy = !self.config.sync_disable_negentropy | 515 | let use_negentropy = |
| 516 | && connection.supports_negentropy().await; | 516 | !self.config.sync_disable_negentropy && connection.supports_negentropy().await; |
| 517 | 517 | ||
| 518 | // Unsubscribe all current subscriptions | 518 | // Unsubscribe all current subscriptions |
| 519 | connection.unsubscribe_all().await; | 519 | connection.unsubscribe_all().await; |
| @@ -1657,12 +1657,12 @@ impl SyncManager { | |||
| 1657 | 1657 | ||
| 1658 | let layer1_filters = 1; | 1658 | let layer1_filters = 1; |
| 1659 | let layer2_filters = if repo_count > 0 { | 1659 | let layer2_filters = if repo_count > 0 { |
| 1660 | ((repo_count + 99) / 100) * 3 | 1660 | repo_count.div_ceil(100) * 3 |
| 1661 | } else { | 1661 | } else { |
| 1662 | 0 | 1662 | 0 |
| 1663 | }; | 1663 | }; |
| 1664 | let layer3_filters = if event_count > 0 { | 1664 | let layer3_filters = if event_count > 0 { |
| 1665 | ((event_count + 99) / 100) * 3 | 1665 | event_count.div_ceil(100) * 3 |
| 1666 | } else { | 1666 | } else { |
| 1667 | 0 | 1667 | 0 |
| 1668 | }; | 1668 | }; |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index fae179b..4167a0c 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -150,17 +150,21 @@ impl RelayConnection { | |||
| 150 | // | 150 | // |
| 151 | // See: nostr-sdk-0.44 Client::try_connect_relay documentation | 151 | // See: nostr-sdk-0.44 Client::try_connect_relay documentation |
| 152 | self.client | 152 | self.client |
| 153 | .try_connect_relay(&self.url, std::time::Duration::from_secs(connection_timeout_secs)) | 153 | .try_connect_relay( |
| 154 | &self.url, | ||
| 155 | std::time::Duration::from_secs(connection_timeout_secs), | ||
| 156 | ) | ||
| 154 | .await | 157 | .await |
| 155 | .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; | 158 | .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; |
| 156 | 159 | ||
| 157 | // Subscribe to Layer 1 (announcements) | 160 | // Subscribe to Layer 1 (announcements) |
| 158 | let filter = build_announcement_filter(since); | 161 | let filter = build_announcement_filter(since); |
| 159 | let output = self | 162 | let output = self.client.subscribe(filter, None).await.map_err(|e| { |
| 160 | .client | 163 | format!( |
| 161 | .subscribe(filter, None) | 164 | "Failed to subscribe to announcements on {}: {}", |
| 162 | .await | 165 | self.url, e |
| 163 | .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?; | 166 | ) |
| 167 | })?; | ||
| 164 | 168 | ||
| 165 | tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); | 169 | tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); |
| 166 | Ok(output.val) | 170 | Ok(output.val) |
| @@ -250,7 +254,8 @@ impl RelayConnection { | |||
| 250 | } | 254 | } |
| 251 | RelayMessage::Closed { message: msg, .. } => { | 255 | RelayMessage::Closed { message: msg, .. } => { |
| 252 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); | 256 | tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); |
| 253 | let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; | 257 | let _ = |
| 258 | event_sender.send(RelayEvent::Closed(msg.to_string())).await; | ||
| 254 | break; | 259 | break; |
| 255 | } | 260 | } |
| 256 | _ => {} | 261 | _ => {} |
| @@ -421,7 +426,10 @@ impl RelayConnection { | |||
| 421 | /// - Relay doesn't actually support NIP-77 (despite claiming to) | 426 | /// - Relay doesn't actually support NIP-77 (despite claiming to) |
| 422 | /// - Network errors during reconciliation | 427 | /// - Network errors during reconciliation |
| 423 | /// - Timeout during sync | 428 | /// - Timeout during sync |
| 424 | pub async fn negentropy_sync_filter(&self, filter: Filter) -> Result<NegentropySyncResult, String> { | 429 | pub async fn negentropy_sync_filter( |
| 430 | &self, | ||
| 431 | filter: Filter, | ||
| 432 | ) -> Result<NegentropySyncResult, String> { | ||
| 425 | // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange | 433 | // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange |
| 426 | let sync_opts = SyncOptions::default(); | 434 | let sync_opts = SyncOptions::default(); |
| 427 | 435 | ||
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index f83b081..e29e45b 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -49,7 +49,12 @@ impl PendingUpdates { | |||
| 49 | } | 49 | } |
| 50 | 50 | ||
| 51 | /// Add or update a repo with its relays and root events | 51 | /// Add or update a repo with its relays and root events |
| 52 | fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) { | 52 | fn add_repo( |
| 53 | &mut self, | ||
| 54 | repo_id: String, | ||
| 55 | relays: HashSet<String>, | ||
| 56 | root_events: HashSet<EventId>, | ||
| 57 | ) { | ||
| 53 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | 58 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { |
| 54 | relays: HashSet::new(), | 59 | relays: HashSet::new(), |
| 55 | root_events: HashSet::new(), | 60 | root_events: HashSet::new(), |
| @@ -251,9 +256,9 @@ impl SelfSubscriber { | |||
| 251 | /// | 256 | /// |
| 252 | /// Returns true if any extracted relay URL contains our domain | 257 | /// Returns true if any extracted relay URL contains our domain |
| 253 | fn lists_our_relay(&self, event: &Event) -> bool { | 258 | fn lists_our_relay(&self, event: &Event) -> bool { |
| 254 | Self::extract_relay_urls(event).iter().any(|url| { | 259 | Self::extract_relay_urls(event) |
| 255 | url.contains(&self.relay_domain) || url == &self.own_relay_url | 260 | .iter() |
| 256 | }) | 261 | .any(|url| url.contains(&self.relay_domain) || url == &self.own_relay_url) |
| 257 | } | 262 | } |
| 258 | 263 | ||
| 259 | /// Main run loop | 264 | /// Main run loop |
| @@ -413,21 +418,21 @@ impl SelfSubscriber { | |||
| 413 | if let Some(repo_sync) = index.get_mut(&repo_ref) { | 418 | if let Some(repo_sync) = index.get_mut(&repo_ref) { |
| 414 | // Add event.id to root_events set in the index (immediate availability) | 419 | // Add event.id to root_events set in the index (immediate availability) |
| 415 | repo_sync.root_events.insert(event.id); | 420 | repo_sync.root_events.insert(event.id); |
| 416 | 421 | ||
| 417 | // Clone the relays before releasing the lock - Layer 3 filters need to be | 422 | // Clone the relays before releasing the lock - Layer 3 filters need to be |
| 418 | // sent to the same relays as Layer 2 filters for this repo | 423 | // sent to the same relays as Layer 2 filters for this repo |
| 419 | let relays = repo_sync.relays.clone(); | 424 | let relays = repo_sync.relays.clone(); |
| 420 | 425 | ||
| 421 | // Release lock before modifying pending | 426 | // Release lock before modifying pending |
| 422 | drop(index); | 427 | drop(index); |
| 423 | 428 | ||
| 424 | // Also add root event to pending - this ensures batch processing runs | 429 | // Also add root event to pending - this ensures batch processing runs |
| 425 | // and creates Layer 3 filters for events referencing this root event. | 430 | // and creates Layer 3 filters for events referencing this root event. |
| 426 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! | 431 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! |
| 427 | let mut root_events = HashSet::new(); | 432 | let mut root_events = HashSet::new(); |
| 428 | root_events.insert(event.id); | 433 | root_events.insert(event.id); |
| 429 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); | 434 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); |
| 430 | 435 | ||
| 431 | tracing::debug!( | 436 | tracing::debug!( |
| 432 | event_id = %event.id, | 437 | event_id = %event.id, |
| 433 | repo_ref = %repo_ref, | 438 | repo_ref = %repo_ref, |
| @@ -475,10 +480,12 @@ impl SelfSubscriber { | |||
| 475 | 480 | ||
| 476 | for (repo_id, needs) in updates { | 481 | for (repo_id, needs) in updates { |
| 477 | // Merge with existing entry or insert new | 482 | // Merge with existing entry or insert new |
| 478 | let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { | 483 | let entry = index |
| 479 | relays: HashSet::new(), | 484 | .entry(repo_id.clone()) |
| 480 | root_events: HashSet::new(), | 485 | .or_insert_with(|| RepoSyncNeeds { |
| 481 | }); | 486 | relays: HashSet::new(), |
| 487 | root_events: HashSet::new(), | ||
| 488 | }); | ||
| 482 | entry.relays.extend(needs.relays); | 489 | entry.relays.extend(needs.relays); |
| 483 | entry.root_events.extend(needs.root_events); | 490 | entry.root_events.extend(needs.root_events); |
| 484 | 491 | ||
| @@ -556,7 +563,7 @@ fn clone_url_to_relay_url(clone_url: &str) -> Option<String> { | |||
| 556 | } else { | 563 | } else { |
| 557 | return None; | 564 | return None; |
| 558 | }; | 565 | }; |
| 559 | 566 | ||
| 560 | // Extract just the host:port part (everything before the first /) | 567 | // Extract just the host:port part (everything before the first /) |
| 561 | let host_port = rest.split('/').next()?; | 568 | let host_port = rest.split('/').next()?; |
| 562 | Some(format!("{}{}", ws_scheme, host_port)) | 569 | Some(format!("{}{}", ws_scheme, host_port)) |
| @@ -581,7 +588,7 @@ mod tests { | |||
| 581 | Some("ws://localhost:3000".to_string()) | 588 | Some("ws://localhost:3000".to_string()) |
| 582 | ); | 589 | ); |
| 583 | } | 590 | } |
| 584 | 591 | ||
| 585 | #[test] | 592 | #[test] |
| 586 | fn test_clone_url_to_relay_url_with_port() { | 593 | fn test_clone_url_to_relay_url_with_port() { |
| 587 | assert_eq!( | 594 | assert_eq!( |
| @@ -593,6 +600,9 @@ mod tests { | |||
| 593 | #[test] | 600 | #[test] |
| 594 | fn test_clone_url_to_relay_url_unsupported() { | 601 | fn test_clone_url_to_relay_url_unsupported() { |
| 595 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); | 602 | assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); |
| 596 | assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); | 603 | assert_eq!( |
| 604 | clone_url_to_relay_url("ssh://git@example.com/repo.git"), | ||
| 605 | None | ||
| 606 | ); | ||
| 597 | } | 607 | } |
| 598 | } \ No newline at end of file | 608 | } |