upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs47
1 files changed, 36 insertions, 11 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 0e5b9bb..8b1da0e 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -908,8 +908,22 @@ impl SyncManager {
908 // Release lock before checking if historic sync is complete 908 // Release lock before checking if historic sync is complete
909 drop(relay_index); 909 drop(relay_index);
910 910
911 // Check if all historic sync is complete (no more pending batches) 911 // Spawn background task to check if historic sync is complete
912 self.check_and_complete_historic_sync(relay_url).await; 912 // This avoids blocking the confirm_batch flow for 6 seconds
913 let relay_url = relay_url.to_string();
914 let pending_index = self.pending_sync_index.clone();
915 let relay_index = self.relay_sync_index.clone();
916 let metrics = self.metrics.clone();
917
918 tokio::spawn(async move {
919 Self::check_and_complete_historic_sync_impl(
920 &relay_url,
921 pending_index,
922 relay_index,
923 metrics,
924 )
925 .await;
926 });
913 } 927 }
914 928
915 /// Check if historic sync is complete and transition to Connected status 929 /// Check if historic sync is complete and transition to Connected status
@@ -930,11 +944,17 @@ impl SyncManager {
930 /// - Buffer for processing: 1 second 944 /// - Buffer for processing: 1 second
931 /// 945 ///
932 /// Called after each batch is confirmed to detect completion. 946 /// Called after each batch is confirmed to detect completion.
933 async fn check_and_complete_historic_sync(&self, relay_url: &str) { 947 /// Spawned as a background task to avoid blocking the confirm_batch flow.
948 async fn check_and_complete_historic_sync_impl(
949 relay_url: &str,
950 pending_index: PendingSyncIndex,
951 relay_index: RelaySyncIndex,
952 metrics: Option<SyncMetrics>,
953 ) {
934 // First check: Are there any pending batches? 954 // First check: Are there any pending batches?
935 let has_pending = { 955 let has_pending = {
936 let pending = self.pending_sync_index.read().await; 956 let pending = pending_index.read().await;
937 pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) 957 pending.get(relay_url).is_some_and(|batches| !batches.is_empty())
938 }; 958 };
939 959
940 if has_pending { 960 if has_pending {
@@ -948,8 +968,8 @@ impl SyncManager {
948 968
949 // Second check: Are there still no pending batches? 969 // Second check: Are there still no pending batches?
950 let has_pending = { 970 let has_pending = {
951 let pending = self.pending_sync_index.read().await; 971 let pending = pending_index.read().await;
952 pending.get(relay_url).map_or(false, |batches| !batches.is_empty()) 972 pending.get(relay_url).is_some_and(|batches| !batches.is_empty())
953 }; 973 };
954 974
955 if has_pending { 975 if has_pending {
@@ -958,8 +978,8 @@ impl SyncManager {
958 } 978 }
959 979
960 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded 980 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded
961 let mut relay_index = self.relay_sync_index.write().await; 981 let mut relay_index_guard = relay_index.write().await;
962 if let Some(state) = relay_index.get_mut(relay_url) { 982 if let Some(state) = relay_index_guard.get_mut(relay_url) {
963 if state.connection_status == ConnectionStatus::Syncing { 983 if state.connection_status == ConnectionStatus::Syncing {
964 // Check if any batches failed during historic sync 984 // Check if any batches failed during historic sync
965 let new_status = if state.historic_sync_had_failures { 985 let new_status = if state.historic_sync_had_failures {
@@ -983,7 +1003,7 @@ impl SyncManager {
983 ); 1003 );
984 1004
985 // Update metrics 1005 // Update metrics
986 if let Some(ref metrics) = self.metrics { 1006 if let Some(ref metrics) = metrics {
987 metrics.record_connection_status(relay_url, new_status); 1007 metrics.record_connection_status(relay_url, new_status);
988 } 1008 }
989 } 1009 }
@@ -1215,7 +1235,9 @@ impl SyncManager {
1215 ); 1235 );
1216 return; 1236 return;
1217 } 1237 }
1218 Some(status) if status.is_live_sync_active() => { 1238 Some(ConnectionStatus::Syncing)
1239 | Some(ConnectionStatus::Connected)
1240 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => {
1219 // Continue to subscribe - live sync is active, can accept new filters 1241 // Continue to subscribe - live sync is active, can accept new filters
1220 } 1242 }
1221 } 1243 }
@@ -1736,6 +1758,9 @@ impl SyncManager {
1736 repos: HashSet::new(), 1758 repos: HashSet::new(),
1737 root_events: HashSet::new(), 1759 root_events: HashSet::new(),
1738 announcements_synced: false, 1760 announcements_synced: false,
1761 historic_sync_completed: false,
1762 historic_sync_completed_at: None,
1763 historic_sync_had_failures: false,
1739 }; 1764 };
1740 index.insert(relay_url.clone(), new_state); 1765 index.insert(relay_url.clone(), new_state);
1741 true 1766 true