diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-18 10:12:11 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-18 10:12:11 +0000 |
| commit | 03f074d0d0840b946a356badde75551d61c0f84c (patch) | |
| tree | 97943bb692d40b3e572854bd30eec6bfdbcf8cb2 /src | |
| parent | 7821b107190cc116a30a4c339f935bc16a1d5197 (diff) | |
sync removing dead code
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/filters.rs | 21 | ||||
| -rw-r--r-- | src/sync/mod.rs | 812 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 87 |
3 files changed, 237 insertions, 683 deletions
diff --git a/src/sync/filters.rs b/src/sync/filters.rs index e508eb2..963fa02 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs | |||
| @@ -103,10 +103,18 @@ pub fn tagged_one_of_our_root_event_filters( | |||
| 103 | return vec![]; | 103 | return vec![]; |
| 104 | } | 104 | } |
| 105 | 105 | ||
| 106 | // DEBUG TRACING: Log the root events we're creating Layer 3 filters for | ||
| 107 | tracing::debug!( | ||
| 108 | root_event_count = root_events.len(), | ||
| 109 | root_event_ids = ?root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 110 | since = ?since, | ||
| 111 | "Building Layer 3 filters for root events" | ||
| 112 | ); | ||
| 113 | |||
| 106 | let mut filters = Vec::new(); | 114 | let mut filters = Vec::new(); |
| 107 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); | 115 | let event_ids: Vec<String> = root_events.iter().map(|id| id.to_hex()).collect(); |
| 108 | 116 | ||
| 109 | for chunk in event_ids.chunks(100) { | 117 | for (chunk_idx, chunk) in event_ids.chunks(100).enumerate() { |
| 110 | // Lowercase 'e' tag - standard event reference | 118 | // Lowercase 'e' tag - standard event reference |
| 111 | let mut f1 = Filter::new(); | 119 | let mut f1 = Filter::new(); |
| 112 | for event_id in chunk { | 120 | for event_id in chunk { |
| @@ -131,6 +139,17 @@ pub fn tagged_one_of_our_root_event_filters( | |||
| 131 | f3 = f3.since(ts); | 139 | f3 = f3.since(ts); |
| 132 | } | 140 | } |
| 133 | 141 | ||
| 142 | // DEBUG TRACING: Log the filters being created | ||
| 143 | tracing::debug!( | ||
| 144 | chunk_idx = chunk_idx, | ||
| 145 | chunk_size = chunk.len(), | ||
| 146 | event_ids_in_chunk = ?chunk, | ||
| 147 | filter_e = ?f1, | ||
| 148 | filter_E = ?f2, | ||
| 149 | filter_q = ?f3, | ||
| 150 | "Created Layer 3 filter chunk" | ||
| 151 | ); | ||
| 152 | |||
| 134 | filters.push(f1); | 153 | filters.push(f1); |
| 135 | filters.push(f2); | 154 | filters.push(f2); |
| 136 | filters.push(f3); | 155 | filters.push(f3); |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 401cf21..3c50387 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -490,16 +490,19 @@ impl SyncManager { | |||
| 490 | // Move repos to confirmed | 490 | // Move repos to confirmed |
| 491 | state.repos.extend(batch.items.repos); | 491 | state.repos.extend(batch.items.repos); |
| 492 | // Move root_events to confirmed | 492 | // Move root_events to confirmed |
| 493 | state.root_events.extend(batch.items.root_events); | 493 | state.root_events.extend(batch.items.root_events.clone()); |
| 494 | 494 | ||
| 495 | // DEBUG TRACING: Log the root events being confirmed | ||
| 495 | tracing::info!( | 496 | tracing::info!( |
| 496 | relay = %relay_url, | 497 | relay = %relay_url, |
| 497 | batch_id = batch_id, | 498 | batch_id = batch_id, |
| 498 | sync_method = ?sync_method, | 499 | sync_method = ?sync_method, |
| 499 | repos_confirmed = repos_count, | 500 | repos_confirmed = repos_count, |
| 500 | root_events_confirmed = events_count, | 501 | root_events_confirmed = events_count, |
| 502 | root_events_ids = ?batch.items.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 501 | total_repos = state.repos.len(), | 503 | total_repos = state.repos.len(), |
| 502 | total_root_events = state.root_events.len(), | 504 | total_root_events = state.root_events.len(), |
| 505 | all_root_events = ?state.root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 503 | "Batch confirmed - items moved from pending to confirmed" | 506 | "Batch confirmed - items moved from pending to confirmed" |
| 504 | ); | 507 | ); |
| 505 | } else { | 508 | } else { |
| @@ -535,10 +538,6 @@ impl SyncManager { | |||
| 535 | } | 538 | } |
| 536 | }; | 539 | }; |
| 537 | 540 | ||
| 538 | // Check if relay supports NIP-77 negentropy AND negentropy is not disabled | ||
| 539 | let use_negentropy = | ||
| 540 | !self.config.sync_disable_negentropy && connection.supports_negentropy().await; | ||
| 541 | |||
| 542 | // Unsubscribe all current subscriptions | 541 | // Unsubscribe all current subscriptions |
| 543 | connection.unsubscribe_all().await; | 542 | connection.unsubscribe_all().await; |
| 544 | 543 | ||
| @@ -564,62 +563,14 @@ impl SyncManager { | |||
| 564 | } | 563 | } |
| 565 | } | 564 | } |
| 566 | 565 | ||
| 567 | let now = Timestamp::now(); | 566 | // maybe we just run start fresh with a daily flag? make sture so start layer 1 filters |
| 568 | 567 | self.fresh_start(relay_url).await; | |
| 569 | if use_negentropy { | ||
| 570 | // NIP-77 supported - use negentropy for efficient reconciliation | ||
| 571 | tracing::info!( | ||
| 572 | relay = %relay_url, | ||
| 573 | "Using NIP-77 negentropy for daily sync" | ||
| 574 | ); | ||
| 575 | |||
| 576 | // Perform negentropy sync for Layer 1 (announcements) | ||
| 577 | let layer1_filter = filters::build_announcement_filter(None); | ||
| 578 | self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (daily)") | ||
| 579 | .await; | ||
| 580 | |||
| 581 | // After negentropy sync, set up live subscription for new events | ||
| 582 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 583 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 584 | if let Err(e) = conn.subscribe_filter(live_filter).await { | ||
| 585 | tracing::error!( | ||
| 586 | relay = %relay_url, | ||
| 587 | error = %e, | ||
| 588 | "Failed to set up live Layer 1 subscription after negentropy daily sync" | ||
| 589 | ); | ||
| 590 | } | ||
| 591 | } | ||
| 592 | |||
| 593 | // Recompute actions for Layer 2+3 based on synced events | ||
| 594 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 595 | } else { | ||
| 596 | // NIP-77 not supported - fall back to REQ+EOSE | ||
| 597 | tracing::info!( | ||
| 598 | relay = %relay_url, | ||
| 599 | "NIP-77 not supported, using REQ+EOSE for daily sync" | ||
| 600 | ); | ||
| 601 | |||
| 602 | // Re-subscribe to Layer 1 (announcements) without since filter for full discovery | ||
| 603 | let layer1_filter = filters::build_announcement_filter(None); | ||
| 604 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 605 | if let Err(e) = conn.subscribe_filter(layer1_filter).await { | ||
| 606 | tracing::error!( | ||
| 607 | relay = %relay_url, | ||
| 608 | error = %e, | ||
| 609 | "Failed to re-subscribe to Layer 1 during daily sync" | ||
| 610 | ); | ||
| 611 | } | ||
| 612 | } | ||
| 613 | |||
| 614 | // Recompute actions for Layer 2+3 - will discover all repos/events again | ||
| 615 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 616 | } | ||
| 617 | 568 | ||
| 618 | if let Some(ref metrics) = self.metrics { | 569 | // if let Some(ref metrics) = self.metrics { |
| 619 | metrics.record_event(event_source::DAILY); | 570 | // metrics.record_event(event_source::DAILY); |
| 620 | } | 571 | // } |
| 621 | 572 | ||
| 622 | tracing::info!(relay = %relay_url, "Daily sync complete"); | 573 | // tracing::info!(relay = %relay_url, "Daily sync complete"); |
| 623 | } | 574 | } |
| 624 | 575 | ||
| 625 | /// Run the sync manager | 576 | /// Run the sync manager |
| @@ -827,79 +778,19 @@ impl SyncManager { | |||
| 827 | // Step 2: Check if consolidation is needed BEFORE adding new filters | 778 | // Step 2: Check if consolidation is needed BEFORE adding new filters |
| 828 | self.maybe_consolidate(&action.relay_url, action.filters.len()) | 779 | self.maybe_consolidate(&action.relay_url, action.filters.len()) |
| 829 | .await; | 780 | .await; |
| 830 | /// DELETE this bit | ||
| 831 | // Step 3: Get connection and subscribe to all filters | ||
| 832 | let connection = match self.connections.get(&action.relay_url) { | ||
| 833 | Some(conn) => conn, | ||
| 834 | None => { | ||
| 835 | tracing::warn!( | ||
| 836 | relay = %action.relay_url, | ||
| 837 | "No connection for relay, cannot subscribe" | ||
| 838 | ); | ||
| 839 | return; | ||
| 840 | } | ||
| 841 | }; | ||
| 842 | 781 | ||
| 843 | // Subscribe to each filter and collect subscription IDs | 782 | // Subscribe to each filter and collect subscription IDs |
| 844 | let mut subscription_ids = Vec::new(); | 783 | tracing::info!( |
| 845 | for filter in &action.filters { | ||
| 846 | match connection.subscribe_filter(filter.clone()).await { | ||
| 847 | Ok(sub_id) => { | ||
| 848 | subscription_ids.push(sub_id); | ||
| 849 | } | ||
| 850 | Err(e) => { | ||
| 851 | tracing::error!( | ||
| 852 | relay = %action.relay_url, | ||
| 853 | error = %e, | ||
| 854 | "Failed to subscribe to filter" | ||
| 855 | ); | ||
| 856 | } | ||
| 857 | } | ||
| 858 | } | ||
| 859 | |||
| 860 | if subscription_ids.is_empty() && !action.filters.is_empty() { | ||
| 861 | tracing::warn!( | ||
| 862 | relay = %action.relay_url, | ||
| 863 | "All filter subscriptions failed, not creating batch" | ||
| 864 | ); | ||
| 865 | return; | ||
| 866 | } | ||
| 867 | |||
| 868 | // Step 4: Create PendingBatch | ||
| 869 | let batch_id = self.next_batch_id(); | ||
| 870 | let batch = PendingBatch { | ||
| 871 | batch_id, | ||
| 872 | items: PendingItems { | ||
| 873 | repos: action.items.repos.clone(), | ||
| 874 | root_events: action.items.root_events.clone(), | ||
| 875 | }, | ||
| 876 | outstanding_subs: subscription_ids.into_iter().collect(), | ||
| 877 | sync_method: SyncMethod::ReqEose, | ||
| 878 | }; | ||
| 879 | |||
| 880 | // Step 5: Add to pending_sync_index | ||
| 881 | { | ||
| 882 | let mut pending = self.pending_sync_index.write().await; | ||
| 883 | pending | ||
| 884 | .entry(action.relay_url.clone()) | ||
| 885 | .or_insert_with(Vec::new) | ||
| 886 | .push(batch); | ||
| 887 | } | ||
| 888 | |||
| 889 | tracing::debug!( | ||
| 890 | relay = %action.relay_url, | 784 | relay = %action.relay_url, |
| 891 | batch_id = batch_id, | 785 | filter_count = action.filters.len(), |
| 892 | repos = action.items.repos.len(), | 786 | repo_count = action.items.repos.len(), |
| 893 | root_events = action.items.root_events.len(), | 787 | root_event_count = action.items.root_events.len(), |
| 894 | filters = action.filters.len(), | 788 | "handle_add_filters: calling sync_live and historic_sync" |
| 895 | "Created pending batch for filter subscriptions" | ||
| 896 | ); | 789 | ); |
| 897 | // REPLACE WITH THIS: | 790 | |
| 898 | // // Subscribe to each filter and collect subscription IDs | 791 | self.sync_live(&action.relay_url, &action.filters).await; |
| 899 | // self.sync_live(&action.relay_url, &action.filters).await; | 792 | self.historic_sync(&action.relay_url, action.filters, action.items, None) |
| 900 | // // TODO need to do actions.repos | 793 | .await; |
| 901 | // self.historic_sync(&action.relay_url, action.filters, action.items, None) | ||
| 902 | // .await; | ||
| 903 | } | 794 | } |
| 904 | 795 | ||
| 905 | /// Handle a connection success (called when a relay connects or reconnects) | 796 | /// Handle a connection success (called when a relay connects or reconnects) |
| @@ -909,209 +800,39 @@ impl SyncManager { | |||
| 909 | /// - `quick_reconnect()` if disconnected < 15 minutes | 800 | /// - `quick_reconnect()` if disconnected < 15 minutes |
| 910 | /// - `long_reconnect()` if disconnected > 15 minutes | 801 | /// - `long_reconnect()` if disconnected > 15 minutes |
| 911 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { | 802 | async fn handle_connect_or_reconnect(&mut self, relay_url: &str) { |
| 912 | let now = Timestamp::now(); | 803 | let last_connected = { |
| 913 | |||
| 914 | // // Get the relay state to determine reconnect type | ||
| 915 | // let (last_connected, disconnected_at) = { | ||
| 916 | // let index = self.relay_sync_index.read().await; | ||
| 917 | // if let Some(state) = index.get(relay_url) { | ||
| 918 | // (state.last_connected, state.disconnected_at) | ||
| 919 | // } else { | ||
| 920 | // (None, None) // No state found | ||
| 921 | // } | ||
| 922 | // }; | ||
| 923 | |||
| 924 | // // Determine which reconnection strategy to use | ||
| 925 | // match (last_connected, disconnected_at) { | ||
| 926 | // (None, _) => { | ||
| 927 | // // Never connected before - fresh start | ||
| 928 | // tracing::info!( | ||
| 929 | // relay = %relay_url, | ||
| 930 | // "First connection - initiating fresh_start" | ||
| 931 | // ); | ||
| 932 | // self.fresh_start(relay_url).await; | ||
| 933 | // } | ||
| 934 | // (Some(last), Some(disconnected)) => { | ||
| 935 | // // Was connected before, check how long disconnected | ||
| 936 | // let disconnect_duration = now.as_secs().saturating_sub(disconnected.as_secs()); | ||
| 937 | |||
| 938 | // if disconnect_duration <= QUICK_RECONNECT_WINDOW_SECS { | ||
| 939 | // // Disconnected < 15 minutes - quick reconnect | ||
| 940 | // // Use last_connected minus buffer as since timestamp | ||
| 941 | // let since = | ||
| 942 | // Timestamp::from(last.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); | ||
| 943 | // tracing::info!( | ||
| 944 | // relay = %relay_url, | ||
| 945 | // disconnect_secs = disconnect_duration, | ||
| 946 | // since = %since, | ||
| 947 | // "Short disconnection - initiating quick_reconnect" | ||
| 948 | // ); | ||
| 949 | // self.quick_reconnect(relay_url, since).await; | ||
| 950 | // } else { | ||
| 951 | // // Disconnected > 15 minutes - long reconnect | ||
| 952 | // tracing::info!( | ||
| 953 | // relay = %relay_url, | ||
| 954 | // disconnect_secs = disconnect_duration, | ||
| 955 | // "Long disconnection - initiating long_reconnect" | ||
| 956 | // ); | ||
| 957 | // self.long_reconnect(relay_url).await; | ||
| 958 | // } | ||
| 959 | // } | ||
| 960 | // (Some(_last), None) => { | ||
| 961 | // // Was connected but no disconnected_at - shouldn't happen normally | ||
| 962 | // // Treat as long reconnect to be safe | ||
| 963 | // tracing::warn!( | ||
| 964 | // relay = %relay_url, | ||
| 965 | // "Unexpected state: last_connected set but no disconnected_at - using long_reconnect" | ||
| 966 | // ); | ||
| 967 | // self.long_reconnect(relay_url).await; | ||
| 968 | // } | ||
| 969 | // } | ||
| 970 | // Get the relay state to determine reconnect type | ||
| 971 | let (is_fresh_sync, last_connected, is_bootstrap) = { | ||
| 972 | let index = self.relay_sync_index.read().await; | 804 | let index = self.relay_sync_index.read().await; |
| 973 | if let Some(state) = index.get(relay_url) { | 805 | index.get(relay_url).and_then(|s| s.last_connected) |
| 974 | let last_conn = state.last_connected; | ||
| 975 | let is_fresh = match last_conn { | ||
| 976 | None => true, // Never connected before | ||
| 977 | Some(last) => { | ||
| 978 | let elapsed = now.as_secs().saturating_sub(last.as_secs()); | ||
| 979 | elapsed > QUICK_RECONNECT_WINDOW_SECS // Stale if > 15 min | ||
| 980 | } | ||
| 981 | }; | ||
| 982 | (is_fresh, last_conn, state.is_bootstrap) | ||
| 983 | } else { | ||
| 984 | (true, None, false) // No state found, treat as fresh | ||
| 985 | } | ||
| 986 | }; | 806 | }; |
| 987 | 807 | ||
| 988 | // If stale reconnect, clear state | 808 | if let Some(last) = last_connected { |
| 989 | if is_fresh_sync && last_connected.is_some() { | 809 | let elapsed = Timestamp::now().as_secs().saturating_sub(last.as_secs()); |
| 990 | let mut index = self.relay_sync_index.write().await; | 810 | if elapsed < QUICK_RECONNECT_WINDOW_SECS { |
| 991 | if let Some(state) = index.get_mut(relay_url) { | 811 | // short disconnect |
| 992 | state.clear_sync_state(); | ||
| 993 | tracing::info!( | 812 | tracing::info!( |
| 994 | relay = %relay_url, | 813 | relay = %relay_url, |
| 995 | "Cleared stale sync state (was disconnected > 15 min)" | 814 | disconnect_secs = elapsed, |
| 815 | "Short disconnection - initiating quick_reconnect" | ||
| 996 | ); | 816 | ); |
| 997 | } | 817 | self.quick_reconnect(relay_url, Timestamp::from(elapsed)) |
| 998 | } | ||
| 999 | |||
| 1000 | // Update connection state | ||
| 1001 | { | ||
| 1002 | let mut index = self.relay_sync_index.write().await; | ||
| 1003 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 1004 | state.connection_status = ConnectionStatus::Connected; | ||
| 1005 | state.last_connected = Some(now); | ||
| 1006 | state.disconnected_at = None; | ||
| 1007 | } | ||
| 1008 | |||
| 1009 | // Record success in health tracker | ||
| 1010 | self.health_tracker.record_success(relay_url); | ||
| 1011 | |||
| 1012 | // Update metrics | ||
| 1013 | if let Some(ref metrics) = self.metrics { | ||
| 1014 | metrics.set_relay_connected(relay_url, true); | ||
| 1015 | metrics.inc_connected_count(); | ||
| 1016 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1017 | } | ||
| 1018 | |||
| 1019 | // Subscribe based on reconnect type | ||
| 1020 | if is_fresh_sync { | ||
| 1021 | tracing::info!( | ||
| 1022 | relay = %relay_url, | ||
| 1023 | is_bootstrap = is_bootstrap, | ||
| 1024 | "Fresh sync - checking NIP-77 negentropy support" | ||
| 1025 | ); | ||
| 1026 | |||
| 1027 | // Check if relay supports NIP-77 negentropy for efficient sync | ||
| 1028 | // Respect the sync_disable_negentropy config option | ||
| 1029 | let use_negentropy = if self.config.sync_disable_negentropy { | ||
| 1030 | tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); | ||
| 1031 | false | ||
| 1032 | } else if let Some(connection) = self.connections.get(relay_url) { | ||
| 1033 | connection.supports_negentropy().await | ||
| 1034 | } else { | ||
| 1035 | false | ||
| 1036 | }; | ||
| 1037 | |||
| 1038 | if use_negentropy { | ||
| 1039 | // NIP-77 supported - use negentropy for historical sync | ||
| 1040 | tracing::info!( | ||
| 1041 | relay = %relay_url, | ||
| 1042 | "Using NIP-77 negentropy for fresh sync" | ||
| 1043 | ); | ||
| 1044 | |||
| 1045 | // Perform negentropy sync for Layer 1 (announcements) | ||
| 1046 | let layer1_filter = filters::build_announcement_filter(None); | ||
| 1047 | self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1") | ||
| 1048 | .await; | 818 | .await; |
| 1049 | |||
| 1050 | // After negentropy sync, recompute Layer 2+3 actions | ||
| 1051 | // Layer 1 events are now in sync, so we can proceed with Layer 2+3 | ||
| 1052 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1053 | |||
| 1054 | // Set up live subscription for new events (since=now) | ||
| 1055 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 1056 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1057 | if let Err(e) = connection.subscribe_filter(live_filter).await { | ||
| 1058 | tracing::error!( | ||
| 1059 | relay = %relay_url, | ||
| 1060 | error = %e, | ||
| 1061 | "Failed to set up live Layer 1 subscription after negentropy sync" | ||
| 1062 | ); | ||
| 1063 | } | ||
| 1064 | } | ||
| 1065 | } else { | 819 | } else { |
| 1066 | // NIP-77 not supported - fall back to REQ+EOSE | 820 | // long disconnect |
| 1067 | tracing::info!( | 821 | tracing::info!( |
| 1068 | relay = %relay_url, | 822 | relay = %relay_url, |
| 1069 | "NIP-77 not supported, using REQ+EOSE for fresh sync" | 823 | disconnect_secs = elapsed, |
| 824 | "Long disconnection - initiating fresh_start" | ||
| 1070 | ); | 825 | ); |
| 1071 | // Fresh sync: Layer 1 subscription (without since) was already established | 826 | self.fresh_start(relay_url).await; |
| 1072 | // during connect_and_subscribe() in handle_add_filters(). That call subscribes | ||
| 1073 | // to kinds 30617+30618 for the full history. Here we only need to recompute | ||
| 1074 | // Layer 2+3 actions based on the repos we're tracking. | ||
| 1075 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1076 | } | 827 | } |
| 1077 | } else { | 828 | } else { |
| 1078 | // Quick reconnect: use since filter (no negentropy needed) | 829 | // not successfully connected before (since launching binary) |
| 1079 | let since_ts = Timestamp::from( | ||
| 1080 | last_connected | ||
| 1081 | .unwrap() | ||
| 1082 | .as_secs() | ||
| 1083 | .saturating_sub(QUICK_RECONNECT_WINDOW_SECS), | ||
| 1084 | ); | ||
| 1085 | |||
| 1086 | tracing::info!( | 830 | tracing::info!( |
| 1087 | relay = %relay_url, | 831 | relay = %relay_url, |
| 1088 | since = %since_ts, | 832 | "First connection - initiating fresh_start" |
| 1089 | "Quick reconnect - using since filter for incremental sync" | ||
| 1090 | ); | 833 | ); |
| 1091 | 834 | self.fresh_start(relay_url).await; | |
| 1092 | // Subscribe to Layer 1 (announcements) with since filter to catch new repos | 835 | }; |
| 1093 | let layer1_filter = filters::build_announcement_filter(Some(since_ts)); | ||
| 1094 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1095 | if let Err(e) = connection.subscribe_filter(layer1_filter).await { | ||
| 1096 | tracing::error!( | ||
| 1097 | relay = %relay_url, | ||
| 1098 | error = %e, | ||
| 1099 | "Failed to subscribe to Layer 1 filter on quick reconnect" | ||
| 1100 | ); | ||
| 1101 | } | ||
| 1102 | } | ||
| 1103 | |||
| 1104 | // Rebuild Layer 2 and Layer 3 with since filter | ||
| 1105 | self.rebuild_layer2_and_layer3(relay_url, Some(since_ts)) | ||
| 1106 | .await; | ||
| 1107 | |||
| 1108 | // Recompute actions for any new items discovered while disconnected | ||
| 1109 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1110 | |||
| 1111 | if let Some(ref metrics) = self.metrics { | ||
| 1112 | metrics.record_event(event_source::RECONNECT); | ||
| 1113 | } | ||
| 1114 | } | ||
| 1115 | } | 836 | } |
| 1116 | 837 | ||
| 1117 | /// Fresh start - clears state and does full sync | 838 | /// Fresh start - clears state and does full sync |
| @@ -1155,77 +876,16 @@ impl SyncManager { | |||
| 1155 | "Cleared sync state in fresh_start" | 876 | "Cleared sync state in fresh_start" |
| 1156 | ); | 877 | ); |
| 1157 | } | 878 | } |
| 1158 | } | 879 | if state.connection_status == ConnectionStatus::Connected { |
| 1159 | } | 880 | // TODO start layer 1 |
| 1160 | 881 | drop(index); | |
| 1161 | // Step 3: Update connection state | 882 | self.recompute_new_sync_filters_for_relay(relay_url).await; |
| 1162 | { | ||
| 1163 | let mut index = self.relay_sync_index.write().await; | ||
| 1164 | let state = index.entry(relay_url.to_string()).or_default(); | ||
| 1165 | state.connection_status = ConnectionStatus::Connected; | ||
| 1166 | state.last_connected = Some(now); | ||
| 1167 | state.disconnected_at = None; | ||
| 1168 | } | ||
| 1169 | |||
| 1170 | // Record success in health tracker | ||
| 1171 | self.health_tracker.record_success(relay_url); | ||
| 1172 | |||
| 1173 | // Update metrics | ||
| 1174 | if let Some(ref metrics) = self.metrics { | ||
| 1175 | metrics.set_relay_connected(relay_url, true); | ||
| 1176 | metrics.inc_connected_count(); | ||
| 1177 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1178 | } | ||
| 1179 | |||
| 1180 | // Step 4: L1 sync - check negentropy support | ||
| 1181 | let use_negentropy = if self.config.sync_disable_negentropy { | ||
| 1182 | tracing::debug!(relay = %relay_url, "Negentropy disabled via config"); | ||
| 1183 | false | ||
| 1184 | } else if let Some(connection) = self.connections.get(relay_url) { | ||
| 1185 | connection.supports_negentropy().await | ||
| 1186 | } else { | ||
| 1187 | false | ||
| 1188 | }; | ||
| 1189 | |||
| 1190 | if use_negentropy { | ||
| 1191 | // NIP-77 supported - use negentropy for L1 historical sync | ||
| 1192 | tracing::info!( | ||
| 1193 | relay = %relay_url, | ||
| 1194 | "Using NIP-77 negentropy for L1 historical sync" | ||
| 1195 | ); | ||
| 1196 | |||
| 1197 | // L1 historic sync (no since - full sync) | ||
| 1198 | let layer1_filter = filters::build_announcement_filter(None); | ||
| 1199 | self.negentropy_sync_and_process(relay_url, layer1_filter, "Layer 1 (fresh_start)") | ||
| 1200 | .await; | ||
| 1201 | |||
| 1202 | // L1 live subscription (since=now for ongoing events) | ||
| 1203 | let live_filter = filters::build_announcement_filter(Some(now)); | ||
| 1204 | if let Some(connection) = self.connections.get(relay_url) { | ||
| 1205 | if let Err(e) = connection.subscribe_filter(live_filter).await { | ||
| 1206 | tracing::error!( | ||
| 1207 | relay = %relay_url, | ||
| 1208 | error = %e, | ||
| 1209 | "Failed to set up L1 live subscription in fresh_start" | ||
| 1210 | ); | ||
| 1211 | } | 883 | } |
| 884 | } else { | ||
| 885 | drop(index); | ||
| 886 | return self.spawn_relay_connection(relay_url.to_string()).await; | ||
| 1212 | } | 887 | } |
| 1213 | } else { | ||
| 1214 | // NIP-77 not supported - REQ+EOSE | ||
| 1215 | // Note: Layer 1 subscription (without since) was already established | ||
| 1216 | // during connect_and_subscribe() in spawn_relay_connection | ||
| 1217 | tracing::info!( | ||
| 1218 | relay = %relay_url, | ||
| 1219 | "Using REQ+EOSE for L1 sync (negentropy not available)" | ||
| 1220 | ); | ||
| 1221 | } | 888 | } |
| 1222 | |||
| 1223 | // Step 5: compute_actions → AddFilters for L2+L3 | ||
| 1224 | // Since RelaySyncIndex is now empty, compute_actions will produce AddFilters | ||
| 1225 | // for ALL repos that should be synced from this relay | ||
| 1226 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1227 | |||
| 1228 | tracing::info!(relay = %relay_url, "fresh_start complete"); | ||
| 1229 | } | 889 | } |
| 1230 | 890 | ||
| 1231 | /// Quick reconnect - for disconnections < 15 minutes | 891 | /// Quick reconnect - for disconnections < 15 minutes |
| @@ -1312,27 +972,6 @@ impl SyncManager { | |||
| 1312 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); | 972 | tracing::info!(relay = %relay_url, "quick_reconnect complete"); |
| 1313 | } | 973 | } |
| 1314 | 974 | ||
| 1315 | /// Long reconnect - for disconnections > 15 minutes | ||
| 1316 | /// | ||
| 1317 | /// Flow: | ||
| 1318 | /// 1. Record disconnect/reconnect metric | ||
| 1319 | /// 2. Delegate to fresh_start() | ||
| 1320 | async fn long_reconnect(&mut self, relay_url: &str) { | ||
| 1321 | tracing::info!(relay = %relay_url, "Starting long_reconnect"); | ||
| 1322 | |||
| 1323 | // Step 1: Record disconnect/reconnect metric | ||
| 1324 | // This distinguishes intentional daily refresh from failure recovery | ||
| 1325 | if let Some(ref metrics) = self.metrics { | ||
| 1326 | metrics.record_event(event_source::RECONNECT); | ||
| 1327 | } | ||
| 1328 | |||
| 1329 | // Step 2: Delegate to fresh_start | ||
| 1330 | // State is too stale to trust, start fresh | ||
| 1331 | self.fresh_start(relay_url).await; | ||
| 1332 | |||
| 1333 | tracing::info!(relay = %relay_url, "long_reconnect complete"); | ||
| 1334 | } | ||
| 1335 | |||
| 1336 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay | 975 | /// Rebuild Layer 2 and Layer 3 subscriptions for a relay |
| 1337 | /// | 976 | /// |
| 1338 | /// Uses the confirmed repos and root_events from RelayState to build filters. | 977 | /// Uses the confirmed repos and root_events from RelayState to build filters. |
| @@ -1367,11 +1006,15 @@ impl SyncManager { | |||
| 1367 | // Build Layer 2 and Layer 3 filters | 1006 | // Build Layer 2 and Layer 3 filters |
| 1368 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); | 1007 | let filters = build_layer2_and_layer3_filters(&repos, &root_events, since); |
| 1369 | 1008 | ||
| 1009 | // DEBUG TRACING: Log detailed filter information | ||
| 1370 | tracing::debug!( | 1010 | tracing::debug!( |
| 1371 | relay = %relay_url, | 1011 | relay = %relay_url, |
| 1372 | filter_count = filters.len(), | 1012 | filter_count = filters.len(), |
| 1373 | repos_count = repos.len(), | 1013 | repos_count = repos.len(), |
| 1374 | root_events_count = root_events.len(), | 1014 | root_events_count = root_events.len(), |
| 1015 | repos = ?repos, | ||
| 1016 | root_events = ?root_events.iter().map(|id| id.to_hex()).collect::<Vec<_>>(), | ||
| 1017 | filters = ?filters, | ||
| 1375 | since = ?since, | 1018 | since = ?since, |
| 1376 | "Rebuilding Layer 2/3 filters" | 1019 | "Rebuilding Layer 2/3 filters" |
| 1377 | ); | 1020 | ); |
| @@ -1829,197 +1472,6 @@ impl SyncManager { | |||
| 1829 | } | 1472 | } |
| 1830 | 1473 | ||
| 1831 | // ========================================================================= | 1474 | // ========================================================================= |
| 1832 | // NIP-77 Negentropy Sync Helpers | ||
| 1833 | // ========================================================================= | ||
| 1834 | |||
| 1835 | /// Perform negentropy sync for a filter and process received events | ||
| 1836 | /// | ||
| 1837 | /// This method uses the unified PendingBatch flow: | ||
| 1838 | /// 1. Creates a PendingBatch with targets for this relay | ||
| 1839 | /// 2. Performs negentropy reconciliation with the remote relay | ||
| 1840 | /// 3. On success, confirms the batch (moves items to RelayState) | ||
| 1841 | /// 4. On failure, removes the batch without confirming | ||
| 1842 | /// | ||
| 1843 | /// This ensures consistent state tracking across both sync paths | ||
| 1844 | /// (REQ+EOSE and Negentropy). | ||
| 1845 | /// | ||
| 1846 | /// # Arguments | ||
| 1847 | /// * `relay_url` - The relay URL to sync with | ||
| 1848 | /// * `filter` - The filter defining which events to sync | ||
| 1849 | /// * `layer_name` - Human-readable layer name for logging (e.g., "Layer 1") | ||
| 1850 | /// | ||
| 1851 | /// # Returns | ||
| 1852 | /// Number of events received from negentropy sync | ||
| 1853 | async fn negentropy_sync_and_process( | ||
| 1854 | &mut self, | ||
| 1855 | relay_url: &str, | ||
| 1856 | filter: Filter, | ||
| 1857 | layer_name: &str, | ||
| 1858 | ) -> usize { | ||
| 1859 | use crate::sync::algorithms::derive_relay_targets; | ||
| 1860 | |||
| 1861 | // Check connection exists first (borrow ends immediately) | ||
| 1862 | if !self.connections.contains_key(relay_url) { | ||
| 1863 | tracing::warn!( | ||
| 1864 | relay = %relay_url, | ||
| 1865 | layer = layer_name, | ||
| 1866 | "No connection found for negentropy sync" | ||
| 1867 | ); | ||
| 1868 | return 0; | ||
| 1869 | } | ||
| 1870 | |||
| 1871 | // Step 1: Get targets for this relay and create PendingBatch | ||
| 1872 | // Get batch_id first (requires mutable borrow of self) | ||
| 1873 | let batch_id = self.next_batch_id(); | ||
| 1874 | |||
| 1875 | let pending_items = { | ||
| 1876 | let repo_index = self.repo_sync_index.read().await; | ||
| 1877 | let targets = derive_relay_targets(&repo_index); | ||
| 1878 | |||
| 1879 | let relay_targets = match targets.get(relay_url) { | ||
| 1880 | Some(t) => t, | ||
| 1881 | None => { | ||
| 1882 | tracing::debug!( | ||
| 1883 | relay = %relay_url, | ||
| 1884 | layer = layer_name, | ||
| 1885 | "No targets found for relay, skipping negentropy sync" | ||
| 1886 | ); | ||
| 1887 | return 0; | ||
| 1888 | } | ||
| 1889 | }; | ||
| 1890 | |||
| 1891 | PendingItems { | ||
| 1892 | repos: relay_targets.repos.clone(), | ||
| 1893 | root_events: relay_targets.root_events.clone(), | ||
| 1894 | } | ||
| 1895 | }; | ||
| 1896 | |||
| 1897 | // Create PendingBatch for negentropy sync (empty outstanding_subs) | ||
| 1898 | let batch = PendingBatch { | ||
| 1899 | batch_id, | ||
| 1900 | items: pending_items.clone(), | ||
| 1901 | outstanding_subs: HashSet::new(), // Negentropy doesn't use subscriptions | ||
| 1902 | sync_method: SyncMethod::Negentropy, | ||
| 1903 | }; | ||
| 1904 | |||
| 1905 | // Add batch to pending_sync_index before starting sync | ||
| 1906 | { | ||
| 1907 | let mut pending = self.pending_sync_index.write().await; | ||
| 1908 | pending | ||
| 1909 | .entry(relay_url.to_string()) | ||
| 1910 | .or_insert_with(Vec::new) | ||
| 1911 | .push(batch); | ||
| 1912 | } | ||
| 1913 | |||
| 1914 | tracing::debug!( | ||
| 1915 | relay = %relay_url, | ||
| 1916 | layer = layer_name, | ||
| 1917 | batch_id = batch_id, | ||
| 1918 | repos = pending_items.repos.len(), | ||
| 1919 | root_events = pending_items.root_events.len(), | ||
| 1920 | "Created pending batch for negentropy sync" | ||
| 1921 | ); | ||
| 1922 | |||
| 1923 | // Step 2: Perform negentropy sync | ||
| 1924 | // Get connection reference here (borrows self.connections briefly) | ||
| 1925 | let Some(connection) = self.connections.get(relay_url) else { | ||
| 1926 | // Connection was removed between check and use (race condition) | ||
| 1927 | // Remove the pending batch we just added | ||
| 1928 | let mut pending = self.pending_sync_index.write().await; | ||
| 1929 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 1930 | batches.retain(|b| b.batch_id != batch_id); | ||
| 1931 | if batches.is_empty() { | ||
| 1932 | pending.remove(relay_url); | ||
| 1933 | } | ||
| 1934 | } | ||
| 1935 | tracing::warn!( | ||
| 1936 | relay = %relay_url, | ||
| 1937 | layer = layer_name, | ||
| 1938 | "Connection disappeared before negentropy sync could start" | ||
| 1939 | ); | ||
| 1940 | return 0; | ||
| 1941 | }; | ||
| 1942 | |||
| 1943 | match connection.negentropy_sync_filter(filter).await { | ||
| 1944 | Ok(result) => { | ||
| 1945 | let event_count = result.received.len(); | ||
| 1946 | |||
| 1947 | tracing::info!( | ||
| 1948 | relay = %relay_url, | ||
| 1949 | layer = layer_name, | ||
| 1950 | received = event_count, | ||
| 1951 | remote_only = result.remote_only.len(), | ||
| 1952 | local_only = result.local_only.len(), | ||
| 1953 | "Negentropy sync completed for {}", | ||
| 1954 | layer_name | ||
| 1955 | ); | ||
| 1956 | |||
| 1957 | tracing::debug!( | ||
| 1958 | relay = %relay_url, | ||
| 1959 | layer = layer_name, | ||
| 1960 | event_ids = ?result.received.iter().take(5).collect::<Vec<_>>(), | ||
| 1961 | "Received event IDs via negentropy (first 5 shown)" | ||
| 1962 | ); | ||
| 1963 | |||
| 1964 | // Record metrics for negentropy events | ||
| 1965 | if let Some(ref metrics) = self.metrics { | ||
| 1966 | for _ in 0..event_count { | ||
| 1967 | metrics.record_event(event_source::STARTUP); | ||
| 1968 | } | ||
| 1969 | } | ||
| 1970 | |||
| 1971 | // Step 3: Remove batch from pending and confirm it | ||
| 1972 | let completed_batch = { | ||
| 1973 | let mut pending = self.pending_sync_index.write().await; | ||
| 1974 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 1975 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); | ||
| 1976 | if let Some(idx) = batch_idx { | ||
| 1977 | let batch = batches.remove(idx); | ||
| 1978 | if batches.is_empty() { | ||
| 1979 | pending.remove(relay_url); | ||
| 1980 | } | ||
| 1981 | Some(batch) | ||
| 1982 | } else { | ||
| 1983 | None | ||
| 1984 | } | ||
| 1985 | } else { | ||
| 1986 | None | ||
| 1987 | } | ||
| 1988 | }; | ||
| 1989 | |||
| 1990 | // Confirm the batch using unified confirm_batch method | ||
| 1991 | if let Some(batch) = completed_batch { | ||
| 1992 | self.confirm_batch(relay_url, batch).await; | ||
| 1993 | } | ||
| 1994 | |||
| 1995 | event_count | ||
| 1996 | } | ||
| 1997 | Err(e) => { | ||
| 1998 | tracing::warn!( | ||
| 1999 | relay = %relay_url, | ||
| 2000 | layer = layer_name, | ||
| 2001 | error = %e, | ||
| 2002 | "Negentropy sync failed for {}", | ||
| 2003 | layer_name | ||
| 2004 | ); | ||
| 2005 | |||
| 2006 | // Remove the batch without confirming on failure | ||
| 2007 | { | ||
| 2008 | let mut pending = self.pending_sync_index.write().await; | ||
| 2009 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 2010 | batches.retain(|b| b.batch_id != batch_id); | ||
| 2011 | if batches.is_empty() { | ||
| 2012 | pending.remove(relay_url); | ||
| 2013 | } | ||
| 2014 | } | ||
| 2015 | } | ||
| 2016 | |||
| 2017 | 0 | ||
| 2018 | } | ||
| 2019 | } | ||
| 2020 | } | ||
| 2021 | |||
| 2022 | // ========================================================================= | ||
| 2023 | // Consolidation System | 1475 | // Consolidation System |
| 2024 | // ========================================================================= | 1476 | // ========================================================================= |
| 2025 | 1477 | ||
| @@ -2410,7 +1862,7 @@ impl SyncManager { | |||
| 2410 | // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", | 1862 | // Note: nostr-sdk Filter doesn't have a limit(0) that means "no limit", |
| 2411 | // but omitting limit means "no limit" which is what we want for live. | 1863 | // but omitting limit means "no limit" which is what we want for live. |
| 2412 | // The filter passed in should already NOT have a limit set. | 1864 | // The filter passed in should already NOT have a limit set. |
| 2413 | match connection.subscribe_filter(filter.clone()).await { | 1865 | match connection.subscribe_filter(filter.clone().limit(1)).await { |
| 2414 | Ok(sub_id) => { | 1866 | Ok(sub_id) => { |
| 2415 | tracing::trace!( | 1867 | tracing::trace!( |
| 2416 | relay = %relay_url, | 1868 | relay = %relay_url, |
| @@ -2519,6 +1971,17 @@ impl SyncManager { | |||
| 2519 | items: PendingItems, | 1971 | items: PendingItems, |
| 2520 | since: Option<Timestamp>, | 1972 | since: Option<Timestamp>, |
| 2521 | ) -> Option<u64> { | 1973 | ) -> Option<u64> { |
| 1974 | // DEBUG TRACING: Log all filters being passed to historic_sync | ||
| 1975 | tracing::debug!( | ||
| 1976 | relay = %relay_url, | ||
| 1977 | filter_count = filters.len(), | ||
| 1978 | filters = ?filters, | ||
| 1979 | repos_count = items.repos.len(), | ||
| 1980 | root_events_count = items.root_events.len(), | ||
| 1981 | since = ?since, | ||
| 1982 | "historic_sync called" | ||
| 1983 | ); | ||
| 1984 | |||
| 2522 | if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { | 1985 | if filters.is_empty() && items.repos.is_empty() && items.root_events.is_empty() { |
| 2523 | tracing::debug!( | 1986 | tracing::debug!( |
| 2524 | relay = %relay_url, | 1987 | relay = %relay_url, |
| @@ -2527,9 +1990,9 @@ impl SyncManager { | |||
| 2527 | return None; | 1990 | return None; |
| 2528 | } | 1991 | } |
| 2529 | 1992 | ||
| 2530 | // Check connection exists | 1993 | // Check connection exists and clone for async usage |
| 2531 | let connection = match self.connections.get(relay_url) { | 1994 | let connection = match self.connections.get(relay_url) { |
| 2532 | Some(conn) => conn, | 1995 | Some(conn) => conn.clone(), |
| 2533 | None => { | 1996 | None => { |
| 2534 | tracing::warn!( | 1997 | tracing::warn!( |
| 2535 | relay = %relay_url, | 1998 | relay = %relay_url, |
| @@ -2581,38 +2044,63 @@ impl SyncManager { | |||
| 2581 | .push(batch); | 2044 | .push(batch); |
| 2582 | } | 2045 | } |
| 2583 | 2046 | ||
| 2584 | // Perform negentropy sync for each filter | 2047 | // Perform negentropy sync for all filters concurrently |
| 2585 | // Note: We sync each filter separately because negentropy works on a single filter | 2048 | // Note: We sync each filter separately because negentropy works on a single filter |
| 2586 | let mut total_received = 0; | 2049 | let diff_futures: Vec<_> = filters_with_since |
| 2587 | let mut any_success = false; | 2050 | .iter() |
| 2051 | .enumerate() | ||
| 2052 | .map(|(idx, filter)| { | ||
| 2053 | let filter = filter.clone(); | ||
| 2054 | let conn = connection.clone(); | ||
| 2055 | async move { (idx, conn.negentropy_sync_diff(filter).await) } | ||
| 2056 | }) | ||
| 2057 | .collect(); | ||
| 2588 | 2058 | ||
| 2589 | for filter in &filters_with_since { | 2059 | let diff_results = futures_util::future::join_all(diff_futures).await; |
| 2590 | if let Some(conn) = self.connections.get(relay_url) { | 2060 | |
| 2591 | match conn.negentropy_sync_filter(filter.clone()).await { | 2061 | // Process results - collect all event IDs we need to fetch |
| 2592 | Ok(result) => { | 2062 | let mut all_remote_ids = Vec::new(); |
| 2593 | total_received += result.received.len(); | 2063 | let mut failed_count = 0; |
| 2594 | any_success = true; | 2064 | |
| 2595 | 2065 | for (idx, result) in diff_results { | |
| 2596 | // Record metrics for received events | 2066 | match result { |
| 2597 | if let Some(ref metrics) = self.metrics { | 2067 | Ok(reconciliation) => { |
| 2598 | for _ in 0..result.received.len() { | 2068 | let remote_count = reconciliation.remote.len(); |
| 2599 | metrics.record_event(event_source::STARTUP); | 2069 | if remote_count > 0 { |
| 2600 | } | 2070 | tracing::debug!( |
| 2601 | } | ||
| 2602 | } | ||
| 2603 | Err(e) => { | ||
| 2604 | tracing::warn!( | ||
| 2605 | relay = %relay_url, | 2071 | relay = %relay_url, |
| 2606 | error = %e, | 2072 | filter_idx = idx, |
| 2607 | "Negentropy sync failed for filter in historic_sync" | 2073 | remote_count = remote_count, |
| 2074 | "Negentropy diff identified missing events" | ||
| 2608 | ); | 2075 | ); |
| 2076 | all_remote_ids.extend(reconciliation.remote.into_iter()); | ||
| 2609 | } | 2077 | } |
| 2610 | } | 2078 | } |
| 2079 | Err(e) => { | ||
| 2080 | failed_count += 1; | ||
| 2081 | tracing::warn!( | ||
| 2082 | relay = %relay_url, | ||
| 2083 | filter_idx = idx, | ||
| 2084 | error = %e, | ||
| 2085 | "Negentropy diff failed for filter in historic_sync" | ||
| 2086 | ); | ||
| 2087 | } | ||
| 2611 | } | 2088 | } |
| 2612 | } | 2089 | } |
| 2613 | 2090 | ||
| 2614 | if any_success { | 2091 | // Require ALL filters to succeed to confirm the batch |
| 2615 | // Remove batch from pending and confirm it | 2092 | if failed_count > 0 { |
| 2093 | // Leave pending batch so it doesnt appear as synced. we can try again later. | ||
| 2094 | tracing::warn!( | ||
| 2095 | relay = %relay_url, | ||
| 2096 | batch_id = batch_id, | ||
| 2097 | failed_count = failed_count, | ||
| 2098 | total_filters = filters_with_since.len(), | ||
| 2099 | "historic_sync (negentropy) failed - not all filters succeeded" | ||
| 2100 | ); | ||
| 2101 | return None; | ||
| 2102 | } else if all_remote_ids.is_empty() { | ||
| 2103 | // Remove batch from pending and confirm it (no items to download) | ||
| 2616 | let completed_batch = { | 2104 | let completed_batch = { |
| 2617 | let mut pending = self.pending_sync_index.write().await; | 2105 | let mut pending = self.pending_sync_index.write().await; |
| 2618 | if let Some(batches) = pending.get_mut(relay_url) { | 2106 | if let Some(batches) = pending.get_mut(relay_url) { |
| @@ -2638,26 +2126,67 @@ impl SyncManager { | |||
| 2638 | tracing::info!( | 2126 | tracing::info!( |
| 2639 | relay = %relay_url, | 2127 | relay = %relay_url, |
| 2640 | batch_id = batch_id, | 2128 | batch_id = batch_id, |
| 2641 | total_received = total_received, | 2129 | total_received = 0, |
| 2642 | "historic_sync (negentropy) completed" | 2130 | "historic_sync (negentropy) completed - already up-to-date" |
| 2643 | ); | 2131 | ); |
| 2644 | } else { | 2132 | } |
| 2645 | // All negentropy syncs failed - remove the pending batch | 2133 | // launch subscriptions to fetch missing events by id |
| 2134 | let ids_filters: Vec<_> = all_remote_ids | ||
| 2135 | .chunks(300) | ||
| 2136 | .map(|c| Filter::new().ids(c.iter().copied())) | ||
| 2137 | .collect(); | ||
| 2138 | |||
| 2139 | // DEBUG TRACING: Log that we're requesting events by ID | ||
| 2140 | tracing::debug!( | ||
| 2141 | relay = %relay_url, | ||
| 2142 | batch_id = batch_id, | ||
| 2143 | total_event_ids = all_remote_ids.len(), | ||
| 2144 | filter_chunks = ids_filters.len(), | ||
| 2145 | event_ids = ?all_remote_ids, | ||
| 2146 | "Creating subscriptions to fetch missing events by ID (negentropy path)" | ||
| 2147 | ); | ||
| 2148 | |||
| 2149 | let mut subscription_ids = HashSet::new(); | ||
| 2150 | for (idx, filter) in ids_filters.iter().enumerate() { | ||
| 2151 | if let Some(conn) = self.connections.get(relay_url) { | ||
| 2152 | // DEBUG TRACING: Log each filter being subscribed | ||
| 2153 | tracing::debug!( | ||
| 2154 | relay = %relay_url, | ||
| 2155 | batch_id = batch_id, | ||
| 2156 | chunk_idx = idx, | ||
| 2157 | filter = ?filter, | ||
| 2158 | "Subscribing to ID filter chunk" | ||
| 2159 | ); | ||
| 2160 | |||
| 2161 | match conn.subscribe_filter(filter.clone()).await { | ||
| 2162 | Ok(sub_id) => { | ||
| 2163 | subscription_ids.insert(sub_id); | ||
| 2164 | } | ||
| 2165 | Err(e) => { | ||
| 2166 | tracing::error!( | ||
| 2167 | relay = %relay_url, | ||
| 2168 | error = %e, | ||
| 2169 | "Failed to subscribe to filter in historic_sync" | ||
| 2170 | ); | ||
| 2171 | } | ||
| 2172 | } | ||
| 2173 | } | ||
| 2174 | } | ||
| 2175 | { | ||
| 2646 | let mut pending = self.pending_sync_index.write().await; | 2176 | let mut pending = self.pending_sync_index.write().await; |
| 2647 | if let Some(batches) = pending.get_mut(relay_url) { | 2177 | if let Some(relay_batches) = pending.get_mut(relay_url) { |
| 2648 | batches.retain(|b| b.batch_id != batch_id); | 2178 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { |
| 2649 | if batches.is_empty() { | 2179 | batch.outstanding_subs.extend(subscription_ids.clone()); |
| 2650 | pending.remove(relay_url); | ||
| 2651 | } | 2180 | } |
| 2652 | } | 2181 | } |
| 2653 | |||
| 2654 | tracing::warn!( | ||
| 2655 | relay = %relay_url, | ||
| 2656 | batch_id = batch_id, | ||
| 2657 | "historic_sync (negentropy) failed for all filters" | ||
| 2658 | ); | ||
| 2659 | return None; | ||
| 2660 | } | 2182 | } |
| 2183 | tracing::debug!( | ||
| 2184 | relay = %relay_url, | ||
| 2185 | batch_id = batch_id, | ||
| 2186 | subscription_ids = subscription_ids.len(), | ||
| 2187 | events = all_remote_ids.len(), | ||
| 2188 | "historic_sync (Negentropy) created subscritions to fetch missing events by id, awaiting EOSE" | ||
| 2189 | ); | ||
| 2661 | } else { | 2190 | } else { |
| 2662 | // Traditional REQ+EOSE path | 2191 | // Traditional REQ+EOSE path |
| 2663 | tracing::debug!( | 2192 | tracing::debug!( |
| @@ -2673,7 +2202,16 @@ impl SyncManager { | |||
| 2673 | // Subscribe to each filter and collect subscription IDs | 2202 | // Subscribe to each filter and collect subscription IDs |
| 2674 | let mut subscription_ids = HashSet::new(); | 2203 | let mut subscription_ids = HashSet::new(); |
| 2675 | 2204 | ||
| 2676 | for filter in &filters_with_since { | 2205 | // DEBUG TRACING: Log each filter in REQ+EOSE path |
| 2206 | for (idx, filter) in filters_with_since.iter().enumerate() { | ||
| 2207 | tracing::debug!( | ||
| 2208 | relay = %relay_url, | ||
| 2209 | batch_id = batch_id, | ||
| 2210 | filter_idx = idx, | ||
| 2211 | filter = ?filter, | ||
| 2212 | "Subscribing to filter in REQ+EOSE path" | ||
| 2213 | ); | ||
| 2214 | |||
| 2677 | if let Some(conn) = self.connections.get(relay_url) { | 2215 | if let Some(conn) = self.connections.get(relay_url) { |
| 2678 | match conn.subscribe_filter(filter.clone()).await { | 2216 | match conn.subscribe_filter(filter.clone()).await { |
| 2679 | Ok(sub_id) => { | 2217 | Ok(sub_id) => { |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 4167a0c..bc4b59e 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -325,11 +325,25 @@ impl RelayConnection { | |||
| 325 | /// * `Ok(SubscriptionId)` - The subscription ID on success | 325 | /// * `Ok(SubscriptionId)` - The subscription ID on success |
| 326 | /// * `Err(String)` - Error description on failure | 326 | /// * `Err(String)` - Error description on failure |
| 327 | pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> { | 327 | pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> { |
| 328 | // DEBUG TRACING: Log the filter being subscribed to | ||
| 329 | tracing::debug!( | ||
| 330 | relay = %self.url, | ||
| 331 | filter = ?filter, | ||
| 332 | "subscribe_filter called with filter" | ||
| 333 | ); | ||
| 334 | |||
| 328 | let output = self | 335 | let output = self |
| 329 | .client | 336 | .client |
| 330 | .subscribe(filter, None) | 337 | .subscribe(filter, None) |
| 331 | .await | 338 | .await |
| 332 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; | 339 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; |
| 340 | |||
| 341 | tracing::debug!( | ||
| 342 | relay = %self.url, | ||
| 343 | subscription_id = %output.val, | ||
| 344 | "subscribe_filter succeeded" | ||
| 345 | ); | ||
| 346 | |||
| 333 | Ok(output.val) | 347 | Ok(output.val) |
| 334 | } | 348 | } |
| 335 | 349 | ||
| @@ -407,31 +421,36 @@ impl RelayConnection { | |||
| 407 | true | 421 | true |
| 408 | } | 422 | } |
| 409 | 423 | ||
| 410 | /// Perform negentropy synchronization for a filter | 424 | /// Perform a negentropy sync diff (dry run) to identify missing events |
| 411 | /// | 425 | /// |
| 412 | /// Uses NIP-77 negentropy protocol to efficiently reconcile events matching | 426 | /// This method performs NIP-77 negentropy reconciliation without downloading events. |
| 413 | /// the filter between local database and remote relay. This is much more | 427 | /// It returns the list of event IDs that need to be fetched. The caller should then |
| 414 | /// efficient than REQ+EOSE for relays with overlapping event sets. | 428 | /// manually fetch these events and pass them through the write policy for validation. |
| 415 | /// | 429 | /// |
| 416 | /// # Arguments | 430 | /// # Arguments |
| 417 | /// * `filter` - The filter defining which events to sync | 431 | /// * `filter` - The filter to sync |
| 418 | /// | 432 | /// |
| 419 | /// # Returns | 433 | /// # Returns |
| 420 | /// * `Ok(NegentropySyncResult)` - Sync completed successfully with reconciliation info | 434 | /// * `Ok(Reconciliation)` - Reconciliation result with remote/local/sent event IDs |
| 421 | /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error) | 435 | /// * `Err(String)` - Sync failed (relay may not support NIP-77, or other error) |
| 422 | /// | 436 | /// |
| 423 | /// # Fallback Behavior | 437 | /// # Usage Pattern |
| 424 | /// If this method fails, the caller should fall back to traditional REQ+EOSE sync. | 438 | /// ```ignore |
| 425 | /// Failure reasons include: | 439 | /// // 1. Get the diff |
| 426 | /// - Relay doesn't actually support NIP-77 (despite claiming to) | 440 | /// let reconciliation = conn.negentropy_sync_diff(filter).await?; |
| 427 | /// - Network errors during reconciliation | 441 | /// |
| 428 | /// - Timeout during sync | 442 | /// // 2. Fetch missing events by ID |
| 429 | pub async fn negentropy_sync_filter( | 443 | /// if !reconciliation.remote.is_empty() { |
| 430 | &self, | 444 | /// let ids: Vec<EventId> = reconciliation.remote.into_iter().collect(); |
| 431 | filter: Filter, | 445 | /// let filter = Filter::new().ids(ids); |
| 432 | ) -> Result<NegentropySyncResult, String> { | 446 | /// conn.subscribe_filter(filter, tx).await?; |
| 433 | // Use nostr-sdk's sync method which handles the NEG-OPEN/NEG-MSG exchange | 447 | /// } |
| 434 | let sync_opts = SyncOptions::default(); | 448 | /// |
| 449 | /// // 3. Events come through normal flow and get validated via process_event_static | ||
| 450 | /// ``` | ||
| 451 | pub async fn negentropy_sync_diff(&self, filter: Filter) -> Result<Reconciliation, String> { | ||
| 452 | // Use dry_run to only identify differences without downloading events | ||
| 453 | let sync_opts = SyncOptions::default().dry_run(); | ||
| 435 | 454 | ||
| 436 | match self.client.sync(filter.clone(), &sync_opts).await { | 455 | match self.client.sync(filter.clone(), &sync_opts).await { |
| 437 | Ok(output) => { | 456 | Ok(output) => { |
| @@ -441,9 +460,7 @@ impl RelayConnection { | |||
| 441 | relay = %self.url, | 460 | relay = %self.url, |
| 442 | local_count = reconciliation.local.len(), | 461 | local_count = reconciliation.local.len(), |
| 443 | remote_count = reconciliation.remote.len(), | 462 | remote_count = reconciliation.remote.len(), |
| 444 | sent_count = reconciliation.sent.len(), | 463 | "Negentropy diff completed (dry run)" |
| 445 | received_count = reconciliation.received.len(), | ||
| 446 | "Negentropy sync completed" | ||
| 447 | ); | 464 | ); |
| 448 | 465 | ||
| 449 | // Check for any failures | 466 | // Check for any failures |
| @@ -451,15 +468,11 @@ impl RelayConnection { | |||
| 451 | tracing::warn!( | 468 | tracing::warn!( |
| 452 | relay = %self.url, | 469 | relay = %self.url, |
| 453 | failures = ?output.failed, | 470 | failures = ?output.failed, |
| 454 | "Some relays failed during negentropy sync" | 471 | "Some relays failed during negentropy diff" |
| 455 | ); | 472 | ); |
| 456 | } | 473 | } |
| 457 | 474 | ||
| 458 | Ok(NegentropySyncResult { | 475 | Ok(reconciliation) |
| 459 | remote_only: reconciliation.remote.into_iter().collect(), | ||
| 460 | local_only: reconciliation.local.into_iter().collect(), | ||
| 461 | received: reconciliation.received.into_iter().collect(), | ||
| 462 | }) | ||
| 463 | } | 476 | } |
| 464 | Err(e) => { | 477 | Err(e) => { |
| 465 | // Log warning only once per relay to avoid spam | 478 | // Log warning only once per relay to avoid spam |
| @@ -470,30 +483,14 @@ impl RelayConnection { | |||
| 470 | tracing::warn!( | 483 | tracing::warn!( |
| 471 | relay = %self.url, | 484 | relay = %self.url, |
| 472 | error = %e, | 485 | error = %e, |
| 473 | "Negentropy sync failed, will fall back to REQ+EOSE" | 486 | "Negentropy diff failed, will fall back to REQ+EOSE" |
| 474 | ); | 487 | ); |
| 475 | } | 488 | } |
| 476 | Err(format!("Negentropy sync failed: {}", e)) | 489 | Err(format!("Negentropy diff failed: {}", e)) |
| 477 | } | 490 | } |
| 478 | } | 491 | } |
| 479 | } | 492 | } |
| 480 | 493 | ||
| 481 | /// Perform negentropy sync and return received event IDs | ||
| 482 | /// | ||
| 483 | /// Convenience method that performs negentropy sync and returns the event IDs | ||
| 484 | /// that were received (i.e., events that exist on remote but not locally). | ||
| 485 | /// | ||
| 486 | /// # Arguments | ||
| 487 | /// * `filter` - The filter defining which events to sync | ||
| 488 | /// | ||
| 489 | /// # Returns | ||
| 490 | /// * `Ok(Vec<EventId>)` - Event IDs received from remote relay | ||
| 491 | /// * `Err(String)` - Sync failed | ||
| 492 | pub async fn negentropy_sync_and_fetch(&self, filter: Filter) -> Result<Vec<EventId>, String> { | ||
| 493 | let result = self.negentropy_sync_filter(filter).await?; | ||
| 494 | Ok(result.received) | ||
| 495 | } | ||
| 496 | |||
| 497 | /// Check if this connection has a database configured for negentropy | 494 | /// Check if this connection has a database configured for negentropy |
| 498 | pub fn has_database(&self) -> bool { | 495 | pub fn has_database(&self) -> bool { |
| 499 | self.database.is_some() | 496 | self.database.is_some() |