upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-12 10:09:47 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-12 10:20:42 +0000
commitde07e31fad60f9c68a08807cde17ff81d8371a65 (patch)
tree1c670ff38f6702b6e437877049038fbbf3281f97 /src/sync/mod.rs
parent307c380a91a3575ab2266ed08427d24c7b2d016e (diff)
fix: unify sync state tracking for negentropy and REQ+EOSE paths
When negentropy (NIP-77) sync was enabled, the RelaySyncIndex was never updated to reflect historical sync completion. This caused the three-way diff algorithm in compute_actions() to malfunction, leading to: - Repeated sync attempts for the same items - Incorrect filter counting for consolidation - Potential premature relay disconnection This fix unifies both sync paths (REQ+EOSE and Negentropy) through a consistent PendingBatch flow: 1. Added SyncMethod enum to distinguish between sync types 2. Updated PendingBatch struct to include sync_method field 3. Extracted confirm_batch() method for unified batch confirmation 4. Modified negentropy_sync_and_process() to: - Create a PendingBatch before sync - Add batch to pending_sync_index - On success: Remove batch and call confirm_batch() - On failure: Remove batch without confirming The confirm_batch() method moves repos and root_events from the batch to the RelayState.repos and RelayState.root_events, ensuring the three-way diff works correctly regardless of sync method. Closes: negentropy-sync-state-tracking.md
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs242
1 files changed, 185 insertions, 57 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 50d4ae5..b69d627 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -137,15 +137,27 @@ impl RelayState {
137 } 137 }
138} 138}
139 139
140/// A batch of items pending EOSE confirmation 140/// Method used for synchronization
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum SyncMethod {
143 /// Traditional REQ+EOSE flow - waits for EOSE on subscriptions
144 ReqEose,
145 /// NIP-77 negentropy sync - confirms immediately after sync completes
146 Negentropy,
147}
148
149/// A batch of items pending confirmation
141#[derive(Debug, Clone)] 150#[derive(Debug, Clone)]
142pub struct PendingBatch { 151pub struct PendingBatch {
143 /// Unique ID for this batch - for debugging/logging 152 /// Unique ID for this batch - for debugging/logging
144 pub batch_id: u64, 153 pub batch_id: u64,
145 /// The items this batch is syncing 154 /// The items this batch is syncing
146 pub items: PendingItems, 155 pub items: PendingItems,
147 /// Subscription IDs that must ALL receive EOSE before confirming 156 /// Subscription IDs that must ALL receive EOSE before confirming (for ReqEose)
157 /// Empty for Negentropy sync method
148 pub outstanding_subs: HashSet<SubscriptionId>, 158 pub outstanding_subs: HashSet<SubscriptionId>,
159 /// The sync method used for this batch
160 pub sync_method: SyncMethod,
149} 161}
150 162
151/// Items included in a pending batch 163/// Items included in a pending batch
@@ -397,9 +409,7 @@ impl SyncManager {
397 /// - Finds the PendingBatch containing this subscription ID 409 /// - Finds the PendingBatch containing this subscription ID
398 /// - Removes the subscription from outstanding_subs 410 /// - Removes the subscription from outstanding_subs
399 /// - When all subscriptions complete (outstanding_subs empty): 411 /// - When all subscriptions complete (outstanding_subs empty):
400 /// - Moves repos from pending to confirmed in RelayState 412 /// - Calls confirm_batch to move items to confirmed state
401 /// - Moves root_events from pending to confirmed
402 /// - Removes the batch from pending_sync_index
403 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { 413 async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) {
404 // 1. Find and update the pending batch 414 // 1. Find and update the pending batch
405 let mut pending = self.pending_sync_index.write().await; 415 let mut pending = self.pending_sync_index.write().await;
@@ -444,46 +454,60 @@ impl SyncManager {
444 return; 454 return;
445 } 455 }
446 456
447 // 2. Batch complete - extract items and remove batch 457 // 2. Batch complete - extract and remove
448 let completed_batch = batches.remove(batch_idx); 458 let completed_batch = batches.remove(batch_idx);
449 let batch_id = completed_batch.batch_id;
450 let repos_count = completed_batch.items.repos.len();
451 let events_count = completed_batch.items.root_events.len();
452 459
453 // Clean up empty relay entry 460 // Clean up empty relay entry
454 if batches.is_empty() { 461 if batches.is_empty() {
455 pending.remove(relay_url); 462 pending.remove(relay_url);
456 } 463 }
457 464
458 // Drop the pending lock before acquiring relay_sync_index lock 465 // Drop the pending lock before confirm_batch
459 drop(pending); 466 drop(pending);
460 467
461 // 3. Move items to confirmed state in RelayState 468 // 3. Confirm the batch (moves items to RelayState)
462 { 469 self.confirm_batch(relay_url, completed_batch).await;
463 let mut relay_index = self.relay_sync_index.write().await; 470 }
464 471
465 if let Some(state) = relay_index.get_mut(relay_url) { 472 /// Confirm a completed batch by moving items to RelayState
466 // Move repos to confirmed 473 ///
467 state.repos.extend(completed_batch.items.repos); 474 /// This method is used by both sync paths (REQ+EOSE and Negentropy) to
468 // Move root_events to confirmed 475 /// move repos and root_events from pending to confirmed state. This unified
469 state.root_events.extend(completed_batch.items.root_events); 476 /// flow ensures consistent state tracking regardless of sync method.
477 ///
478 /// # Arguments
479 /// * `relay_url` - The relay URL the batch belongs to
480 /// * `batch` - The completed batch to confirm
481 async fn confirm_batch(&self, relay_url: &str, batch: PendingBatch) {
482 let batch_id = batch.batch_id;
483 let repos_count = batch.items.repos.len();
484 let events_count = batch.items.root_events.len();
485 let sync_method = batch.sync_method;
486
487 let mut relay_index = self.relay_sync_index.write().await;
488
489 if let Some(state) = relay_index.get_mut(relay_url) {
490 // Move repos to confirmed
491 state.repos.extend(batch.items.repos);
492 // Move root_events to confirmed
493 state.root_events.extend(batch.items.root_events);
470 494
471 tracing::info!( 495 tracing::info!(
472 relay = %relay_url, 496 relay = %relay_url,
473 batch_id = batch_id, 497 batch_id = batch_id,
474 repos_confirmed = repos_count, 498 sync_method = ?sync_method,
475 root_events_confirmed = events_count, 499 repos_confirmed = repos_count,
476 total_repos = state.repos.len(), 500 root_events_confirmed = events_count,
477 total_root_events = state.root_events.len(), 501 total_repos = state.repos.len(),
478 "Batch confirmed - items moved from pending to confirmed" 502 total_root_events = state.root_events.len(),
479 ); 503 "Batch confirmed - items moved from pending to confirmed"
480 } else { 504 );
481 tracing::warn!( 505 } else {
482 relay = %relay_url, 506 tracing::warn!(
483 batch_id = batch_id, 507 relay = %relay_url,
484 "Batch completed but no RelayState found for relay" 508 batch_id = batch_id,
485 ); 509 "Batch completed but no RelayState found for relay"
486 } 510 );
487 } 511 }
488 } 512 }
489 513
@@ -850,6 +874,7 @@ impl SyncManager {
850 root_events: action.root_events.clone(), 874 root_events: action.root_events.clone(),
851 }, 875 },
852 outstanding_subs: subscription_ids.into_iter().collect(), 876 outstanding_subs: subscription_ids.into_iter().collect(),
877 sync_method: SyncMethod::ReqEose,
853 }; 878 };
854 879
855 // Step 5: Add to pending_sync_index 880 // Step 5: Add to pending_sync_index
@@ -1539,10 +1564,14 @@ impl SyncManager {
1539 1564
1540 /// Perform negentropy sync for a filter and process received events 1565 /// Perform negentropy sync for a filter and process received events
1541 /// 1566 ///
1542 /// This method: 1567 /// This method uses the unified PendingBatch flow:
1543 /// 1. Performs negentropy reconciliation with the remote relay 1568 /// 1. Creates a PendingBatch with targets for this relay
1544 /// 2. Processes all received events (dedup, policy check, save, broadcast) 1569 /// 2. Performs negentropy reconciliation with the remote relay
1545 /// 3. Returns the number of events received and processed 1570 /// 3. On success, confirms the batch (moves items to RelayState)
1571 /// 4. On failure, removes the batch without confirming
1572 ///
1573 /// This ensures consistent state tracking across both sync paths
1574 /// (REQ+EOSE and Negentropy).
1546 /// 1575 ///
1547 /// # Arguments 1576 /// # Arguments
1548 /// * `relay_url` - The relay URL to sync with 1577 /// * `relay_url` - The relay URL to sync with
@@ -1552,24 +1581,95 @@ impl SyncManager {
1552 /// # Returns 1581 /// # Returns
1553 /// Number of events received from negentropy sync 1582 /// Number of events received from negentropy sync
1554 async fn negentropy_sync_and_process( 1583 async fn negentropy_sync_and_process(
1555 &self, 1584 &mut self,
1556 relay_url: &str, 1585 relay_url: &str,
1557 filter: Filter, 1586 filter: Filter,
1558 layer_name: &str, 1587 layer_name: &str,
1559 ) -> usize { 1588 ) -> usize {
1560 let connection = match self.connections.get(relay_url) { 1589 use crate::sync::algorithms::derive_relay_targets;
1561 Some(conn) => conn, 1590
1562 None => { 1591 // Check connection exists first (borrow ends immediately)
1563 tracing::warn!( 1592 if !self.connections.contains_key(relay_url) {
1564 relay = %relay_url, 1593 tracing::warn!(
1565 layer = layer_name, 1594 relay = %relay_url,
1566 "No connection found for negentropy sync" 1595 layer = layer_name,
1567 ); 1596 "No connection found for negentropy sync"
1568 return 0; 1597 );
1598 return 0;
1599 }
1600
1601 // Step 1: Get targets for this relay and create PendingBatch
1602 // Get batch_id first (requires mutable borrow of self)
1603 let batch_id = self.next_batch_id();
1604
1605 let pending_items = {
1606 let repo_index = self.repo_sync_index.read().await;
1607 let targets = derive_relay_targets(&repo_index);
1608
1609 let relay_targets = match targets.get(relay_url) {
1610 Some(t) => t,
1611 None => {
1612 tracing::debug!(
1613 relay = %relay_url,
1614 layer = layer_name,
1615 "No targets found for relay, skipping negentropy sync"
1616 );
1617 return 0;
1618 }
1619 };
1620
1621 PendingItems {
1622 repos: relay_targets.repos.clone(),
1623 root_events: relay_targets.root_events.clone(),
1569 } 1624 }
1570 }; 1625 };
1571 1626
1572 // Perform negentropy sync 1627 // Create PendingBatch for negentropy sync (empty outstanding_subs)
1628 let batch = PendingBatch {
1629 batch_id,
1630 items: pending_items.clone(),
1631 outstanding_subs: HashSet::new(), // Negentropy doesn't use subscriptions
1632 sync_method: SyncMethod::Negentropy,
1633 };
1634
1635 // Add batch to pending_sync_index before starting sync
1636 {
1637 let mut pending = self.pending_sync_index.write().await;
1638 pending
1639 .entry(relay_url.to_string())
1640 .or_insert_with(Vec::new)
1641 .push(batch);
1642 }
1643
1644 tracing::debug!(
1645 relay = %relay_url,
1646 layer = layer_name,
1647 batch_id = batch_id,
1648 repos = pending_items.repos.len(),
1649 root_events = pending_items.root_events.len(),
1650 "Created pending batch for negentropy sync"
1651 );
1652
1653 // Step 2: Perform negentropy sync
1654 // Get connection reference here (borrows self.connections briefly)
1655 let Some(connection) = self.connections.get(relay_url) else {
1656 // Connection was removed between check and use (race condition)
1657 // Remove the pending batch we just added
1658 let mut pending = self.pending_sync_index.write().await;
1659 if let Some(batches) = pending.get_mut(relay_url) {
1660 batches.retain(|b| b.batch_id != batch_id);
1661 if batches.is_empty() {
1662 pending.remove(relay_url);
1663 }
1664 }
1665 tracing::warn!(
1666 relay = %relay_url,
1667 layer = layer_name,
1668 "Connection disappeared before negentropy sync could start"
1669 );
1670 return 0;
1671 };
1672
1573 match connection.negentropy_sync_filter(filter).await { 1673 match connection.negentropy_sync_filter(filter).await {
1574 Ok(result) => { 1674 Ok(result) => {
1575 let event_count = result.received.len(); 1675 let event_count = result.received.len();
@@ -1584,14 +1684,6 @@ impl SyncManager {
1584 layer_name 1684 layer_name
1585 ); 1685 );
1586 1686
1587 // Note: nostr-sdk's sync() handles fetching events automatically.
1588 // The result.received contains EventIds that were fetched during sync.
1589 // Events are stored in nostr-sdk's client database.
1590 // For production use, we would need to either:
1591 // 1. Configure nostr-sdk client to use our SharedDatabase
1592 // 2. Fetch events by ID from nostr-sdk's database to process them
1593 // For now, we just log the count - the sync operation itself ensures
1594 // the relay has the events available.
1595 tracing::debug!( 1687 tracing::debug!(
1596 relay = %relay_url, 1688 relay = %relay_url,
1597 layer = layer_name, 1689 layer = layer_name,
@@ -1606,6 +1698,30 @@ impl SyncManager {
1606 } 1698 }
1607 } 1699 }
1608 1700
1701 // Step 3: Remove batch from pending and confirm it
1702 let completed_batch = {
1703 let mut pending = self.pending_sync_index.write().await;
1704 if let Some(batches) = pending.get_mut(relay_url) {
1705 let batch_idx = batches.iter().position(|b| b.batch_id == batch_id);
1706 if let Some(idx) = batch_idx {
1707 let batch = batches.remove(idx);
1708 if batches.is_empty() {
1709 pending.remove(relay_url);
1710 }
1711 Some(batch)
1712 } else {
1713 None
1714 }
1715 } else {
1716 None
1717 }
1718 };
1719
1720 // Confirm the batch using unified confirm_batch method
1721 if let Some(batch) = completed_batch {
1722 self.confirm_batch(relay_url, batch).await;
1723 }
1724
1609 event_count 1725 event_count
1610 } 1726 }
1611 Err(e) => { 1727 Err(e) => {
@@ -1616,6 +1732,18 @@ impl SyncManager {
1616 "Negentropy sync failed for {}, will fall back to REQ+EOSE", 1732 "Negentropy sync failed for {}, will fall back to REQ+EOSE",
1617 layer_name 1733 layer_name
1618 ); 1734 );
1735
1736 // Remove the batch without confirming on failure
1737 {
1738 let mut pending = self.pending_sync_index.write().await;
1739 if let Some(batches) = pending.get_mut(relay_url) {
1740 batches.retain(|b| b.batch_id != batch_id);
1741 if batches.is_empty() {
1742 pending.remove(relay_url);
1743 }
1744 }
1745 }
1746
1619 0 1747 0
1620 } 1748 }
1621 } 1749 }