upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 16:53:03 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 16:53:03 +0000
commit2a9160836bb87fdea3ae891563b0169c68d1c2ab (patch)
tree583c890687beaf7f380fc0be131bdf17485f06fa /src/sync
parent52489d3b1a7d79e164b4cc901b53fd06c05ce1b1 (diff)
fix: resolve all fmt and clippy warnings
Main lib (src/): - Add #[allow(dead_code)] for build_info field (stored to prevent Prometheus unregistration) - Add #[allow(dead_code)] for first_seen field (reserved for future rate limiting) - Replace .or_insert_with(RelaySyncNeeds::default) with .or_default() - Replace manual div_ceil implementations with .div_ceil(100) Test code (tests/): - Replace .expect(&format!(...)) with .unwrap_or_else(|_| panic!(...)) - Remove needless borrows in fetch_metrics() calls - Add #[allow(dead_code)] and #[allow(unused_imports)] to test helpers module grasp-audit: - Apply cargo fmt to fix formatting
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs6
-rw-r--r--src/sync/filters.rs2
-rw-r--r--src/sync/health.rs19
-rw-r--r--src/sync/metrics.rs10
-rw-r--r--src/sync/mod.rs8
-rw-r--r--src/sync/relay_connection.rs24
-rw-r--r--src/sync/self_subscriber.rs42
7 files changed, 64 insertions, 47 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 7d87411..3063516 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -65,9 +65,7 @@ pub fn derive_relay_targets(
65 65
66 for (repo_id, needs) in repo_index { 66 for (repo_id, needs) in repo_index {
67 for relay_url in &needs.relays { 67 for relay_url in &needs.relays {
68 let entry = relay_targets 68 let entry = relay_targets.entry(relay_url.clone()).or_default();
69 .entry(relay_url.clone())
70 .or_insert_with(RelaySyncNeeds::default);
71 69
72 entry.repos.insert(repo_id.clone()); 70 entry.repos.insert(repo_id.clone());
73 entry.root_events.extend(needs.root_events.iter().cloned()); 71 entry.root_events.extend(needs.root_events.iter().cloned());
@@ -586,4 +584,4 @@ mod tests {
586 ); 584 );
587 assert_eq!(actions[0].relay_url, "wss://new-relay.com"); 585 assert_eq!(actions[0].relay_url, "wss://new-relay.com");
588 } 586 }
589} \ No newline at end of file 587}
diff --git a/src/sync/filters.rs b/src/sync/filters.rs
index 02d580e..24e9bb2 100644
--- a/src/sync/filters.rs
+++ b/src/sync/filters.rs
@@ -337,4 +337,4 @@ mod tests {
337 337
338 assert_eq!(filters.len(), 6); 338 assert_eq!(filters.len(), 6);
339 } 339 }
340} \ No newline at end of file 340}
diff --git a/src/sync/health.rs b/src/sync/health.rs
index f9a5f3a..0ae7dee 100644
--- a/src/sync/health.rs
+++ b/src/sync/health.rs
@@ -206,11 +206,7 @@ impl RelayHealthTracker {
206 health.next_retry_at = Some(now + backoff); 206 health.next_retry_at = Some(now + backoff);
207 207
208 if old_state != HealthState::Degraded { 208 if old_state != HealthState::Degraded {
209 tracing::warn!( 209 tracing::warn!("Relay {} degraded, backoff {:?}", relay_url, backoff);
210 "Relay {} degraded, backoff {:?}",
211 relay_url,
212 backoff
213 );
214 } else { 210 } else {
215 tracing::debug!( 211 tracing::debug!(
216 "Relay {} failure #{}, backoff {:?}", 212 "Relay {} failure #{}, backoff {:?}",
@@ -308,12 +304,17 @@ impl RelayHealthTracker {
308 304
309 /// Get all tracked relay URLs 305 /// Get all tracked relay URLs
310 pub fn get_tracked_relays(&self) -> Vec<String> { 306 pub fn get_tracked_relays(&self) -> Vec<String> {
311 self.health.iter().map(|entry| entry.key().clone()).collect() 307 self.health
308 .iter()
309 .map(|entry| entry.key().clone())
310 .collect()
312 } 311 }
313 312
314 /// Get a clone of the health info for a relay 313 /// Get a clone of the health info for a relay
315 pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> { 314 pub fn get_health(&self, relay_url: &str) -> Option<RelayHealth> {
316 self.health.get(relay_url).map(|entry| entry.value().clone()) 315 self.health
316 .get(relay_url)
317 .map(|entry| entry.value().clone())
317 } 318 }
318} 319}
319 320
@@ -369,7 +370,7 @@ mod tests {
369 fn test_backoff_increases_exponentially() { 370 fn test_backoff_increases_exponentially() {
370 let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds 371 let base = DEFAULT_BASE_BACKOFF_SECS; // 5 seconds
371 let max = 3600u64; 372 let max = 3600u64;
372 373
373 // failure 1: 5s (base * 2^0 = 5) 374 // failure 1: 5s (base * 2^0 = 5)
374 assert_eq!( 375 assert_eq!(
375 RelayHealthTracker::get_backoff_duration(1, base, max), 376 RelayHealthTracker::get_backoff_duration(1, base, max),
@@ -498,4 +499,4 @@ mod tests {
498 let health = tracker.get_health("wss://nonexistent.example.com"); 499 let health = tracker.get_health("wss://nonexistent.example.com");
499 assert!(health.is_none()); 500 assert!(health.is_none());
500 } 501 }
501} \ No newline at end of file 502}
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index 411ff63..d917dc0 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -207,7 +207,9 @@ impl SyncMetrics {
207 HealthState::Degraded => 2, 207 HealthState::Degraded => 2,
208 HealthState::Dead => 3, 208 HealthState::Dead => 3,
209 }; 209 };
210 self.relay_status.with_label_values(&[relay]).set(state_value); 210 self.relay_status
211 .with_label_values(&[relay])
212 .set(state_value);
211 } 213 }
212 214
213 /// Record relay failure count. 215 /// Record relay failure count.
@@ -259,9 +261,7 @@ impl SyncMetrics {
259 /// * `source` - The event source type (see [`record_event`](Self::record_event)) 261 /// * `source` - The event source type (see [`record_event`](Self::record_event))
260 /// * `count` - Number of events to record 262 /// * `count` - Number of events to record
261 pub fn record_events(&self, source: &str, count: u64) { 263 pub fn record_events(&self, source: &str, count: u64) {
262 self.events_total 264 self.events_total.with_label_values(&[source]).inc_by(count);
263 .with_label_values(&[source])
264 .inc_by(count);
265 } 265 }
266 266
267 /// Record a gap event filled during catchup. 267 /// Record a gap event filled during catchup.
@@ -451,4 +451,4 @@ mod tests {
451 let metrics2 = SyncMetrics::register(&registry); 451 let metrics2 = SyncMetrics::register(&registry);
452 assert!(metrics2.is_err()); 452 assert!(metrics2.is_err());
453 } 453 }
454} \ No newline at end of file 454}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index c4c3c7f..fb59b3c 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -512,8 +512,8 @@ impl SyncManager {
512 }; 512 };
513 513
514 // Check if relay supports NIP-77 negentropy AND negentropy is not disabled 514 // Check if relay supports NIP-77 negentropy AND negentropy is not disabled
515 let use_negentropy = !self.config.sync_disable_negentropy 515 let use_negentropy =
516 && connection.supports_negentropy().await; 516 !self.config.sync_disable_negentropy && connection.supports_negentropy().await;
517 517
518 // Unsubscribe all current subscriptions 518 // Unsubscribe all current subscriptions
519 connection.unsubscribe_all().await; 519 connection.unsubscribe_all().await;
@@ -1657,12 +1657,12 @@ impl SyncManager {
1657 1657
1658 let layer1_filters = 1; 1658 let layer1_filters = 1;
1659 let layer2_filters = if repo_count > 0 { 1659 let layer2_filters = if repo_count > 0 {
1660 ((repo_count + 99) / 100) * 3 1660 repo_count.div_ceil(100) * 3
1661 } else { 1661 } else {
1662 0 1662 0
1663 }; 1663 };
1664 let layer3_filters = if event_count > 0 { 1664 let layer3_filters = if event_count > 0 {
1665 ((event_count + 99) / 100) * 3 1665 event_count.div_ceil(100) * 3
1666 } else { 1666 } else {
1667 0 1667 0
1668 }; 1668 };
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index fae179b..4167a0c 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -150,17 +150,21 @@ impl RelayConnection {
150 // 150 //
151 // See: nostr-sdk-0.44 Client::try_connect_relay documentation 151 // See: nostr-sdk-0.44 Client::try_connect_relay documentation
152 self.client 152 self.client
153 .try_connect_relay(&self.url, std::time::Duration::from_secs(connection_timeout_secs)) 153 .try_connect_relay(
154 &self.url,
155 std::time::Duration::from_secs(connection_timeout_secs),
156 )
154 .await 157 .await
155 .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?; 158 .map_err(|e| format!("Failed to connect to relay {}: {}", self.url, e))?;
156 159
157 // Subscribe to Layer 1 (announcements) 160 // Subscribe to Layer 1 (announcements)
158 let filter = build_announcement_filter(since); 161 let filter = build_announcement_filter(since);
159 let output = self 162 let output = self.client.subscribe(filter, None).await.map_err(|e| {
160 .client 163 format!(
161 .subscribe(filter, None) 164 "Failed to subscribe to announcements on {}: {}",
162 .await 165 self.url, e
163 .map_err(|e| format!("Failed to subscribe to announcements on {}: {}", self.url, e))?; 166 )
167 })?;
164 168
165 tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)"); 169 tracing::info!(url = %self.url, sub_id = %output.val, "Connected and subscribed to Layer 1 (announcements)");
166 Ok(output.val) 170 Ok(output.val)
@@ -250,7 +254,8 @@ impl RelayConnection {
250 } 254 }
251 RelayMessage::Closed { message: msg, .. } => { 255 RelayMessage::Closed { message: msg, .. } => {
252 tracing::info!(relay = %url, message = %msg, "Relay closed subscription"); 256 tracing::info!(relay = %url, message = %msg, "Relay closed subscription");
253 let _ = event_sender.send(RelayEvent::Closed(msg.to_string())).await; 257 let _ =
258 event_sender.send(RelayEvent::Closed(msg.to_string())).await;
254 break; 259 break;
255 } 260 }
256 _ => {} 261 _ => {}
@@ -421,7 +426,10 @@ impl RelayConnection {
421 /// - Relay doesn't actually support NIP-77 (despite claiming to) 426 /// - Relay doesn't actually support NIP-77 (despite claiming to)
422 /// - Network errors during reconciliation 427 /// - Network errors during reconciliation
423 /// - Timeout during sync 428 /// - Timeout during sync
424 pub async fn negentropy_sync_filter(&self, filter: Filter) -> Result<NegentropySyncResult, String> { 429 pub async fn negentropy_sync_filter(
430 &self,
431 filter: Filter,
432 ) -> Result<NegentropySyncResult, String> {
425 // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange 433 // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange
426 let sync_opts = SyncOptions::default(); 434 let sync_opts = SyncOptions::default();
427 435
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index f83b081..e29e45b 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -49,7 +49,12 @@ impl PendingUpdates {
49 } 49 }
50 50
51 /// Add or update a repo with its relays and root events 51 /// Add or update a repo with its relays and root events
52 fn add_repo(&mut self, repo_id: String, relays: HashSet<String>, root_events: HashSet<EventId>) { 52 fn add_repo(
53 &mut self,
54 repo_id: String,
55 relays: HashSet<String>,
56 root_events: HashSet<EventId>,
57 ) {
53 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { 58 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
54 relays: HashSet::new(), 59 relays: HashSet::new(),
55 root_events: HashSet::new(), 60 root_events: HashSet::new(),
@@ -251,9 +256,9 @@ impl SelfSubscriber {
251 /// 256 ///
252 /// Returns true if any extracted relay URL contains our domain 257 /// Returns true if any extracted relay URL contains our domain
253 fn lists_our_relay(&self, event: &Event) -> bool { 258 fn lists_our_relay(&self, event: &Event) -> bool {
254 Self::extract_relay_urls(event).iter().any(|url| { 259 Self::extract_relay_urls(event)
255 url.contains(&self.relay_domain) || url == &self.own_relay_url 260 .iter()
256 }) 261 .any(|url| url.contains(&self.relay_domain) || url == &self.own_relay_url)
257 } 262 }
258 263
259 /// Main run loop 264 /// Main run loop
@@ -413,21 +418,21 @@ impl SelfSubscriber {
413 if let Some(repo_sync) = index.get_mut(&repo_ref) { 418 if let Some(repo_sync) = index.get_mut(&repo_ref) {
414 // Add event.id to root_events set in the index (immediate availability) 419 // Add event.id to root_events set in the index (immediate availability)
415 repo_sync.root_events.insert(event.id); 420 repo_sync.root_events.insert(event.id);
416 421
417 // Clone the relays before releasing the lock - Layer 3 filters need to be 422 // Clone the relays before releasing the lock - Layer 3 filters need to be
418 // sent to the same relays as Layer 2 filters for this repo 423 // sent to the same relays as Layer 2 filters for this repo
419 let relays = repo_sync.relays.clone(); 424 let relays = repo_sync.relays.clone();
420 425
421 // Release lock before modifying pending 426 // Release lock before modifying pending
422 drop(index); 427 drop(index);
423 428
424 // Also add root event to pending - this ensures batch processing runs 429 // Also add root event to pending - this ensures batch processing runs
425 // and creates Layer 3 filters for events referencing this root event. 430 // and creates Layer 3 filters for events referencing this root event.
426 // CRITICAL: Include relays so derive_relay_targets knows where to send filters! 431 // CRITICAL: Include relays so derive_relay_targets knows where to send filters!
427 let mut root_events = HashSet::new(); 432 let mut root_events = HashSet::new();
428 root_events.insert(event.id); 433 root_events.insert(event.id);
429 pending.add_repo(repo_ref.clone(), relays.clone(), root_events); 434 pending.add_repo(repo_ref.clone(), relays.clone(), root_events);
430 435
431 tracing::debug!( 436 tracing::debug!(
432 event_id = %event.id, 437 event_id = %event.id,
433 repo_ref = %repo_ref, 438 repo_ref = %repo_ref,
@@ -475,10 +480,12 @@ impl SelfSubscriber {
475 480
476 for (repo_id, needs) in updates { 481 for (repo_id, needs) in updates {
477 // Merge with existing entry or insert new 482 // Merge with existing entry or insert new
478 let entry = index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { 483 let entry = index
479 relays: HashSet::new(), 484 .entry(repo_id.clone())
480 root_events: HashSet::new(), 485 .or_insert_with(|| RepoSyncNeeds {
481 }); 486 relays: HashSet::new(),
487 root_events: HashSet::new(),
488 });
482 entry.relays.extend(needs.relays); 489 entry.relays.extend(needs.relays);
483 entry.root_events.extend(needs.root_events); 490 entry.root_events.extend(needs.root_events);
484 491
@@ -556,7 +563,7 @@ fn clone_url_to_relay_url(clone_url: &str) -> Option<String> {
556 } else { 563 } else {
557 return None; 564 return None;
558 }; 565 };
559 566
560 // Extract just the host:port part (everything before the first /) 567 // Extract just the host:port part (everything before the first /)
561 let host_port = rest.split('/').next()?; 568 let host_port = rest.split('/').next()?;
562 Some(format!("{}{}", ws_scheme, host_port)) 569 Some(format!("{}{}", ws_scheme, host_port))
@@ -581,7 +588,7 @@ mod tests {
581 Some("ws://localhost:3000".to_string()) 588 Some("ws://localhost:3000".to_string())
582 ); 589 );
583 } 590 }
584 591
585 #[test] 592 #[test]
586 fn test_clone_url_to_relay_url_with_port() { 593 fn test_clone_url_to_relay_url_with_port() {
587 assert_eq!( 594 assert_eq!(
@@ -593,6 +600,9 @@ mod tests {
593 #[test] 600 #[test]
594 fn test_clone_url_to_relay_url_unsupported() { 601 fn test_clone_url_to_relay_url_unsupported() {
595 assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None); 602 assert_eq!(clone_url_to_relay_url("git://example.com/repo.git"), None);
596 assert_eq!(clone_url_to_relay_url("ssh://git@example.com/repo.git"), None); 603 assert_eq!(
604 clone_url_to_relay_url("ssh://git@example.com/repo.git"),
605 None
606 );
597 } 607 }
598} \ No newline at end of file 608}