diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-16 15:26:55 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-16 15:26:55 +0000 |
| commit | 7821b107190cc116a30a4c339f935bc16a1d5197 (patch) | |
| tree | d9cc8f440304f383aa75689eb6c1f87cc75fd20d /src | |
| parent | 2164f075d441d7337b2b3d7ed85993fc69b8057e (diff) | |
proactive sync prep - some helper functions written but not enabled
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/algorithms.rs | 33 | ||||
| -rw-r--r-- | src/sync/mod.rs | 700 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 8 |
3 files changed, 688 insertions, 53 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 5b5b520..84248b1 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -11,7 +11,9 @@ use std::collections::{HashMap, HashSet}; | |||
| 11 | 11 | ||
| 12 | use nostr_sdk::prelude::*; | 12 | use nostr_sdk::prelude::*; |
| 13 | 13 | ||
| 14 | use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; | 14 | use crate::sync::PendingItems; |
| 15 | |||
| 16 | use super::{ConnectionStatus, PendingBatch, RelayState}; | ||
| 15 | 17 | ||
| 16 | // ============================================================================= | 18 | // ============================================================================= |
| 17 | // Data Structures | 19 | // Data Structures |
| @@ -36,10 +38,8 @@ pub struct RelaySyncNeeds { | |||
| 36 | pub struct AddFilters { | 38 | pub struct AddFilters { |
| 37 | /// The relay URL to add filters to | 39 | /// The relay URL to add filters to |
| 38 | pub relay_url: String, | 40 | pub relay_url: String, |
| 39 | /// Repos being synced in this action | 41 | /// pending items - repos and root events |
| 40 | pub repos: HashSet<String>, | 42 | pub items: PendingItems, |
| 41 | /// Root events being tracked in this action | ||
| 42 | pub root_events: HashSet<EventId>, | ||
| 43 | /// The actual filters to subscribe with | 43 | /// The actual filters to subscribe with |
| 44 | pub filters: Vec<Filter>, | 44 | pub filters: Vec<Filter>, |
| 45 | } | 45 | } |
| @@ -161,8 +161,10 @@ pub fn compute_actions( | |||
| 161 | 161 | ||
| 162 | actions.push(AddFilters { | 162 | actions.push(AddFilters { |
| 163 | relay_url: relay_url.clone(), | 163 | relay_url: relay_url.clone(), |
| 164 | repos: new_repos, | 164 | items: PendingItems { |
| 165 | root_events: new_events, | 165 | repos: new_repos, |
| 166 | root_events: new_events, | ||
| 167 | }, | ||
| 166 | filters, | 168 | filters, |
| 167 | }); | 169 | }); |
| 168 | } | 170 | } |
| @@ -175,6 +177,7 @@ pub fn compute_actions( | |||
| 175 | mod tests { | 177 | mod tests { |
| 176 | use super::*; | 178 | use super::*; |
| 177 | use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; | 179 | use crate::sync::RepoSyncNeeds as ModRepoSyncNeeds; |
| 180 | use crate::sync::SyncMethod; | ||
| 178 | 181 | ||
| 179 | // ========================================================================= | 182 | // ========================================================================= |
| 180 | // derive_relay_targets tests | 183 | // derive_relay_targets tests |
| @@ -371,7 +374,7 @@ mod tests { | |||
| 371 | assert_eq!(actions.len(), 1); | 374 | assert_eq!(actions.len(), 1); |
| 372 | let action = &actions[0]; | 375 | let action = &actions[0]; |
| 373 | assert_eq!(action.relay_url, "wss://relay1.com"); | 376 | assert_eq!(action.relay_url, "wss://relay1.com"); |
| 374 | assert!(action.repos.contains("repo1")); | 377 | assert!(action.items.repos.contains("repo1")); |
| 375 | assert!(!action.filters.is_empty()); | 378 | assert!(!action.filters.is_empty()); |
| 376 | } | 379 | } |
| 377 | 380 | ||
| @@ -528,10 +531,10 @@ mod tests { | |||
| 528 | assert_eq!(actions.len(), 1); | 531 | assert_eq!(actions.len(), 1); |
| 529 | let action = &actions[0]; | 532 | let action = &actions[0]; |
| 530 | // Only repo3 should be in the action (repo1 pending, repo2 confirmed) | 533 | // Only repo3 should be in the action (repo1 pending, repo2 confirmed) |
| 531 | assert_eq!(action.repos.len(), 1); | 534 | assert_eq!(action.items.repos.len(), 1); |
| 532 | assert!(action.repos.contains("repo3")); | 535 | assert!(action.items.repos.contains("repo3")); |
| 533 | assert!(!action.repos.contains("repo1")); | 536 | assert!(!action.items.repos.contains("repo1")); |
| 534 | assert!(!action.repos.contains("repo2")); | 537 | assert!(!action.items.repos.contains("repo2")); |
| 535 | } | 538 | } |
| 536 | 539 | ||
| 537 | #[test] | 540 | #[test] |
| @@ -554,9 +557,9 @@ mod tests { | |||
| 554 | 557 | ||
| 555 | assert_eq!(actions.len(), 1); | 558 | assert_eq!(actions.len(), 1); |
| 556 | let action = &actions[0]; | 559 | let action = &actions[0]; |
| 557 | assert!(action.repos.is_empty()); | 560 | assert!(action.items.repos.is_empty()); |
| 558 | assert_eq!(action.root_events.len(), 1); | 561 | assert_eq!(action.items.root_events.len(), 1); |
| 559 | assert!(action.root_events.contains(&event_id)); | 562 | assert!(action.items.root_events.contains(&event_id)); |
| 560 | // Should have 3 filters for the root event (e, E, q tags) | 563 | // Should have 3 filters for the root event (e, E, q tags) |
| 561 | assert_eq!(action.filters.len(), 3); | 564 | assert_eq!(action.filters.len(), 3); |
| 562 | } | 565 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 41586a4..401cf21 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -591,7 +591,7 @@ impl SyncManager { | |||
| 591 | } | 591 | } |
| 592 | 592 | ||
| 593 | // Recompute actions for Layer 2+3 based on synced events | 593 | // Recompute actions for Layer 2+3 based on synced events |
| 594 | self.recompute_actions_for_relay(relay_url).await; | 594 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 595 | } else { | 595 | } else { |
| 596 | // NIP-77 not supported - fall back to REQ+EOSE | 596 | // NIP-77 not supported - fall back to REQ+EOSE |
| 597 | tracing::info!( | 597 | tracing::info!( |
| @@ -612,7 +612,7 @@ impl SyncManager { | |||
| 612 | } | 612 | } |
| 613 | 613 | ||
| 614 | // Recompute actions for Layer 2+3 - will discover all repos/events again | 614 | // Recompute actions for Layer 2+3 - will discover all repos/events again |
| 615 | self.recompute_actions_for_relay(relay_url).await; | 615 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 616 | } | 616 | } |
| 617 | 617 | ||
| 618 | if let Some(ref metrics) = self.metrics { | 618 | if let Some(ref metrics) = self.metrics { |
| @@ -709,7 +709,7 @@ impl SyncManager { | |||
| 709 | Some(add_filters) => { | 709 | Some(add_filters) => { |
| 710 | // Process AddFilters action directly | 710 | // Process AddFilters action directly |
| 711 | let mut manager = sync_manager.lock().await; | 711 | let mut manager = sync_manager.lock().await; |
| 712 | manager.handle_add_filters(add_filters).await; | 712 | manager.handle_new_sync_filters(add_filters).await; |
| 713 | } | 713 | } |
| 714 | None => break, | 714 | None => break, |
| 715 | } | 715 | } |
| @@ -763,13 +763,13 @@ impl SyncManager { | |||
| 763 | /// - For new relays: creates entry with Connecting status, spawns connection | 763 | /// - For new relays: creates entry with Connecting status, spawns connection |
| 764 | /// - For existing connected relays: subscribes to filters, creates PendingBatch | 764 | /// - For existing connected relays: subscribes to filters, creates PendingBatch |
| 765 | /// - For disconnected/connecting relays: returns (will be handled on connection) | 765 | /// - For disconnected/connecting relays: returns (will be handled on connection) |
| 766 | async fn handle_add_filters(&mut self, action: AddFilters) { | 766 | async fn handle_new_sync_filters(&mut self, action: AddFilters) { |
| 767 | tracing::info!( | 767 | tracing::info!( |
| 768 | relay = %action.relay_url, | 768 | relay = %action.relay_url, |
| 769 | repo_count = action.repos.len(), | 769 | repo_count = action.items.repos.len(), |
| 770 | root_event_count = action.root_events.len(), | 770 | root_event_count = action.items.root_events.len(), |
| 771 | filter_count = action.filters.len(), | 771 | filter_count = action.filters.len(), |
| 772 | "[DIAG] handle_add_filters called" | 772 | "[DIAG] handle_new_sync_filters called" |
| 773 | ); | 773 | ); |
| 774 | 774 | ||
| 775 | // Step 1: Check if relay exists in relay_sync_index | 775 | // Step 1: Check if relay exists in relay_sync_index |
| @@ -801,7 +801,7 @@ impl SyncManager { | |||
| 801 | 801 | ||
| 802 | tracing::info!( | 802 | tracing::info!( |
| 803 | relay = %action.relay_url, | 803 | relay = %action.relay_url, |
| 804 | repos = action.repos.len(), | 804 | repos = action.items.repos.len(), |
| 805 | "Spawning connection for new relay" | 805 | "Spawning connection for new relay" |
| 806 | ); | 806 | ); |
| 807 | 807 | ||
| @@ -827,7 +827,7 @@ impl SyncManager { | |||
| 827 | // Step 2: Check if consolidation is needed BEFORE adding new filters | 827 | // Step 2: Check if consolidation is needed BEFORE adding new filters |
| 828 | self.maybe_consolidate(&action.relay_url, action.filters.len()) | 828 | self.maybe_consolidate(&action.relay_url, action.filters.len()) |
| 829 | .await; | 829 | .await; |
| 830 | 830 | /// DELETE this bit | |
| 831 | // Step 3: Get connection and subscribe to all filters | 831 | // Step 3: Get connection and subscribe to all filters |
| 832 | let connection = match self.connections.get(&action.relay_url) { | 832 | let connection = match self.connections.get(&action.relay_url) { |
| 833 | Some(conn) => conn, | 833 | Some(conn) => conn, |
| @@ -870,8 +870,8 @@ impl SyncManager { | |||
| 870 | let batch = PendingBatch { | 870 | let batch = PendingBatch { |
| 871 | batch_id, | 871 | batch_id, |
| 872 | items: PendingItems { | 872 | items: PendingItems { |
| 873 | repos: action.repos.clone(), | 873 | repos: action.items.repos.clone(), |
| 874 | root_events: action.root_events.clone(), | 874 | root_events: action.items.root_events.clone(), |
| 875 | }, | 875 | }, |
| 876 | outstanding_subs: subscription_ids.into_iter().collect(), | 876 | outstanding_subs: subscription_ids.into_iter().collect(), |
| 877 | sync_method: SyncMethod::ReqEose, | 877 | sync_method: SyncMethod::ReqEose, |
| @@ -889,33 +889,84 @@ impl SyncManager { | |||
| 889 | tracing::debug!( | 889 | tracing::debug!( |
| 890 | relay = %action.relay_url, | 890 | relay = %action.relay_url, |
| 891 | batch_id = batch_id, | 891 | batch_id = batch_id, |
| 892 | repos = action.repos.len(), | 892 | repos = action.items.repos.len(), |
| 893 | root_events = action.root_events.len(), | 893 | root_events = action.items.root_events.len(), |
| 894 | filters = action.filters.len(), | 894 | filters = action.filters.len(), |
| 895 | "Created pending batch for filter subscriptions" | 895 | "Created pending batch for filter subscriptions" |
| 896 | ); | 896 | ); |
| 897 | // REPLACE WITH THIS: | ||
| 898 | // // Subscribe to each filter and collect subscription IDs | ||
| 899 | // self.sync_live(&action.relay_url, &action.filters).await; | ||
| 900 | // // TODO need to do actions.repos | ||
| 901 | // self.historic_sync(&action.relay_url, action.filters, action.items, None) | ||
| 902 | // .await; | ||
| 897 | } | 903 | } |
| 898 | 904 | ||
| 899 | /// Handle a connection success (called when a relay connects or reconnects) | 905 | /// Handle a connection success (called when a relay connects or reconnects) |
| 900 | /// | 906 | /// |
| 901 | /// This method implements smart reconnection logic: | 907 | /// This method dispatches to the appropriate reconnection strategy: |
| 902 | /// - Fresh sync if never connected or >15 min since last connection | 908 | /// - `fresh_start()` if never connected before |
| 903 | /// - Quick reconnect with since filter if <15 min since last connection | 909 | /// - `quick_reconnect()` if disconnected < 15 minutes |
| 904 | /// | 910 | /// - `long_reconnect()` if disconnected > 15 minutes |
| 905 | /// For fresh sync (with NIP-77 negentropy if supported): | ||
| 906 | /// - Clears any stale state | ||
| 907 | /// - Uses negentropy sync for Layer 1 (if NIP-77 supported) | ||
| 908 | /// - Falls back to REQ+EOSE if NIP-77 not supported | ||
| 909 | /// - Recomputes actions for new items | ||
| 910 | /// | ||
| 911 | /// For quick reconnect: | ||
| 912 | /// - Preserves existing state | ||
| 913 | /// - Subscribes to Layer 1 with since filter | ||
| 914 | /// - Rebuilds Layer 2 and Layer 3 with since filter | ||
| 915 | /// - Recomputes actions for new items | ||
| 916 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | 911 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { |
| 917 | let now = Timestamp::now(); | 912 | let now = Timestamp::now(); |
| 918 | 913 | ||
| 914 | // // Get the relay state to determine reconnect type | ||
| 915 | // let (last_connected, disconnected_at) = { | ||
| 916 | // let index = self.relay_sync_index.read().await; | ||
| 917 | // if let Some(state) = index.get(relay_url) { | ||
| 918 | // (state.last_connected, state.disconnected_at) | ||
| 919 | // } else { | ||
| 920 | // (None, None) // No state found | ||
| 921 | // } | ||
| 922 | // }; | ||
| 923 | |||
| 924 | // // Determine which reconnection strategy to use | ||
| 925 | // match (last_connected, disconnected_at) { | ||
| 926 | // (None, _) => { | ||
| 927 | // // Never connected before - fresh start | ||
| 928 | // tracing::info!( | ||
| 929 | // relay = %relay_url, | ||
| 930 | // "First connection - initiating fresh_start" | ||
| 931 | // ); | ||
| 932 | // self.fresh_start(relay_url).await; | ||
| 933 | // } | ||
| 934 | // (Some(last), Some(disconnected)) => { | ||
| 935 | // // Was connected before, check how long disconnected | ||
| 936 | // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs()); | ||
| 937 | |||
| 938 | // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS { | ||
| 939 | // // Disconnected < 15 minutes - quick reconnect | ||
| 940 | // // Use last_connected minus buffer as since timestamp | ||
| 941 | // let since = | ||
| 942 | // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); | ||
| 943 | // tracing::info!( | ||
| 944 | // relay = %relay_url, | ||
| 945 | // disconnect_secs = disconnect_duration, | ||
| 946 | // since = %since, | ||
| 947 | // "Short disconnection - initiating quick_reconnect" | ||
| 948 | // ); | ||
| 949 | // self.quick_reconnect(relay_url, since).await; | ||
| 950 | // } else { | ||
| 951 | // // Disconnected > 15 minutes - long reconnect | ||
| 952 | // tracing::info!( | ||
| 953 | // relay = %relay_url, | ||
| 954 | // disconnect_secs = disconnect_duration, | ||
| 955 | // "Long disconnection - initiating long_reconnect" | ||
| 956 | // ); | ||
| 957 | // self.long_reconnect(relay_url).await; | ||
| 958 | // } | ||
| 959 | // } | ||
| 960 | // (Some(_last), None) => { | ||
| 961 | // // Was connected but no disconnected_at - shouldn't happen normally | ||
| 962 | // // Treat as long reconnect to be safe | ||
| 963 | // tracing::warn!( | ||
| 964 | // relay = %relay_url, | ||
| 965 | // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect" | ||
| 966 | // ); | ||
| 967 | // self.long_reconnect(relay_url).await; | ||
| 968 | // } | ||
| 969 | // } | ||
| 919 | // Get the relay state to determine reconnect type | 970 | // Get the relay state to determine reconnect type |
| 920 | let (is_fresh_sync, last_connected, is_bootstrap) = { | 971 | let (is_fresh_sync, last_connected, is_bootstrap) = { |
| 921 | let index = self.relay_sync_index.read().await; | 972 | let index = self.relay_sync_index.read().await; |
| @@ -998,7 +1049,7 @@ impl SyncManager { | |||
| 998 | 1049 | ||
| 999 | // After negentropy sync, recompute Layer 2+3 actions | 1050 | // After negentropy sync, recompute Layer 2+3 actions |
| 1000 | // Layer 1 events are now in sync, so we can proceed with Layer 2+3 | 1051 | // Layer 1 events are now in sync, so we can proceed with Layer 2+3 |
| 1001 | self.recompute_actions_for_relay(relay_url).await; | 1052 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1002 | 1053 | ||
| 1003 | // Set up live subscription for new events (since=now) | 1054 | // Set up live subscription for new events (since=now) |
| 1004 | let live_filter = filters::build_announcement_filter(Some(now)); | 1055 | let live_filter = filters::build_announcement_filter(Some(now)); |
| @@ -1021,7 +1072,7 @@ impl SyncManager { | |||
| 1021 | // during connect_and_subscribe() in handle_add_filters(). That call subscribes | 1072 | // during connect_and_subscribe() in handle_add_filters(). That call subscribes |
| 1022 | // to kinds 30617+30618 for the full history. Here we only need to recompute | 1073 | // to kinds 30617+30618 for the full history. Here we only need to recompute |
| 1023 | // Layer 2+3 actions based on the repos we're tracking. | 1074 | // Layer 2+3 actions based on the repos we're tracking. |
| 1024 | self.recompute_actions_for_relay(relay_url).await; | 1075 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1025 | } | 1076 | } |
| 1026 | } else { | 1077 | } else { |
| 1027 | // Quick reconnect: use since filter (no negentropy needed) | 1078 | // Quick reconnect: use since filter (no negentropy needed) |
| @@ -1055,7 +1106,7 @@ impl SyncManager { | |||
| 1055 | .await; | 1106 | .await; |
| 1056 | 1107 | ||
| 1057 | // Recompute actions for any new items discovered while disconnected | 1108 | // Recompute actions for any new items discovered while disconnected |
| 1058 | self.recompute_actions_for_relay(relay_url).await; | 1109 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1059 | 1110 | ||
| 1060 | if let Some(ref metrics) = self.metrics { | 1111 | if let Some(ref metrics) = self.metrics { |
| 1061 | metrics.record_event(event_source::RECONNECT); | 1112 | metrics.record_event(event_source::RECONNECT); |
| @@ -1063,6 +1114,225 @@ impl SyncManager { | |||
| 1063 | } | 1114 | } |
| 1064 | } | 1115 | } |
| 1065 | 1116 | ||
| 1117 | /// Fresh start - clears state and does full sync | ||
| 1118 | /// | ||
| 1119 | /// Called by: initial connect, long_reconnect, daily_sync | ||
| 1120 | /// | ||
| 1121 | /// Flow: | ||
| 1122 | /// 1. Clear PendingSyncIndex for this relay | ||
| 1123 | /// 2. Clear RelaySyncIndex sync state (repos/root_events) | ||
| 1124 | /// 3. Update connection state to Connected | ||
| 1125 | /// 4. L1 live + L1 historic (negentropy if available) | ||
| 1126 | /// 5. compute_actions → AddFilters → sync_computed_filters for L2+L3 | ||
| 1127 | async fn fresh_start(&mut self, relay_url: &str) { | ||
| 1128 | let now = Timestamp::now(); | ||
| 1129 | |||
| 1130 | tracing::info!(relay = %relay_url, "Starting fresh_start"); | ||
| 1131 | |||
| 1132 | // Step 1: Clear PendingSyncIndex for this relay | ||
| 1133 | { | ||
| 1134 | let mut pending = self.pending_sync_index.write().await; | ||
| 1135 | if pending.remove(relay_url).is_some() { | ||
| 1136 | tracing::debug!( | ||
| 1137 | relay = %relay_url, | ||
| 1138 | "Cleared pending batches in fresh_start" | ||
| 1139 | ); | ||
| 1140 | } | ||
| 1141 | } | ||
| 1142 | |||
| 1143 | // Step 2: Clear RelaySyncIndex sync state (but preserve connection metadata) | ||
| 1144 | { | ||
| 1145 | let mut index = self.relay_sync_index.write().await; | ||
| 1146 | if let Some(state) = index.get_mut(relay_url) { | ||
| 1147 | let repos_cleared = state.repos.len(); | ||
| 1148 | let events_cleared = state.root_events.len(); | ||
| 1149 | state.clear_sync_state(); | ||
| 1150 | if repos_cleared > 0 || events_cleared > 0 { | ||
| 1151 | tracing::debug!( | ||
| 1152 | relay = %relay_url, | ||
| 1153 | repos_cleared = repos_cleared, | ||
| 1154 | events_cleared = events_cleared, | ||
| 1155 | "Cleared sync state in fresh_start" | ||
| 1156 | ); | ||
| 1157 | } | ||
| 1158 | } | ||
| 1159 | } | ||
| 1160 | |||
| 1161 | // Step 3: Update connection state | ||
| 1162 | { | ||
| 1163 | let mut index = self.relay_sync_index.write().await; | ||
| 1164 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 1165 | state.connection_status = ConnectionStatus::Connected; | ||
| 1166 | state.last_connected = Some(now); | ||
| 1167 | state.disconnected_at = None; | ||
| 1168 | } | ||
| 1169 | |||
| 1170 | // Record success in health tracker | ||
| 1171 | self.health_tracker.record_success(relay_url); | ||
| 1172 | |||
| 1173 | // Update metrics | ||
| 1174 | if let Some(ref metrics) = self.metrics { | ||
| 1175 | metrics.set_relay_connected(relay_url, true); | ||
| 1176 | metrics.inc_connected_count(); | ||
| 1177 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1178 | } | ||
| 1179 | |||
| 1180 | // Step 4: L1 sync - check negentropy support | ||
| 1181 | let use_negentropy = if self.config.sync_disable_negentropy { | ||
| 1182 | tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); | ||
| 1183 | false | ||
| 1184 | } else if let Some(connection) = self.connections.get(relay_url) { | ||
| 1185 | connection.supports_negentropy().await | ||
| 1186 | } else { | ||
| 1187 | false | ||
| 1188 | }; | ||
| 1189 | |||
| 1190 | if use_negentropy { | ||
| 1191 | // NIP-77 supported - use negentropy for L1 historical sync | ||
| 1192 | tracing::info!( | ||
| 1193 | relay = %relay_url, | ||
| 1194 | "Using NIP-77 negentropy for L1 historical sync" | ||
| 1195 | ); | ||
| 1196 | |||
| 1197 | // L1 historic sync (no since - full sync) | ||
| 1198 | let layer1_filter = filters::build_announcement_filter(None); | ||
| 1199 | self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)") | ||
| 1200 | .await; | ||
| 1201 | |||
| 1202 | // L1 live subscription (since=now for ongoing events) | ||
| 1203 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 1204 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1205 | if let Err(e) = connection.subscribe_filter(live_filter).await { | ||
| 1206 | tracing::error!( | ||
| 1207 | relay = %relay_url, | ||
| 1208 | error = %e, | ||
| 1209 | "Failed to set up L1 live subscription in fresh_start" | ||
| 1210 | ); | ||
| 1211 | } | ||
| 1212 | } | ||
| 1213 | } else { | ||
| 1214 | // NIP-77 not supported - REQ+EOSE | ||
| 1215 | // Note: Layer 1 subscription (without since) was already established | ||
| 1216 | // during connect_and_subscribe() in spawn_relay_connection | ||
| 1217 | tracing::info!( | ||
| 1218 | relay = %relay_url, | ||
| 1219 | "Using REQ+EOSE for L1 sync (negentropy not available)" | ||
| 1220 | ); | ||
| 1221 | } | ||
| 1222 | |||
| 1223 | // Step 5: compute_actions → AddFilters for L2+L3 | ||
| 1224 | // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters | ||
| 1225 | // for ALL repos that should be synced from this relay | ||
| 1226 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1227 | |||
| 1228 | tracing::info!(relay = %relay_url, "fresh_start complete"); | ||
| 1229 | } | ||
| 1230 | |||
| 1231 | /// Quick reconnect - for disconnections < 15 minutes | ||
| 1232 | /// | ||
| 1233 | /// Flow: | ||
| 1234 | /// 1. Clear PendingSyncIndex for this relay | ||
| 1235 | /// 2. Update connection state to Connected | ||
| 1236 | /// 3. L1 live + L1 historic(since) | ||
| 1237 | /// 4. reconstruct_filters → L2+L3 live + L2+L3 historic(since) | ||
| 1238 | /// 5. compute_actions for any new items discovered during catchup | ||
| 1239 | async fn quick_reconnect(&mut self, relay_url: &str, since: Timestamp) { | ||
| 1240 | let now = Timestamp::now(); | ||
| 1241 | |||
| 1242 | tracing::info!( | ||
| 1243 | relay = %relay_url, | ||
| 1244 | since = %since, | ||
| 1245 | "Starting quick_reconnect" | ||
| 1246 | ); | ||
| 1247 | |||
| 1248 | // Step 1: Clear PendingSyncIndex for this relay | ||
| 1249 | // Old subscriptions are dead after disconnect | ||
| 1250 | { | ||
| 1251 | let mut pending = self.pending_sync_index.write().await; | ||
| 1252 | if pending.remove(relay_url).is_some() { | ||
| 1253 | tracing::debug!( | ||
| 1254 | relay = %relay_url, | ||
| 1255 | "Cleared pending batches in quick_reconnect" | ||
| 1256 | ); | ||
| 1257 | } | ||
| 1258 | } | ||
| 1259 | |||
| 1260 | // Step 2: Update connection state (preserve repos/root_events - that's the point!) | ||
| 1261 | { | ||
| 1262 | let mut index = self.relay_sync_index.write().await; | ||
| 1263 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 1264 | state.connection_status = ConnectionStatus::Connected; | ||
| 1265 | state.last_connected = Some(now); | ||
| 1266 | state.disconnected_at = None; | ||
| 1267 | } | ||
| 1268 | |||
| 1269 | // Record success in health tracker | ||
| 1270 | self.health_tracker.record_success(relay_url); | ||
| 1271 | |||
| 1272 | // Update metrics | ||
| 1273 | if let Some(ref metrics) = self.metrics { | ||
| 1274 | metrics.set_relay_connected(relay_url, true); | ||
| 1275 | metrics.inc_connected_count(); | ||
| 1276 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1277 | metrics.record_event(event_source::RECONNECT); | ||
| 1278 | } | ||
| 1279 | |||
| 1280 | // Step 3: L1 live + L1 historic with since filter | ||
| 1281 | // L1 live subscription (since=now for ongoing events) | ||
| 1282 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 1283 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1284 | if let Err(e) = connection.subscribe_filter(live_filter).await { | ||
| 1285 | tracing::error!( | ||
| 1286 | relay = %relay_url, | ||
| 1287 | error = %e, | ||
| 1288 | "Failed to set up L1 live subscription in quick_reconnect" | ||
| 1289 | ); | ||
| 1290 | } | ||
| 1291 | } | ||
| 1292 | |||
| 1293 | // L1 historic with since filter (catch up on missed announcements) | ||
| 1294 | let layer1_filter = filters::build_announcement_filter(Some(since)); | ||
| 1295 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1296 | if let Err(e) = connection.subscribe_filter(layer1_filter).await { | ||
| 1297 | tracing::error!( | ||
| 1298 | relay = %relay_url, | ||
| 1299 | error = %e, | ||
| 1300 | "Failed to subscribe to L1 historic filter in quick_reconnect" | ||
| 1301 | ); | ||
| 1302 | } | ||
| 1303 | } | ||
| 1304 | |||
| 1305 | // Step 4: Rebuild L2+L3 from confirmed state with since filter | ||
| 1306 | // This uses the preserved repos/root_events from RelaySyncIndex | ||
| 1307 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | ||
| 1308 | |||
| 1309 | // Step 5: compute_actions for any NEW items discovered while disconnected | ||
| 1310 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1311 | |||
| 1312 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); | ||
| 1313 | } | ||
| 1314 | |||
| 1315 | /// Long reconnect - for disconnections > 15 minutes | ||
| 1316 | /// | ||
| 1317 | /// Flow: | ||
| 1318 | /// 1. Record disconnect/reconnect metric | ||
| 1319 | /// 2. Delegate to fresh_start() | ||
| 1320 | async fn long_reconnect(&mut self, relay_url: &str) { | ||
| 1321 | tracing::info!(relay = %relay_url, "Starting long_reconnect"); | ||
| 1322 | |||
| 1323 | // Step 1: Record disconnect/reconnect metric | ||
| 1324 | // This distinguishes intentional daily refresh from failure recovery | ||
| 1325 | if let Some(ref metrics) = self.metrics { | ||
| 1326 | metrics.record_event(event_source::RECONNECT); | ||
| 1327 | } | ||
| 1328 | |||
| 1329 | // Step 2: Delegate to fresh_start | ||
| 1330 | // State is too stale to trust, start fresh | ||
| 1331 | self.fresh_start(relay_url).await; | ||
| 1332 | |||
| 1333 | tracing::info!(relay = %relay_url, "long_reconnect complete"); | ||
| 1334 | } | ||
| 1335 | |||
| 1066 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay | 1336 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay |
| 1067 | /// | 1337 | /// |
| 1068 | /// Uses the confirmed repos and root_events from RelayState to build filters. | 1338 | /// Uses the confirmed repos and root_events from RelayState to build filters. |
| @@ -1129,7 +1399,7 @@ impl SyncManager { | |||
| 1129 | /// | 1399 | /// |
| 1130 | /// Uses derive_relay_targets and compute_actions to find new items | 1400 | /// Uses derive_relay_targets and compute_actions to find new items |
| 1131 | /// that need to be synced. Processes AddFilters actions for new items. | 1401 | /// that need to be synced. Processes AddFilters actions for new items. |
| 1132 | async fn recompute_actions_for_relay(&mut self, relay_url: &str) { | 1402 | async fn recompute_new_sync_filters_for_relay(&mut self, relay_url: &str) { |
| 1133 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | 1403 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; |
| 1134 | 1404 | ||
| 1135 | // Get current state from indexes (need to collect to avoid holding locks) | 1405 | // Get current state from indexes (need to collect to avoid holding locks) |
| @@ -1173,12 +1443,12 @@ impl SyncManager { | |||
| 1173 | for action in actions { | 1443 | for action in actions { |
| 1174 | tracing::info!( | 1444 | tracing::info!( |
| 1175 | relay = %action.relay_url, | 1445 | relay = %action.relay_url, |
| 1176 | new_repos = action.repos.len(), | 1446 | new_repos = action.items.repos.len(), |
| 1177 | new_root_events = action.root_events.len(), | 1447 | new_root_events = action.items.root_events.len(), |
| 1178 | filters = action.filters.len(), | 1448 | filters = action.filters.len(), |
| 1179 | "Processing AddFilters for new items" | 1449 | "Processing AddFilters for new items" |
| 1180 | ); | 1450 | ); |
| 1181 | self.handle_add_filters(action).await; | 1451 | self.handle_new_sync_filters(action).await; |
| 1182 | } | 1452 | } |
| 1183 | } | 1453 | } |
| 1184 | 1454 | ||
| @@ -2095,6 +2365,366 @@ impl SyncManager { | |||
| 2095 | } | 2365 | } |
| 2096 | } | 2366 | } |
| 2097 | 2367 | ||
| 2368 | // ========================================================================= | ||
| 2369 | // Sync Primitives (Phase 1 of GRASP-02 refactoring) | ||
| 2370 | // These methods are new primitives that will be used in subsequent phases. | ||
| 2371 | // ========================================================================= | ||
| 2372 | |||
| 2373 | /// Subscribe to filters for live (ongoing) events - NOT tracked in PendingSyncIndex | ||
| 2374 | #[allow(dead_code)] // Will be used in Phase 2+ | ||
| 2375 | /// | ||
| 2376 | /// This method subscribes to filters with `limit: 0` for receiving ongoing events. | ||
| 2377 | /// Live subscriptions are NOT tracked in PendingSyncIndex because they don't have | ||
| 2378 | /// a definite "completion" - they stay open indefinitely. | ||
| 2379 | /// | ||
| 2380 | /// Used for: | ||
| 2381 | /// - Layer 1 live subscription (new announcements after initial sync) | ||
| 2382 | /// - Layer 2+3 live subscriptions (new events after initial sync) | ||
| 2383 | /// | ||
| 2384 | /// # Arguments | ||
| 2385 | /// * `relay_url` - The relay URL to subscribe on | ||
| 2386 | /// * `filters` - Filters to subscribe to (will have `limit: 0` applied) | ||
| 2387 | /// | ||
| 2388 | /// # Returns | ||
| 2389 | /// Vec of subscription IDs for the live subscriptions, or empty if connection not found | ||
| 2390 | async fn sync_live(&self, relay_url: &str, filters: &[Filter]) -> Vec<SubscriptionId> { | ||
| 2391 | if filters.is_empty() { | ||
| 2392 | return vec![]; | ||
| 2393 | } | ||
| 2394 | |||
| 2395 | let connection = match self.connections.get(relay_url) { | ||
| 2396 | Some(conn) => conn, | ||
| 2397 | None => { | ||
| 2398 | tracing::warn!( | ||
| 2399 | relay = %relay_url, | ||
| 2400 | "No connection found for sync_live" | ||
| 2401 | ); | ||
| 2402 | return vec![]; | ||
| 2403 | } | ||
| 2404 | }; | ||
| 2405 | |||
| 2406 | let mut sub_ids = Vec::new(); | ||
| 2407 | |||
| 2408 | for filter in filters { | ||
| 2409 | // Apply limit: 0 to make this a live subscription | ||
| 2410 | // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", | ||
| 2411 | // but omitting limit means "no limit" which is what we want for live. | ||
| 2412 | // The filter passed in should already NOT have a limit set. | ||
| 2413 | match connection.subscribe_filter(filter.clone()).await { | ||
| 2414 | Ok(sub_id) => { | ||
| 2415 | tracing::trace!( | ||
| 2416 | relay = %relay_url, | ||
| 2417 | sub_id = %sub_id, | ||
| 2418 | "Live subscription created" | ||
| 2419 | ); | ||
| 2420 | sub_ids.push(sub_id); | ||
| 2421 | } | ||
| 2422 | Err(e) => { | ||
| 2423 | tracing::error!( | ||
| 2424 | relay = %relay_url, | ||
| 2425 | error = %e, | ||
| 2426 | "Failed to create live subscription" | ||
| 2427 | ); | ||
| 2428 | } | ||
| 2429 | } | ||
| 2430 | } | ||
| 2431 | |||
| 2432 | tracing::debug!( | ||
| 2433 | relay = %relay_url, | ||
| 2434 | filter_count = filters.len(), | ||
| 2435 | sub_count = sub_ids.len(), | ||
| 2436 | "sync_live completed" | ||
| 2437 | ); | ||
| 2438 | |||
| 2439 | sub_ids | ||
| 2440 | } | ||
| 2441 | |||
| 2442 | /// Reconstruct filters from RelaySyncIndex (confirmed state ONLY) | ||
| 2443 | /// | ||
| 2444 | /// Returns raw Vec<Filter> for L1+L2+L3. | ||
| 2445 | /// Used by: quick_reconnect, consolidate | ||
| 2446 | /// Does NOT include pending items - those flow through AddFilters path. | ||
| 2447 | /// | ||
| 2448 | /// # Arguments | ||
| 2449 | /// * `relay_url` - The relay URL to reconstruct filters for | ||
| 2450 | /// | ||
| 2451 | /// # Returns | ||
| 2452 | /// Vec of filters for L1 (announcements) + L2 (repo tags) + L3 (event tags) | ||
| 2453 | #[allow(dead_code)] // Will be used in Phase 3+ | ||
| 2454 | async fn reconstruct_filters(&self, relay_url: &str) -> Vec<Filter> { | ||
| 2455 | // Get confirmed state from relay_sync_index | ||
| 2456 | let (repos, root_events) = { | ||
| 2457 | let index = self.relay_sync_index.read().await; | ||
| 2458 | match index.get(relay_url) { | ||
| 2459 | Some(state) => (state.repos.clone(), state.root_events.clone()), | ||
| 2460 | None => { | ||
| 2461 | tracing::warn!( | ||
| 2462 | relay = %relay_url, | ||
| 2463 | "No RelayState found for reconstruct_filters" | ||
| 2464 | ); | ||
| 2465 | return vec![]; | ||
| 2466 | } | ||
| 2467 | } | ||
| 2468 | }; | ||
| 2469 | |||
| 2470 | let mut all_filters = Vec::new(); | ||
| 2471 | |||
| 2472 | // Layer 1: Announcements (always included) | ||
| 2473 | // Note: No `since` filter - this returns raw filters for live subscriptions | ||
| 2474 | all_filters.push(filters::build_announcement_filter(None)); | ||
| 2475 | |||
| 2476 | // Layer 2 + Layer 3: Repo and root event tag filters | ||
| 2477 | if !repos.is_empty() || !root_events.is_empty() { | ||
| 2478 | let l2_l3_filters = | ||
| 2479 | filters::build_layer2_and_layer3_filters(&repos, &root_events, None); | ||
| 2480 | all_filters.extend(l2_l3_filters); | ||
| 2481 | } | ||
| 2482 | |||
| 2483 | tracing::debug!( | ||
| 2484 | relay = %relay_url, | ||
| 2485 | total_filters = all_filters.len(), | ||
| 2486 | repos_count = repos.len(), | ||
| 2487 | root_events_count = root_events.len(), | ||
| 2488 | "Reconstructed filters from confirmed state" | ||
| 2489 | ); | ||
| 2490 | |||
| 2491 | all_filters | ||
| 2492 | } | ||
| 2493 | |||
| 2494 | /// Sync historical events and track in PendingSyncIndex | ||
| 2495 | #[allow(dead_code)] // Will be used in Phase 3+ | ||
| 2496 | /// | ||
| 2497 | /// This method handles historical synchronization for a set of filters, | ||
| 2498 | /// creating a PendingBatch to track completion. It dispatches to either | ||
| 2499 | /// negentropy sync or traditional REQ+EOSE based on relay capability and config. | ||
| 2500 | /// | ||
| 2501 | /// Used for: | ||
| 2502 | /// - Initial sync (no since filter) | ||
| 2503 | /// - Reconnect sync (with since filter) | ||
| 2504 | /// - Daily sync (no since filter, full re-sync) | ||
| 2505 | /// | ||
| 2506 | /// # Arguments | ||
| 2507 | /// * `relay_url` - The relay URL to sync from | ||
| 2508 | /// * `filters` - Filters to sync (will have `since` applied if provided) | ||
| 2509 | /// * `items` - Items being synced (for tracking in PendingBatch) | ||
| 2510 | /// * `since` - Optional timestamp for incremental sync | ||
| 2511 | /// | ||
| 2512 | /// # Returns | ||
| 2513 | /// * `Some(batch_id)` - Batch was created and sync initiated | ||
| 2514 | /// * `None` - No connection or sync failed to start | ||
| 2515 | async fn historic_sync( | ||
| 2516 | &mut self, | ||
| 2517 | relay_url: &str, | ||
| 2518 | filters: Vec<Filter>, | ||
| 2519 | items: PendingItems, | ||
| 2520 | since: Option<Timestamp>, | ||
| 2521 | ) -> Option<u64> { | ||
| 2522 | if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { | ||
| 2523 | tracing::debug!( | ||
| 2524 | relay = %relay_url, | ||
| 2525 | "historic_sync called with empty filters and items, skipping" | ||
| 2526 | ); | ||
| 2527 | return None; | ||
| 2528 | } | ||
| 2529 | |||
| 2530 | // Check connection exists | ||
| 2531 | let connection = match self.connections.get(relay_url) { | ||
| 2532 | Some(conn) => conn, | ||
| 2533 | None => { | ||
| 2534 | tracing::warn!( | ||
| 2535 | relay = %relay_url, | ||
| 2536 | "No connection found for historic_sync" | ||
| 2537 | ); | ||
| 2538 | return None; | ||
| 2539 | } | ||
| 2540 | }; | ||
| 2541 | |||
| 2542 | // Apply since filter if provided | ||
| 2543 | let filters_with_since: Vec<Filter> = if let Some(ts) = since { | ||
| 2544 | filters.into_iter().map(|f| f.since(ts)).collect() | ||
| 2545 | } else { | ||
| 2546 | filters | ||
| 2547 | }; | ||
| 2548 | |||
| 2549 | // Check if we should use negentropy | ||
| 2550 | let use_negentropy = | ||
| 2551 | !self.config.sync_disable_negentropy && connection.supports_negentropy().await; | ||
| 2552 | |||
| 2553 | // Generate batch ID | ||
| 2554 | let batch_id = self.next_batch_id(); | ||
| 2555 | |||
| 2556 | if use_negentropy && !filters_with_since.is_empty() { | ||
| 2557 | // NIP-77 negentropy path | ||
| 2558 | tracing::debug!( | ||
| 2559 | relay = %relay_url, | ||
| 2560 | batch_id = batch_id, | ||
| 2561 | filter_count = filters_with_since.len(), | ||
| 2562 | repos = items.repos.len(), | ||
| 2563 | root_events = items.root_events.len(), | ||
| 2564 | "Starting historic_sync with negentropy" | ||
| 2565 | ); | ||
| 2566 | |||
| 2567 | // Create PendingBatch for negentropy (empty outstanding_subs) | ||
| 2568 | let batch = PendingBatch { | ||
| 2569 | batch_id, | ||
| 2570 | items: items.clone(), | ||
| 2571 | outstanding_subs: HashSet::new(), | ||
| 2572 | sync_method: SyncMethod::Negentropy, | ||
| 2573 | }; | ||
| 2574 | |||
| 2575 | // Add to pending_sync_index | ||
| 2576 | { | ||
| 2577 | let mut pending = self.pending_sync_index.write().await; | ||
| 2578 | pending | ||
| 2579 | .entry(relay_url.to_string()) | ||
| 2580 | .or_insert_with(Vec::new) | ||
| 2581 | .push(batch); | ||
| 2582 | } | ||
| 2583 | |||
| 2584 | // Perform negentropy sync for each filter | ||
| 2585 | // Note: We sync each filter separately because negentropy works on a single filter | ||
| 2586 | let mut total_received = 0; | ||
| 2587 | let mut any_success = false; | ||
| 2588 | |||
| 2589 | for filter in &filters_with_since { | ||
| 2590 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 2591 | match conn.negentropy_sync_filter(filter.clone()).await { | ||
| 2592 | Ok(result) => { | ||
| 2593 | total_received += result.received.len(); | ||
| 2594 | any_success = true; | ||
| 2595 | |||
| 2596 | // Record metrics for received events | ||
| 2597 | if let Some(ref metrics) = self.metrics { | ||
| 2598 | for _ in 0..result.received.len() { | ||
| 2599 | metrics.record_event(event_source::STARTUP); | ||
| 2600 | } | ||
| 2601 | } | ||
| 2602 | } | ||
| 2603 | Err(e) => { | ||
| 2604 | tracing::warn!( | ||
| 2605 | relay = %relay_url, | ||
| 2606 | error = %e, | ||
| 2607 | "Negentropy sync failed for filter in historic_sync" | ||
| 2608 | ); | ||
| 2609 | } | ||
| 2610 | } | ||
| 2611 | } | ||
| 2612 | } | ||
| 2613 | |||
| 2614 | if any_success { | ||
| 2615 | // Remove batch from pending and confirm it | ||
| 2616 | let completed_batch = { | ||
| 2617 | let mut pending = self.pending_sync_index.write().await; | ||
| 2618 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 2619 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); | ||
| 2620 | if let Some(idx) = batch_idx { | ||
| 2621 | let batch = batches.remove(idx); | ||
| 2622 | if batches.is_empty() { | ||
| 2623 | pending.remove(relay_url); | ||
| 2624 | } | ||
| 2625 | Some(batch) | ||
| 2626 | } else { | ||
| 2627 | None | ||
| 2628 | } | ||
| 2629 | } else { | ||
| 2630 | None | ||
| 2631 | } | ||
| 2632 | }; | ||
| 2633 | |||
| 2634 | if let Some(batch) = completed_batch { | ||
| 2635 | self.confirm_batch(relay_url, batch).await; | ||
| 2636 | } | ||
| 2637 | |||
| 2638 | tracing::info!( | ||
| 2639 | relay = %relay_url, | ||
| 2640 | batch_id = batch_id, | ||
| 2641 | total_received = total_received, | ||
| 2642 | "historic_sync (negentropy) completed" | ||
| 2643 | ); | ||
| 2644 | } else { | ||
| 2645 | // All negentropy syncs failed - remove the pending batch | ||
| 2646 | let mut pending = self.pending_sync_index.write().await; | ||
| 2647 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 2648 | batches.retain(|b| b.batch_id != batch_id); | ||
| 2649 | if batches.is_empty() { | ||
| 2650 | pending.remove(relay_url); | ||
| 2651 | } | ||
| 2652 | } | ||
| 2653 | |||
| 2654 | tracing::warn!( | ||
| 2655 | relay = %relay_url, | ||
| 2656 | batch_id = batch_id, | ||
| 2657 | "historic_sync (negentropy) failed for all filters" | ||
| 2658 | ); | ||
| 2659 | return None; | ||
| 2660 | } | ||
| 2661 | } else { | ||
| 2662 | // Traditional REQ+EOSE path | ||
| 2663 | tracing::debug!( | ||
| 2664 | relay = %relay_url, | ||
| 2665 | batch_id = batch_id, | ||
| 2666 | filter_count = filters_with_since.len(), | ||
| 2667 | repos = items.repos.len(), | ||
| 2668 | root_events = items.root_events.len(), | ||
| 2669 | use_negentropy = use_negentropy, | ||
| 2670 | "Starting historic_sync with REQ+EOSE" | ||
| 2671 | ); | ||
| 2672 | |||
| 2673 | // Subscribe to each filter and collect subscription IDs | ||
| 2674 | let mut subscription_ids = HashSet::new(); | ||
| 2675 | |||
| 2676 | for filter in &filters_with_since { | ||
| 2677 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 2678 | match conn.subscribe_filter(filter.clone()).await { | ||
| 2679 | Ok(sub_id) => { | ||
| 2680 | subscription_ids.insert(sub_id); | ||
| 2681 | } | ||
| 2682 | Err(e) => { | ||
| 2683 | tracing::error!( | ||
| 2684 | relay = %relay_url, | ||
| 2685 | error = %e, | ||
| 2686 | "Failed to subscribe to filter in historic_sync" | ||
| 2687 | ); | ||
| 2688 | } | ||
| 2689 | } | ||
| 2690 | } | ||
| 2691 | } | ||
| 2692 | |||
| 2693 | if subscription_ids.is_empty() && !filters_with_since.is_empty() { | ||
| 2694 | tracing::warn!( | ||
| 2695 | relay = %relay_url, | ||
| 2696 | "All filter subscriptions failed in historic_sync" | ||
| 2697 | ); | ||
| 2698 | return None; | ||
| 2699 | } | ||
| 2700 | |||
| 2701 | // Create PendingBatch for REQ+EOSE | ||
| 2702 | let batch = PendingBatch { | ||
| 2703 | batch_id, | ||
| 2704 | items, | ||
| 2705 | outstanding_subs: subscription_ids, | ||
| 2706 | sync_method: SyncMethod::ReqEose, | ||
| 2707 | }; | ||
| 2708 | |||
| 2709 | // Add to pending_sync_index | ||
| 2710 | { | ||
| 2711 | let mut pending = self.pending_sync_index.write().await; | ||
| 2712 | pending | ||
| 2713 | .entry(relay_url.to_string()) | ||
| 2714 | .or_insert_with(Vec::new) | ||
| 2715 | .push(batch); | ||
| 2716 | } | ||
| 2717 | |||
| 2718 | tracing::debug!( | ||
| 2719 | relay = %relay_url, | ||
| 2720 | batch_id = batch_id, | ||
| 2721 | "historic_sync (REQ+EOSE) batch created, awaiting EOSE" | ||
| 2722 | ); | ||
| 2723 | } | ||
| 2724 | |||
| 2725 | Some(batch_id) | ||
| 2726 | } | ||
| 2727 | |||
| 2098 | /// Gracefully shutdown the SyncManager | 2728 | /// Gracefully shutdown the SyncManager |
| 2099 | /// | 2729 | /// |
| 2100 | /// This method: | 2730 | /// This method: |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 0379fe4..9643fc0 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -499,7 +499,7 @@ impl SelfSubscriber { | |||
| 499 | drop(index); // Release lock before async operations | 499 | drop(index); // Release lock before async operations |
| 500 | 500 | ||
| 501 | // For each relay, send AddFilters action directly | 501 | // For each relay, send AddFilters action directly |
| 502 | // SyncManager's handle_add_filters auto-spawns connection for unknown relays | 502 | // SyncManager's handle_new_sync_filters auto-spawns connection for unknown relays |
| 503 | for (relay_url, needs) in targets { | 503 | for (relay_url, needs) in targets { |
| 504 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) | 504 | // Skip our own relay URL (we're subscribed to ourselves via self-subscription) |
| 505 | if relay_url.contains(&self.relay_domain) { | 505 | if relay_url.contains(&self.relay_domain) { |
| @@ -519,8 +519,10 @@ impl SelfSubscriber { | |||
| 519 | 519 | ||
| 520 | let action = AddFilters { | 520 | let action = AddFilters { |
| 521 | relay_url: relay_url.clone(), | 521 | relay_url: relay_url.clone(), |
| 522 | repos: needs.repos, | 522 | items: crate::sync::PendingItems { |
| 523 | root_events: needs.root_events, | 523 | repos: needs.repos, |
| 524 | root_events: needs.root_events, | ||
| 525 | }, | ||
| 524 | filters, | 526 | filters, |
| 525 | }; | 527 | }; |
| 526 | 528 | ||