upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-18 10:12:11 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-18 10:12:11 +0000
commit03f074d0d0840b946a356badde75551d61c0f84c (patch)
tree97943bb692d40b3e572854bd30eec6bfdbcf8cb2 /src
parent7821b107190cc116a30a4c339f935bc16a1d5197 (diff)
sync removing dead code
Diffstat (limited to 'src')
-rw-r--r--src/sync/filters.rs21
-rw-r--r--src/sync/mod.rs812
-rw-r--r--src/sync/relay_connection.rs87
3 files changed, 237 insertions, 683 deletions
diff --git a/src/sync/filters.rs b/src/sync/filters.rs
index e508eb2..963fa02 100644
--- a/src/sync/filters.rs
+++ b/src/sync/filters.rs
@@ -103,10 +103,18 @@ pub fn tagged_one_of_our_root_event_filters(
103 return vec![]; 103 return vec![];
104 } 104 }
105 105
106 // DEBUG TRACING: Log the root events we're creating Layer 3 filters for
107 tracing::debug!(
108 root_event_count = root_events.len(),
109 root_event_ids = ?root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
110 since = ?since,
111 "Building Layer 3 filters for root events"
112 );
113
106 let mut filters = Vec::new(); 114 let mut filters = Vec::new();
107 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); 115 let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect();
108 116
109 for chunk in event_ids.chunks(100) { 117 for (chunk_idx, chunk) in event_ids.chunks(100).enumerate() {
110 // Lowercase 'e' tag - standard event reference 118 // Lowercase 'e' tag - standard event reference
111 let mut f1 = Filter::new(); 119 let mut f1 = Filter::new();
112 for event_id in chunk { 120 for event_id in chunk {
@@ -131,6 +139,17 @@ pub fn tagged_one_of_our_root_event_filters(
131 f3 = f3.since(ts); 139 f3 = f3.since(ts);
132 } 140 }
133 141
142 // DEBUG TRACING: Log the filters being created
143 tracing::debug!(
144 chunk_idx = chunk_idx,
145 chunk_size = chunk.len(),
146 event_ids_in_chunk = ?chunk,
147 filter_e = ?f1,
148 filter_E = ?f2,
149 filter_q = ?f3,
150 "Created Layer 3 filter chunk"
151 );
152
134 filters.push(f1); 153 filters.push(f1);
135 filters.push(f2); 154 filters.push(f2);
136 filters.push(f3); 155 filters.push(f3);
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 401cf21..3c50387 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -490,16 +490,19 @@ impl SyncManager {
490 // Move repos to confirmed 490 // Move repos to confirmed
491 state.repos.extend(batch.items.repos); 491 state.repos.extend(batch.items.repos);
492 // Move root_events to confirmed 492 // Move root_events to confirmed
493 state.root_events.extend(batch.items.root_events); 493 state.root_events.extend(batch.items.root_events.clone());
494 494
495 // DEBUG TRACING: Log the root events being confirmed
495 tracing::info!( 496 tracing::info!(
496 relay = %relay_url, 497 relay = %relay_url,
497 batch_id = batch_id, 498 batch_id = batch_id,
498 sync_method = ?sync_method, 499 sync_method = ?sync_method,
499 repos_confirmed = repos_count, 500 repos_confirmed = repos_count,
500 root_events_confirmed = events_count, 501 root_events_confirmed = events_count,
502 root_events_ids = ?batch.items.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
501 total_repos = state.repos.len(), 503 total_repos = state.repos.len(),
502 total_root_events = state.root_events.len(), 504 total_root_events = state.root_events.len(),
505 all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
503 "Batch confirmed - items moved from pending to confirmed" 506 "Batch confirmed - items moved from pending to confirmed"
504 ); 507 );
505 } else { 508 } else {
@@ -535,10 +538,6 @@ impl SyncManager {
535 } 538 }
536 }; 539 };
537 540
538 // Check if relay supports NIP-77 negentropy AND negentropy is not disabled
539 let use_negentropy =
540 !self.config.sync_disable_negentropy && connection.supports_negentropy().await;
541
542 // Unsubscribe all current subscriptions 541 // Unsubscribe all current subscriptions
543 connection.unsubscribe_all().await; 542 connection.unsubscribe_all().await;
544 543
@@ -564,62 +563,14 @@ impl SyncManager {
564 } 563 }
565 } 564 }
566 565
567 let now = Timestamp::now(); 566 // maybe we just run start fresh with a daily flag? make sture so start layer 1 filters
568 567 self.fresh_start(relay_url).await;
569 if use_negentropy {
570 // NIP-77 supported - use negentropy for efficient reconciliation
571 tracing::info!(
572 relay = %relay_url,
573 "Using NIP-77 negentropy for daily sync"
574 );
575
576 // Perform negentropy sync for Layer 1 (announcements)
577 let layer1_filter = filters::build_announcement_filter(None);
578 self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (daily)")
579 .await;
580
581 // After negentropy sync, set up live subscription for new events
582 let live_filter = filters::build_announcement_filter(Some(now));
583 if let Some(conn) = self.connections.get(relay_url) {
584 if let Err(e) = conn.subscribe_filter(live_filter).await {
585 tracing::error!(
586 relay = %relay_url,
587 error = %e,
588 "Failed to set up live Layer 1 subscription after negentropy daily sync"
589 );
590 }
591 }
592
593 // Recompute actions for Layer 2+3 based on synced events
594 self.recompute_new_sync_filters_for_relay(relay_url).await;
595 } else {
596 // NIP-77 not supported - fall back to REQ+EOSE
597 tracing::info!(
598 relay = %relay_url,
599 "NIP-77 not supported, using REQ+EOSE for daily sync"
600 );
601
602 // Re-subscribe to Layer 1 (announcements) without since filter for full discovery
603 let layer1_filter = filters::build_announcement_filter(None);
604 if let Some(conn) = self.connections.get(relay_url) {
605 if let Err(e) = conn.subscribe_filter(layer1_filter).await {
606 tracing::error!(
607 relay = %relay_url,
608 error = %e,
609 "Failed to re-subscribe to Layer 1 during daily sync"
610 );
611 }
612 }
613
614 // Recompute actions for Layer 2+3 - will discover all repos/events again
615 self.recompute_new_sync_filters_for_relay(relay_url).await;
616 }
617 568
618 if let Some(ref metrics) = self.metrics { 569 // if let Some(ref metrics) = self.metrics {
619 metrics.record_event(event_source::DAILY); 570 // metrics.record_event(event_source::DAILY);
620 } 571 // }
621 572
622 tracing::info!(relay = %relay_url, "Daily sync complete"); 573 // tracing::info!(relay = %relay_url, "Daily sync complete");
623 } 574 }
624 575
625 /// Run the sync manager 576 /// Run the sync manager
@@ -827,79 +778,19 @@ impl SyncManager {
827 // Step 2: Check if consolidation is needed BEFORE adding new filters 778 // Step 2: Check if consolidation is needed BEFORE adding new filters
828 self.maybe_consolidate(&action.relay_url, action.filters.len()) 779 self.maybe_consolidate(&action.relay_url, action.filters.len())
829 .await; 780 .await;
830 /// DELETE this bit
831 // Step 3: Get connection and subscribe to all filters
832 let connection = match self.connections.get(&action.relay_url) {
833 Some(conn) => conn,
834 None => {
835 tracing::warn!(
836 relay = %action.relay_url,
837 "No connection for relay, cannot subscribe"
838 );
839 return;
840 }
841 };
842 781
843 // Subscribe to each filter and collect subscription IDs 782 // Subscribe to each filter and collect subscription IDs
844 let mut subscription_ids = Vec::new(); 783 tracing::info!(
845 for filter in &action.filters {
846 match connection.subscribe_filter(filter.clone()).await {
847 Ok(sub_id) => {
848 subscription_ids.push(sub_id);
849 }
850 Err(e) => {
851 tracing::error!(
852 relay = %action.relay_url,
853 error = %e,
854 "Failed to subscribe to filter"
855 );
856 }
857 }
858 }
859
860 if subscription_ids.is_empty() && !action.filters.is_empty() {
861 tracing::warn!(
862 relay = %action.relay_url,
863 "All filter subscriptions failed, not creating batch"
864 );
865 return;
866 }
867
868 // Step 4: Create PendingBatch
869 let batch_id = self.next_batch_id();
870 let batch = PendingBatch {
871 batch_id,
872 items: PendingItems {
873 repos: action.items.repos.clone(),
874 root_events: action.items.root_events.clone(),
875 },
876 outstanding_subs: subscription_ids.into_iter().collect(),
877 sync_method: SyncMethod::ReqEose,
878 };
879
880 // Step 5: Add to pending_sync_index
881 {
882 let mut pending = self.pending_sync_index.write().await;
883 pending
884 .entry(action.relay_url.clone())
885 .or_insert_with(Vec::new)
886 .push(batch);
887 }
888
889 tracing::debug!(
890 relay = %action.relay_url, 784 relay = %action.relay_url,
891 batch_id = batch_id, 785 filter_count = action.filters.len(),
892 repos = action.items.repos.len(), 786 repo_count = action.items.repos.len(),
893 root_events = action.items.root_events.len(), 787 root_event_count = action.items.root_events.len(),
894 filters = action.filters.len(), 788 "handle_add_filters: calling sync_live and historic_sync"
895 "Created pending batch for filter subscriptions"
896 ); 789 );
897 // REPLACE WITH THIS: 790
898 // // Subscribe to each filter and collect subscription IDs 791 self.sync_live(&action.relay_url, &action.filters).await;
899 // self.sync_live(&action.relay_url, &action.filters).await; 792 self.historic_sync(&action.relay_url, action.filters, action.items, None)
900 // // TODO need to do actions.repos 793 .await;
901 // self.historic_sync(&action.relay_url, action.filters, action.items, None)
902 // .await;
903 } 794 }
904 795
905 /// Handle a connection success (called when a relay connects or reconnects) 796 /// Handle a connection success (called when a relay connects or reconnects)
@@ -909,209 +800,39 @@ impl SyncManager {
909 /// - `quick_reconnect()` if disconnected < 15 minutes 800 /// - `quick_reconnect()` if disconnected < 15 minutes
910 /// - `long_reconnect()` if disconnected > 15 minutes 801 /// - `long_reconnect()` if disconnected > 15 minutes
911 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { 802 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
912 let now = Timestamp::now(); 803 let last_connected = {
913
914 // // Get the relay state to determine reconnect type
915 // let (last_connected, disconnected_at) = {
916 // let index = self.relay_sync_index.read().await;
917 // if let Some(state) = index.get(relay_url) {
918 // (state.last_connected, state.disconnected_at)
919 // } else {
920 // (None, None) // No state found
921 // }
922 // };
923
924 // // Determine which reconnection strategy to use
925 // match (last_connected, disconnected_at) {
926 // (None, _) => {
927 // // Never connected before - fresh start
928 // tracing::info!(
929 // relay = %relay_url,
930 // "First connection - initiating fresh_start"
931 // );
932 // self.fresh_start(relay_url).await;
933 // }
934 // (Some(last), Some(disconnected)) => {
935 // // Was connected before, check how long disconnected
936 // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs());
937
938 // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS {
939 // // Disconnected < 15 minutes - quick reconnect
940 // // Use last_connected minus buffer as since timestamp
941 // let since =
942 // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS));
943 // tracing::info!(
944 // relay = %relay_url,
945 // disconnect_secs = disconnect_duration,
946 // since = %since,
947 // "Short disconnection - initiating quick_reconnect"
948 // );
949 // self.quick_reconnect(relay_url, since).await;
950 // } else {
951 // // Disconnected > 15 minutes - long reconnect
952 // tracing::info!(
953 // relay = %relay_url,
954 // disconnect_secs = disconnect_duration,
955 // "Long disconnection - initiating long_reconnect"
956 // );
957 // self.long_reconnect(relay_url).await;
958 // }
959 // }
960 // (Some(_last), None) => {
961 // // Was connected but no disconnected_at - shouldn't happen normally
962 // // Treat as long reconnect to be safe
963 // tracing::warn!(
964 // relay = %relay_url,
965 // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect"
966 // );
967 // self.long_reconnect(relay_url).await;
968 // }
969 // }
970 // Get the relay state to determine reconnect type
971 let (is_fresh_sync, last_connected, is_bootstrap) = {
972 let index = self.relay_sync_index.read().await; 804 let index = self.relay_sync_index.read().await;
973 if let Some(state) = index.get(relay_url) { 805 index.get(relay_url).and_then(|s| s.last_connected)
974 let last_conn = state.last_connected;
975 let is_fresh = match last_conn {
976 None => true, // Never connected before
977 Some(last) => {
978 let elapsed = now.as_secs().saturating_sub(last.as_secs());
979 elapsed > QUICK_RECONNECT_WINDOW_SECS // Stale if > 15 min
980 }
981 };
982 (is_fresh, last_conn, state.is_bootstrap)
983 } else {
984 (true, None, false) // No state found, treat as fresh
985 }
986 }; 806 };
987 807
988 // If stale reconnect, clear state 808 if let Some(last) = last_connected {
989 if is_fresh_sync && last_connected.is_some() { 809 let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs());
990 let mut index = self.relay_sync_index.write().await; 810 if elapsed < QUICK_RECONNECT_WINDOW_SECS {
991 if let Some(state) = index.get_mut(relay_url) { 811 // short disconnect
992 state.clear_sync_state();
993 tracing::info!( 812 tracing::info!(
994 relay = %relay_url, 813 relay = %relay_url,
995 "Cleared stale sync state (was disconnected > 15 min)" 814 disconnect_secs = elapsed,
815 "Short disconnection - initiating quick_reconnect"
996 ); 816 );
997 } 817 self.quick_reconnect(relay_url, Timestamp::from(elapsed))
998 }
999
1000 // Update connection state
1001 {
1002 let mut index = self.relay_sync_index.write().await;
1003 let state = index.entry(relay_url.to_string()).or_default();
1004 state.connection_status = ConnectionStatus::Connected;
1005 state.last_connected = Some(now);
1006 state.disconnected_at = None;
1007 }
1008
1009 // Record success in health tracker
1010 self.health_tracker.record_success(relay_url);
1011
1012 // Update metrics
1013 if let Some(ref metrics) = self.metrics {
1014 metrics.set_relay_connected(relay_url, true);
1015 metrics.inc_connected_count();
1016 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1017 }
1018
1019 // Subscribe based on reconnect type
1020 if is_fresh_sync {
1021 tracing::info!(
1022 relay = %relay_url,
1023 is_bootstrap = is_bootstrap,
1024 "Fresh sync - checking NIP-77 negentropy support"
1025 );
1026
1027 // Check if relay supports NIP-77 negentropy for efficient sync
1028 // Respect the sync_disable_negentropy config option
1029 let use_negentropy = if self.config.sync_disable_negentropy {
1030 tracing::debug!(relay = %relay_url, "Negentropy disabled via config");
1031 false
1032 } else if let Some(connection) = self.connections.get(relay_url) {
1033 connection.supports_negentropy().await
1034 } else {
1035 false
1036 };
1037
1038 if use_negentropy {
1039 // NIP-77 supported - use negentropy for historical sync
1040 tracing::info!(
1041 relay = %relay_url,
1042 "Using NIP-77 negentropy for fresh sync"
1043 );
1044
1045 // Perform negentropy sync for Layer 1 (announcements)
1046 let layer1_filter = filters::build_announcement_filter(None);
1047 self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1")
1048 .await; 818 .await;
1049
1050 // After negentropy sync, recompute Layer 2+3 actions
1051 // Layer 1 events are now in sync, so we can proceed with Layer 2+3
1052 self.recompute_new_sync_filters_for_relay(relay_url).await;
1053
1054 // Set up live subscription for new events (since=now)
1055 let live_filter = filters::build_announcement_filter(Some(now));
1056 if let Some(connection) = self.connections.get(relay_url) {
1057 if let Err(e) = connection.subscribe_filter(live_filter).await {
1058 tracing::error!(
1059 relay = %relay_url,
1060 error = %e,
1061 "Failed to set up live Layer 1 subscription after negentropy sync"
1062 );
1063 }
1064 }
1065 } else { 819 } else {
1066 // NIP-77 not supported - fall back to REQ+EOSE 820 // long disconnect
1067 tracing::info!( 821 tracing::info!(
1068 relay = %relay_url, 822 relay = %relay_url,
1069 "NIP-77 not supported, using REQ+EOSE for fresh sync" 823 disconnect_secs = elapsed,
824 "Long disconnection - initiating fresh_start"
1070 ); 825 );
1071 // Fresh sync: Layer 1 subscription (without since) was already established 826 self.fresh_start(relay_url).await;
1072 // during connect_and_subscribe() in handle_add_filters(). That call subscribes
1073 // to kinds 30617+30618 for the full history. Here we only need to recompute
1074 // Layer 2+3 actions based on the repos we're tracking.
1075 self.recompute_new_sync_filters_for_relay(relay_url).await;
1076 } 827 }
1077 } else { 828 } else {
1078 // Quick reconnect: use since filter (no negentropy needed) 829 // not successfully connected before (since launching binary)
1079 let since_ts = Timestamp::from(
1080 last_connected
1081 .unwrap()
1082 .as_secs()
1083 .saturating_sub(QUICK_RECONNECT_WINDOW_SECS),
1084 );
1085
1086 tracing::info!( 830 tracing::info!(
1087 relay = %relay_url, 831 relay = %relay_url,
1088 since = %since_ts, 832 "First connection - initiating fresh_start"
1089 "Quick reconnect - using since filter for incremental sync"
1090 ); 833 );
1091 834 self.fresh_start(relay_url).await;
1092 // Subscribe to Layer 1 (announcements) with since filter to catch new repos 835 };
1093 let layer1_filter = filters::build_announcement_filter(Some(since_ts));
1094 if let Some(connection) = self.connections.get(relay_url) {
1095 if let Err(e) = connection.subscribe_filter(layer1_filter).await {
1096 tracing::error!(
1097 relay = %relay_url,
1098 error = %e,
1099 "Failed to subscribe to Layer 1 filter on quick reconnect"
1100 );
1101 }
1102 }
1103
1104 // Rebuild Layer 2 and Layer 3 with since filter
1105 self.rebuild_layer2_and_layer3(relay_url, Some(since_ts))
1106 .await;
1107
1108 // Recompute actions for any new items discovered while disconnected
1109 self.recompute_new_sync_filters_for_relay(relay_url).await;
1110
1111 if let Some(ref metrics) = self.metrics {
1112 metrics.record_event(event_source::RECONNECT);
1113 }
1114 }
1115 } 836 }
1116 837
1117 /// Fresh start - clears state and does full sync 838 /// Fresh start - clears state and does full sync
@@ -1155,77 +876,16 @@ impl SyncManager {
1155 "Cleared sync state in fresh_start" 876 "Cleared sync state in fresh_start"
1156 ); 877 );
1157 } 878 }
1158 } 879 if state.connection_status == ConnectionStatus::Connected {
1159 } 880 // TODO start layer 1
1160 881 drop(index);
1161 // Step 3: Update connection state 882 self.recompute_new_sync_filters_for_relay(relay_url).await;
1162 {
1163 let mut index = self.relay_sync_index.write().await;
1164 let state = index.entry(relay_url.to_string()).or_default();
1165 state.connection_status = ConnectionStatus::Connected;
1166 state.last_connected = Some(now);
1167 state.disconnected_at = None;
1168 }
1169
1170 // Record success in health tracker
1171 self.health_tracker.record_success(relay_url);
1172
1173 // Update metrics
1174 if let Some(ref metrics) = self.metrics {
1175 metrics.set_relay_connected(relay_url, true);
1176 metrics.inc_connected_count();
1177 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1178 }
1179
1180 // Step 4: L1 sync - check negentropy support
1181 let use_negentropy = if self.config.sync_disable_negentropy {
1182 tracing::debug!(relay = %relay_url, "Negentropy disabled via config");
1183 false
1184 } else if let Some(connection) = self.connections.get(relay_url) {
1185 connection.supports_negentropy().await
1186 } else {
1187 false
1188 };
1189
1190 if use_negentropy {
1191 // NIP-77 supported - use negentropy for L1 historical sync
1192 tracing::info!(
1193 relay = %relay_url,
1194 "Using NIP-77 negentropy for L1 historical sync"
1195 );
1196
1197 // L1 historic sync (no since - full sync)
1198 let layer1_filter = filters::build_announcement_filter(None);
1199 self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)")
1200 .await;
1201
1202 // L1 live subscription (since=now for ongoing events)
1203 let live_filter = filters::build_announcement_filter(Some(now));
1204 if let Some(connection) = self.connections.get(relay_url) {
1205 if let Err(e) = connection.subscribe_filter(live_filter).await {
1206 tracing::error!(
1207 relay = %relay_url,
1208 error = %e,
1209 "Failed to set up L1 live subscription in fresh_start"
1210 );
1211 } 883 }
884 } else {
885 drop(index);
886 return self.spawn_relay_connection(relay_url.to_string()).await;
1212 } 887 }
1213 } else {
1214 // NIP-77 not supported - REQ+EOSE
1215 // Note: Layer 1 subscription (without since) was already established
1216 // during connect_and_subscribe() in spawn_relay_connection
1217 tracing::info!(
1218 relay = %relay_url,
1219 "Using REQ+EOSE for L1 sync (negentropy not available)"
1220 );
1221 } 888 }
1222
1223 // Step 5: compute_actions → AddFilters for L2+L3
1224 // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters
1225 // for ALL repos that should be synced from this relay
1226 self.recompute_new_sync_filters_for_relay(relay_url).await;
1227
1228 tracing::info!(relay = %relay_url, "fresh_start complete");
1229 } 889 }
1230 890
1231 /// Quick reconnect - for disconnections < 15 minutes 891 /// Quick reconnect - for disconnections < 15 minutes
@@ -1312,27 +972,6 @@ impl SyncManager {
1312 tracing::info!(relay = %relay_url, "quick_reconnect complete"); 972 tracing::info!(relay = %relay_url, "quick_reconnect complete");
1313 } 973 }
1314 974
1315 /// Long reconnect - for disconnections > 15 minutes
1316 ///
1317 /// Flow:
1318 /// 1. Record disconnect/reconnect metric
1319 /// 2. Delegate to fresh_start()
1320 async fn long_reconnect(&mut self, relay_url: &str) {
1321 tracing::info!(relay = %relay_url, "Starting long_reconnect");
1322
1323 // Step 1: Record disconnect/reconnect metric
1324 // This distinguishes intentional daily refresh from failure recovery
1325 if let Some(ref metrics) = self.metrics {
1326 metrics.record_event(event_source::RECONNECT);
1327 }
1328
1329 // Step 2: Delegate to fresh_start
1330 // State is too stale to trust, start fresh
1331 self.fresh_start(relay_url).await;
1332
1333 tracing::info!(relay = %relay_url, "long_reconnect complete");
1334 }
1335
1336 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay 975 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay
1337 /// 976 ///
1338 /// Uses the confirmed repos and root_events from RelayState to build filters. 977 /// Uses the confirmed repos and root_events from RelayState to build filters.
@@ -1367,11 +1006,15 @@ impl SyncManager {
1367 // Build Layer 2 and Layer 3 filters 1006 // Build Layer 2 and Layer 3 filters
1368 let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); 1007 let filters = build_layer2_and_layer3_filters(&repos, &root_events, since);
1369 1008
1009 // DEBUG TRACING: Log detailed filter information
1370 tracing::debug!( 1010 tracing::debug!(
1371 relay = %relay_url, 1011 relay = %relay_url,
1372 filter_count = filters.len(), 1012 filter_count = filters.len(),
1373 repos_count = repos.len(), 1013 repos_count = repos.len(),
1374 root_events_count = root_events.len(), 1014 root_events_count = root_events.len(),
1015 repos = ?repos,
1016 root_events = ?root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(),
1017 filters = ?filters,
1375 since = ?since, 1018 since = ?since,
1376 "Rebuilding Layer 2/3 filters" 1019 "Rebuilding Layer 2/3 filters"
1377 ); 1020 );
@@ -1829,197 +1472,6 @@ impl SyncManager {
1829 } 1472 }
1830 1473
1831 // ========================================================================= 1474 // =========================================================================
1832 // NIP-77 Negentropy Sync Helpers
1833 // =========================================================================
1834
1835 /// Perform negentropy sync for a filter and process received events
1836 ///
1837 /// This method uses the unified PendingBatch flow:
1838 /// 1. Creates a PendingBatch with targets for this relay
1839 /// 2. Performs negentropy reconciliation with the remote relay
1840 /// 3. On success, confirms the batch (moves items to RelayState)
1841 /// 4. On failure, removes the batch without confirming
1842 ///
1843 /// This ensures consistent state tracking across both sync paths
1844 /// (REQ+EOSE and Negentropy).
1845 ///
1846 /// # Arguments
1847 /// * `relay_url` - The relay URL to sync with
1848 /// * `filter` - The filter defining which events to sync
1849 /// * `layer_name` - Human-readable layer name for logging (e.g., "Layer 1")
1850 ///
1851 /// # Returns
1852 /// Number of events received from negentropy sync
1853 async fn negentropy_sync_and_process(
1854 &mut self,
1855 relay_url: &str,
1856 filter: Filter,
1857 layer_name: &str,
1858 ) -> usize {
1859 use crate::sync::algorithms::derive_relay_targets;
1860
1861 // Check connection exists first (borrow ends immediately)
1862 if !self.connections.contains_key(relay_url) {
1863 tracing::warn!(
1864 relay = %relay_url,
1865 layer = layer_name,
1866 "No connection found for negentropy sync"
1867 );
1868 return 0;
1869 }
1870
1871 // Step 1: Get targets for this relay and create PendingBatch
1872 // Get batch_id first (requires mutable borrow of self)
1873 let batch_id = self.next_batch_id();
1874
1875 let pending_items = {
1876 let repo_index = self.repo_sync_index.read().await;
1877 let targets = derive_relay_targets(&repo_index);
1878
1879 let relay_targets = match targets.get(relay_url) {
1880 Some(t) => t,
1881 None => {
1882 tracing::debug!(
1883 relay = %relay_url,
1884 layer = layer_name,
1885 "No targets found for relay, skipping negentropy sync"
1886 );
1887 return 0;
1888 }
1889 };
1890
1891 PendingItems {
1892 repos: relay_targets.repos.clone(),
1893 root_events: relay_targets.root_events.clone(),
1894 }
1895 };
1896
1897 // Create PendingBatch for negentropy sync (empty outstanding_subs)
1898 let batch = PendingBatch {
1899 batch_id,
1900 items: pending_items.clone(),
1901 outstanding_subs: HashSet::new(), // Negentropy doesn't use subscriptions
1902 sync_method: SyncMethod::Negentropy,
1903 };
1904
1905 // Add batch to pending_sync_index before starting sync
1906 {
1907 let mut pending = self.pending_sync_index.write().await;
1908 pending
1909 .entry(relay_url.to_string())
1910 .or_insert_with(Vec::new)
1911 .push(batch);
1912 }
1913
1914 tracing::debug!(
1915 relay = %relay_url,
1916 layer = layer_name,
1917 batch_id = batch_id,
1918 repos = pending_items.repos.len(),
1919 root_events = pending_items.root_events.len(),
1920 "Created pending batch for negentropy sync"
1921 );
1922
1923 // Step 2: Perform negentropy sync
1924 // Get connection reference here (borrows self.connections briefly)
1925 let Some(connection) = self.connections.get(relay_url) else {
1926 // Connection was removed between check and use (race condition)
1927 // Remove the pending batch we just added
1928 let mut pending = self.pending_sync_index.write().await;
1929 if let Some(batches) = pending.get_mut(relay_url) {
1930 batches.retain(|b| b.batch_id != batch_id);
1931 if batches.is_empty() {
1932 pending.remove(relay_url);
1933 }
1934 }
1935 tracing::warn!(
1936 relay = %relay_url,
1937 layer = layer_name,
1938 "Connection disappeared before negentropy sync could start"
1939 );
1940 return 0;
1941 };
1942
1943 match connection.negentropy_sync_filter(filter).await {
1944 Ok(result) => {
1945 let event_count = result.received.len();
1946
1947 tracing::info!(
1948 relay = %relay_url,
1949 layer = layer_name,
1950 received = event_count,
1951 remote_only = result.remote_only.len(),
1952 local_only = result.local_only.len(),
1953 "Negentropy sync completed for {}",
1954 layer_name
1955 );
1956
1957 tracing::debug!(
1958 relay = %relay_url,
1959 layer = layer_name,
1960 event_ids = ?result.received.iter().take(5).collect::<Vec<_>>(),
1961 "Received event IDs via negentropy (first 5 shown)"
1962 );
1963
1964 // Record metrics for negentropy events
1965 if let Some(ref metrics) = self.metrics {
1966 for _ in 0..event_count {
1967 metrics.record_event(event_source::STARTUP);
1968 }
1969 }
1970
1971 // Step 3: Remove batch from pending and confirm it
1972 let completed_batch = {
1973 let mut pending = self.pending_sync_index.write().await;
1974 if let Some(batches) = pending.get_mut(relay_url) {
1975 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id);
1976 if let Some(idx) = batch_idx {
1977 let batch = batches.remove(idx);
1978 if batches.is_empty() {
1979 pending.remove(relay_url);
1980 }
1981 Some(batch)
1982 } else {
1983 None
1984 }
1985 } else {
1986 None
1987 }
1988 };
1989
1990 // Confirm the batch using unified confirm_batch method
1991 if let Some(batch) = completed_batch {
1992 self.confirm_batch(relay_url, batch).await;
1993 }
1994
1995 event_count
1996 }
1997 Err(e) => {
1998 tracing::warn!(
1999 relay = %relay_url,
2000 layer = layer_name,
2001 error = %e,
2002 "Negentropy sync failed for {}",
2003 layer_name
2004 );
2005
2006 // Remove the batch without confirming on failure
2007 {
2008 let mut pending = self.pending_sync_index.write().await;
2009 if let Some(batches) = pending.get_mut(relay_url) {
2010 batches.retain(|b| b.batch_id != batch_id);
2011 if batches.is_empty() {
2012 pending.remove(relay_url);
2013 }
2014 }
2015 }
2016
2017 0
2018 }
2019 }
2020 }
2021
2022 // =========================================================================
2023 // Consolidation System 1475 // Consolidation System
2024 // ========================================================================= 1476 // =========================================================================
2025 1477
@@ -2410,7 +1862,7 @@ impl SyncManager {
2410 // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", 1862 // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit",
2411 // but omitting limit means "no limit" which is what we want for live. 1863 // but omitting limit means "no limit" which is what we want for live.
2412 // The filter passed in should already NOT have a limit set. 1864 // The filter passed in should already NOT have a limit set.
2413 match connection.subscribe_filter(filter.clone()).await { 1865 match connection.subscribe_filter(filter.clone().limit(1)).await {
2414 Ok(sub_id) => { 1866 Ok(sub_id) => {
2415 tracing::trace!( 1867 tracing::trace!(
2416 relay = %relay_url, 1868 relay = %relay_url,
@@ -2519,6 +1971,17 @@ impl SyncManager {
2519 items: PendingItems, 1971 items: PendingItems,
2520 since: Option<Timestamp>, 1972 since: Option<Timestamp>,
2521 ) -> Option<u64> { 1973 ) -> Option<u64> {
1974 // DEBUG TRACING: Log all filters being passed to historic_sync
1975 tracing::debug!(
1976 relay = %relay_url,
1977 filter_count = filters.len(),
1978 filters = ?filters,
1979 repos_count = items.repos.len(),
1980 root_events_count = items.root_events.len(),
1981 since = ?since,
1982 "historic_sync called"
1983 );
1984
2522 if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { 1985 if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() {
2523 tracing::debug!( 1986 tracing::debug!(
2524 relay = %relay_url, 1987 relay = %relay_url,
@@ -2527,9 +1990,9 @@ impl SyncManager {
2527 return None; 1990 return None;
2528 } 1991 }
2529 1992
2530 // Check connection exists 1993 // Check connection exists and clone for async usage
2531 let connection = match self.connections.get(relay_url) { 1994 let connection = match self.connections.get(relay_url) {
2532 Some(conn) => conn, 1995 Some(conn) => conn.clone(),
2533 None => { 1996 None => {
2534 tracing::warn!( 1997 tracing::warn!(
2535 relay = %relay_url, 1998 relay = %relay_url,
@@ -2581,38 +2044,63 @@ impl SyncManager {
2581 .push(batch); 2044 .push(batch);
2582 } 2045 }
2583 2046
2584 // Perform negentropy sync for each filter 2047 // Perform negentropy sync for all filters concurrently
2585 // Note: We sync each filter separately because negentropy works on a single filter 2048 // Note: We sync each filter separately because negentropy works on a single filter
2586 let mut total_received = 0; 2049 let diff_futures: Vec<_> = filters_with_since
2587 let mut any_success = false; 2050 .iter()
2051 .enumerate()
2052 .map(|(idx, filter)| {
2053 let filter = filter.clone();
2054 let conn = connection.clone();
2055 async move { (idx, conn.negentropy_sync_diff(filter).await) }
2056 })
2057 .collect();
2588 2058
2589 for filter in &filters_with_since { 2059 let diff_results = futures_util::future::join_all(diff_futures).await;
2590 if let Some(conn) = self.connections.get(relay_url) { 2060
2591 match conn.negentropy_sync_filter(filter.clone()).await { 2061 // Process results - collect all event IDs we need to fetch
2592 Ok(result) => { 2062 let mut all_remote_ids = Vec::new();
2593 total_received += result.received.len(); 2063 let mut failed_count = 0;
2594 any_success = true; 2064
2595 2065 for (idx, result) in diff_results {
2596 // Record metrics for received events 2066 match result {
2597 if let Some(ref metrics) = self.metrics { 2067 Ok(reconciliation) => {
2598 for _ in 0..result.received.len() { 2068 let remote_count = reconciliation.remote.len();
2599 metrics.record_event(event_source::STARTUP); 2069 if remote_count > 0 {
2600 } 2070 tracing::debug!(
2601 }
2602 }
2603 Err(e) => {
2604 tracing::warn!(
2605 relay = %relay_url, 2071 relay = %relay_url,
2606 error = %e, 2072 filter_idx = idx,
2607 "Negentropy sync failed for filter in historic_sync" 2073 remote_count = remote_count,
2074 "Negentropy diff identified missing events"
2608 ); 2075 );
2076 all_remote_ids.extend(reconciliation.remote.into_iter());
2609 } 2077 }
2610 } 2078 }
2079 Err(e) => {
2080 failed_count += 1;
2081 tracing::warn!(
2082 relay = %relay_url,
2083 filter_idx = idx,
2084 error = %e,
2085 "Negentropy diff failed for filter in historic_sync"
2086 );
2087 }
2611 } 2088 }
2612 } 2089 }
2613 2090
2614 if any_success { 2091 // Require ALL filters to succeed to confirm the batch
2615 // Remove batch from pending and confirm it 2092 if failed_count > 0 {
2093 // Leave pending batch so it doesnt appear as synced. we can try again later.
2094 tracing::warn!(
2095 relay = %relay_url,
2096 batch_id = batch_id,
2097 failed_count = failed_count,
2098 total_filters = filters_with_since.len(),
2099 "historic_sync (negentropy) failed - not all filters succeeded"
2100 );
2101 return None;
2102 } else if all_remote_ids.is_empty() {
2103 // Remove batch from pending and confirm it (no items to download)
2616 let completed_batch = { 2104 let completed_batch = {
2617 let mut pending = self.pending_sync_index.write().await; 2105 let mut pending = self.pending_sync_index.write().await;
2618 if let Some(batches) = pending.get_mut(relay_url) { 2106 if let Some(batches) = pending.get_mut(relay_url) {
@@ -2638,26 +2126,67 @@ impl SyncManager {
2638 tracing::info!( 2126 tracing::info!(
2639 relay = %relay_url, 2127 relay = %relay_url,
2640 batch_id = batch_id, 2128 batch_id = batch_id,
2641 total_received = total_received, 2129 total_received = 0,
2642 "historic_sync (negentropy) completed" 2130 "historic_sync (negentropy) completed - already up-to-date"
2643 ); 2131 );
2644 } else { 2132 }
2645 // All negentropy syncs failed - remove the pending batch 2133 // launch subscriptions to fetch missing events by id
2134 let ids_filters: Vec<_> = all_remote_ids
2135 .chunks(300)
2136 .map(|c| Filter::new().ids(c.iter().copied()))
2137 .collect();
2138
2139 // DEBUG TRACING: Log that we're requesting events by ID
2140 tracing::debug!(
2141 relay = %relay_url,
2142 batch_id = batch_id,
2143 total_event_ids = all_remote_ids.len(),
2144 filter_chunks = ids_filters.len(),
2145 event_ids = ?all_remote_ids,
2146 "Creating subscriptions to fetch missing events by ID (negentropy path)"
2147 );
2148
2149 let mut subscription_ids = HashSet::new();
2150 for (idx, filter) in ids_filters.iter().enumerate() {
2151 if let Some(conn) = self.connections.get(relay_url) {
2152 // DEBUG TRACING: Log each filter being subscribed
2153 tracing::debug!(
2154 relay = %relay_url,
2155 batch_id = batch_id,
2156 chunk_idx = idx,
2157 filter = ?filter,
2158 "Subscribing to ID filter chunk"
2159 );
2160
2161 match conn.subscribe_filter(filter.clone()).await {
2162 Ok(sub_id) => {
2163 subscription_ids.insert(sub_id);
2164 }
2165 Err(e) => {
2166 tracing::error!(
2167 relay = %relay_url,
2168 error = %e,
2169 "Failed to subscribe to filter in historic_sync"
2170 );
2171 }
2172 }
2173 }
2174 }
2175 {
2646 let mut pending = self.pending_sync_index.write().await; 2176 let mut pending = self.pending_sync_index.write().await;
2647 if let Some(batches) = pending.get_mut(relay_url) { 2177 if let Some(relay_batches) = pending.get_mut(relay_url) {
2648 batches.retain(|b| b.batch_id != batch_id); 2178 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) {
2649 if batches.is_empty() { 2179 batch.outstanding_subs.extend(subscription_ids.clone());
2650 pending.remove(relay_url);
2651 } 2180 }
2652 } 2181 }
2653
2654 tracing::warn!(
2655 relay = %relay_url,
2656 batch_id = batch_id,
2657 "historic_sync (negentropy) failed for all filters"
2658 );
2659 return None;
2660 } 2182 }
2183 tracing::debug!(
2184 relay = %relay_url,
2185 batch_id = batch_id,
2186 subscription_ids = subscription_ids.len(),
2187 events = all_remote_ids.len(),
2188 "historic_sync (Negentropy) created subscritions to fetch missing events by id, awaiting EOSE"
2189 );
2661 } else { 2190 } else {
2662 // Traditional REQ+EOSE path 2191 // Traditional REQ+EOSE path
2663 tracing::debug!( 2192 tracing::debug!(
@@ -2673,7 +2202,16 @@ impl SyncManager {
2673 // Subscribe to each filter and collect subscription IDs 2202 // Subscribe to each filter and collect subscription IDs
2674 let mut subscription_ids = HashSet::new(); 2203 let mut subscription_ids = HashSet::new();
2675 2204
2676 for filter in &filters_with_since { 2205 // DEBUG TRACING: Log each filter in REQ+EOSE path
2206 for (idx, filter) in filters_with_since.iter().enumerate() {
2207 tracing::debug!(
2208 relay = %relay_url,
2209 batch_id = batch_id,
2210 filter_idx = idx,
2211 filter = ?filter,
2212 "Subscribing to filter in REQ+EOSE path"
2213 );
2214
2677 if let Some(conn) = self.connections.get(relay_url) { 2215 if let Some(conn) = self.connections.get(relay_url) {
2678 match conn.subscribe_filter(filter.clone()).await { 2216 match conn.subscribe_filter(filter.clone()).await {
2679 Ok(sub_id) => { 2217 Ok(sub_id) => {
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 4167a0c..bc4b59e 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -325,11 +325,25 @@ impl RelayConnection {
325 /// * `Ok(SubscriptionId)` - The subscription ID on success 325 /// * `Ok(SubscriptionId)` - The subscription ID on success
326 /// * `Err(String)` - Error description on failure 326 /// * `Err(String)` - Error description on failure
327 pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> { 327 pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> {
328 // DEBUG TRACING: Log the filter being subscribed to
329 tracing::debug!(
330 relay = %self.url,
331 filter = ?filter,
332 "subscribe_filter called with filter"
333 );
334
328 let output = self 335 let output = self
329 .client 336 .client
330 .subscribe(filter, None) 337 .subscribe(filter, None)
331 .await 338 .await
332 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; 339 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
340
341 tracing::debug!(
342 relay = %self.url,
343 subscription_id = %output.val,
344 "subscribe_filter succeeded"
345 );
346
333 Ok(output.val) 347 Ok(output.val)
334 } 348 }
335 349
@@ -407,31 +421,36 @@ impl RelayConnection {
407 true 421 true
408 } 422 }
409 423
410 /// Perform negentropy synchronization for a filter 424 /// Perform a negentropy sync diff (dry run) to identify missing events
411 /// 425 ///
412 /// Uses NIP-77 negentropy protocol to efficiently reconcile events matching 426 /// This method performs NIP-77 negentropy reconciliation without downloading events.
413 /// the filter between local database and remote relay. This is much more 427 /// It returns the list of event IDs that need to be fetched. The caller should then
414 /// efficient than REQ+EOSE for relays with overlapping event sets. 428 /// manually fetch these events and pass them through the write policy for validation.
415 /// 429 ///
416 /// # Arguments 430 /// # Arguments
417 /// * `filter` - The filter defining which events to sync 431 /// * `filter` - The filter to sync
418 /// 432 ///
419 /// # Returns 433 /// # Returns
420 /// * `Ok(NegentropySyncResult)` - Sync completed successfully with reconciliation info 434 /// * `Ok(Reconciliation)` - Reconciliation result with remote/local/sent event IDs
421 /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error) 435 /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error)
422 /// 436 ///
423 /// # Fallback Behavior 437 /// # Usage Pattern
424 /// If this method fails, the caller should fall back to traditional REQ+EOSE sync. 438 /// ```ignore
425 /// Failure reasons include: 439 /// // 1. Get the diff
426 /// - Relay doesn't actually support NIP-77 (despite claiming to) 440 /// let reconciliation = conn.negentropy_sync_diff(filter).await?;
427 /// - Network errors during reconciliation 441 ///
428 /// - Timeout during sync 442 /// // 2. Fetch missing events by ID
429 pub async fn negentropy_sync_filter( 443 /// if !reconciliation.remote.is_empty() {
430 &self, 444 /// let ids: Vec<EventId> = reconciliation.remote.into_iter().collect();
431 filter: Filter, 445 /// let filter = Filter::new().ids(ids);
432 ) -> Result<NegentropySyncResult, String> { 446 /// conn.subscribe_filter(filter, tx).await?;
433 // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange 447 /// }
434 let sync_opts = SyncOptions::default(); 448 ///
449 /// // 3. Events come through normal flow and get validated via process_event_static
450 /// ```
451 pub async fn negentropy_sync_diff(&self, filter: Filter) -> Result<Reconciliation, String> {
452 // Use dry_run to only identify differences without downloading events
453 let sync_opts = SyncOptions::default().dry_run();
435 454
436 match self.client.sync(filter.clone(), &sync_opts).await { 455 match self.client.sync(filter.clone(), &sync_opts).await {
437 Ok(output) => { 456 Ok(output) => {
@@ -441,9 +460,7 @@ impl RelayConnection {
441 relay = %self.url, 460 relay = %self.url,
442 local_count = reconciliation.local.len(), 461 local_count = reconciliation.local.len(),
443 remote_count = reconciliation.remote.len(), 462 remote_count = reconciliation.remote.len(),
444 sent_count = reconciliation.sent.len(), 463 "Negentropy diff completed (dry run)"
445 received_count = reconciliation.received.len(),
446 "Negentropy sync completed"
447 ); 464 );
448 465
449 // Check for any failures 466 // Check for any failures
@@ -451,15 +468,11 @@ impl RelayConnection {
451 tracing::warn!( 468 tracing::warn!(
452 relay = %self.url, 469 relay = %self.url,
453 failures = ?output.failed, 470 failures = ?output.failed,
454 "Some relays failed during negentropy sync" 471 "Some relays failed during negentropy diff"
455 ); 472 );
456 } 473 }
457 474
458 Ok(NegentropySyncResult { 475 Ok(reconciliation)
459 remote_only: reconciliation.remote.into_iter().collect(),
460 local_only: reconciliation.local.into_iter().collect(),
461 received: reconciliation.received.into_iter().collect(),
462 })
463 } 476 }
464 Err(e) => { 477 Err(e) => {
465 // Log warning only once per relay to avoid spam 478 // Log warning only once per relay to avoid spam
@@ -470,30 +483,14 @@ impl RelayConnection {
470 tracing::warn!( 483 tracing::warn!(
471 relay = %self.url, 484 relay = %self.url,
472 error = %e, 485 error = %e,
473 "Negentropy sync failed, will fall back to REQ+EOSE" 486 "Negentropy diff failed, will fall back to REQ+EOSE"
474 ); 487 );
475 } 488 }
476 Err(format!("Negentropy sync failed: {}", e)) 489 Err(format!("Negentropy diff failed: {}", e))
477 } 490 }
478 } 491 }
479 } 492 }
480 493
481 /// Perform negentropy sync and return received event IDs
482 ///
483 /// Convenience method that performs negentropy sync and returns the event IDs
484 /// that were received (i.e., events that exist on remote but not locally).
485 ///
486 /// # Arguments
487 /// * `filter` - The filter defining which events to sync
488 ///
489 /// # Returns
490 /// * `Ok(Vec<EventId>)` - Event IDs received from remote relay
491 /// * `Err(String)` - Sync failed
492 pub async fn negentropy_sync_and_fetch(&self, filter: Filter) -> Result<Vec<EventId>, String> {
493 let result = self.negentropy_sync_filter(filter).await?;
494 Ok(result.received)
495 }
496
497 /// Check if this connection has a database configured for negentropy 494 /// Check if this connection has a database configured for negentropy
498 pub fn has_database(&self) -> bool { 495 pub fn has_database(&self) -> bool {
499 self.database.is_some() 496 self.database.is_some()