upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:38:16 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:38:16 +0000
commitf7546879af9a692fd466772c9af772ada8aca68e (patch)
tree049de66b90c91cb65cf68d4fa620f788140280eb /src/sync/mod.rs
parent541f34a207047b26547154e7d631005d456f12fd (diff)
fix: sync consoldate subscription count
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs67
1 files changed, 5 insertions, 62 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 1f95ff7..2475eb6 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1661,67 +1661,6 @@ impl SyncManager {
1661 // Consolidation System 1661 // Consolidation System
1662 // ========================================================================= 1662 // =========================================================================
1663 1663
1664 /// Get the current filter count for a relay
1665 ///
1666 /// Counts both pending subscriptions (outstanding_subs in batches) and
1667 /// confirmed subscriptions (active Layer 2/3 filters based on RelayState).
1668 /// This is used to determine if consolidation is needed.
1669 ///
1670 /// Confirmed filter counts:
1671 /// - Layer 1: 1 filter (announcement subscription)
1672 /// - Layer 2: 3 filters per 100-repo chunk (for kinds 1617/1618/1621)
1673 /// - Layer 3: 3 filters per 100-event chunk (for replies/reactions/etc)
1674 async fn get_filter_count(&self, relay_url: &str) -> usize {
1675 // Count pending subscriptions
1676 let pending_count = {
1677 let pending = self.pending_sync_index.read().await;
1678 match pending.get(relay_url) {
1679 Some(batches) => batches.iter().map(|b| b.outstanding_subs.len()).sum(),
1680 None => 0,
1681 }
1682 };
1683
1684 // Count confirmed subscriptions from relay state
1685 let confirmed_count = {
1686 let relay_index = self.relay_sync_index.read().await;
1687 if let Some(state) = relay_index.get(relay_url) {
1688 // Layer 1: 1 filter for announcements
1689 // Layer 2: 3 filters per 100-repo chunk (ceiling division)
1690 // Layer 3: 3 filters per 100-event chunk (ceiling division)
1691 let repo_count = state.repos.len();
1692 let event_count = state.root_events.len();
1693
1694 let layer1_filters = 1;
1695 let layer2_filters = if repo_count > 0 {
1696 repo_count.div_ceil(100) * 3
1697 } else {
1698 0
1699 };
1700 let layer3_filters = if event_count > 0 {
1701 event_count.div_ceil(100) * 3
1702 } else {
1703 0
1704 };
1705
1706 layer1_filters + layer2_filters + layer3_filters
1707 } else {
1708 0
1709 }
1710 };
1711
1712 let total_count = pending_count + confirmed_count;
1713
1714 tracing::debug!(
1715 relay = %relay_url,
1716 pending_count = pending_count,
1717 confirmed_count = confirmed_count,
1718 total_count = total_count,
1719 "Counted active filters for relay"
1720 );
1721
1722 total_count
1723 }
1724
1725 /// Wait until all pending batches for a relay are complete 1664 /// Wait until all pending batches for a relay are complete
1726 /// 1665 ///
1727 /// Polls the pending_sync_index until the relay has no pending batches. 1666 /// Polls the pending_sync_index until the relay has no pending batches.
@@ -1776,7 +1715,11 @@ impl SyncManager {
1776 /// Compares current filter count + new filter count against the threshold. 1715 /// Compares current filter count + new filter count against the threshold.
1777 /// If exceeded, triggers consolidation before adding new filters. 1716 /// If exceeded, triggers consolidation before adding new filters.
1778 async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { 1717 async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) {
1779 let current_count = self.get_filter_count(relay_url).await; 1718 let current_count = if let Some(connection) = self.connections.get(relay_url) {
1719 connection.subscription_count().await
1720 } else {
1721 0
1722 };
1780 1723
1781 if current_count + new_count > CONSOLIDATION_THRESHOLD { 1724 if current_count + new_count > CONSOLIDATION_THRESHOLD {
1782 tracing::info!( 1725 tracing::info!(