upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sync/connection.rs41
-rw-r--r--src/sync/subscription.rs71
2 files changed, 30 insertions, 82 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index e921185..61a33f8 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -70,10 +70,8 @@ impl SyncConnection {
70 tracing::info!("Sync connection established to {}", url); 70 tracing::info!("Sync connection established to {}", url);
71 71
72 // Create subscription manager for this connection 72 // Create subscription manager for this connection
73 let subscription_manager = SubscriptionManager::new( 73 let subscription_manager =
74 filter_service.clone(), 74 SubscriptionManager::new(filter_service.clone(), remote_domain.to_string());
75 remote_domain.to_string(),
76 );
77 75
78 Ok(Self { 76 Ok(Self {
79 url: url.to_string(), 77 url: url.to_string(),
@@ -208,10 +206,8 @@ impl SyncConnection {
208 /// - kind 30617/30618: New announcement → add Layer 2 subscription 206 /// - kind 30617/30618: New announcement → add Layer 2 subscription
209 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription 207 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
210 async fn handle_dynamic_subscription(&mut self, event: &Event) { 208 async fn handle_dynamic_subscription(&mut self, event: &Event) {
211 let kind = event.kind.as_u16();
212
213 // Check if this is an announcement kind (triggers Layer 2 subscription) 209 // Check if this is an announcement kind (triggers Layer 2 subscription)
214 if SubscriptionManager::is_announcement_kind(kind) { 210 if matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
215 if let Some(new_filters) = self.subscription_manager.add_announcement(event) { 211 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
216 tracing::info!( 212 tracing::info!(
217 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", 213 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
@@ -224,8 +220,11 @@ impl SyncConnection {
224 } 220 }
225 } 221 }
226 222
227 // Check if this is a PR/Issue kind (triggers Layer 3 subscription) 223 // Check if this is a Patch/PR/Issue kind (triggers Layer 3 subscription)
228 if SubscriptionManager::is_pr_issue_kind(kind) { 224 if matches!(
225 event.kind,
226 Kind::GitPatch | Kind::GitIssue | Kind::Custom(1618)
227 ) {
229 if let Some(new_filters) = self.subscription_manager.add_event(event) { 228 if let Some(new_filters) = self.subscription_manager.add_event(event) {
230 tracing::info!( 229 tracing::info!(
231 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", 230 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
@@ -366,11 +365,13 @@ pub async fn connect_with_retry(
366 ); 365 );
367 } 366 }
368 367
369 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone()).await { 368 match SyncConnection::new(url, filter_service.clone(), &remote_domain, metrics.clone())
369 .await
370 {
370 Ok(conn) => { 371 Ok(conn) => {
371 // Record successful connection 372 // Record successful connection
372 health_tracker.record_success(url); 373 health_tracker.record_success(url);
373 374
374 // Record metrics 375 // Record metrics
375 if let Some(ref m) = metrics { 376 if let Some(ref m) = metrics {
376 m.record_connection_attempt(url, true); 377 m.record_connection_attempt(url, true);
@@ -379,7 +380,7 @@ pub async fn connect_with_retry(
379 m.record_health_state(url, health_tracker.get_state(url)); 380 m.record_health_state(url, health_tracker.get_state(url));
380 m.record_failure_count(url, 0); 381 m.record_failure_count(url, 0);
381 } 382 }
382 383
383 tracing::info!("Sync connection established to {}", url); 384 tracing::info!("Sync connection established to {}", url);
384 385
385 // Run the connection (this blocks until disconnection) 386 // Run the connection (this blocks until disconnection)
@@ -388,7 +389,7 @@ pub async fn connect_with_retry(
388 // Connection ended - record as failure for reconnection backoff 389 // Connection ended - record as failure for reconnection backoff
389 // (The connection ending is considered a failure even if it worked for a while) 390 // (The connection ending is considered a failure even if it worked for a while)
390 health_tracker.record_failure(url); 391 health_tracker.record_failure(url);
391 392
392 // Update metrics for disconnection 393 // Update metrics for disconnection
393 if let Some(ref m) = metrics { 394 if let Some(ref m) = metrics {
394 m.set_relay_connected(url, false); 395 m.set_relay_connected(url, false);
@@ -396,7 +397,7 @@ pub async fn connect_with_retry(
396 m.record_health_state(url, health_tracker.get_state(url)); 397 m.record_health_state(url, health_tracker.get_state(url));
397 m.record_failure_count(url, health_tracker.get_failure_count(url)); 398 m.record_failure_count(url, health_tracker.get_failure_count(url));
398 } 399 }
399 400
400 tracing::warn!("Sync connection to {} ended, will reconnect", url); 401 tracing::warn!("Sync connection to {} ended, will reconnect", url);
401 } 402 }
402 Err(e) => { 403 Err(e) => {
@@ -405,14 +406,14 @@ pub async fn connect_with_retry(
405 406
406 let failure_count = health_tracker.get_failure_count(url); 407 let failure_count = health_tracker.get_failure_count(url);
407 let state = health_tracker.get_state(url); 408 let state = health_tracker.get_state(url);
408 409
409 // Record metrics 410 // Record metrics
410 if let Some(ref m) = metrics { 411 if let Some(ref m) = metrics {
411 m.record_connection_attempt(url, false); 412 m.record_connection_attempt(url, false);
412 m.set_relay_connected(url, false); 413 m.set_relay_connected(url, false);
413 m.record_health_state(url, state); 414 m.record_health_state(url, state);
414 m.record_failure_count(url, failure_count); 415 m.record_failure_count(url, failure_count);
415 416
416 // Track dead relays 417 // Track dead relays
417 if state == super::health::HealthState::Dead { 418 if state == super::health::HealthState::Dead {
418 m.inc_dead_count(); 419 m.inc_dead_count();
@@ -435,11 +436,7 @@ pub async fn connect_with_retry(
435 .get_remaining_backoff(url) 436 .get_remaining_backoff(url)
436 .unwrap_or(Duration::from_secs(5)); 437 .unwrap_or(Duration::from_secs(5));
437 438
438 tracing::debug!( 439 tracing::debug!("Waiting {:?} before reconnecting to {}", wait_duration, url);
439 "Waiting {:?} before reconnecting to {}",
440 wait_duration,
441 url
442 );
443 tokio::time::sleep(wait_duration).await; 440 tokio::time::sleep(wait_duration).await;
444 } 441 }
445} 442}
@@ -473,4 +470,4 @@ mod tests {
473 Some("relay.example.com".to_string()) 470 Some("relay.example.com".to_string())
474 ); 471 );
475 } 472 }
476} \ No newline at end of file 473}
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs
index c37404f..bbeaa2a 100644
--- a/src/sync/subscription.rs
+++ b/src/sync/subscription.rs
@@ -26,12 +26,6 @@ use super::filter::FilterService;
26/// Maximum number of filters before consolidation is triggered 26/// Maximum number of filters before consolidation is triggered
27const CONSOLIDATION_THRESHOLD: usize = 150; 27const CONSOLIDATION_THRESHOLD: usize = 150;
28 28
29/// Kind 30617 - Repository Announcement (NIP-34)
30const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617;
31
32/// Kind 30618 - Maintainer List (NIP-34)
33const KIND_MAINTAINER_LIST: u16 = 30618;
34
35/// Manages subscriptions for a single relay connection 29/// Manages subscriptions for a single relay connection
36/// 30///
37/// Tracks which announcements and events have been subscribed to, 31/// Tracks which announcements and events have been subscribed to,
@@ -113,10 +107,7 @@ impl SubscriptionManager {
113 107
114 // Build Layer 3 filter for this event 108 // Build Layer 3 filter for this event
115 // Layer 3 filters target events with 'e' tags pointing to this event 109 // Layer 3 filters target events with 'e' tags pointing to this event
116 let filter = Filter::new().custom_tag( 110 let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id);
117 SingleLetterTag::lowercase(Alphabet::E),
118 event_id,
119 );
120 111
121 Some(vec![filter]) 112 Some(vec![filter])
122 } 113 }
@@ -212,67 +203,27 @@ impl SubscriptionManager {
212 } 203 }
213 }; 204 };
214 205
215 // Determine the kind for the coordinate 206 // Verify this is an announcement kind
216 let kind = event.kind.as_u16(); 207 if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
217 if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST {
218 tracing::warn!( 208 tracing::warn!(
219 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", 209 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
220 event.id.to_hex(), 210 event.id.to_hex(),
221 kind 211 event.kind
222 ); 212 );
223 return Vec::new(); 213 return Vec::new();
224 } 214 }
225 215
226 // Build the addressable coordinate: kind:pubkey:identifier 216 // Build the addressable coordinate: kind:pubkey:identifier
227 let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier); 217 let coord = format!(
218 "{}:{}:{}",
219 event.kind.as_u16(),
220 event.pubkey.to_hex(),
221 identifier
222 );
228 223
229 // Create filter with 'a' tag for this coordinate 224 // Create filter with 'a' tag for this coordinate
230 let filter = Filter::new().custom_tag( 225 let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord);
231 SingleLetterTag::lowercase(Alphabet::A),
232 coord,
233 );
234 226
235 vec![filter] 227 vec![filter]
236 } 228 }
237
238 /// Check if an event kind is an announcement kind
239 pub fn is_announcement_kind(kind: u16) -> bool {
240 kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST
241 }
242
243 /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3
244 pub fn is_pr_issue_kind(kind: u16) -> bool {
245 matches!(
246 kind,
247 1617 | // Patch proposal (NIP-34)
248 1618 | // PR
249 1619 | // PR Update
250 1621 | // Issue
251 1622 // Reply
252 )
253 }
254} 229}
255
256#[cfg(test)]
257mod tests {
258 use super::SubscriptionManager;
259
260 #[test]
261 fn test_is_announcement_kind() {
262 assert!(SubscriptionManager::is_announcement_kind(30617));
263 assert!(SubscriptionManager::is_announcement_kind(30618));
264 assert!(!SubscriptionManager::is_announcement_kind(1));
265 assert!(!SubscriptionManager::is_announcement_kind(1617));
266 }
267
268 #[test]
269 fn test_is_pr_issue_kind() {
270 assert!(SubscriptionManager::is_pr_issue_kind(1617));
271 assert!(SubscriptionManager::is_pr_issue_kind(1618));
272 assert!(SubscriptionManager::is_pr_issue_kind(1619));
273 assert!(SubscriptionManager::is_pr_issue_kind(1621));
274 assert!(SubscriptionManager::is_pr_issue_kind(1622));
275 assert!(!SubscriptionManager::is_pr_issue_kind(30617));
276 assert!(!SubscriptionManager::is_pr_issue_kind(1));
277 }
278} \ No newline at end of file