diff options
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 133 |
1 files changed, 130 insertions, 3 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8581fb6..6f59b19 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -162,6 +162,17 @@ pub enum ProcessResult { | |||
| 162 | Rejected, | 162 | Rejected, |
| 163 | } | 163 | } |
| 164 | 164 | ||
| 165 | /// Pagination state for a subscription in non-Negentropy historic sync | ||
| 166 | #[derive(Debug, Clone)] | ||
| 167 | pub struct PaginationState { | ||
| 168 | /// Number of events received for this subscription | ||
| 169 | pub event_count: usize, | ||
| 170 | /// Smallest created_at timestamp seen (for pagination with `until`) | ||
| 171 | pub min_created_at: Option<Timestamp>, | ||
| 172 | /// Original filter to reconstruct for next page | ||
| 173 | pub original_filter: Filter, | ||
| 174 | } | ||
| 175 | |||
| 165 | /// A batch of items pending confirmation | 176 | /// A batch of items pending confirmation |
| 166 | #[derive(Debug, Clone)] | 177 | #[derive(Debug, Clone)] |
| 167 | pub struct PendingBatch { | 178 | pub struct PendingBatch { |
| @@ -174,6 +185,9 @@ pub struct PendingBatch { | |||
| 174 | pub outstanding_subs: HashSet<SubscriptionId>, | 185 | pub outstanding_subs: HashSet<SubscriptionId>, |
| 175 | /// The sync method used for this batch | 186 | /// The sync method used for this batch |
| 176 | pub sync_method: SyncMethod, | 187 | pub sync_method: SyncMethod, |
| 188 | /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) | ||
| 189 | /// Maps subscription ID to its pagination state | ||
| 190 | pub pagination_state: HashMap<SubscriptionId, PaginationState>, | ||
| 177 | } | 191 | } |
| 178 | 192 | ||
| 179 | /// Items included in a pending batch | 193 | /// Items included in a pending batch |
| @@ -221,6 +235,10 @@ const CONSOLIDATION_THRESHOLD: usize = 70; | |||
| 221 | /// Maximum time to wait for pending batches (30 seconds) | 235 | /// Maximum time to wait for pending batches (30 seconds) |
| 222 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; | 236 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; |
| 223 | 237 | ||
| 238 | /// Page size threshold for historic sync pagination (non-negentropy) | ||
| 239 | /// If a subscription receives >= 75 events, we fetch the next page | ||
| 240 | const PAGINATION_THRESHOLD: usize = 75; | ||
| 241 | |||
| 224 | // ============================================================================= | 242 | // ============================================================================= |
| 225 | // Daily Timer | 243 | // Daily Timer |
| 226 | // ============================================================================= | 244 | // ============================================================================= |
| @@ -465,6 +483,80 @@ impl SyncManager { | |||
| 465 | "EOSE processed for subscription" | 483 | "EOSE processed for subscription" |
| 466 | ); | 484 | ); |
| 467 | 485 | ||
| 486 | // Check for pagination: if this subscription hit the threshold, fetch next page | ||
| 487 | if let Some(pagination_state) = batch.pagination_state.remove(&sub_id) { | ||
| 488 | if pagination_state.event_count >= PAGINATION_THRESHOLD { | ||
| 489 | if let Some(min_created_at) = pagination_state.min_created_at { | ||
| 490 | tracing::info!( | ||
| 491 | relay = %relay_url, | ||
| 492 | sub_id = %sub_id, | ||
| 493 | batch_id = batch.batch_id, | ||
| 494 | event_count = pagination_state.event_count, | ||
| 495 | min_created_at = %min_created_at, | ||
| 496 | "Subscription hit pagination threshold, fetching next page" | ||
| 497 | ); | ||
| 498 | |||
| 499 | // Create next page filter: same as original but with .until(min_created_at) | ||
| 500 | // dont subtract 1 second to avoid duplicate events at the boundary | ||
| 501 | // as this would lead to missed events with the same created_at timestamp | ||
| 502 | let until_timestamp = Timestamp::from(min_created_at.as_secs()); | ||
| 503 | let mut next_filter = pagination_state.original_filter.clone(); | ||
| 504 | next_filter = next_filter.until(until_timestamp); | ||
| 505 | |||
| 506 | // Store relay_url for spawning the subscription after releasing the lock | ||
| 507 | let relay_url_for_pagination = relay_url.to_string(); | ||
| 508 | let batch_id = batch.batch_id; | ||
| 509 | |||
| 510 | // Drop the lock before async operations | ||
| 511 | drop(pending); | ||
| 512 | |||
| 513 | // Subscribe to next page and add to outstanding_subs | ||
| 514 | if let Some(conn) = self.connections.get(&relay_url_for_pagination) { | ||
| 515 | match conn.subscribe_filter(next_filter.clone(), true).await { | ||
| 516 | Ok(new_sub_id) => { | ||
| 517 | // Re-acquire lock to update the batch | ||
| 518 | let mut pending = self.pending_sync_index.write().await; | ||
| 519 | if let Some(batches) = pending.get_mut(&relay_url_for_pagination) { | ||
| 520 | if let Some(batch) = | ||
| 521 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 522 | { | ||
| 523 | batch.outstanding_subs.insert(new_sub_id.clone()); | ||
| 524 | // Initialize pagination state for new subscription | ||
| 525 | batch.pagination_state.insert( | ||
| 526 | new_sub_id.clone(), | ||
| 527 | PaginationState { | ||
| 528 | event_count: 0, | ||
| 529 | min_created_at: None, | ||
| 530 | original_filter: next_filter, | ||
| 531 | }, | ||
| 532 | ); | ||
| 533 | tracing::info!( | ||
| 534 | relay = %relay_url_for_pagination, | ||
| 535 | new_sub_id = %new_sub_id, | ||
| 536 | batch_id = batch_id, | ||
| 537 | until = %until_timestamp, | ||
| 538 | "Next page subscription created" | ||
| 539 | ); | ||
| 540 | } | ||
| 541 | } | ||
| 542 | } | ||
| 543 | Err(e) => { | ||
| 544 | tracing::error!( | ||
| 545 | relay = %relay_url_for_pagination, | ||
| 546 | batch_id = batch_id, | ||
| 547 | error = %e, | ||
| 548 | "Failed to create pagination subscription, continuing without next page" | ||
| 549 | ); | ||
| 550 | } | ||
| 551 | } | ||
| 552 | } | ||
| 553 | |||
| 554 | // Early return since we've released and re-acquired locks | ||
| 555 | return; | ||
| 556 | } | ||
| 557 | } | ||
| 558 | } | ||
| 559 | |||
| 468 | // Check if batch is complete | 560 | // Check if batch is complete |
| 469 | if !batch.outstanding_subs.is_empty() { | 561 | if !batch.outstanding_subs.is_empty() { |
| 470 | return; | 562 | return; |
| @@ -861,13 +953,14 @@ impl SyncManager { | |||
| 861 | let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); | 953 | let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); |
| 862 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | 954 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); |
| 863 | let metrics_clone = self.metrics.clone(); | 955 | let metrics_clone = self.metrics.clone(); |
| 956 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | ||
| 864 | 957 | ||
| 865 | tokio::spawn(async move { | 958 | tokio::spawn(async move { |
| 866 | let mut disconnect_sent = false; | 959 | let mut disconnect_sent = false; |
| 867 | 960 | ||
| 868 | while let Some(relay_event) = event_rx.recv().await { | 961 | while let Some(relay_event) = event_rx.recv().await { |
| 869 | match relay_event { | 962 | match relay_event { |
| 870 | RelayEvent::Event(event) => { | 963 | RelayEvent::Event(event, subscription_id) => { |
| 871 | let result = Self::process_event_static( | 964 | let result = Self::process_event_static( |
| 872 | &event, | 965 | &event, |
| 873 | &relay_url_clone, | 966 | &relay_url_clone, |
| @@ -882,6 +975,28 @@ impl SyncManager { | |||
| 882 | metrics.record_synced_event(); | 975 | metrics.record_synced_event(); |
| 883 | } | 976 | } |
| 884 | } | 977 | } |
| 978 | |||
| 979 | // Track pagination state for this subscription | ||
| 980 | if result == ProcessResult::Saved || result == ProcessResult::Duplicate { | ||
| 981 | let mut pending = pending_sync_index.write().await; | ||
| 982 | if let Some(batches) = pending.get_mut(&relay_url_clone) { | ||
| 983 | for batch in batches.iter_mut() { | ||
| 984 | if let Some(state) = | ||
| 985 | batch.pagination_state.get_mut(&subscription_id) | ||
| 986 | { | ||
| 987 | state.event_count += 1; | ||
| 988 | // Track minimum created_at timestamp | ||
| 989 | match state.min_created_at { | ||
| 990 | None => state.min_created_at = Some(event.created_at), | ||
| 991 | Some(min) if event.created_at < min => { | ||
| 992 | state.min_created_at = Some(event.created_at); | ||
| 993 | } | ||
| 994 | _ => {} | ||
| 995 | } | ||
| 996 | } | ||
| 997 | } | ||
| 998 | } | ||
| 999 | } | ||
| 885 | } | 1000 | } |
| 886 | RelayEvent::EndOfStoredEvents(sub_id) => { | 1001 | RelayEvent::EndOfStoredEvents(sub_id) => { |
| 887 | tracing::debug!( | 1002 | tracing::debug!( |
| @@ -1929,12 +2044,13 @@ impl SyncManager { | |||
| 1929 | "Starting historic_sync with negentropy" | 2044 | "Starting historic_sync with negentropy" |
| 1930 | ); | 2045 | ); |
| 1931 | 2046 | ||
| 1932 | // Create PendingBatch for negentropy (empty outstanding_subs) | 2047 | // Create PendingBatch for negentropy (empty outstanding_subs and pagination_state) |
| 1933 | let batch = PendingBatch { | 2048 | let batch = PendingBatch { |
| 1934 | batch_id, | 2049 | batch_id, |
| 1935 | items: items.clone(), | 2050 | items: items.clone(), |
| 1936 | outstanding_subs: HashSet::new(), | 2051 | outstanding_subs: HashSet::new(), |
| 1937 | sync_method: SyncMethod::Negentropy, | 2052 | sync_method: SyncMethod::Negentropy, |
| 2053 | pagination_state: HashMap::new(), // Negentropy doesn't use pagination | ||
| 1938 | }; | 2054 | }; |
| 1939 | 2055 | ||
| 1940 | // Add to pending_sync_index | 2056 | // Add to pending_sync_index |
| @@ -2105,6 +2221,7 @@ impl SyncManager { | |||
| 2105 | 2221 | ||
| 2106 | // Subscribe to each filter and collect subscription IDs | 2222 | // Subscribe to each filter and collect subscription IDs |
| 2107 | let mut subscription_ids = HashSet::new(); | 2223 | let mut subscription_ids = HashSet::new(); |
| 2224 | let mut pagination_state = HashMap::new(); | ||
| 2108 | 2225 | ||
| 2109 | // DEBUG TRACING: Log each filter in REQ+EOSE path | 2226 | // DEBUG TRACING: Log each filter in REQ+EOSE path |
| 2110 | for (idx, filter) in filters_with_since.iter().enumerate() { | 2227 | for (idx, filter) in filters_with_since.iter().enumerate() { |
| @@ -2119,7 +2236,16 @@ impl SyncManager { | |||
| 2119 | if let Some(conn) = self.connections.get(relay_url) { | 2236 | if let Some(conn) = self.connections.get(relay_url) { |
| 2120 | match conn.subscribe_filter(filter.clone(), true).await { | 2237 | match conn.subscribe_filter(filter.clone(), true).await { |
| 2121 | Ok(sub_id) => { | 2238 | Ok(sub_id) => { |
| 2122 | subscription_ids.insert(sub_id); | 2239 | subscription_ids.insert(sub_id.clone()); |
| 2240 | // Initialize pagination state for this subscription | ||
| 2241 | pagination_state.insert( | ||
| 2242 | sub_id, | ||
| 2243 | PaginationState { | ||
| 2244 | event_count: 0, | ||
| 2245 | min_created_at: None, | ||
| 2246 | original_filter: filter.clone(), | ||
| 2247 | }, | ||
| 2248 | ); | ||
| 2123 | } | 2249 | } |
| 2124 | Err(e) => { | 2250 | Err(e) => { |
| 2125 | tracing::error!( | 2251 | tracing::error!( |
| @@ -2146,6 +2272,7 @@ impl SyncManager { | |||
| 2146 | items, | 2272 | items, |
| 2147 | outstanding_subs: subscription_ids, | 2273 | outstanding_subs: subscription_ids, |
| 2148 | sync_method: SyncMethod::ReqEose, | 2274 | sync_method: SyncMethod::ReqEose, |
| 2275 | pagination_state, | ||
| 2149 | }; | 2276 | }; |
| 2150 | 2277 | ||
| 2151 | // Add to pending_sync_index | 2278 | // Add to pending_sync_index |