upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:18:05 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-11 14:18:05 +0000
commit4e5a937a4ef5288e702ba2bae3daf2a78398b690 (patch)
treef05bbd0cc51325e7a18db27271bfc63e6f93a0d4 /src
parentfb8928f626e81f78e13e642009de9a86ea100487 (diff)
fix docs
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs64
1 files changed, 37 insertions, 27 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 16c8924..e17565b 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -103,8 +103,6 @@ pub struct RelayState {
103 pub last_connected: Option<Timestamp>, 103 pub last_connected: Option<Timestamp>,
104 /// When we disconnected - for 15-minute state retention rule 104 /// When we disconnected - for 15-minute state retention rule
105 pub disconnected_at: Option<Timestamp>, 105 pub disconnected_at: Option<Timestamp>,
106 // The active connection - will be added in Phase 4
107 // pub connection: Option<RelayConnection>,
108} 106}
109 107
110impl Default for RelayState { 108impl Default for RelayState {
@@ -283,7 +281,7 @@ async fn run_disconnect_checker(
283 interval_secs = check_interval_secs, 281 interval_secs = check_interval_secs,
284 "Disconnect checker started with configured interval" 282 "Disconnect checker started with configured interval"
285 ); 283 );
286 284
287 loop { 285 loop {
288 tokio::select! { 286 tokio::select! {
289 _ = tokio::time::sleep(interval) => { 287 _ = tokio::time::sleep(interval) => {
@@ -406,7 +404,7 @@ impl SyncManager {
406 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { 404 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
407 // 1. Find and update the pending batch 405 // 1. Find and update the pending batch
408 let mut pending = self.pending_sync_index.write().await; 406 let mut pending = self.pending_sync_index.write().await;
409 407
410 let Some(batches) = pending.get_mut(relay_url) else { 408 let Some(batches) = pending.get_mut(relay_url) else {
411 tracing::warn!( 409 tracing::warn!(
412 relay = %relay_url, 410 relay = %relay_url,
@@ -417,8 +415,10 @@ impl SyncManager {
417 }; 415 };
418 416
419 // Find the batch containing this subscription 417 // Find the batch containing this subscription
420 let batch_index = batches.iter().position(|b| b.outstanding_subs.contains(&sub_id)); 418 let batch_index = batches
421 419 .iter()
420 .position(|b| b.outstanding_subs.contains(&sub_id));
421
422 let Some(batch_idx) = batch_index else { 422 let Some(batch_idx) = batch_index else {
423 tracing::warn!( 423 tracing::warn!(
424 relay = %relay_url, 424 relay = %relay_url,
@@ -431,7 +431,7 @@ impl SyncManager {
431 // Remove the subscription from outstanding_subs 431 // Remove the subscription from outstanding_subs
432 let batch = &mut batches[batch_idx]; 432 let batch = &mut batches[batch_idx];
433 batch.outstanding_subs.remove(&sub_id); 433 batch.outstanding_subs.remove(&sub_id);
434 434
435 tracing::debug!( 435 tracing::debug!(
436 relay = %relay_url, 436 relay = %relay_url,
437 sub_id = %sub_id, 437 sub_id = %sub_id,
@@ -450,25 +450,25 @@ impl SyncManager {
450 let batch_id = completed_batch.batch_id; 450 let batch_id = completed_batch.batch_id;
451 let repos_count = completed_batch.items.repos.len(); 451 let repos_count = completed_batch.items.repos.len();
452 let events_count = completed_batch.items.root_events.len(); 452 let events_count = completed_batch.items.root_events.len();
453 453
454 // Clean up empty relay entry 454 // Clean up empty relay entry
455 if batches.is_empty() { 455 if batches.is_empty() {
456 pending.remove(relay_url); 456 pending.remove(relay_url);
457 } 457 }
458 458
459 // Drop the pending lock before acquiring relay_sync_index lock 459 // Drop the pending lock before acquiring relay_sync_index lock
460 drop(pending); 460 drop(pending);
461 461
462 // 3. Move items to confirmed state in RelayState 462 // 3. Move items to confirmed state in RelayState
463 { 463 {
464 let mut relay_index = self.relay_sync_index.write().await; 464 let mut relay_index = self.relay_sync_index.write().await;
465 465
466 if let Some(state) = relay_index.get_mut(relay_url) { 466 if let Some(state) = relay_index.get_mut(relay_url) {
467 // Move repos to confirmed 467 // Move repos to confirmed
468 state.repos.extend(completed_batch.items.repos); 468 state.repos.extend(completed_batch.items.repos);
469 // Move root_events to confirmed 469 // Move root_events to confirmed
470 state.root_events.extend(completed_batch.items.root_events); 470 state.root_events.extend(completed_batch.items.root_events);
471 471
472 tracing::info!( 472 tracing::info!(
473 relay = %relay_url, 473 relay = %relay_url,
474 batch_id = batch_id, 474 batch_id = batch_id,
@@ -628,7 +628,12 @@ impl SyncManager {
628 let checker_manager = Arc::clone(&sync_manager); 628 let checker_manager = Arc::clone(&sync_manager);
629 let checker_shutdown = shutdown_tx.subscribe(); 629 let checker_shutdown = shutdown_tx.subscribe();
630 tokio::spawn(async move { 630 tokio::spawn(async move {
631 run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await; 631 run_disconnect_checker(
632 checker_manager,
633 checker_shutdown,
634 disconnect_check_interval_secs,
635 )
636 .await;
632 }); 637 });
633 638
634 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 639 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
@@ -702,7 +707,7 @@ impl SyncManager {
702 filter_count = action.filters.len(), 707 filter_count = action.filters.len(),
703 "[DIAG] handle_add_filters called" 708 "[DIAG] handle_add_filters called"
704 ); 709 );
705 710
706 // Step 1: Check if relay exists in relay_sync_index 711 // Step 1: Check if relay exists in relay_sync_index
707 let connection_status = { 712 let connection_status = {
708 let index = self.relay_sync_index.read().await; 713 let index = self.relay_sync_index.read().await;
@@ -756,7 +761,8 @@ impl SyncManager {
756 } 761 }
757 762
758 // Step 2: Check if consolidation is needed BEFORE adding new filters 763 // Step 2: Check if consolidation is needed BEFORE adding new filters
759 self.maybe_consolidate(&action.relay_url, action.filters.len()).await; 764 self.maybe_consolidate(&action.relay_url, action.filters.len())
765 .await;
760 766
761 // Step 3: Get connection and subscribe to all filters 767 // Step 3: Get connection and subscribe to all filters
762 let connection = match self.connections.get(&action.relay_url) { 768 let connection = match self.connections.get(&action.relay_url) {
@@ -1185,7 +1191,10 @@ impl SyncManager {
1185 let connection_timeout_secs = self.health_tracker.base_backoff_secs(); 1191 let connection_timeout_secs = self.health_tracker.base_backoff_secs();
1186 1192
1187 // Connect and subscribe to Layer 1 1193 // Connect and subscribe to Layer 1
1188 match connection.connect_and_subscribe(None, connection_timeout_secs).await { 1194 match connection
1195 .connect_and_subscribe(None, connection_timeout_secs)
1196 .await
1197 {
1189 Ok(_) => { 1198 Ok(_) => {
1190 // Record successful connection attempt 1199 // Record successful connection attempt
1191 if let Some(ref metrics) = self.metrics { 1200 if let Some(ref metrics) = self.metrics {
@@ -1194,20 +1203,21 @@ impl SyncManager {
1194 } 1203 }
1195 Err(e) => { 1204 Err(e) => {
1196 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); 1205 tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay");
1197 1206
1198 // Record failed connection attempt 1207 // Record failed connection attempt
1199 if let Some(ref metrics) = self.metrics { 1208 if let Some(ref metrics) = self.metrics {
1200 metrics.record_connection_attempt(&relay_url, false); 1209 metrics.record_connection_attempt(&relay_url, false);
1201 } 1210 }
1202 1211
1203 // Record failure in health tracker 1212 // Record failure in health tracker
1204 self.health_tracker.record_failure(&relay_url); 1213 self.health_tracker.record_failure(&relay_url);
1205 1214
1206 // Record health state in metrics 1215 // Record health state in metrics
1207 if let Some(ref metrics) = self.metrics { 1216 if let Some(ref metrics) = self.metrics {
1208 metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); 1217 metrics
1218 .record_health_state(&relay_url, self.health_tracker.get_state(&relay_url));
1209 } 1219 }
1210 1220
1211 // Update state to disconnected on failure 1221 // Update state to disconnected on failure
1212 { 1222 {
1213 let mut index = relay_sync_index.write().await; 1223 let mut index = relay_sync_index.write().await;
@@ -1276,7 +1286,7 @@ impl SyncManager {
1276 let mut disconnect_sent = false; 1286 let mut disconnect_sent = false;
1277 // Track whether EOSE has been received - events before EOSE are "startup", after are "live" 1287 // Track whether EOSE has been received - events before EOSE are "startup", after are "live"
1278 let mut eose_received = false; 1288 let mut eose_received = false;
1279 1289
1280 while let Some(relay_event) = event_rx.recv().await { 1290 while let Some(relay_event) = event_rx.recv().await {
1281 match relay_event { 1291 match relay_event {
1282 RelayEvent::Event(event) => { 1292 RelayEvent::Event(event) => {
@@ -1342,7 +1352,7 @@ impl SyncManager {
1342 } 1352 }
1343 } 1353 }
1344 } 1354 }
1345 1355
1346 // If the event channel closed without a Closed/Shutdown event 1356 // If the event channel closed without a Closed/Shutdown event
1347 // (e.g., connection dropped unexpectedly), still notify SyncManager 1357 // (e.g., connection dropped unexpectedly), still notify SyncManager
1348 if !disconnect_sent { 1358 if !disconnect_sent {
@@ -1468,7 +1478,7 @@ impl SyncManager {
1468 // Layer 3: 3 filters per 100-event chunk (ceiling division) 1478 // Layer 3: 3 filters per 100-event chunk (ceiling division)
1469 let repo_count = state.repos.len(); 1479 let repo_count = state.repos.len();
1470 let event_count = state.root_events.len(); 1480 let event_count = state.root_events.len();
1471 1481
1472 let layer1_filters = 1; 1482 let layer1_filters = 1;
1473 let layer2_filters = if repo_count > 0 { 1483 let layer2_filters = if repo_count > 0 {
1474 ((repo_count + 99) / 100) * 3 1484 ((repo_count + 99) / 100) * 3
@@ -1480,7 +1490,7 @@ impl SyncManager {
1480 } else { 1490 } else {
1481 0 1491 0
1482 }; 1492 };
1483 1493
1484 layer1_filters + layer2_filters + layer3_filters 1494 layer1_filters + layer2_filters + layer3_filters
1485 } else { 1495 } else {
1486 0 1496 0
@@ -1564,7 +1574,7 @@ impl SyncManager {
1564 threshold = CONSOLIDATION_THRESHOLD, 1574 threshold = CONSOLIDATION_THRESHOLD,
1565 "Filter count exceeds threshold, consolidating" 1575 "Filter count exceeds threshold, consolidating"
1566 ); 1576 );
1567 1577
1568 if let Err(e) = self.consolidate(relay_url).await { 1578 if let Err(e) = self.consolidate(relay_url).await {
1569 tracing::error!( 1579 tracing::error!(
1570 relay = %relay_url, 1580 relay = %relay_url,
@@ -1603,7 +1613,7 @@ impl SyncManager {
1603 return Ok(()); // No connection, nothing to consolidate 1613 return Ok(()); // No connection, nothing to consolidate
1604 } 1614 }
1605 }; 1615 };
1606 1616
1607 connection.unsubscribe_all().await; 1617 connection.unsubscribe_all().await;
1608 1618
1609 // Step 3: Rebuild all subscriptions with since filter 1619 // Step 3: Rebuild all subscriptions with since filter
@@ -1824,4 +1834,4 @@ impl SyncManager {
1824 1834
1825 tracing::info!("SyncManager shutdown complete"); 1835 tracing::info!("SyncManager shutdown complete");
1826 } 1836 }
1827} \ No newline at end of file 1837}