From 39242bfec6f6592c478c651f2e89e88e3e66ff2a Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 16:37:28 +0000 Subject: feat(sync): implement pagination for historic_sync REQ+EOSE flow Add automatic pagination support for non-Negentropy historic sync to handle large result sets efficiently. When a subscription receives >= 75 events, the system automatically fetches the next page using the 'until' parameter. Changes: - Add PaginationState struct to track event counts and min timestamps - Add pagination_state HashMap to PendingBatch for per-subscription tracking - Add PAGINATION_THRESHOLD constant (75 events) - Pass pending_sync_index to event processor for state updates - Track events and timestamps as they arrive - Check threshold on EOSE and launch follow-up subscriptions - Initialize pagination state when creating historic sync subscriptions - Update test fixtures in algorithms.rs The pagination continues recursively until a page returns fewer than 75 events, ensuring complete historic data retrieval without overwhelming relay limits. --- src/sync/algorithms.rs | 2 + src/sync/mod.rs | 133 ++++++++++++++++++++++++++++++++++++++++++- src/sync/relay_connection.rs | 6 +- 3 files changed, 135 insertions(+), 6 deletions(-) (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index a6e0787..f4b1f5c 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -401,6 +401,7 @@ mod tests { }, outstanding_subs: HashSet::new(), sync_method: SyncMethod::ReqEose, + pagination_state: HashMap::new(), }], ); @@ -512,6 +513,7 @@ mod tests { }, outstanding_subs: HashSet::new(), sync_method: SyncMethod::ReqEose, + pagination_state: HashMap::new(), }], ); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8581fb6..6f59b19 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -162,6 +162,17 @@ pub enum ProcessResult { Rejected, } +/// Pagination state for a subscription in non-Negentropy historic sync +#[derive(Debug, Clone)] +pub struct PaginationState { + /// Number of events received for this subscription + pub event_count: usize, + /// Smallest created_at timestamp seen (for pagination with `until`) + pub min_created_at: Option, + /// Original filter to reconstruct for next page + pub original_filter: Filter, +} + /// A batch of items pending confirmation #[derive(Debug, Clone)] pub struct PendingBatch { @@ -174,6 +185,9 @@ pub struct PendingBatch { pub outstanding_subs: HashSet, /// The sync method used for this batch pub sync_method: SyncMethod, + /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy) + /// Maps subscription ID to its pagination state + pub pagination_state: HashMap, } /// Items included in a pending batch @@ -221,6 +235,10 @@ const CONSOLIDATION_THRESHOLD: usize = 70; /// Maximum time to wait for pending batches (30 seconds) const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; +/// Page size threshold for historic sync pagination (non-negentropy) +/// If a subscription receives >= 75 events, we fetch the next page +const PAGINATION_THRESHOLD: usize = 75; + // ============================================================================= // Daily Timer // ============================================================================= @@ -465,6 +483,80 @@ impl SyncManager { "EOSE processed for subscription" ); + // Check for pagination: if this subscription hit the threshold, fetch next page + if let Some(pagination_state) = batch.pagination_state.remove(&sub_id) { + if pagination_state.event_count >= PAGINATION_THRESHOLD { + if let Some(min_created_at) = pagination_state.min_created_at { + tracing::info!( + relay = %relay_url, + sub_id = %sub_id, + batch_id = batch.batch_id, + event_count = pagination_state.event_count, + min_created_at = %min_created_at, + "Subscription hit pagination threshold, fetching next page" + ); + + // Create next page filter: same as original but with .until(min_created_at) + // dont subtract 1 second to avoid duplicate events at the boundary + // as this would lead to missed events with the same created_at timestamp + let until_timestamp = Timestamp::from(min_created_at.as_secs()); + let mut next_filter = pagination_state.original_filter.clone(); + next_filter = next_filter.until(until_timestamp); + + // Store relay_url for spawning the subscription after releasing the lock + let relay_url_for_pagination = relay_url.to_string(); + let batch_id = batch.batch_id; + + // Drop the lock before async operations + drop(pending); + + // Subscribe to next page and add to outstanding_subs + if let Some(conn) = self.connections.get(&relay_url_for_pagination) { + match conn.subscribe_filter(next_filter.clone(), true).await { + Ok(new_sub_id) => { + // Re-acquire lock to update the batch + let mut pending = self.pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(&relay_url_for_pagination) { + if let Some(batch) = + batches.iter_mut().find(|b| b.batch_id == batch_id) + { + batch.outstanding_subs.insert(new_sub_id.clone()); + // Initialize pagination state for new subscription + batch.pagination_state.insert( + new_sub_id.clone(), + PaginationState { + event_count: 0, + min_created_at: None, + original_filter: next_filter, + }, + ); + tracing::info!( + relay = %relay_url_for_pagination, + new_sub_id = %new_sub_id, + batch_id = batch_id, + until = %until_timestamp, + "Next page subscription created" + ); + } + } + } + Err(e) => { + tracing::error!( + relay = %relay_url_for_pagination, + batch_id = batch_id, + error = %e, + "Failed to create pagination subscription, continuing without next page" + ); + } + } + } + + // Early return since we've released and re-acquired locks + return; + } + } + } + // Check if batch is complete if !batch.outstanding_subs.is_empty() { return; @@ -861,13 +953,14 @@ impl SyncManager { let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); + let pending_sync_index = Arc::clone(&self.pending_sync_index); tokio::spawn(async move { let mut disconnect_sent = false; while let Some(relay_event) = event_rx.recv().await { match relay_event { - RelayEvent::Event(event) => { + RelayEvent::Event(event, subscription_id) => { let result = Self::process_event_static( &event, &relay_url_clone, @@ -882,6 +975,28 @@ impl SyncManager { metrics.record_synced_event(); } } + + // Track pagination state for this subscription + if result == ProcessResult::Saved || result == ProcessResult::Duplicate { + let mut pending = pending_sync_index.write().await; + if let Some(batches) = pending.get_mut(&relay_url_clone) { + for batch in batches.iter_mut() { + if let Some(state) = + batch.pagination_state.get_mut(&subscription_id) + { + state.event_count += 1; + // Track minimum created_at timestamp + match state.min_created_at { + None => state.min_created_at = Some(event.created_at), + Some(min) if event.created_at < min => { + state.min_created_at = Some(event.created_at); + } + _ => {} + } + } + } + } + } } RelayEvent::EndOfStoredEvents(sub_id) => { tracing::debug!( @@ -1929,12 +2044,13 @@ impl SyncManager { "Starting historic_sync with negentropy" ); - // Create PendingBatch for negentropy (empty outstanding_subs) + // Create PendingBatch for negentropy (empty outstanding_subs and pagination_state) let batch = PendingBatch { batch_id, items: items.clone(), outstanding_subs: HashSet::new(), sync_method: SyncMethod::Negentropy, + pagination_state: HashMap::new(), // Negentropy doesn't use pagination }; // Add to pending_sync_index @@ -2105,6 +2221,7 @@ impl SyncManager { // Subscribe to each filter and collect subscription IDs let mut subscription_ids = HashSet::new(); + let mut pagination_state = HashMap::new(); // DEBUG TRACING: Log each filter in REQ+EOSE path for (idx, filter) in filters_with_since.iter().enumerate() { @@ -2119,7 +2236,16 @@ impl SyncManager { if let Some(conn) = self.connections.get(relay_url) { match conn.subscribe_filter(filter.clone(), true).await { Ok(sub_id) => { - subscription_ids.insert(sub_id); + subscription_ids.insert(sub_id.clone()); + // Initialize pagination state for this subscription + pagination_state.insert( + sub_id, + PaginationState { + event_count: 0, + min_created_at: None, + original_filter: filter.clone(), + }, + ); } Err(e) => { tracing::error!( @@ -2146,6 +2272,7 @@ impl SyncManager { items, outstanding_subs: subscription_ids, sync_method: SyncMethod::ReqEose, + pagination_state, }; // Add to pending_sync_index diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index 37094be..5a61777 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -23,8 +23,8 @@ use crate::nostr::builder::SharedDatabase; /// Events from a relay connection #[derive(Debug)] pub enum RelayEvent { - /// A new event was received - Event(Event), + /// A new event was received (event, subscription_id) + Event(Event, SubscriptionId), /// End of stored events for a subscription EndOfStoredEvents(SubscriptionId), /// Connection was closed @@ -216,7 +216,7 @@ impl RelayConnection { sub_id = %subscription_id, "Received event" ); - if event_sender.send(RelayEvent::Event(*event)).await.is_err() { + if event_sender.send(RelayEvent::Event(*event, subscription_id.clone())).await.is_err() { tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); break; } -- cgit v1.2.3