diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:38:16 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:38:16 +0000 |
| commit | f7546879af9a692fd466772c9af772ada8aca68e (patch) | |
| tree | 049de66b90c91cb65cf68d4fa620f788140280eb /src/sync/mod.rs | |
| parent | 541f34a207047b26547154e7d631005d456f12fd (diff) | |
fix: sync consoldate subscription count
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 67 |
1 files changed, 5 insertions, 62 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1f95ff7..2475eb6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1661,67 +1661,6 @@ impl SyncManager { | |||
| 1661 | // Consolidation System | 1661 | // Consolidation System |
| 1662 | // ========================================================================= | 1662 | // ========================================================================= |
| 1663 | 1663 | ||
| 1664 | /// Get the current filter count for a relay | ||
| 1665 | /// | ||
| 1666 | /// Counts both pending subscriptions (outstanding_subs in batches) and | ||
| 1667 | /// confirmed subscriptions (active Layer 2/3 filters based on RelayState). | ||
| 1668 | /// This is used to determine if consolidation is needed. | ||
| 1669 | /// | ||
| 1670 | /// Confirmed filter counts: | ||
| 1671 | /// - Layer 1: 1 filter (announcement subscription) | ||
| 1672 | /// - Layer 2: 3 filters per 100-repo chunk (for kinds 1617/1618/1621) | ||
| 1673 | /// - Layer 3: 3 filters per 100-event chunk (for replies/reactions/etc) | ||
| 1674 | async fn get_filter_count(&self, relay_url: &str) -> usize { | ||
| 1675 | // Count pending subscriptions | ||
| 1676 | let pending_count = { | ||
| 1677 | let pending = self.pending_sync_index.read().await; | ||
| 1678 | match pending.get(relay_url) { | ||
| 1679 | Some(batches) => batches.iter().map(|b| b.outstanding_subs.len()).sum(), | ||
| 1680 | None => 0, | ||
| 1681 | } | ||
| 1682 | }; | ||
| 1683 | |||
| 1684 | // Count confirmed subscriptions from relay state | ||
| 1685 | let confirmed_count = { | ||
| 1686 | let relay_index = self.relay_sync_index.read().await; | ||
| 1687 | if let Some(state) = relay_index.get(relay_url) { | ||
| 1688 | // Layer 1: 1 filter for announcements | ||
| 1689 | // Layer 2: 3 filters per 100-repo chunk (ceiling division) | ||
| 1690 | // Layer 3: 3 filters per 100-event chunk (ceiling division) | ||
| 1691 | let repo_count = state.repos.len(); | ||
| 1692 | let event_count = state.root_events.len(); | ||
| 1693 | |||
| 1694 | let layer1_filters = 1; | ||
| 1695 | let layer2_filters = if repo_count > 0 { | ||
| 1696 | repo_count.div_ceil(100) * 3 | ||
| 1697 | } else { | ||
| 1698 | 0 | ||
| 1699 | }; | ||
| 1700 | let layer3_filters = if event_count > 0 { | ||
| 1701 | event_count.div_ceil(100) * 3 | ||
| 1702 | } else { | ||
| 1703 | 0 | ||
| 1704 | }; | ||
| 1705 | |||
| 1706 | layer1_filters + layer2_filters + layer3_filters | ||
| 1707 | } else { | ||
| 1708 | 0 | ||
| 1709 | } | ||
| 1710 | }; | ||
| 1711 | |||
| 1712 | let total_count = pending_count + confirmed_count; | ||
| 1713 | |||
| 1714 | tracing::debug!( | ||
| 1715 | relay = %relay_url, | ||
| 1716 | pending_count = pending_count, | ||
| 1717 | confirmed_count = confirmed_count, | ||
| 1718 | total_count = total_count, | ||
| 1719 | "Counted active filters for relay" | ||
| 1720 | ); | ||
| 1721 | |||
| 1722 | total_count | ||
| 1723 | } | ||
| 1724 | |||
| 1725 | /// Wait until all pending batches for a relay are complete | 1664 | /// Wait until all pending batches for a relay are complete |
| 1726 | /// | 1665 | /// |
| 1727 | /// Polls the pending_sync_index until the relay has no pending batches. | 1666 | /// Polls the pending_sync_index until the relay has no pending batches. |
| @@ -1776,7 +1715,11 @@ impl SyncManager { | |||
| 1776 | /// Compares current filter count + new filter count against the threshold. | 1715 | /// Compares current filter count + new filter count against the threshold. |
| 1777 | /// If exceeded, triggers consolidation before adding new filters. | 1716 | /// If exceeded, triggers consolidation before adding new filters. |
| 1778 | async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { | 1717 | async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { |
| 1779 | let current_count = self.get_filter_count(relay_url).await; | 1718 | let current_count = if let Some(connection) = self.connections.get(relay_url) { |
| 1719 | connection.subscription_count().await | ||
| 1720 | } else { | ||
| 1721 | 0 | ||
| 1722 | }; | ||
| 1780 | 1723 | ||
| 1781 | if current_count + new_count > CONSOLIDATION_THRESHOLD { | 1724 | if current_count + new_count > CONSOLIDATION_THRESHOLD { |
| 1782 | tracing::info!( | 1725 | tracing::info!( |