diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 13:52:43 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 13:52:43 +0000 |
| commit | b1aefcadf4fbe2ed2b71248208d4f6662b4bee98 (patch) | |
| tree | 5690f23fd2a56c274b7bc46f397a9a66e07f24b6 /src/sync | |
| parent | aabde32812366d314c3baf9b40798a3e420ba677 (diff) | |
feat: add automatic reconnection with exponential backoff (IMPROVE-2)
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index b967d4f..3c69fb9 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -442,6 +442,7 @@ async fn run_disconnect_checker( | |||
| 442 | 442 | ||
| 443 | let mut manager = sync_manager.lock().await; | 443 | let mut manager = sync_manager.lock().await; |
| 444 | manager.check_disconnects().await; | 444 | manager.check_disconnects().await; |
| 445 | manager.check_reconnects().await; | ||
| 445 | } | 446 | } |
| 446 | _ = shutdown_rx.recv() => { | 447 | _ = shutdown_rx.recv() => { |
| 447 | tracing::info!("Disconnect checker received shutdown signal"); | 448 | tracing::info!("Disconnect checker received shutdown signal"); |
| @@ -858,6 +859,14 @@ impl SyncManager { | |||
| 858 | /// - For existing connected relays: subscribes to filters, creates PendingBatch | 859 | /// - For existing connected relays: subscribes to filters, creates PendingBatch |
| 859 | /// - For disconnected/connecting relays: returns (will be handled on connection) | 860 | /// - For disconnected/connecting relays: returns (will be handled on connection) |
| 860 | async fn handle_add_filters(&mut self, action: AddFilters) { | 861 | async fn handle_add_filters(&mut self, action: AddFilters) { |
| 862 | tracing::info!( | ||
| 863 | relay = %action.relay_url, | ||
| 864 | repo_count = action.repos.len(), | ||
| 865 | root_event_count = action.root_events.len(), | ||
| 866 | filter_count = action.filters.len(), | ||
| 867 | "[DIAG] handle_add_filters called" | ||
| 868 | ); | ||
| 869 | |||
| 861 | // Step 1: Check if relay exists in relay_sync_index | 870 | // Step 1: Check if relay exists in relay_sync_index |
| 862 | let connection_status = { | 871 | let connection_status = { |
| 863 | let index = self.relay_sync_index.read().await; | 872 | let index = self.relay_sync_index.read().await; |
| @@ -1730,6 +1739,64 @@ impl SyncManager { | |||
| 1730 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); | 1739 | tracing::info!(relay = %relay_url, "Relay disconnected and cleaned up"); |
| 1731 | } | 1740 | } |
| 1732 | 1741 | ||
| 1742 | /// Check for disconnected relays that should be reconnected | ||
| 1743 | /// | ||
| 1744 | /// This method is called periodically by run_disconnect_checker. | ||
| 1745 | /// It identifies relays that: | ||
| 1746 | /// - Are currently disconnected | ||
| 1747 | /// - Have repos or root events to sync (not empty) | ||
| 1748 | /// - Have passed the exponential backoff period (respects health tracker) | ||
| 1749 | /// | ||
| 1750 | /// For each eligible relay, a reconnection is attempted via spawn_relay_connection. | ||
| 1751 | async fn check_reconnects(&mut self) { | ||
| 1752 | // Collect relays to reconnect | ||
| 1753 | let to_reconnect: Vec<String> = { | ||
| 1754 | let index = self.relay_sync_index.read().await; | ||
| 1755 | index | ||
| 1756 | .iter() | ||
| 1757 | .filter_map(|(relay_url, state)| { | ||
| 1758 | // Only consider disconnected relays | ||
| 1759 | if state.connection_status != ConnectionStatus::Disconnected { | ||
| 1760 | return None; | ||
| 1761 | } | ||
| 1762 | |||
| 1763 | // Skip empty relays - they'll be cleaned up by check_disconnects | ||
| 1764 | if state.repos.is_empty() && state.root_events.is_empty() { | ||
| 1765 | return None; | ||
| 1766 | } | ||
| 1767 | |||
| 1768 | // Check if backoff period has elapsed | ||
| 1769 | if self.health_tracker.should_attempt_connection(relay_url) { | ||
| 1770 | Some(relay_url.clone()) | ||
| 1771 | } else { | ||
| 1772 | None | ||
| 1773 | } | ||
| 1774 | }) | ||
| 1775 | .collect() | ||
| 1776 | }; | ||
| 1777 | |||
| 1778 | if to_reconnect.is_empty() { | ||
| 1779 | tracing::trace!("No disconnected relays ready for reconnection"); | ||
| 1780 | return; | ||
| 1781 | } | ||
| 1782 | |||
| 1783 | tracing::info!( | ||
| 1784 | count = to_reconnect.len(), | ||
| 1785 | relays = ?to_reconnect, | ||
| 1786 | "Attempting reconnection for disconnected relays" | ||
| 1787 | ); | ||
| 1788 | |||
| 1789 | // Reconnect eligible relays | ||
| 1790 | for relay_url in to_reconnect { | ||
| 1791 | tracing::info!( | ||
| 1792 | relay = %relay_url, | ||
| 1793 | health_state = %self.health_tracker.get_state(&relay_url), | ||
| 1794 | "Attempting reconnection" | ||
| 1795 | ); | ||
| 1796 | self.spawn_relay_connection(relay_url).await; | ||
| 1797 | } | ||
| 1798 | } | ||
| 1799 | |||
| 1733 | /// Gracefully shutdown the SyncManager | 1800 | /// Gracefully shutdown the SyncManager |
| 1734 | /// | 1801 | /// |
| 1735 | /// This method: | 1802 | /// This method: |