diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 14:18:05 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-11 14:18:05 +0000 |
| commit | 4e5a937a4ef5288e702ba2bae3daf2a78398b690 (patch) | |
| tree | f05bbd0cc51325e7a18db27271bfc63e6f93a0d4 /src/sync/mod.rs | |
| parent | fb8928f626e81f78e13e642009de9a86ea100487 (diff) | |
fix docs
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 64 |
1 files changed, 37 insertions, 27 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 16c8924..e17565b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -103,8 +103,6 @@ pub struct RelayState { | |||
| 103 | pub last_connected: Option<Timestamp>, | 103 | pub last_connected: Option<Timestamp>, |
| 104 | /// When we disconnected - for 15-minute state retention rule | 104 | /// When we disconnected - for 15-minute state retention rule |
| 105 | pub disconnected_at: Option<Timestamp>, | 105 | pub disconnected_at: Option<Timestamp>, |
| 106 | // The active connection - will be added in Phase 4 | ||
| 107 | // pub connection: Option<RelayConnection>, | ||
| 108 | } | 106 | } |
| 109 | 107 | ||
| 110 | impl Default for RelayState { | 108 | impl Default for RelayState { |
| @@ -283,7 +281,7 @@ async fn run_disconnect_checker( | |||
| 283 | interval_secs = check_interval_secs, | 281 | interval_secs = check_interval_secs, |
| 284 | "Disconnect checker started with configured interval" | 282 | "Disconnect checker started with configured interval" |
| 285 | ); | 283 | ); |
| 286 | 284 | ||
| 287 | loop { | 285 | loop { |
| 288 | tokio::select! { | 286 | tokio::select! { |
| 289 | _ = tokio::time::sleep(interval) => { | 287 | _ = tokio::time::sleep(interval) => { |
| @@ -406,7 +404,7 @@ impl SyncManager { | |||
| 406 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { | 404 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { |
| 407 | // 1. Find and update the pending batch | 405 | // 1. Find and update the pending batch |
| 408 | let mut pending = self.pending_sync_index.write().await; | 406 | let mut pending = self.pending_sync_index.write().await; |
| 409 | 407 | ||
| 410 | let Some(batches) = pending.get_mut(relay_url) else { | 408 | let Some(batches) = pending.get_mut(relay_url) else { |
| 411 | tracing::warn!( | 409 | tracing::warn!( |
| 412 | relay = %relay_url, | 410 | relay = %relay_url, |
| @@ -417,8 +415,10 @@ impl SyncManager { | |||
| 417 | }; | 415 | }; |
| 418 | 416 | ||
| 419 | // Find the batch containing this subscription | 417 | // Find the batch containing this subscription |
| 420 | let batch_index = batches.iter().position(|b| b.outstanding_subs.contains(&sub_id)); | 418 | let batch_index = batches |
| 421 | 419 | .iter() | |
| 420 | .position(|b| b.outstanding_subs.contains(&sub_id)); | ||
| 421 | |||
| 422 | let Some(batch_idx) = batch_index else { | 422 | let Some(batch_idx) = batch_index else { |
| 423 | tracing::warn!( | 423 | tracing::warn!( |
| 424 | relay = %relay_url, | 424 | relay = %relay_url, |
| @@ -431,7 +431,7 @@ impl SyncManager { | |||
| 431 | // Remove the subscription from outstanding_subs | 431 | // Remove the subscription from outstanding_subs |
| 432 | let batch = &mut batches[batch_idx]; | 432 | let batch = &mut batches[batch_idx]; |
| 433 | batch.outstanding_subs.remove(&sub_id); | 433 | batch.outstanding_subs.remove(&sub_id); |
| 434 | 434 | ||
| 435 | tracing::debug!( | 435 | tracing::debug!( |
| 436 | relay = %relay_url, | 436 | relay = %relay_url, |
| 437 | sub_id = %sub_id, | 437 | sub_id = %sub_id, |
| @@ -450,25 +450,25 @@ impl SyncManager { | |||
| 450 | let batch_id = completed_batch.batch_id; | 450 | let batch_id = completed_batch.batch_id; |
| 451 | let repos_count = completed_batch.items.repos.len(); | 451 | let repos_count = completed_batch.items.repos.len(); |
| 452 | let events_count = completed_batch.items.root_events.len(); | 452 | let events_count = completed_batch.items.root_events.len(); |
| 453 | 453 | ||
| 454 | // Clean up empty relay entry | 454 | // Clean up empty relay entry |
| 455 | if batches.is_empty() { | 455 | if batches.is_empty() { |
| 456 | pending.remove(relay_url); | 456 | pending.remove(relay_url); |
| 457 | } | 457 | } |
| 458 | 458 | ||
| 459 | // Drop the pending lock before acquiring relay_sync_index lock | 459 | // Drop the pending lock before acquiring relay_sync_index lock |
| 460 | drop(pending); | 460 | drop(pending); |
| 461 | 461 | ||
| 462 | // 3. Move items to confirmed state in RelayState | 462 | // 3. Move items to confirmed state in RelayState |
| 463 | { | 463 | { |
| 464 | let mut relay_index = self.relay_sync_index.write().await; | 464 | let mut relay_index = self.relay_sync_index.write().await; |
| 465 | 465 | ||
| 466 | if let Some(state) = relay_index.get_mut(relay_url) { | 466 | if let Some(state) = relay_index.get_mut(relay_url) { |
| 467 | // Move repos to confirmed | 467 | // Move repos to confirmed |
| 468 | state.repos.extend(completed_batch.items.repos); | 468 | state.repos.extend(completed_batch.items.repos); |
| 469 | // Move root_events to confirmed | 469 | // Move root_events to confirmed |
| 470 | state.root_events.extend(completed_batch.items.root_events); | 470 | state.root_events.extend(completed_batch.items.root_events); |
| 471 | 471 | ||
| 472 | tracing::info!( | 472 | tracing::info!( |
| 473 | relay = %relay_url, | 473 | relay = %relay_url, |
| 474 | batch_id = batch_id, | 474 | batch_id = batch_id, |
| @@ -628,7 +628,12 @@ impl SyncManager { | |||
| 628 | let checker_manager = Arc::clone(&sync_manager); | 628 | let checker_manager = Arc::clone(&sync_manager); |
| 629 | let checker_shutdown = shutdown_tx.subscribe(); | 629 | let checker_shutdown = shutdown_tx.subscribe(); |
| 630 | tokio::spawn(async move { | 630 | tokio::spawn(async move { |
| 631 | run_disconnect_checker(checker_manager, checker_shutdown, disconnect_check_interval_secs).await; | 631 | run_disconnect_checker( |
| 632 | checker_manager, | ||
| 633 | checker_shutdown, | ||
| 634 | disconnect_check_interval_secs, | ||
| 635 | ) | ||
| 636 | .await; | ||
| 632 | }); | 637 | }); |
| 633 | 638 | ||
| 634 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 639 | // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications |
| @@ -702,7 +707,7 @@ impl SyncManager { | |||
| 702 | filter_count = action.filters.len(), | 707 | filter_count = action.filters.len(), |
| 703 | "[DIAG] handle_add_filters called" | 708 | "[DIAG] handle_add_filters called" |
| 704 | ); | 709 | ); |
| 705 | 710 | ||
| 706 | // Step 1: Check if relay exists in relay_sync_index | 711 | // Step 1: Check if relay exists in relay_sync_index |
| 707 | let connection_status = { | 712 | let connection_status = { |
| 708 | let index = self.relay_sync_index.read().await; | 713 | let index = self.relay_sync_index.read().await; |
| @@ -756,7 +761,8 @@ impl SyncManager { | |||
| 756 | } | 761 | } |
| 757 | 762 | ||
| 758 | // Step 2: Check if consolidation is needed BEFORE adding new filters | 763 | // Step 2: Check if consolidation is needed BEFORE adding new filters |
| 759 | self.maybe_consolidate(&action.relay_url, action.filters.len()).await; | 764 | self.maybe_consolidate(&action.relay_url, action.filters.len()) |
| 765 | .await; | ||
| 760 | 766 | ||
| 761 | // Step 3: Get connection and subscribe to all filters | 767 | // Step 3: Get connection and subscribe to all filters |
| 762 | let connection = match self.connections.get(&action.relay_url) { | 768 | let connection = match self.connections.get(&action.relay_url) { |
| @@ -1185,7 +1191,10 @@ impl SyncManager { | |||
| 1185 | let connection_timeout_secs = self.health_tracker.base_backoff_secs(); | 1191 | let connection_timeout_secs = self.health_tracker.base_backoff_secs(); |
| 1186 | 1192 | ||
| 1187 | // Connect and subscribe to Layer 1 | 1193 | // Connect and subscribe to Layer 1 |
| 1188 | match connection.connect_and_subscribe(None, connection_timeout_secs).await { | 1194 | match connection |
| 1195 | .connect_and_subscribe(None, connection_timeout_secs) | ||
| 1196 | .await | ||
| 1197 | { | ||
| 1189 | Ok(_) => { | 1198 | Ok(_) => { |
| 1190 | // Record successful connection attempt | 1199 | // Record successful connection attempt |
| 1191 | if let Some(ref metrics) = self.metrics { | 1200 | if let Some(ref metrics) = self.metrics { |
| @@ -1194,20 +1203,21 @@ impl SyncManager { | |||
| 1194 | } | 1203 | } |
| 1195 | Err(e) => { | 1204 | Err(e) => { |
| 1196 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); | 1205 | tracing::error!(relay = %relay_url, error = %e, "Failed to connect to relay"); |
| 1197 | 1206 | ||
| 1198 | // Record failed connection attempt | 1207 | // Record failed connection attempt |
| 1199 | if let Some(ref metrics) = self.metrics { | 1208 | if let Some(ref metrics) = self.metrics { |
| 1200 | metrics.record_connection_attempt(&relay_url, false); | 1209 | metrics.record_connection_attempt(&relay_url, false); |
| 1201 | } | 1210 | } |
| 1202 | 1211 | ||
| 1203 | // Record failure in health tracker | 1212 | // Record failure in health tracker |
| 1204 | self.health_tracker.record_failure(&relay_url); | 1213 | self.health_tracker.record_failure(&relay_url); |
| 1205 | 1214 | ||
| 1206 | // Record health state in metrics | 1215 | // Record health state in metrics |
| 1207 | if let Some(ref metrics) = self.metrics { | 1216 | if let Some(ref metrics) = self.metrics { |
| 1208 | metrics.record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); | 1217 | metrics |
| 1218 | .record_health_state(&relay_url, self.health_tracker.get_state(&relay_url)); | ||
| 1209 | } | 1219 | } |
| 1210 | 1220 | ||
| 1211 | // Update state to disconnected on failure | 1221 | // Update state to disconnected on failure |
| 1212 | { | 1222 | { |
| 1213 | let mut index = relay_sync_index.write().await; | 1223 | let mut index = relay_sync_index.write().await; |
| @@ -1276,7 +1286,7 @@ impl SyncManager { | |||
| 1276 | let mut disconnect_sent = false; | 1286 | let mut disconnect_sent = false; |
| 1277 | // Track whether EOSE has been received - events before EOSE are "startup", after are "live" | 1287 | // Track whether EOSE has been received - events before EOSE are "startup", after are "live" |
| 1278 | let mut eose_received = false; | 1288 | let mut eose_received = false; |
| 1279 | 1289 | ||
| 1280 | while let Some(relay_event) = event_rx.recv().await { | 1290 | while let Some(relay_event) = event_rx.recv().await { |
| 1281 | match relay_event { | 1291 | match relay_event { |
| 1282 | RelayEvent::Event(event) => { | 1292 | RelayEvent::Event(event) => { |
| @@ -1342,7 +1352,7 @@ impl SyncManager { | |||
| 1342 | } | 1352 | } |
| 1343 | } | 1353 | } |
| 1344 | } | 1354 | } |
| 1345 | 1355 | ||
| 1346 | // If the event channel closed without a Closed/Shutdown event | 1356 | // If the event channel closed without a Closed/Shutdown event |
| 1347 | // (e.g., connection dropped unexpectedly), still notify SyncManager | 1357 | // (e.g., connection dropped unexpectedly), still notify SyncManager |
| 1348 | if !disconnect_sent { | 1358 | if !disconnect_sent { |
| @@ -1468,7 +1478,7 @@ impl SyncManager { | |||
| 1468 | // Layer 3: 3 filters per 100-event chunk (ceiling division) | 1478 | // Layer 3: 3 filters per 100-event chunk (ceiling division) |
| 1469 | let repo_count = state.repos.len(); | 1479 | let repo_count = state.repos.len(); |
| 1470 | let event_count = state.root_events.len(); | 1480 | let event_count = state.root_events.len(); |
| 1471 | 1481 | ||
| 1472 | let layer1_filters = 1; | 1482 | let layer1_filters = 1; |
| 1473 | let layer2_filters = if repo_count > 0 { | 1483 | let layer2_filters = if repo_count > 0 { |
| 1474 | ((repo_count + 99) / 100) * 3 | 1484 | ((repo_count + 99) / 100) * 3 |
| @@ -1480,7 +1490,7 @@ impl SyncManager { | |||
| 1480 | } else { | 1490 | } else { |
| 1481 | 0 | 1491 | 0 |
| 1482 | }; | 1492 | }; |
| 1483 | 1493 | ||
| 1484 | layer1_filters + layer2_filters + layer3_filters | 1494 | layer1_filters + layer2_filters + layer3_filters |
| 1485 | } else { | 1495 | } else { |
| 1486 | 0 | 1496 | 0 |
| @@ -1564,7 +1574,7 @@ impl SyncManager { | |||
| 1564 | threshold = CONSOLIDATION_THRESHOLD, | 1574 | threshold = CONSOLIDATION_THRESHOLD, |
| 1565 | "Filter count exceeds threshold, consolidating" | 1575 | "Filter count exceeds threshold, consolidating" |
| 1566 | ); | 1576 | ); |
| 1567 | 1577 | ||
| 1568 | if let Err(e) = self.consolidate(relay_url).await { | 1578 | if let Err(e) = self.consolidate(relay_url).await { |
| 1569 | tracing::error!( | 1579 | tracing::error!( |
| 1570 | relay = %relay_url, | 1580 | relay = %relay_url, |
| @@ -1603,7 +1613,7 @@ impl SyncManager { | |||
| 1603 | return Ok(()); // No connection, nothing to consolidate | 1613 | return Ok(()); // No connection, nothing to consolidate |
| 1604 | } | 1614 | } |
| 1605 | }; | 1615 | }; |
| 1606 | 1616 | ||
| 1607 | connection.unsubscribe_all().await; | 1617 | connection.unsubscribe_all().await; |
| 1608 | 1618 | ||
| 1609 | // Step 3: Rebuild all subscriptions with since filter | 1619 | // Step 3: Rebuild all subscriptions with since filter |
| @@ -1824,4 +1834,4 @@ impl SyncManager { | |||
| 1824 | 1834 | ||
| 1825 | tracing::info!("SyncManager shutdown complete"); | 1835 | tracing::info!("SyncManager shutdown complete"); |
| 1826 | } | 1836 | } |
| 1827 | } \ No newline at end of file | 1837 | } |