upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:38:16 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-22 14:38:16 +0000
commitf7546879af9a692fd466772c9af772ada8aca68e (patch)
tree049de66b90c91cb65cf68d4fa620f788140280eb /src
parent541f34a207047b26547154e7d631005d456f12fd (diff)
fix: sync consoldate subscription count
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs67
-rw-r--r--src/sync/relay_connection.rs24
2 files changed, 27 insertions, 64 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 1f95ff7..2475eb6 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1661,67 +1661,6 @@ impl SyncManager {
1661 // Consolidation System 1661 // Consolidation System
1662 // ========================================================================= 1662 // =========================================================================
1663 1663
1664 /// Get the current filter count for a relay
1665 ///
1666 /// Counts both pending subscriptions (outstanding_subs in batches) and
1667 /// confirmed subscriptions (active Layer 2/3 filters based on RelayState).
1668 /// This is used to determine if consolidation is needed.
1669 ///
1670 /// Confirmed filter counts:
1671 /// - Layer 1: 1 filter (announcement subscription)
1672 /// - Layer 2: 3 filters per 100-repo chunk (for kinds 1617/1618/1621)
1673 /// - Layer 3: 3 filters per 100-event chunk (for replies/reactions/etc)
1674 async fn get_filter_count(&self, relay_url: &str) -> usize {
1675 // Count pending subscriptions
1676 let pending_count = {
1677 let pending = self.pending_sync_index.read().await;
1678 match pending.get(relay_url) {
1679 Some(batches) => batches.iter().map(|b| b.outstanding_subs.len()).sum(),
1680 None => 0,
1681 }
1682 };
1683
1684 // Count confirmed subscriptions from relay state
1685 let confirmed_count = {
1686 let relay_index = self.relay_sync_index.read().await;
1687 if let Some(state) = relay_index.get(relay_url) {
1688 // Layer 1: 1 filter for announcements
1689 // Layer 2: 3 filters per 100-repo chunk (ceiling division)
1690 // Layer 3: 3 filters per 100-event chunk (ceiling division)
1691 let repo_count = state.repos.len();
1692 let event_count = state.root_events.len();
1693
1694 let layer1_filters = 1;
1695 let layer2_filters = if repo_count > 0 {
1696 repo_count.div_ceil(100) * 3
1697 } else {
1698 0
1699 };
1700 let layer3_filters = if event_count > 0 {
1701 event_count.div_ceil(100) * 3
1702 } else {
1703 0
1704 };
1705
1706 layer1_filters + layer2_filters + layer3_filters
1707 } else {
1708 0
1709 }
1710 };
1711
1712 let total_count = pending_count + confirmed_count;
1713
1714 tracing::debug!(
1715 relay = %relay_url,
1716 pending_count = pending_count,
1717 confirmed_count = confirmed_count,
1718 total_count = total_count,
1719 "Counted active filters for relay"
1720 );
1721
1722 total_count
1723 }
1724
1725 /// Wait until all pending batches for a relay are complete 1664 /// Wait until all pending batches for a relay are complete
1726 /// 1665 ///
1727 /// Polls the pending_sync_index until the relay has no pending batches. 1666 /// Polls the pending_sync_index until the relay has no pending batches.
@@ -1776,7 +1715,11 @@ impl SyncManager {
1776 /// Compares current filter count + new filter count against the threshold. 1715 /// Compares current filter count + new filter count against the threshold.
1777 /// If exceeded, triggers consolidation before adding new filters. 1716 /// If exceeded, triggers consolidation before adding new filters.
1778 async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { 1717 async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) {
1779 let current_count = self.get_filter_count(relay_url).await; 1718 let current_count = if let Some(connection) = self.connections.get(relay_url) {
1719 connection.subscription_count().await
1720 } else {
1721 0
1722 };
1780 1723
1781 if current_count + new_count > CONSOLIDATION_THRESHOLD { 1724 if current_count + new_count > CONSOLIDATION_THRESHOLD {
1782 tracing::info!( 1725 tracing::info!(
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index d69e1ce..de20e0f 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -218,7 +218,11 @@ impl RelayConnection {
218 sub_id = %subscription_id, 218 sub_id = %subscription_id,
219 "Received event" 219 "Received event"
220 ); 220 );
221 if event_sender.send(RelayEvent::Event(*event, subscription_id.clone())).await.is_err() { 221 if event_sender
222 .send(RelayEvent::Event(*event, subscription_id.clone()))
223 .await
224 .is_err()
225 {
222 tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); 226 tracing::debug!(relay = %url, "Event sender closed, stopping event loop");
223 break; 227 break;
224 } 228 }
@@ -242,7 +246,8 @@ impl RelayConnection {
242 } 246 }
243 RelayMessage::Notice(msg) => { 247 RelayMessage::Notice(msg) => {
244 tracing::debug!(relay = %url, message = %msg, "Received NOTICE"); 248 tracing::debug!(relay = %url, message = %msg, "Received NOTICE");
245 let _ = event_sender.send(RelayEvent::Notice(msg.to_string())).await; 249 let _ =
250 event_sender.send(RelayEvent::Notice(msg.to_string())).await;
246 // Don't break - continue processing events 251 // Don't break - continue processing events
247 } 252 }
248 RelayMessage::Closed { message: msg, .. } => { 253 RelayMessage::Closed { message: msg, .. } => {
@@ -358,6 +363,21 @@ impl RelayConnection {
358 &self.url 363 &self.url
359 } 364 }
360 365
366 /// Get the number of active subscriptions on this connection
367 ///
368 /// Returns the count of subscriptions tracked by the underlying nostr-sdk client.
369 /// This reflects all active REQ subscriptions on the relay, including:
370 /// - Layer 1 announcement subscriptions
371 /// - Layer 2 repo-tagging subscriptions
372 /// - Layer 3 root-event subscriptions
373 /// - Both historic (auto-close) and live subscriptions
374 ///
375 /// # Returns
376 /// The number of active subscriptions
377 pub async fn subscription_count(&self) -> usize {
378 self.client.subscriptions().await.len()
379 }
380
361 /// Disconnect from the relay 381 /// Disconnect from the relay
362 pub async fn disconnect(&self) { 382 pub async fn disconnect(&self) {
363 self.client.disconnect().await; 383 self.client.disconnect().await;