upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/algorithms.rs2
-rw-r--r--src/sync/mod.rs133
-rw-r--r--src/sync/relay_connection.rs6
3 files changed, 135 insertions, 6 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index a6e0787..f4b1f5c 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -401,6 +401,7 @@ mod tests {
401 }, 401 },
402 outstanding_subs: HashSet::new(), 402 outstanding_subs: HashSet::new(),
403 sync_method: SyncMethod::ReqEose, 403 sync_method: SyncMethod::ReqEose,
404 pagination_state: HashMap::new(),
404 }], 405 }],
405 ); 406 );
406 407
@@ -512,6 +513,7 @@ mod tests {
512 }, 513 },
513 outstanding_subs: HashSet::new(), 514 outstanding_subs: HashSet::new(),
514 sync_method: SyncMethod::ReqEose, 515 sync_method: SyncMethod::ReqEose,
516 pagination_state: HashMap::new(),
515 }], 517 }],
516 ); 518 );
517 519
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 8581fb6..6f59b19 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -162,6 +162,17 @@ pub enum ProcessResult {
162 Rejected, 162 Rejected,
163} 163}
164 164
165/// Pagination state for a subscription in non-Negentropy historic sync
166#[derive(Debug, Clone)]
167pub struct PaginationState {
168 /// Number of events received for this subscription
169 pub event_count: usize,
170 /// Smallest created_at timestamp seen (for pagination with `until`)
171 pub min_created_at: Option<Timestamp>,
172 /// Original filter to reconstruct for next page
173 pub original_filter: Filter,
174}
175
165/// A batch of items pending confirmation 176/// A batch of items pending confirmation
166#[derive(Debug, Clone)] 177#[derive(Debug, Clone)]
167pub struct PendingBatch { 178pub struct PendingBatch {
@@ -174,6 +185,9 @@ pub struct PendingBatch {
174 pub outstanding_subs: HashSet<SubscriptionId>, 185 pub outstanding_subs: HashSet<SubscriptionId>,
175 /// The sync method used for this batch 186 /// The sync method used for this batch
176 pub sync_method: SyncMethod, 187 pub sync_method: SyncMethod,
188 /// Pagination tracking for REQ+EOSE subscriptions (empty for Negentropy)
189 /// Maps subscription ID to its pagination state
190 pub pagination_state: HashMap<SubscriptionId, PaginationState>,
177} 191}
178 192
179/// Items included in a pending batch 193/// Items included in a pending batch
@@ -221,6 +235,10 @@ const CONSOLIDATION_THRESHOLD: usize = 70;
221/// Maximum time to wait for pending batches (30 seconds) 235/// Maximum time to wait for pending batches (30 seconds)
222const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; 236const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30;
223 237
238/// Page size threshold for historic sync pagination (non-negentropy)
239/// If a subscription receives >= 75 events, we fetch the next page
240const PAGINATION_THRESHOLD: usize = 75;
241
224// ============================================================================= 242// =============================================================================
225// Daily Timer 243// Daily Timer
226// ============================================================================= 244// =============================================================================
@@ -465,6 +483,80 @@ impl SyncManager {
465 "EOSE processed for subscription" 483 "EOSE processed for subscription"
466 ); 484 );
467 485
486 // Check for pagination: if this subscription hit the threshold, fetch next page
487 if let Some(pagination_state) = batch.pagination_state.remove(&sub_id) {
488 if pagination_state.event_count >= PAGINATION_THRESHOLD {
489 if let Some(min_created_at) = pagination_state.min_created_at {
490 tracing::info!(
491 relay = %relay_url,
492 sub_id = %sub_id,
493 batch_id = batch.batch_id,
494 event_count = pagination_state.event_count,
495 min_created_at = %min_created_at,
496 "Subscription hit pagination threshold, fetching next page"
497 );
498
499 // Create next page filter: same as original but with .until(min_created_at)
500 // dont subtract 1 second to avoid duplicate events at the boundary
501 // as this would lead to missed events with the same created_at timestamp
502 let until_timestamp = Timestamp::from(min_created_at.as_secs());
503 let mut next_filter = pagination_state.original_filter.clone();
504 next_filter = next_filter.until(until_timestamp);
505
506 // Store relay_url for spawning the subscription after releasing the lock
507 let relay_url_for_pagination = relay_url.to_string();
508 let batch_id = batch.batch_id;
509
510 // Drop the lock before async operations
511 drop(pending);
512
513 // Subscribe to next page and add to outstanding_subs
514 if let Some(conn) = self.connections.get(&relay_url_for_pagination) {
515 match conn.subscribe_filter(next_filter.clone(), true).await {
516 Ok(new_sub_id) => {
517 // Re-acquire lock to update the batch
518 let mut pending = self.pending_sync_index.write().await;
519 if let Some(batches) = pending.get_mut(&relay_url_for_pagination) {
520 if let Some(batch) =
521 batches.iter_mut().find(|b| b.batch_id == batch_id)
522 {
523 batch.outstanding_subs.insert(new_sub_id.clone());
524 // Initialize pagination state for new subscription
525 batch.pagination_state.insert(
526 new_sub_id.clone(),
527 PaginationState {
528 event_count: 0,
529 min_created_at: None,
530 original_filter: next_filter,
531 },
532 );
533 tracing::info!(
534 relay = %relay_url_for_pagination,
535 new_sub_id = %new_sub_id,
536 batch_id = batch_id,
537 until = %until_timestamp,
538 "Next page subscription created"
539 );
540 }
541 }
542 }
543 Err(e) => {
544 tracing::error!(
545 relay = %relay_url_for_pagination,
546 batch_id = batch_id,
547 error = %e,
548 "Failed to create pagination subscription, continuing without next page"
549 );
550 }
551 }
552 }
553
554 // Early return since we've released and re-acquired locks
555 return;
556 }
557 }
558 }
559
468 // Check if batch is complete 560 // Check if batch is complete
469 if !batch.outstanding_subs.is_empty() { 561 if !batch.outstanding_subs.is_empty() {
470 return; 562 return;
@@ -861,13 +953,14 @@ impl SyncManager {
861 let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone(); 953 let disconnect_tx = self.disconnect_tx.as_ref().unwrap().clone();
862 let eose_tx = self.eose_tx.as_ref().unwrap().clone(); 954 let eose_tx = self.eose_tx.as_ref().unwrap().clone();
863 let metrics_clone = self.metrics.clone(); 955 let metrics_clone = self.metrics.clone();
956 let pending_sync_index = Arc::clone(&self.pending_sync_index);
864 957
865 tokio::spawn(async move { 958 tokio::spawn(async move {
866 let mut disconnect_sent = false; 959 let mut disconnect_sent = false;
867 960
868 while let Some(relay_event) = event_rx.recv().await { 961 while let Some(relay_event) = event_rx.recv().await {
869 match relay_event { 962 match relay_event {
870 RelayEvent::Event(event) => { 963 RelayEvent::Event(event, subscription_id) => {
871 let result = Self::process_event_static( 964 let result = Self::process_event_static(
872 &event, 965 &event,
873 &relay_url_clone, 966 &relay_url_clone,
@@ -882,6 +975,28 @@ impl SyncManager {
882 metrics.record_synced_event(); 975 metrics.record_synced_event();
883 } 976 }
884 } 977 }
978
979 // Track pagination state for this subscription
980 if result == ProcessResult::Saved || result == ProcessResult::Duplicate {
981 let mut pending = pending_sync_index.write().await;
982 if let Some(batches) = pending.get_mut(&relay_url_clone) {
983 for batch in batches.iter_mut() {
984 if let Some(state) =
985 batch.pagination_state.get_mut(&subscription_id)
986 {
987 state.event_count += 1;
988 // Track minimum created_at timestamp
989 match state.min_created_at {
990 None => state.min_created_at = Some(event.created_at),
991 Some(min) if event.created_at < min => {
992 state.min_created_at = Some(event.created_at);
993 }
994 _ => {}
995 }
996 }
997 }
998 }
999 }
885 } 1000 }
886 RelayEvent::EndOfStoredEvents(sub_id) => { 1001 RelayEvent::EndOfStoredEvents(sub_id) => {
887 tracing::debug!( 1002 tracing::debug!(
@@ -1929,12 +2044,13 @@ impl SyncManager {
1929 "Starting historic_sync with negentropy" 2044 "Starting historic_sync with negentropy"
1930 ); 2045 );
1931 2046
1932 // Create PendingBatch for negentropy (empty outstanding_subs) 2047 // Create PendingBatch for negentropy (empty outstanding_subs and pagination_state)
1933 let batch = PendingBatch { 2048 let batch = PendingBatch {
1934 batch_id, 2049 batch_id,
1935 items: items.clone(), 2050 items: items.clone(),
1936 outstanding_subs: HashSet::new(), 2051 outstanding_subs: HashSet::new(),
1937 sync_method: SyncMethod::Negentropy, 2052 sync_method: SyncMethod::Negentropy,
2053 pagination_state: HashMap::new(), // Negentropy doesn't use pagination
1938 }; 2054 };
1939 2055
1940 // Add to pending_sync_index 2056 // Add to pending_sync_index
@@ -2105,6 +2221,7 @@ impl SyncManager {
2105 2221
2106 // Subscribe to each filter and collect subscription IDs 2222 // Subscribe to each filter and collect subscription IDs
2107 let mut subscription_ids = HashSet::new(); 2223 let mut subscription_ids = HashSet::new();
2224 let mut pagination_state = HashMap::new();
2108 2225
2109 // DEBUG TRACING: Log each filter in REQ+EOSE path 2226 // DEBUG TRACING: Log each filter in REQ+EOSE path
2110 for (idx, filter) in filters_with_since.iter().enumerate() { 2227 for (idx, filter) in filters_with_since.iter().enumerate() {
@@ -2119,7 +2236,16 @@ impl SyncManager {
2119 if let Some(conn) = self.connections.get(relay_url) { 2236 if let Some(conn) = self.connections.get(relay_url) {
2120 match conn.subscribe_filter(filter.clone(), true).await { 2237 match conn.subscribe_filter(filter.clone(), true).await {
2121 Ok(sub_id) => { 2238 Ok(sub_id) => {
2122 subscription_ids.insert(sub_id); 2239 subscription_ids.insert(sub_id.clone());
2240 // Initialize pagination state for this subscription
2241 pagination_state.insert(
2242 sub_id,
2243 PaginationState {
2244 event_count: 0,
2245 min_created_at: None,
2246 original_filter: filter.clone(),
2247 },
2248 );
2123 } 2249 }
2124 Err(e) => { 2250 Err(e) => {
2125 tracing::error!( 2251 tracing::error!(
@@ -2146,6 +2272,7 @@ impl SyncManager {
2146 items, 2272 items,
2147 outstanding_subs: subscription_ids, 2273 outstanding_subs: subscription_ids,
2148 sync_method: SyncMethod::ReqEose, 2274 sync_method: SyncMethod::ReqEose,
2275 pagination_state,
2149 }; 2276 };
2150 2277
2151 // Add to pending_sync_index 2278 // Add to pending_sync_index
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index 37094be..5a61777 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -23,8 +23,8 @@ use crate::nostr::builder::SharedDatabase;
23/// Events from a relay connection 23/// Events from a relay connection
24#[derive(Debug)] 24#[derive(Debug)]
25pub enum RelayEvent { 25pub enum RelayEvent {
26 /// A new event was received 26 /// A new event was received (event, subscription_id)
27 Event(Event), 27 Event(Event, SubscriptionId),
28 /// End of stored events for a subscription 28 /// End of stored events for a subscription
29 EndOfStoredEvents(SubscriptionId), 29 EndOfStoredEvents(SubscriptionId),
30 /// Connection was closed 30 /// Connection was closed
@@ -216,7 +216,7 @@ impl RelayConnection {
216 sub_id = %subscription_id, 216 sub_id = %subscription_id,
217 "Received event" 217 "Received event"
218 ); 218 );
219 if event_sender.send(RelayEvent::Event(*event)).await.is_err() { 219 if event_sender.send(RelayEvent::Event(*event, subscription_id.clone())).await.is_err() {
220 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 220 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
221 break; 221 break;
222 } 222 }