diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 16:37:28 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 16:40:13 +0000 |
| commit | 39242bfec6f6592c478c651f2e89e88e3e66ff2a (patch) | |
| tree | ef358d70e7d33bc9edd086e64d9b4f64ee8bcd3a /src | |
| parent | 1adbd93e5bb8e14403ba64a76d5dc93209227514 (diff) | |
feat(sync): implement pagination for historic_sync REQ+EOSE flow
Add automatic pagination support for non-Negentropy historic sync to handle
large result sets efficiently. When a subscription receives >= 75 events,
the system automatically fetches the next page using the 'until' parameter.
Changes:
- Add PaginationState struct to track event counts and min timestamps
- Add pagination_state HashMap to PendingBatch for per-subscription tracking
- Add PAGINATION_THRESHOLD constant (75 events)
- Pass pending_sync_index to event processor for state updates
- Track events and timestamps as they arrive
- Check threshold on EOSE and launch follow-up subscriptions
- Initialize pagination state when creating historic sync subscriptions
- Update test fixtures in algorithms.rs
The pagination continues recursively until a page returns fewer than 75 events,
ensuring complete historic data retrieval without overwhelming relay limits.
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/algorithms.rs | 2 | ||||
| -rw-r--r-- | src/sync/mod.rs | 133 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 6 |
3 files changed, 135 insertions, 6 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index a6e0787..f4b1f5c 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -401,6 +401,7 @@ mod tests { | |||
| 401 | }, | 401 | }, |
| 402 | outstanding_subs: HashSet::new(), | 402 | outstanding_subs: HashSet::new(), |
| 403 | sync_method: SyncMethod::ReqEose, | 403 | sync_method: SyncMethod::ReqEose, |
| 404 | pagination_state: HashMap::new(), | ||
| 404 | }], | 405 | }], |
| 405 | ); | 406 | ); |
| 406 | 407 | ||
| @@ -512,6 +513,7 @@ mod tests { | |||
| 512 | }, | 513 | }, |
| 513 | outstanding_subs: HashSet::new(), | 514 | outstanding_subs: HashSet::new(), |
| 514 | sync_method: SyncMethod::ReqEose, | 515 | sync_method: SyncMethod::ReqEose, |
| 516 | pagination_state: HashMap::new(), | ||
| 515 | }], | 517 | }], |
| 516 | ); | 518 | ); |
| 517 | 519 | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8581fb6..6f59b19 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -162,6 +162,17 @@ pub enum ProcessResult { | |||
| 162 | Rejected, | 162 | Rejected, |
| 163 | } | 163 | } |
| 164 | 164 | ||
| 165 | /// Pagination state for a subscription in non-Negentropy historic sync | ||
| 166 | #[derive(Debug, Clone)] | ||
| 167 | pub struct PaginationState { | ||
| 168 | /// Number of events received for this subscription | ||
| 169 | pub event_count: usize, | ||
| 170 | /// Smallest created_at timestamp seen (for pagination with `until`) | ||
| 171 | pub min_created_at: Option<Timestamp>, | ||
| 172 | /// Original filter to reconstruct for next page | ||
| 173 | pub original_filter: Filter, | ||
| 174 | } | ||
| 175 | |||
| 165 | /// A batch of items pending confirmation | 176 | /// A batch of items pending confirmation |
| 166 | #[derive(Debug, Clone)] | 177 | #[derive(Debug, Clone)] |
| 167 | pub struct PendingBatch { | 178 | pub struct PendingBatch { |
| @@ -174,6 +185,9 @@ pub struct PendingBatch { | |||
| 174 | pub outstanding_subs: HashSet<SubscriptionId>, | 185 | pub outstanding_subs: HashSet<SubscriptionId>, |
| 175 | /// The sync method used for this batch | 186 | /// The sync method used for this batch |
| 176 | pub sync_method: SyncMethod, | 187 | pub sync_method: SyncMethod, |
| 188 | /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) | ||
| 189 | /// Maps subscription ID to its pagination state | ||
| 190 | pub pagination_state: HashMap<SubscriptionId, PaginationState>, | ||
| 177 | } | 191 | } |
| 178 | 192 | ||
| 179 | /// Items included in a pending batch | 193 | /// Items included in a pending batch |
| @@ -221,6 +235,10 @@ const CONSOLIDATION_THRESHOLD: usize = 70; | |||
| 221 | /// Maximum time to wait for pending batches (30 seconds) | 235 | /// Maximum time to wait for pending batches (30 seconds) |
| 222 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; | 236 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; |
| 223 | 237 | ||
| 238 | /// Page size threshold for historic sync pagination (non-negentropy) | ||
| 239 | /// If a subscription receives >= 75 events, we fetch the next page | ||
| 240 | const PAGINATION_THRESHOLD: usize = 75; | ||
| 241 | |||
| 224 | // ============================================================================= | 242 | // ============================================================================= |
| 225 | // Daily Timer | 243 | // Daily Timer |
| 226 | // ============================================================================= | 244 | // ============================================================================= |
| @@ -465,6 +483,80 @@ impl SyncManager { | |||
| 465 | "EOSE processed for subscription" | 483 | "EOSE processed for subscription" |
| 466 | ); | 484 | ); |
| 467 | 485 | ||
| 486 | // Check for pagination: if this subscription hit the threshold, fetch next page | ||
| 487 | if let Some(pagination_state) = batch.pagination_state.remove(&sub_id) { | ||
| 488 | if pagination_state.event_count >= PAGINATION_THRESHOLD { | ||
| 489 | if let Some(min_created_at) = pagination_state.min_created_at { | ||
| 490 | tracing::info!( | ||
| 491 | relay = %relay_url, | ||
| 492 | sub_id = %sub_id, | ||
| 493 | batch_id = batch.batch_id, | ||
| 494 | event_count = pagination_state.event_count, | ||
| 495 | min_created_at = %min_created_at, | ||
| 496 | "Subscription hit pagination threshold, fetching next page" | ||
| 497 | ); | ||
| 498 | |||
| 499 | // Create next page filter: same as original but with .until(min_created_at) | ||
| 500 | // dont subtract 1 second to avoid duplicate events at the boundary | ||
| 501 | // as this would lead to missed events with the same created_at timestamp | ||
| 502 | let until_timestamp = Timestamp::from(min_created_at.as_secs()); | ||
| 503 | let mut next_filter = pagination_state.original_filter.clone(); | ||
| 504 | next_filter = next_filter.until(until_timestamp); | ||
| 505 | |||
| 506 | // Store relay_url for spawning the subscription after releasing the lock | ||
| 507 | let relay_url_for_pagination = relay_url.to_string(); | ||
| 508 | let batch_id = batch.batch_id; | ||
| 509 | |||
| 510 | // Drop the lock before async operations | ||
| 511 | drop(pending); | ||
| 512 | |||
| 513 | // Subscribe to next page and add to outstanding_subs | ||
| 514 | if let Some(conn) = self.connections.get(&relay_url_for_pagination) { | ||
| 515 | match conn.subscribe_filter(next_filter.clone(), true).await { | ||
| 516 | Ok(new_sub_id) => { | ||
| 517 | // Re-acquire lock to update the batch | ||
| 518 | let mut pending = self.pending_sync_index.write().await; | ||
| 519 | if let Some(batches) = pending.get_mut(&relay_url_for_pagination) { | ||
| 520 | if let Some(batch) = | ||
| 521 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 522 | { | ||
| 523 | batch.outstanding_subs.insert(new_sub_id.clone()); | ||
| 524 | // Initialize pagination state for new subscription | ||
| 525 | batch.pagination_state.insert( | ||
| 526 | new_sub_id.clone(), | ||
| 527 | PaginationState { | ||
| 528 | event_count: 0, | ||
| 529 | min_created_at: None, | ||
| 530 | original_filter: next_filter, | ||
| 531 | }, | ||
| 532 | ); | ||
| 533 | tracing::info!( | ||
| 534 | relay = %relay_url_for_pagination, | ||
| 535 | new_sub_id = %new_sub_id, | ||
| 536 | batch_id = batch_id, | ||
| 537 | until = %until_timestamp, | ||
| 538 | "Next page subscription created" | ||
| 539 | ); | ||
| 540 | } | ||
| 541 | } | ||
| 542 | } | ||
| 543 | Err(e) => { | ||
| 544 | tracing::error!( | ||
| 545 | relay = %relay_url_for_pagination, | ||
| 546 | batch_id = batch_id, | ||
| 547 | error = %e, | ||
| 548 | "Failed to create pagination subscription, continuing without next page" | ||
| 549 | ); | ||
| 550 | } | ||
| 551 | } | ||
| 552 | } | ||
| 553 | |||
| 554 | // Early return since we've released and re-acquired locks | ||
| 555 | return; | ||
| 556 | } | ||
| 557 | } | ||
| 558 | } | ||
| 559 | |||
| 468 | // Check if batch is complete | 560 | // Check if batch is complete |
| 469 | if !batch.outstanding_subs.is_empty() { | 561 | if !batch.outstanding_subs.is_empty() { |
| 470 | return; | 562 | return; |
| @@ -861,13 +953,14 @@ impl SyncManager { | |||
| 861 | let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); | 953 | let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); |
| 862 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); | 954 | let eose_tx = self.eose_tx.as_ref().unwrap().clone(); |
| 863 | let metrics_clone = self.metrics.clone(); | 955 | let metrics_clone = self.metrics.clone(); |
| 956 | let pending_sync_index = Arc::clone(&self.pending_sync_index); | ||
| 864 | 957 | ||
| 865 | tokio::spawn(async move { | 958 | tokio::spawn(async move { |
| 866 | let mut disconnect_sent = false; | 959 | let mut disconnect_sent = false; |
| 867 | 960 | ||
| 868 | while let Some(relay_event) = event_rx.recv().await { | 961 | while let Some(relay_event) = event_rx.recv().await { |
| 869 | match relay_event { | 962 | match relay_event { |
| 870 | RelayEvent::Event(event) => { | 963 | RelayEvent::Event(event, subscription_id) => { |
| 871 | let result = Self::process_event_static( | 964 | let result = Self::process_event_static( |
| 872 | &event, | 965 | &event, |
| 873 | &relay_url_clone, | 966 | &relay_url_clone, |
| @@ -882,6 +975,28 @@ impl SyncManager { | |||
| 882 | metrics.record_synced_event(); | 975 | metrics.record_synced_event(); |
| 883 | } | 976 | } |
| 884 | } | 977 | } |
| 978 | |||
| 979 | // Track pagination state for this subscription | ||
| 980 | if result == ProcessResult::Saved || result == ProcessResult::Duplicate { | ||
| 981 | let mut pending = pending_sync_index.write().await; | ||
| 982 | if let Some(batches) = pending.get_mut(&relay_url_clone) { | ||
| 983 | for batch in batches.iter_mut() { | ||
| 984 | if let Some(state) = | ||
| 985 | batch.pagination_state.get_mut(&subscription_id) | ||
| 986 | { | ||
| 987 | state.event_count += 1; | ||
| 988 | // Track minimum created_at timestamp | ||
| 989 | match state.min_created_at { | ||
| 990 | None => state.min_created_at = Some(event.created_at), | ||
| 991 | Some(min) if event.created_at < min => { | ||
| 992 | state.min_created_at = Some(event.created_at); | ||
| 993 | } | ||
| 994 | _ => {} | ||
| 995 | } | ||
| 996 | } | ||
| 997 | } | ||
| 998 | } | ||
| 999 | } | ||
| 885 | } | 1000 | } |
| 886 | RelayEvent::EndOfStoredEvents(sub_id) => { | 1001 | RelayEvent::EndOfStoredEvents(sub_id) => { |
| 887 | tracing::debug!( | 1002 | tracing::debug!( |
| @@ -1929,12 +2044,13 @@ impl SyncManager { | |||
| 1929 | "Starting historic_sync with negentropy" | 2044 | "Starting historic_sync with negentropy" |
| 1930 | ); | 2045 | ); |
| 1931 | 2046 | ||
| 1932 | // Create PendingBatch for negentropy (empty outstanding_subs) | 2047 | // Create PendingBatch for negentropy (empty outstanding_subs and pagination_state) |
| 1933 | let batch = PendingBatch { | 2048 | let batch = PendingBatch { |
| 1934 | batch_id, | 2049 | batch_id, |
| 1935 | items: items.clone(), | 2050 | items: items.clone(), |
| 1936 | outstanding_subs: HashSet::new(), | 2051 | outstanding_subs: HashSet::new(), |
| 1937 | sync_method: SyncMethod::Negentropy, | 2052 | sync_method: SyncMethod::Negentropy, |
| 2053 | pagination_state: HashMap::new(), // Negentropy doesn't use pagination | ||
| 1938 | }; | 2054 | }; |
| 1939 | 2055 | ||
| 1940 | // Add to pending_sync_index | 2056 | // Add to pending_sync_index |
| @@ -2105,6 +2221,7 @@ impl SyncManager { | |||
| 2105 | 2221 | ||
| 2106 | // Subscribe to each filter and collect subscription IDs | 2222 | // Subscribe to each filter and collect subscription IDs |
| 2107 | let mut subscription_ids = HashSet::new(); | 2223 | let mut subscription_ids = HashSet::new(); |
| 2224 | let mut pagination_state = HashMap::new(); | ||
| 2108 | 2225 | ||
| 2109 | // DEBUG TRACING: Log each filter in REQ+EOSE path | 2226 | // DEBUG TRACING: Log each filter in REQ+EOSE path |
| 2110 | for (idx, filter) in filters_with_since.iter().enumerate() { | 2227 | for (idx, filter) in filters_with_since.iter().enumerate() { |
| @@ -2119,7 +2236,16 @@ impl SyncManager { | |||
| 2119 | if let Some(conn) = self.connections.get(relay_url) { | 2236 | if let Some(conn) = self.connections.get(relay_url) { |
| 2120 | match conn.subscribe_filter(filter.clone(), true).await { | 2237 | match conn.subscribe_filter(filter.clone(), true).await { |
| 2121 | Ok(sub_id) => { | 2238 | Ok(sub_id) => { |
| 2122 | subscription_ids.insert(sub_id); | 2239 | subscription_ids.insert(sub_id.clone()); |
| 2240 | // Initialize pagination state for this subscription | ||
| 2241 | pagination_state.insert( | ||
| 2242 | sub_id, | ||
| 2243 | PaginationState { | ||
| 2244 | event_count: 0, | ||
| 2245 | min_created_at: None, | ||
| 2246 | original_filter: filter.clone(), | ||
| 2247 | }, | ||
| 2248 | ); | ||
| 2123 | } | 2249 | } |
| 2124 | Err(e) => { | 2250 | Err(e) => { |
| 2125 | tracing::error!( | 2251 | tracing::error!( |
| @@ -2146,6 +2272,7 @@ impl SyncManager { | |||
| 2146 | items, | 2272 | items, |
| 2147 | outstanding_subs: subscription_ids, | 2273 | outstanding_subs: subscription_ids, |
| 2148 | sync_method: SyncMethod::ReqEose, | 2274 | sync_method: SyncMethod::ReqEose, |
| 2275 | pagination_state, | ||
| 2149 | }; | 2276 | }; |
| 2150 | 2277 | ||
| 2151 | // Add to pending_sync_index | 2278 | // Add to pending_sync_index |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 37094be..5a61777 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -23,8 +23,8 @@ use crate::nostr::builder::SharedDatabase; | |||
| 23 | /// Events from a relay connection | 23 | /// Events from a relay connection |
| 24 | #[derive(Debug)] | 24 | #[derive(Debug)] |
| 25 | pub enum RelayEvent { | 25 | pub enum RelayEvent { |
| 26 | /// A new event was received | 26 | /// A new event was received (event, subscription_id) |
| 27 | Event(Event), | 27 | Event(Event, SubscriptionId), |
| 28 | /// End of stored events for a subscription | 28 | /// End of stored events for a subscription |
| 29 | EndOfStoredEvents(SubscriptionId), | 29 | EndOfStoredEvents(SubscriptionId), |
| 30 | /// Connection was closed | 30 | /// Connection was closed |
| @@ -216,7 +216,7 @@ impl RelayConnection { | |||
| 216 | sub_id = %subscription_id, | 216 | sub_id = %subscription_id, |
| 217 | "Received event" | 217 | "Received event" |
| 218 | ); | 218 | ); |
| 219 | if event_sender.send(RelayEvent::Event(*event)).await.is_err() { | 219 | if event_sender.send(RelayEvent::Event(*event, subscription_id.clone())).await.is_err() { |
| 220 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 220 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); |
| 221 | break; | 221 | break; |
| 222 | } | 222 | } |