upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-16 15:26:55 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-16 15:26:55 +0000
commit7821b107190cc116a30a4c339f935bc16a1d5197 (patch)
treed9cc8f440304f383aa75689eb6c1f87cc75fd20d /src/sync
parent2164f075d441d7337b2b3d7ed85993fc69b8057e (diff)
proactive sync prep - some helper functions written but not enabled
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs33
-rw-r--r--src/sync/mod.rs700
-rw-r--r--src/sync/self_subscriber.rs8
3 files changed, 688 insertions, 53 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 5b5b520..84248b1 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -11,7 +11,9 @@ use std::collections::{HashMap, HashSet};
11 11
12use nostr_sdk::prelude::*; 12use nostr_sdk::prelude::*;
13 13
14use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; 14use crate::sync::PendingItems;
15
16use super::{ConnectionStatus, PendingBatch, RelayState};
15 17
16// ============================================================================= 18// =============================================================================
17// Data Structures 19// Data Structures
@@ -36,10 +38,8 @@ pub struct RelaySyncNeeds {
36pub struct AddFilters { 38pub struct AddFilters {
37 /// The relay URL to add filters to 39 /// The relay URL to add filters to
38 pub relay_url: String, 40 pub relay_url: String,
39 /// Repos being synced in this action 41 /// pending items - repos and root events
40 pub repos: HashSet<String>, 42 pub items: PendingItems,
41 /// Root events being tracked in this action
42 pub root_events: HashSet<EventId>,
43 /// The actual filters to subscribe with 43 /// The actual filters to subscribe with
44 pub filters: Vec<Filter>, 44 pub filters: Vec<Filter>,
45} 45}
@@ -161,8 +161,10 @@ pub fn compute_actions(
161 161
162 actions.push(AddFilters { 162 actions.push(AddFilters {
163 relay_url: relay_url.clone(), 163 relay_url: relay_url.clone(),
164 repos: new_repos, 164 items: PendingItems {
165 root_events: new_events, 165 repos: new_repos,
166 root_events: new_events,
167 },
166 filters, 168 filters,
167 }); 169 });
168 } 170 }
@@ -175,6 +177,7 @@ pub fn compute_actions(
175mod tests { 177mod tests {
176 use super::*; 178 use super::*;
177 use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; 179 use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds;
180 use crate::sync::SyncMethod;
178 181
179 // ========================================================================= 182 // =========================================================================
180 // derive_relay_targets tests 183 // derive_relay_targets tests
@@ -371,7 +374,7 @@ mod tests {
371 assert_eq!(actions.len(), 1); 374 assert_eq!(actions.len(), 1);
372 let action = &actions[0]; 375 let action = &actions[0];
373 assert_eq!(action.relay_url, "wss://relay1.com"); 376 assert_eq!(action.relay_url, "wss://relay1.com");
374 assert!(action.repos.contains("repo1")); 377 assert!(action.items.repos.contains("repo1"));
375 assert!(!action.filters.is_empty()); 378 assert!(!action.filters.is_empty());
376 } 379 }
377 380
@@ -528,10 +531,10 @@ mod tests {
528 assert_eq!(actions.len(), 1); 531 assert_eq!(actions.len(), 1);
529 let action = &actions[0]; 532 let action = &actions[0];
530 // Only repo3 should be in the action (repo1 pending, repo2 confirmed) 533 // Only repo3 should be in the action (repo1 pending, repo2 confirmed)
531 assert_eq!(action.repos.len(), 1); 534 assert_eq!(action.items.repos.len(), 1);
532 assert!(action.repos.contains("repo3")); 535 assert!(action.items.repos.contains("repo3"));
533 assert!(!action.repos.contains("repo1")); 536 assert!(!action.items.repos.contains("repo1"));
534 assert!(!action.repos.contains("repo2")); 537 assert!(!action.items.repos.contains("repo2"));
535 } 538 }
536 539
537 #[test] 540 #[test]
@@ -554,9 +557,9 @@ mod tests {
554 557
555 assert_eq!(actions.len(), 1); 558 assert_eq!(actions.len(), 1);
556 let action = &actions[0]; 559 let action = &actions[0];
557 assert!(action.repos.is_empty()); 560 assert!(action.items.repos.is_empty());
558 assert_eq!(action.root_events.len(), 1); 561 assert_eq!(action.items.root_events.len(), 1);
559 assert!(action.root_events.contains(&event_id)); 562 assert!(action.items.root_events.contains(&event_id));
560 // Should have 3 filters for the root event (e, E, q tags) 563 // Should have 3 filters for the root event (e, E, q tags)
561 assert_eq!(action.filters.len(), 3); 564 assert_eq!(action.filters.len(), 3);
562 } 565 }
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 41586a4..401cf21 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -591,7 +591,7 @@ impl SyncManager {
591 } 591 }
592 592
593 // Recompute actions for Layer 2+3 based on synced events 593 // Recompute actions for Layer 2+3 based on synced events
594 self.recompute_actions_for_relay(relay_url).await; 594 self.recompute_new_sync_filters_for_relay(relay_url).await;
595 } else { 595 } else {
596 // NIP-77 not supported - fall back to REQ+EOSE 596 // NIP-77 not supported - fall back to REQ+EOSE
597 tracing::info!( 597 tracing::info!(
@@ -612,7 +612,7 @@ impl SyncManager {
612 } 612 }
613 613
614 // Recompute actions for Layer 2+3 - will discover all repos/events again 614 // Recompute actions for Layer 2+3 - will discover all repos/events again
615 self.recompute_actions_for_relay(relay_url).await; 615 self.recompute_new_sync_filters_for_relay(relay_url).await;
616 } 616 }
617 617
618 if let Some(ref metrics) = self.metrics { 618 if let Some(ref metrics) = self.metrics {
@@ -709,7 +709,7 @@ impl SyncManager {
709 Some(add_filters) => { 709 Some(add_filters) => {
710 // Process AddFilters action directly 710 // Process AddFilters action directly
711 let mut manager = sync_manager.lock().await; 711 let mut manager = sync_manager.lock().await;
712 manager.handle_add_filters(add_filters).await; 712 manager.handle_new_sync_filters(add_filters).await;
713 } 713 }
714 None => break, 714 None => break,
715 } 715 }
@@ -763,13 +763,13 @@ impl SyncManager {
763 /// - For new relays: creates entry with Connecting status, spawns connection 763 /// - For new relays: creates entry with Connecting status, spawns connection
764 /// - For existing connected relays: subscribes to filters, creates PendingBatch 764 /// - For existing connected relays: subscribes to filters, creates PendingBatch
765 /// - For disconnected/connecting relays: returns (will be handled on connection) 765 /// - For disconnected/connecting relays: returns (will be handled on connection)
766 async fn handle_add_filters(&mut self, action: AddFilters) { 766 async fn handle_new_sync_filters(&mut self, action: AddFilters) {
767 tracing::info!( 767 tracing::info!(
768 relay = %action.relay_url, 768 relay = %action.relay_url,
769 repo_count = action.repos.len(), 769 repo_count = action.items.repos.len(),
770 root_event_count = action.root_events.len(), 770 root_event_count = action.items.root_events.len(),
771 filter_count = action.filters.len(), 771 filter_count = action.filters.len(),
772 "[DIAG] handle_add_filters called" 772 "[DIAG] handle_new_sync_filters called"
773 ); 773 );
774 774
775 // Step 1: Check if relay exists in relay_sync_index 775 // Step 1: Check if relay exists in relay_sync_index
@@ -801,7 +801,7 @@ impl SyncManager {
801 801
802 tracing::info!( 802 tracing::info!(
803 relay = %action.relay_url, 803 relay = %action.relay_url,
804 repos = action.repos.len(), 804 repos = action.items.repos.len(),
805 "Spawning connection for new relay" 805 "Spawning connection for new relay"
806 ); 806 );
807 807
@@ -827,7 +827,7 @@ impl SyncManager {
827 // Step 2: Check if consolidation is needed BEFORE adding new filters 827 // Step 2: Check if consolidation is needed BEFORE adding new filters
828 self.maybe_consolidate(&action.relay_url, action.filters.len()) 828 self.maybe_consolidate(&action.relay_url, action.filters.len())
829 .await; 829 .await;
830 830 /// DELETE this bit
831 // Step 3: Get connection and subscribe to all filters 831 // Step 3: Get connection and subscribe to all filters
832 let connection = match self.connections.get(&action.relay_url) { 832 let connection = match self.connections.get(&action.relay_url) {
833 Some(conn) => conn, 833 Some(conn) => conn,
@@ -870,8 +870,8 @@ impl SyncManager {
870 let batch = PendingBatch { 870 let batch = PendingBatch {
871 batch_id, 871 batch_id,
872 items: PendingItems { 872 items: PendingItems {
873 repos: action.repos.clone(), 873 repos: action.items.repos.clone(),
874 root_events: action.root_events.clone(), 874 root_events: action.items.root_events.clone(),
875 }, 875 },
876 outstanding_subs: subscription_ids.into_iter().collect(), 876 outstanding_subs: subscription_ids.into_iter().collect(),
877 sync_method: SyncMethod::ReqEose, 877 sync_method: SyncMethod::ReqEose,
@@ -889,33 +889,84 @@ impl SyncManager {
889 tracing::debug!( 889 tracing::debug!(
890 relay = %action.relay_url, 890 relay = %action.relay_url,
891 batch_id = batch_id, 891 batch_id = batch_id,
892 repos = action.repos.len(), 892 repos = action.items.repos.len(),
893 root_events = action.root_events.len(), 893 root_events = action.items.root_events.len(),
894 filters = action.filters.len(), 894 filters = action.filters.len(),
895 "Created pending batch for filter subscriptions" 895 "Created pending batch for filter subscriptions"
896 ); 896 );
897 // REPLACE WITH THIS:
898 // // Subscribe to each filter and collect subscription IDs
899 // self.sync_live(&action.relay_url, &action.filters).await;
900 // // TODO need to do actions.repos
901 // self.historic_sync(&action.relay_url, action.filters, action.items, None)
902 // .await;
897 } 903 }
898 904
899 /// Handle a connection success (called when a relay connects or reconnects) 905 /// Handle a connection success (called when a relay connects or reconnects)
900 /// 906 ///
901 /// This method implements smart reconnection logic: 907 /// This method dispatches to the appropriate reconnection strategy:
902 /// - Fresh sync if never connected or >15 min since last connection 908 /// - `fresh_start()` if never connected before
903 /// - Quick reconnect with since filter if <15 min since last connection 909 /// - `quick_reconnect()` if disconnected < 15 minutes
904 /// 910 /// - `long_reconnect()` if disconnected > 15 minutes
905 /// For fresh sync (with NIP-77 negentropy if supported):
906 /// - Clears any stale state
907 /// - Uses negentropy sync for Layer 1 (if NIP-77 supported)
908 /// - Falls back to REQ+EOSE if NIP-77 not supported
909 /// - Recomputes actions for new items
910 ///
911 /// For quick reconnect:
912 /// - Preserves existing state
913 /// - Subscribes to Layer 1 with since filter
914 /// - Rebuilds Layer 2 and Layer 3 with since filter
915 /// - Recomputes actions for new items
916 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { 911 async fn handle_connect_or_reconnect(&mut self, relay_url: &str) {
917 let now = Timestamp::now(); 912 let now = Timestamp::now();
918 913
914 // // Get the relay state to determine reconnect type
915 // let (last_connected, disconnected_at) = {
916 // let index = self.relay_sync_index.read().await;
917 // if let Some(state) = index.get(relay_url) {
918 // (state.last_connected, state.disconnected_at)
919 // } else {
920 // (None, None) // No state found
921 // }
922 // };
923
924 // // Determine which reconnection strategy to use
925 // match (last_connected, disconnected_at) {
926 // (None, _) => {
927 // // Never connected before - fresh start
928 // tracing::info!(
929 // relay = %relay_url,
930 // "First connection - initiating fresh_start"
931 // );
932 // self.fresh_start(relay_url).await;
933 // }
934 // (Some(last), Some(disconnected)) => {
935 // // Was connected before, check how long disconnected
936 // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs());
937
938 // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS {
939 // // Disconnected < 15 minutes - quick reconnect
940 // // Use last_connected minus buffer as since timestamp
941 // let since =
942 // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS));
943 // tracing::info!(
944 // relay = %relay_url,
945 // disconnect_secs = disconnect_duration,
946 // since = %since,
947 // "Short disconnection - initiating quick_reconnect"
948 // );
949 // self.quick_reconnect(relay_url, since).await;
950 // } else {
951 // // Disconnected > 15 minutes - long reconnect
952 // tracing::info!(
953 // relay = %relay_url,
954 // disconnect_secs = disconnect_duration,
955 // "Long disconnection - initiating long_reconnect"
956 // );
957 // self.long_reconnect(relay_url).await;
958 // }
959 // }
960 // (Some(_last), None) => {
961 // // Was connected but no disconnected_at - shouldn't happen normally
962 // // Treat as long reconnect to be safe
963 // tracing::warn!(
964 // relay = %relay_url,
965 // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect"
966 // );
967 // self.long_reconnect(relay_url).await;
968 // }
969 // }
919 // Get the relay state to determine reconnect type 970 // Get the relay state to determine reconnect type
920 let (is_fresh_sync, last_connected, is_bootstrap) = { 971 let (is_fresh_sync, last_connected, is_bootstrap) = {
921 let index = self.relay_sync_index.read().await; 972 let index = self.relay_sync_index.read().await;
@@ -998,7 +1049,7 @@ impl SyncManager {
998 1049
999 // After negentropy sync, recompute Layer 2+3 actions 1050 // After negentropy sync, recompute Layer 2+3 actions
1000 // Layer 1 events are now in sync, so we can proceed with Layer 2+3 1051 // Layer 1 events are now in sync, so we can proceed with Layer 2+3
1001 self.recompute_actions_for_relay(relay_url).await; 1052 self.recompute_new_sync_filters_for_relay(relay_url).await;
1002 1053
1003 // Set up live subscription for new events (since=now) 1054 // Set up live subscription for new events (since=now)
1004 let live_filter = filters::build_announcement_filter(Some(now)); 1055 let live_filter = filters::build_announcement_filter(Some(now));
@@ -1021,7 +1072,7 @@ impl SyncManager {
1021 // during connect_and_subscribe() in handle_add_filters(). That call subscribes 1072 // during connect_and_subscribe() in handle_add_filters(). That call subscribes
1022 // to kinds 30617+30618 for the full history. Here we only need to recompute 1073 // to kinds 30617+30618 for the full history. Here we only need to recompute
1023 // Layer 2+3 actions based on the repos we're tracking. 1074 // Layer 2+3 actions based on the repos we're tracking.
1024 self.recompute_actions_for_relay(relay_url).await; 1075 self.recompute_new_sync_filters_for_relay(relay_url).await;
1025 } 1076 }
1026 } else { 1077 } else {
1027 // Quick reconnect: use since filter (no negentropy needed) 1078 // Quick reconnect: use since filter (no negentropy needed)
@@ -1055,7 +1106,7 @@ impl SyncManager {
1055 .await; 1106 .await;
1056 1107
1057 // Recompute actions for any new items discovered while disconnected 1108 // Recompute actions for any new items discovered while disconnected
1058 self.recompute_actions_for_relay(relay_url).await; 1109 self.recompute_new_sync_filters_for_relay(relay_url).await;
1059 1110
1060 if let Some(ref metrics) = self.metrics { 1111 if let Some(ref metrics) = self.metrics {
1061 metrics.record_event(event_source::RECONNECT); 1112 metrics.record_event(event_source::RECONNECT);
@@ -1063,6 +1114,225 @@ impl SyncManager {
1063 } 1114 }
1064 } 1115 }
1065 1116
1117 /// Fresh start - clears state and does full sync
1118 ///
1119 /// Called by: initial connect, long_reconnect, daily_sync
1120 ///
1121 /// Flow:
1122 /// 1. Clear PendingSyncIndex for this relay
1123 /// 2. Clear RelaySyncIndex sync state (repos/root_events)
1124 /// 3. Update connection state to Connected
1125 /// 4. L1 live + L1 historic (negentropy if available)
1126 /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3
1127 async fn fresh_start(&mut self, relay_url: &str) {
1128 let now = Timestamp::now();
1129
1130 tracing::info!(relay = %relay_url, "Starting fresh_start");
1131
1132 // Step 1: Clear PendingSyncIndex for this relay
1133 {
1134 let mut pending = self.pending_sync_index.write().await;
1135 if pending.remove(relay_url).is_some() {
1136 tracing::debug!(
1137 relay = %relay_url,
1138 "Cleared pending batches in fresh_start"
1139 );
1140 }
1141 }
1142
1143 // Step 2: Clear RelaySyncIndex sync state (but preserve connection metadata)
1144 {
1145 let mut index = self.relay_sync_index.write().await;
1146 if let Some(state) = index.get_mut(relay_url) {
1147 let repos_cleared = state.repos.len();
1148 let events_cleared = state.root_events.len();
1149 state.clear_sync_state();
1150 if repos_cleared > 0 || events_cleared > 0 {
1151 tracing::debug!(
1152 relay = %relay_url,
1153 repos_cleared = repos_cleared,
1154 events_cleared = events_cleared,
1155 "Cleared sync state in fresh_start"
1156 );
1157 }
1158 }
1159 }
1160
1161 // Step 3: Update connection state
1162 {
1163 let mut index = self.relay_sync_index.write().await;
1164 let state = index.entry(relay_url.to_string()).or_default();
1165 state.connection_status = ConnectionStatus::Connected;
1166 state.last_connected = Some(now);
1167 state.disconnected_at = None;
1168 }
1169
1170 // Record success in health tracker
1171 self.health_tracker.record_success(relay_url);
1172
1173 // Update metrics
1174 if let Some(ref metrics) = self.metrics {
1175 metrics.set_relay_connected(relay_url, true);
1176 metrics.inc_connected_count();
1177 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1178 }
1179
1180 // Step 4: L1 sync - check negentropy support
1181 let use_negentropy = if self.config.sync_disable_negentropy {
1182 tracing::debug!(relay = %relay_url, "Negentropy disabled via config");
1183 false
1184 } else if let Some(connection) = self.connections.get(relay_url) {
1185 connection.supports_negentropy().await
1186 } else {
1187 false
1188 };
1189
1190 if use_negentropy {
1191 // NIP-77 supported - use negentropy for L1 historical sync
1192 tracing::info!(
1193 relay = %relay_url,
1194 "Using NIP-77 negentropy for L1 historical sync"
1195 );
1196
1197 // L1 historic sync (no since - full sync)
1198 let layer1_filter = filters::build_announcement_filter(None);
1199 self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)")
1200 .await;
1201
1202 // L1 live subscription (since=now for ongoing events)
1203 let live_filter = filters::build_announcement_filter(Some(now));
1204 if let Some(connection) = self.connections.get(relay_url) {
1205 if let Err(e) = connection.subscribe_filter(live_filter).await {
1206 tracing::error!(
1207 relay = %relay_url,
1208 error = %e,
1209 "Failed to set up L1 live subscription in fresh_start"
1210 );
1211 }
1212 }
1213 } else {
1214 // NIP-77 not supported - REQ+EOSE
1215 // Note: Layer 1 subscription (without since) was already established
1216 // during connect_and_subscribe() in spawn_relay_connection
1217 tracing::info!(
1218 relay = %relay_url,
1219 "Using REQ+EOSE for L1 sync (negentropy not available)"
1220 );
1221 }
1222
1223 // Step 5: compute_actions → AddFilters for L2+L3
1224 // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters
1225 // for ALL repos that should be synced from this relay
1226 self.recompute_new_sync_filters_for_relay(relay_url).await;
1227
1228 tracing::info!(relay = %relay_url, "fresh_start complete");
1229 }
1230
1231 /// Quick reconnect - for disconnections < 15 minutes
1232 ///
1233 /// Flow:
1234 /// 1. Clear PendingSyncIndex for this relay
1235 /// 2. Update connection state to Connected
1236 /// 3. L1 live + L1 historic(since)
1237 /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since)
1238 /// 5. compute_actions for any new items discovered during catchup
1239 async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) {
1240 let now = Timestamp::now();
1241
1242 tracing::info!(
1243 relay = %relay_url,
1244 since = %since,
1245 "Starting quick_reconnect"
1246 );
1247
1248 // Step 1: Clear PendingSyncIndex for this relay
1249 // Old subscriptions are dead after disconnect
1250 {
1251 let mut pending = self.pending_sync_index.write().await;
1252 if pending.remove(relay_url).is_some() {
1253 tracing::debug!(
1254 relay = %relay_url,
1255 "Cleared pending batches in quick_reconnect"
1256 );
1257 }
1258 }
1259
1260 // Step 2: Update connection state (preserve repos/root_events - that's the point!)
1261 {
1262 let mut index = self.relay_sync_index.write().await;
1263 let state = index.entry(relay_url.to_string()).or_default();
1264 state.connection_status = ConnectionStatus::Connected;
1265 state.last_connected = Some(now);
1266 state.disconnected_at = None;
1267 }
1268
1269 // Record success in health tracker
1270 self.health_tracker.record_success(relay_url);
1271
1272 // Update metrics
1273 if let Some(ref metrics) = self.metrics {
1274 metrics.set_relay_connected(relay_url, true);
1275 metrics.inc_connected_count();
1276 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1277 metrics.record_event(event_source::RECONNECT);
1278 }
1279
1280 // Step 3: L1 live + L1 historic with since filter
1281 // L1 live subscription (since=now for ongoing events)
1282 let live_filter = filters::build_announcement_filter(Some(now));
1283 if let Some(connection) = self.connections.get(relay_url) {
1284 if let Err(e) = connection.subscribe_filter(live_filter).await {
1285 tracing::error!(
1286 relay = %relay_url,
1287 error = %e,
1288 "Failed to set up L1 live subscription in quick_reconnect"
1289 );
1290 }
1291 }
1292
1293 // L1 historic with since filter (catch up on missed announcements)
1294 let layer1_filter = filters::build_announcement_filter(Some(since));
1295 if let Some(connection) = self.connections.get(relay_url) {
1296 if let Err(e) = connection.subscribe_filter(layer1_filter).await {
1297 tracing::error!(
1298 relay = %relay_url,
1299 error = %e,
1300 "Failed to subscribe to L1 historic filter in quick_reconnect"
1301 );
1302 }
1303 }
1304
1305 // Step 4: Rebuild L2+L3 from confirmed state with since filter
1306 // This uses the preserved repos/root_events from RelaySyncIndex
1307 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
1308
1309 // Step 5: compute_actions for any NEW items discovered while disconnected
1310 self.recompute_new_sync_filters_for_relay(relay_url).await;
1311
1312 tracing::info!(relay = %relay_url, "quick_reconnect complete");
1313 }
1314
1315 /// Long reconnect - for disconnections > 15 minutes
1316 ///
1317 /// Flow:
1318 /// 1. Record disconnect/reconnect metric
1319 /// 2. Delegate to fresh_start()
1320 async fn long_reconnect(&mut self, relay_url: &str) {
1321 tracing::info!(relay = %relay_url, "Starting long_reconnect");
1322
1323 // Step 1: Record disconnect/reconnect metric
1324 // This distinguishes intentional daily refresh from failure recovery
1325 if let Some(ref metrics) = self.metrics {
1326 metrics.record_event(event_source::RECONNECT);
1327 }
1328
1329 // Step 2: Delegate to fresh_start
1330 // State is too stale to trust, start fresh
1331 self.fresh_start(relay_url).await;
1332
1333 tracing::info!(relay = %relay_url, "long_reconnect complete");
1334 }
1335
1066 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay 1336 /// Rebuild Layer 2 and Layer 3 subscriptions for a relay
1067 /// 1337 ///
1068 /// Uses the confirmed repos and root_events from RelayState to build filters. 1338 /// Uses the confirmed repos and root_events from RelayState to build filters.
@@ -1129,7 +1399,7 @@ impl SyncManager {
1129 /// 1399 ///
1130 /// Uses derive_relay_targets and compute_actions to find new items 1400 /// Uses derive_relay_targets and compute_actions to find new items
1131 /// that need to be synced. Processes AddFilters actions for new items. 1401 /// that need to be synced. Processes AddFilters actions for new items.
1132 async fn recompute_actions_for_relay(&mut self, relay_url: &str) { 1402 async fn recompute_new_sync_filters_for_relay(&mut self, relay_url: &str) {
1133 use crate::sync::algorithms::{compute_actions, derive_relay_targets}; 1403 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
1134 1404
1135 // Get current state from indexes (need to collect to avoid holding locks) 1405 // Get current state from indexes (need to collect to avoid holding locks)
@@ -1173,12 +1443,12 @@ impl SyncManager {
1173 for action in actions { 1443 for action in actions {
1174 tracing::info!( 1444 tracing::info!(
1175 relay = %action.relay_url, 1445 relay = %action.relay_url,
1176 new_repos = action.repos.len(), 1446 new_repos = action.items.repos.len(),
1177 new_root_events = action.root_events.len(), 1447 new_root_events = action.items.root_events.len(),
1178 filters = action.filters.len(), 1448 filters = action.filters.len(),
1179 "Processing AddFilters for new items" 1449 "Processing AddFilters for new items"
1180 ); 1450 );
1181 self.handle_add_filters(action).await; 1451 self.handle_new_sync_filters(action).await;
1182 } 1452 }
1183 } 1453 }
1184 1454
@@ -2095,6 +2365,366 @@ impl SyncManager {
2095 } 2365 }
2096 } 2366 }
2097 2367
2368 // =========================================================================
2369 // Sync Primitives (Phase 1 of GRASP-02 refactoring)
2370 // These methods are new primitives that will be used in subsequent phases.
2371 // =========================================================================
2372
2373 /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex
2374 #[allow(dead_code)] // Will be used in Phase 2+
2375 ///
2376 /// This method subscribes to filters with `limit: 0` for receiving ongoing events.
2377 /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have
2378 /// a definite "completion" - they stay open indefinitely.
2379 ///
2380 /// Used for:
2381 /// - Layer 1 live subscription (new announcements after initial sync)
2382 /// - Layer 2+3 live subscriptions (new events after initial sync)
2383 ///
2384 /// # Arguments
2385 /// * `relay_url` - The relay URL to subscribe on
2386 /// * `filters` - Filters to subscribe to (will have `limit: 0` applied)
2387 ///
2388 /// # Returns
2389 /// Vec of subscription IDs for the live subscriptions, or empty if connection not found
2390 async fn sync_live(&self, relay_url: &str, filters: &[Filter]) -> Vec<SubscriptionId> {
2391 if filters.is_empty() {
2392 return vec![];
2393 }
2394
2395 let connection = match self.connections.get(relay_url) {
2396 Some(conn) => conn,
2397 None => {
2398 tracing::warn!(
2399 relay = %relay_url,
2400 "No connection found for sync_live"
2401 );
2402 return vec![];
2403 }
2404 };
2405
2406 let mut sub_ids = Vec::new();
2407
2408 for filter in filters {
2409 // Apply limit: 0 to make this a live subscription
2410 // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit",
2411 // but omitting limit means "no limit" which is what we want for live.
2412 // The filter passed in should already NOT have a limit set.
2413 match connection.subscribe_filter(filter.clone()).await {
2414 Ok(sub_id) => {
2415 tracing::trace!(
2416 relay = %relay_url,
2417 sub_id = %sub_id,
2418 "Live subscription created"
2419 );
2420 sub_ids.push(sub_id);
2421 }
2422 Err(e) => {
2423 tracing::error!(
2424 relay = %relay_url,
2425 error = %e,
2426 "Failed to create live subscription"
2427 );
2428 }
2429 }
2430 }
2431
2432 tracing::debug!(
2433 relay = %relay_url,
2434 filter_count = filters.len(),
2435 sub_count = sub_ids.len(),
2436 "sync_live completed"
2437 );
2438
2439 sub_ids
2440 }
2441
2442 /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY)
2443 ///
2444 /// Returns raw Vec<Filter> for L1+L2+L3.
2445 /// Used by: quick_reconnect, consolidate
2446 /// Does NOT include pending items - those flow through AddFilters path.
2447 ///
2448 /// # Arguments
2449 /// * `relay_url` - The relay URL to reconstruct filters for
2450 ///
2451 /// # Returns
2452 /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags)
2453 #[allow(dead_code)] // Will be used in Phase 3+
2454 async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> {
2455 // Get confirmed state from relay_sync_index
2456 let (repos, root_events) = {
2457 let index = self.relay_sync_index.read().await;
2458 match index.get(relay_url) {
2459 Some(state) => (state.repos.clone(), state.root_events.clone()),
2460 None => {
2461 tracing::warn!(
2462 relay = %relay_url,
2463 "No RelayState found for reconstruct_filters"
2464 );
2465 return vec![];
2466 }
2467 }
2468 };
2469
2470 let mut all_filters = Vec::new();
2471
2472 // Layer 1: Announcements (always included)
2473 // Note: No `since` filter - this returns raw filters for live subscriptions
2474 all_filters.push(filters::build_announcement_filter(None));
2475
2476 // Layer 2 + Layer 3: Repo and root event tag filters
2477 if !repos.is_empty() || !root_events.is_empty() {
2478 let l2_l3_filters =
2479 filters::build_layer2_and_layer3_filters(&repos, &root_events, None);
2480 all_filters.extend(l2_l3_filters);
2481 }
2482
2483 tracing::debug!(
2484 relay = %relay_url,
2485 total_filters = all_filters.len(),
2486 repos_count = repos.len(),
2487 root_events_count = root_events.len(),
2488 "Reconstructed filters from confirmed state"
2489 );
2490
2491 all_filters
2492 }
2493
2494 /// Sync historical events and track in PendingSyncIndex
2495 #[allow(dead_code)] // Will be used in Phase 3+
2496 ///
2497 /// This method handles historical synchronization for a set of filters,
2498 /// creating a PendingBatch to track completion. It dispatches to either
2499 /// negentropy sync or traditional REQ+EOSE based on relay capability and config.
2500 ///
2501 /// Used for:
2502 /// - Initial sync (no since filter)
2503 /// - Reconnect sync (with since filter)
2504 /// - Daily sync (no since filter, full re-sync)
2505 ///
2506 /// # Arguments
2507 /// * `relay_url` - The relay URL to sync from
2508 /// * `filters` - Filters to sync (will have `since` applied if provided)
2509 /// * `items` - Items being synced (for tracking in PendingBatch)
2510 /// * `since` - Optional timestamp for incremental sync
2511 ///
2512 /// # Returns
2513 /// * `Some(batch_id)` - Batch was created and sync initiated
2514 /// * `None` - No connection or sync failed to start
2515 async fn historic_sync(
2516 &mut self,
2517 relay_url: &str,
2518 filters: Vec<Filter>,
2519 items: PendingItems,
2520 since: Option<Timestamp>,
2521 ) -> Option<u64> {
2522 if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() {
2523 tracing::debug!(
2524 relay = %relay_url,
2525 "historic_sync called with empty filters and items, skipping"
2526 );
2527 return None;
2528 }
2529
2530 // Check connection exists
2531 let connection = match self.connections.get(relay_url) {
2532 Some(conn) => conn,
2533 None => {
2534 tracing::warn!(
2535 relay = %relay_url,
2536 "No connection found for historic_sync"
2537 );
2538 return None;
2539 }
2540 };
2541
2542 // Apply since filter if provided
2543 let filters_with_since: Vec<Filter> = if let Some(ts) = since {
2544 filters.into_iter().map(|f| f.since(ts)).collect()
2545 } else {
2546 filters
2547 };
2548
2549 // Check if we should use negentropy
2550 let use_negentropy =
2551 !self.config.sync_disable_negentropy && connection.supports_negentropy().await;
2552
2553 // Generate batch ID
2554 let batch_id = self.next_batch_id();
2555
2556 if use_negentropy && !filters_with_since.is_empty() {
2557 // NIP-77 negentropy path
2558 tracing::debug!(
2559 relay = %relay_url,
2560 batch_id = batch_id,
2561 filter_count = filters_with_since.len(),
2562 repos = items.repos.len(),
2563 root_events = items.root_events.len(),
2564 "Starting historic_sync with negentropy"
2565 );
2566
2567 // Create PendingBatch for negentropy (empty outstanding_subs)
2568 let batch = PendingBatch {
2569 batch_id,
2570 items: items.clone(),
2571 outstanding_subs: HashSet::new(),
2572 sync_method: SyncMethod::Negentropy,
2573 };
2574
2575 // Add to pending_sync_index
2576 {
2577 let mut pending = self.pending_sync_index.write().await;
2578 pending
2579 .entry(relay_url.to_string())
2580 .or_insert_with(Vec::new)
2581 .push(batch);
2582 }
2583
2584 // Perform negentropy sync for each filter
2585 // Note: We sync each filter separately because negentropy works on a single filter
2586 let mut total_received = 0;
2587 let mut any_success = false;
2588
2589 for filter in &filters_with_since {
2590 if let Some(conn) = self.connections.get(relay_url) {
2591 match conn.negentropy_sync_filter(filter.clone()).await {
2592 Ok(result) => {
2593 total_received += result.received.len();
2594 any_success = true;
2595
2596 // Record metrics for received events
2597 if let Some(ref metrics) = self.metrics {
2598 for _ in 0..result.received.len() {
2599 metrics.record_event(event_source::STARTUP);
2600 }
2601 }
2602 }
2603 Err(e) => {
2604 tracing::warn!(
2605 relay = %relay_url,
2606 error = %e,
2607 "Negentropy sync failed for filter in historic_sync"
2608 );
2609 }
2610 }
2611 }
2612 }
2613
2614 if any_success {
2615 // Remove batch from pending and confirm it
2616 let completed_batch = {
2617 let mut pending = self.pending_sync_index.write().await;
2618 if let Some(batches) = pending.get_mut(relay_url) {
2619 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id);
2620 if let Some(idx) = batch_idx {
2621 let batch = batches.remove(idx);
2622 if batches.is_empty() {
2623 pending.remove(relay_url);
2624 }
2625 Some(batch)
2626 } else {
2627 None
2628 }
2629 } else {
2630 None
2631 }
2632 };
2633
2634 if let Some(batch) = completed_batch {
2635 self.confirm_batch(relay_url, batch).await;
2636 }
2637
2638 tracing::info!(
2639 relay = %relay_url,
2640 batch_id = batch_id,
2641 total_received = total_received,
2642 "historic_sync (negentropy) completed"
2643 );
2644 } else {
2645 // All negentropy syncs failed - remove the pending batch
2646 let mut pending = self.pending_sync_index.write().await;
2647 if let Some(batches) = pending.get_mut(relay_url) {
2648 batches.retain(|b| b.batch_id != batch_id);
2649 if batches.is_empty() {
2650 pending.remove(relay_url);
2651 }
2652 }
2653
2654 tracing::warn!(
2655 relay = %relay_url,
2656 batch_id = batch_id,
2657 "historic_sync (negentropy) failed for all filters"
2658 );
2659 return None;
2660 }
2661 } else {
2662 // Traditional REQ+EOSE path
2663 tracing::debug!(
2664 relay = %relay_url,
2665 batch_id = batch_id,
2666 filter_count = filters_with_since.len(),
2667 repos = items.repos.len(),
2668 root_events = items.root_events.len(),
2669 use_negentropy = use_negentropy,
2670 "Starting historic_sync with REQ+EOSE"
2671 );
2672
2673 // Subscribe to each filter and collect subscription IDs
2674 let mut subscription_ids = HashSet::new();
2675
2676 for filter in &filters_with_since {
2677 if let Some(conn) = self.connections.get(relay_url) {
2678 match conn.subscribe_filter(filter.clone()).await {
2679 Ok(sub_id) => {
2680 subscription_ids.insert(sub_id);
2681 }
2682 Err(e) => {
2683 tracing::error!(
2684 relay = %relay_url,
2685 error = %e,
2686 "Failed to subscribe to filter in historic_sync"
2687 );
2688 }
2689 }
2690 }
2691 }
2692
2693 if subscription_ids.is_empty() && !filters_with_since.is_empty() {
2694 tracing::warn!(
2695 relay = %relay_url,
2696 "All filter subscriptions failed in historic_sync"
2697 );
2698 return None;
2699 }
2700
2701 // Create PendingBatch for REQ+EOSE
2702 let batch = PendingBatch {
2703 batch_id,
2704 items,
2705 outstanding_subs: subscription_ids,
2706 sync_method: SyncMethod::ReqEose,
2707 };
2708
2709 // Add to pending_sync_index
2710 {
2711 let mut pending = self.pending_sync_index.write().await;
2712 pending
2713 .entry(relay_url.to_string())
2714 .or_insert_with(Vec::new)
2715 .push(batch);
2716 }
2717
2718 tracing::debug!(
2719 relay = %relay_url,
2720 batch_id = batch_id,
2721 "historic_sync (REQ+EOSE) batch created, awaiting EOSE"
2722 );
2723 }
2724
2725 Some(batch_id)
2726 }
2727
2098 /// Gracefully shutdown the SyncManager 2728 /// Gracefully shutdown the SyncManager
2099 /// 2729 ///
2100 /// This method: 2730 /// This method:
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 0379fe4..9643fc0 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -499,7 +499,7 @@ impl SelfSubscriber {
499 drop(index); // Release lock before async operations 499 drop(index); // Release lock before async operations
500 500
501 // For each relay, send AddFilters action directly 501 // For each relay, send AddFilters action directly
502 // SyncManager's handle_add_filters auto-spawns connection for unknown relays 502 // SyncManager's handle_new_sync_filters auto-spawns connection for unknown relays
503 for (relay_url, needs) in targets { 503 for (relay_url, needs) in targets {
504 // Skip our own relay URL (we're subscribed to ourselves via self-subscription) 504 // Skip our own relay URL (we're subscribed to ourselves via self-subscription)
505 if relay_url.contains(&self.relay_domain) { 505 if relay_url.contains(&self.relay_domain) {
@@ -519,8 +519,10 @@ impl SelfSubscriber {
519 519
520 let action = AddFilters { 520 let action = AddFilters {
521 relay_url: relay_url.clone(), 521 relay_url: relay_url.clone(),
522 repos: needs.repos, 522 items: crate::sync::PendingItems {
523 root_events: needs.root_events, 523 repos: needs.repos,
524 root_events: needs.root_events,
525 },
524 filters, 526 filters,
525 }; 527 };
526 528