diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-12 10:09:47 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-12 10:20:42 +0000 |
| commit | de07e31fad60f9c68a08807cde17ff81d8371a65 (patch) | |
| tree | 1c670ff38f6702b6e437877049038fbbf3281f97 /src/sync | |
| parent | 307c380a91a3575ab2266ed08427d24c7b2d016e (diff) | |
fix: unify sync state tracking for negentropy and REQ+EOSE paths
When negentropy (NIP-77) sync was enabled, the RelaySyncIndex was never
updated to reflect historical sync completion. This caused the three-way
diff algorithm in compute_actions() to malfunction, leading to:
- Repeated sync attempts for the same items
- Incorrect filter counting for consolidation
- Potential premature relay disconnection
This fix unifies both sync paths (REQ+EOSE and Negentropy) through a
consistent PendingBatch flow:
1. Added SyncMethod enum to distinguish between sync types
2. Updated PendingBatch struct to include sync_method field
3. Extracted confirm_batch() method for unified batch confirmation
4. Modified negentropy_sync_and_process() to:
- Create a PendingBatch before sync
- Add batch to pending_sync_index
- On success: Remove batch and call confirm_batch()
- On failure: Remove batch without confirming
The confirm_batch() method moves repos and root_events from the batch
to the RelayState.repos and RelayState.root_events, ensuring the
three-way diff works correctly regardless of sync method.
Closes: negentropy-sync-state-tracking.md
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/algorithms.rs | 4 | ||||
| -rw-r--r-- | src/sync/mod.rs | 242 |
2 files changed, 188 insertions, 58 deletions
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 3063516..5b5b520 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -11,7 +11,7 @@ use std::collections::{HashMap, HashSet}; | |||
| 11 | 11 | ||
| 12 | use nostr_sdk::prelude::*; | 12 | use nostr_sdk::prelude::*; |
| 13 | 13 | ||
| 14 | use super::{ConnectionStatus, PendingBatch, RelayState}; | 14 | use super::{ConnectionStatus, PendingBatch, RelayState, SyncMethod}; |
| 15 | 15 | ||
| 16 | // ============================================================================= | 16 | // ============================================================================= |
| 17 | // Data Structures | 17 | // Data Structures |
| @@ -396,6 +396,7 @@ mod tests { | |||
| 396 | root_events: HashSet::new(), | 396 | root_events: HashSet::new(), |
| 397 | }, | 397 | }, |
| 398 | outstanding_subs: HashSet::new(), | 398 | outstanding_subs: HashSet::new(), |
| 399 | sync_method: SyncMethod::ReqEose, | ||
| 399 | }], | 400 | }], |
| 400 | ); | 401 | ); |
| 401 | 402 | ||
| @@ -504,6 +505,7 @@ mod tests { | |||
| 504 | root_events: HashSet::new(), | 505 | root_events: HashSet::new(), |
| 505 | }, | 506 | }, |
| 506 | outstanding_subs: HashSet::new(), | 507 | outstanding_subs: HashSet::new(), |
| 508 | sync_method: SyncMethod::ReqEose, | ||
| 507 | }], | 509 | }], |
| 508 | ); | 510 | ); |
| 509 | 511 | ||
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 50d4ae5..b69d627 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -137,15 +137,27 @@ impl RelayState { | |||
| 137 | } | 137 | } |
| 138 | } | 138 | } |
| 139 | 139 | ||
| 140 | /// A batch of items pending EOSE confirmation | 140 | /// Method used for synchronization |
| 141 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 142 | pub enum SyncMethod { | ||
| 143 | /// Traditional REQ+EOSE flow - waits for EOSE on subscriptions | ||
| 144 | ReqEose, | ||
| 145 | /// NIP-77 negentropy sync - confirms immediately after sync completes | ||
| 146 | Negentropy, | ||
| 147 | } | ||
| 148 | |||
| 149 | /// A batch of items pending confirmation | ||
| 141 | #[derive(Debug, Clone)] | 150 | #[derive(Debug, Clone)] |
| 142 | pub struct PendingBatch { | 151 | pub struct PendingBatch { |
| 143 | /// Unique ID for this batch - for debugging/logging | 152 | /// Unique ID for this batch - for debugging/logging |
| 144 | pub batch_id: u64, | 153 | pub batch_id: u64, |
| 145 | /// The items this batch is syncing | 154 | /// The items this batch is syncing |
| 146 | pub items: PendingItems, | 155 | pub items: PendingItems, |
| 147 | /// Subscription IDs that must ALL receive EOSE before confirming | 156 | /// Subscription IDs that must ALL receive EOSE before confirming (for ReqEose) |
| 157 | /// Empty for Negentropy sync method | ||
| 148 | pub outstanding_subs: HashSet<SubscriptionId>, | 158 | pub outstanding_subs: HashSet<SubscriptionId>, |
| 159 | /// The sync method used for this batch | ||
| 160 | pub sync_method: SyncMethod, | ||
| 149 | } | 161 | } |
| 150 | 162 | ||
| 151 | /// Items included in a pending batch | 163 | /// Items included in a pending batch |
| @@ -397,9 +409,7 @@ impl SyncManager { | |||
| 397 | /// - Finds the PendingBatch containing this subscription ID | 409 | /// - Finds the PendingBatch containing this subscription ID |
| 398 | /// - Removes the subscription from outstanding_subs | 410 | /// - Removes the subscription from outstanding_subs |
| 399 | /// - When all subscriptions complete (outstanding_subs empty): | 411 | /// - When all subscriptions complete (outstanding_subs empty): |
| 400 | /// - Moves repos from pending to confirmed in RelayState | 412 | /// - Calls confirm_batch to move items to confirmed state |
| 401 | /// - Moves root_events from pending to confirmed | ||
| 402 | /// - Removes the batch from pending_sync_index | ||
| 403 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { | 413 | async fn handle_eose(&mut self, relay_url: &str, sub_id: SubscriptionId) { |
| 404 | // 1. Find and update the pending batch | 414 | // 1. Find and update the pending batch |
| 405 | let mut pending = self.pending_sync_index.write().await; | 415 | let mut pending = self.pending_sync_index.write().await; |
| @@ -444,46 +454,60 @@ impl SyncManager { | |||
| 444 | return; | 454 | return; |
| 445 | } | 455 | } |
| 446 | 456 | ||
| 447 | // 2. Batch complete - extract items and remove batch | 457 | // 2. Batch complete - extract and remove |
| 448 | let completed_batch = batches.remove(batch_idx); | 458 | let completed_batch = batches.remove(batch_idx); |
| 449 | let batch_id = completed_batch.batch_id; | ||
| 450 | let repos_count = completed_batch.items.repos.len(); | ||
| 451 | let events_count = completed_batch.items.root_events.len(); | ||
| 452 | 459 | ||
| 453 | // Clean up empty relay entry | 460 | // Clean up empty relay entry |
| 454 | if batches.is_empty() { | 461 | if batches.is_empty() { |
| 455 | pending.remove(relay_url); | 462 | pending.remove(relay_url); |
| 456 | } | 463 | } |
| 457 | 464 | ||
| 458 | // Drop the pending lock before acquiring relay_sync_index lock | 465 | // Drop the pending lock before confirm_batch |
| 459 | drop(pending); | 466 | drop(pending); |
| 460 | 467 | ||
| 461 | // 3. Move items to confirmed state in RelayState | 468 | // 3. Confirm the batch (moves items to RelayState) |
| 462 | { | 469 | self.confirm_batch(relay_url, completed_batch).await; |
| 463 | let mut relay_index = self.relay_sync_index.write().await; | 470 | } |
| 464 | 471 | ||
| 465 | if let Some(state) = relay_index.get_mut(relay_url) { | 472 | /// Confirm a completed batch by moving items to RelayState |
| 466 | // Move repos to confirmed | 473 | /// |
| 467 | state.repos.extend(completed_batch.items.repos); | 474 | /// This method is used by both sync paths (REQ+EOSE and Negentropy) to |
| 468 | // Move root_events to confirmed | 475 | /// move repos and root_events from pending to confirmed state. This unified |
| 469 | state.root_events.extend(completed_batch.items.root_events); | 476 | /// flow ensures consistent state tracking regardless of sync method. |
| 477 | /// | ||
| 478 | /// # Arguments | ||
| 479 | /// * `relay_url` - The relay URL the batch belongs to | ||
| 480 | /// * `batch` - The completed batch to confirm | ||
| 481 | async fn confirm_batch(&self, relay_url: &str, batch: PendingBatch) { | ||
| 482 | let batch_id = batch.batch_id; | ||
| 483 | let repos_count = batch.items.repos.len(); | ||
| 484 | let events_count = batch.items.root_events.len(); | ||
| 485 | let sync_method = batch.sync_method; | ||
| 486 | |||
| 487 | let mut relay_index = self.relay_sync_index.write().await; | ||
| 488 | |||
| 489 | if let Some(state) = relay_index.get_mut(relay_url) { | ||
| 490 | // Move repos to confirmed | ||
| 491 | state.repos.extend(batch.items.repos); | ||
| 492 | // Move root_events to confirmed | ||
| 493 | state.root_events.extend(batch.items.root_events); | ||
| 470 | 494 | ||
| 471 | tracing::info!( | 495 | tracing::info!( |
| 472 | relay = %relay_url, | 496 | relay = %relay_url, |
| 473 | batch_id = batch_id, | 497 | batch_id = batch_id, |
| 474 | repos_confirmed = repos_count, | 498 | sync_method = ?sync_method, |
| 475 | root_events_confirmed = events_count, | 499 | repos_confirmed = repos_count, |
| 476 | total_repos = state.repos.len(), | 500 | root_events_confirmed = events_count, |
| 477 | total_root_events = state.root_events.len(), | 501 | total_repos = state.repos.len(), |
| 478 | "Batch confirmed - items moved from pending to confirmed" | 502 | total_root_events = state.root_events.len(), |
| 479 | ); | 503 | "Batch confirmed - items moved from pending to confirmed" |
| 480 | } else { | 504 | ); |
| 481 | tracing::warn!( | 505 | } else { |
| 482 | relay = %relay_url, | 506 | tracing::warn!( |
| 483 | batch_id = batch_id, | 507 | relay = %relay_url, |
| 484 | "Batch completed but no RelayState found for relay" | 508 | batch_id = batch_id, |
| 485 | ); | 509 | "Batch completed but no RelayState found for relay" |
| 486 | } | 510 | ); |
| 487 | } | 511 | } |
| 488 | } | 512 | } |
| 489 | 513 | ||
| @@ -850,6 +874,7 @@ impl SyncManager { | |||
| 850 | root_events: action.root_events.clone(), | 874 | root_events: action.root_events.clone(), |
| 851 | }, | 875 | }, |
| 852 | outstanding_subs: subscription_ids.into_iter().collect(), | 876 | outstanding_subs: subscription_ids.into_iter().collect(), |
| 877 | sync_method: SyncMethod::ReqEose, | ||
| 853 | }; | 878 | }; |
| 854 | 879 | ||
| 855 | // Step 5: Add to pending_sync_index | 880 | // Step 5: Add to pending_sync_index |
| @@ -1539,10 +1564,14 @@ impl SyncManager { | |||
| 1539 | 1564 | ||
| 1540 | /// Perform negentropy sync for a filter and process received events | 1565 | /// Perform negentropy sync for a filter and process received events |
| 1541 | /// | 1566 | /// |
| 1542 | /// This method: | 1567 | /// This method uses the unified PendingBatch flow: |
| 1543 | /// 1. Performs negentropy reconciliation with the remote relay | 1568 | /// 1. Creates a PendingBatch with targets for this relay |
| 1544 | /// 2. Processes all received events (dedup, policy check, save, broadcast) | 1569 | /// 2. Performs negentropy reconciliation with the remote relay |
| 1545 | /// 3. Returns the number of events received and processed | 1570 | /// 3. On success, confirms the batch (moves items to RelayState) |
| 1571 | /// 4. On failure, removes the batch without confirming | ||
| 1572 | /// | ||
| 1573 | /// This ensures consistent state tracking across both sync paths | ||
| 1574 | /// (REQ+EOSE and Negentropy). | ||
| 1546 | /// | 1575 | /// |
| 1547 | /// # Arguments | 1576 | /// # Arguments |
| 1548 | /// * `relay_url` - The relay URL to sync with | 1577 | /// * `relay_url` - The relay URL to sync with |
| @@ -1552,24 +1581,95 @@ impl SyncManager { | |||
| 1552 | /// # Returns | 1581 | /// # Returns |
| 1553 | /// Number of events received from negentropy sync | 1582 | /// Number of events received from negentropy sync |
| 1554 | async fn negentropy_sync_and_process( | 1583 | async fn negentropy_sync_and_process( |
| 1555 | &self, | 1584 | &mut self, |
| 1556 | relay_url: &str, | 1585 | relay_url: &str, |
| 1557 | filter: Filter, | 1586 | filter: Filter, |
| 1558 | layer_name: &str, | 1587 | layer_name: &str, |
| 1559 | ) -> usize { | 1588 | ) -> usize { |
| 1560 | let connection = match self.connections.get(relay_url) { | 1589 | use crate::sync::algorithms::derive_relay_targets; |
| 1561 | Some(conn) => conn, | 1590 | |
| 1562 | None => { | 1591 | // Check connection exists first (borrow ends immediately) |
| 1563 | tracing::warn!( | 1592 | if !self.connections.contains_key(relay_url) { |
| 1564 | relay = %relay_url, | 1593 | tracing::warn!( |
| 1565 | layer = layer_name, | 1594 | relay = %relay_url, |
| 1566 | "No connection found for negentropy sync" | 1595 | layer = layer_name, |
| 1567 | ); | 1596 | "No connection found for negentropy sync" |
| 1568 | return 0; | 1597 | ); |
| 1598 | return 0; | ||
| 1599 | } | ||
| 1600 | |||
| 1601 | // Step 1: Get targets for this relay and create PendingBatch | ||
| 1602 | // Get batch_id first (requires mutable borrow of self) | ||
| 1603 | let batch_id = self.next_batch_id(); | ||
| 1604 | |||
| 1605 | let pending_items = { | ||
| 1606 | let repo_index = self.repo_sync_index.read().await; | ||
| 1607 | let targets = derive_relay_targets(&repo_index); | ||
| 1608 | |||
| 1609 | let relay_targets = match targets.get(relay_url) { | ||
| 1610 | Some(t) => t, | ||
| 1611 | None => { | ||
| 1612 | tracing::debug!( | ||
| 1613 | relay = %relay_url, | ||
| 1614 | layer = layer_name, | ||
| 1615 | "No targets found for relay, skipping negentropy sync" | ||
| 1616 | ); | ||
| 1617 | return 0; | ||
| 1618 | } | ||
| 1619 | }; | ||
| 1620 | |||
| 1621 | PendingItems { | ||
| 1622 | repos: relay_targets.repos.clone(), | ||
| 1623 | root_events: relay_targets.root_events.clone(), | ||
| 1569 | } | 1624 | } |
| 1570 | }; | 1625 | }; |
| 1571 | 1626 | ||
| 1572 | // Perform negentropy sync | 1627 | // Create PendingBatch for negentropy sync (empty outstanding_subs) |
| 1628 | let batch = PendingBatch { | ||
| 1629 | batch_id, | ||
| 1630 | items: pending_items.clone(), | ||
| 1631 | outstanding_subs: HashSet::new(), // Negentropy doesn't use subscriptions | ||
| 1632 | sync_method: SyncMethod::Negentropy, | ||
| 1633 | }; | ||
| 1634 | |||
| 1635 | // Add batch to pending_sync_index before starting sync | ||
| 1636 | { | ||
| 1637 | let mut pending = self.pending_sync_index.write().await; | ||
| 1638 | pending | ||
| 1639 | .entry(relay_url.to_string()) | ||
| 1640 | .or_insert_with(Vec::new) | ||
| 1641 | .push(batch); | ||
| 1642 | } | ||
| 1643 | |||
| 1644 | tracing::debug!( | ||
| 1645 | relay = %relay_url, | ||
| 1646 | layer = layer_name, | ||
| 1647 | batch_id = batch_id, | ||
| 1648 | repos = pending_items.repos.len(), | ||
| 1649 | root_events = pending_items.root_events.len(), | ||
| 1650 | "Created pending batch for negentropy sync" | ||
| 1651 | ); | ||
| 1652 | |||
| 1653 | // Step 2: Perform negentropy sync | ||
| 1654 | // Get connection reference here (borrows self.connections briefly) | ||
| 1655 | let Some(connection) = self.connections.get(relay_url) else { | ||
| 1656 | // Connection was removed between check and use (race condition) | ||
| 1657 | // Remove the pending batch we just added | ||
| 1658 | let mut pending = self.pending_sync_index.write().await; | ||
| 1659 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 1660 | batches.retain(|b| b.batch_id != batch_id); | ||
| 1661 | if batches.is_empty() { | ||
| 1662 | pending.remove(relay_url); | ||
| 1663 | } | ||
| 1664 | } | ||
| 1665 | tracing::warn!( | ||
| 1666 | relay = %relay_url, | ||
| 1667 | layer = layer_name, | ||
| 1668 | "Connection disappeared before negentropy sync could start" | ||
| 1669 | ); | ||
| 1670 | return 0; | ||
| 1671 | }; | ||
| 1672 | |||
| 1573 | match connection.negentropy_sync_filter(filter).await { | 1673 | match connection.negentropy_sync_filter(filter).await { |
| 1574 | Ok(result) => { | 1674 | Ok(result) => { |
| 1575 | let event_count = result.received.len(); | 1675 | let event_count = result.received.len(); |
| @@ -1584,14 +1684,6 @@ impl SyncManager { | |||
| 1584 | layer_name | 1684 | layer_name |
| 1585 | ); | 1685 | ); |
| 1586 | 1686 | ||
| 1587 | // Note: nostr-sdk's sync() handles fetching events automatically. | ||
| 1588 | // The result.received contains EventIds that were fetched during sync. | ||
| 1589 | // Events are stored in nostr-sdk's client database. | ||
| 1590 | // For production use, we would need to either: | ||
| 1591 | // 1. Configure nostr-sdk client to use our SharedDatabase | ||
| 1592 | // 2. Fetch events by ID from nostr-sdk's database to process them | ||
| 1593 | // For now, we just log the count - the sync operation itself ensures | ||
| 1594 | // the relay has the events available. | ||
| 1595 | tracing::debug!( | 1687 | tracing::debug!( |
| 1596 | relay = %relay_url, | 1688 | relay = %relay_url, |
| 1597 | layer = layer_name, | 1689 | layer = layer_name, |
| @@ -1606,6 +1698,30 @@ impl SyncManager { | |||
| 1606 | } | 1698 | } |
| 1607 | } | 1699 | } |
| 1608 | 1700 | ||
| 1701 | // Step 3: Remove batch from pending and confirm it | ||
| 1702 | let completed_batch = { | ||
| 1703 | let mut pending = self.pending_sync_index.write().await; | ||
| 1704 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 1705 | let batch_idx = batches.iter().position(|b| b.batch_id == batch_id); | ||
| 1706 | if let Some(idx) = batch_idx { | ||
| 1707 | let batch = batches.remove(idx); | ||
| 1708 | if batches.is_empty() { | ||
| 1709 | pending.remove(relay_url); | ||
| 1710 | } | ||
| 1711 | Some(batch) | ||
| 1712 | } else { | ||
| 1713 | None | ||
| 1714 | } | ||
| 1715 | } else { | ||
| 1716 | None | ||
| 1717 | } | ||
| 1718 | }; | ||
| 1719 | |||
| 1720 | // Confirm the batch using unified confirm_batch method | ||
| 1721 | if let Some(batch) = completed_batch { | ||
| 1722 | self.confirm_batch(relay_url, batch).await; | ||
| 1723 | } | ||
| 1724 | |||
| 1609 | event_count | 1725 | event_count |
| 1610 | } | 1726 | } |
| 1611 | Err(e) => { | 1727 | Err(e) => { |
| @@ -1616,6 +1732,18 @@ impl SyncManager { | |||
| 1616 | "Negentropy sync failed for {}, will fall back to REQ+EOSE", | 1732 | "Negentropy sync failed for {}, will fall back to REQ+EOSE", |
| 1617 | layer_name | 1733 | layer_name |
| 1618 | ); | 1734 | ); |
| 1735 | |||
| 1736 | // Remove the batch without confirming on failure | ||
| 1737 | { | ||
| 1738 | let mut pending = self.pending_sync_index.write().await; | ||
| 1739 | if let Some(batches) = pending.get_mut(relay_url) { | ||
| 1740 | batches.retain(|b| b.batch_id != batch_id); | ||
| 1741 | if batches.is_empty() { | ||
| 1742 | pending.remove(relay_url); | ||
| 1743 | } | ||
| 1744 | } | ||
| 1745 | } | ||
| 1746 | |||
| 1619 | 0 | 1747 | 0 |
| 1620 | } | 1748 | } |
| 1621 | } | 1749 | } |