diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:38:16 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-22 14:38:16 +0000 |
| commit | f7546879af9a692fd466772c9af772ada8aca68e (patch) | |
| tree | 049de66b90c91cb65cf68d4fa620f788140280eb /src/sync | |
| parent | 541f34a207047b26547154e7d631005d456f12fd (diff) | |
fix: sync consoldate subscription count
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 67 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 24 |
2 files changed, 27 insertions, 64 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1f95ff7..2475eb6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1661,67 +1661,6 @@ impl SyncManager { | |||
| 1661 | // Consolidation System | 1661 | // Consolidation System |
| 1662 | // ========================================================================= | 1662 | // ========================================================================= |
| 1663 | 1663 | ||
| 1664 | /// Get the current filter count for a relay | ||
| 1665 | /// | ||
| 1666 | /// Counts both pending subscriptions (outstanding_subs in batches) and | ||
| 1667 | /// confirmed subscriptions (active Layer 2/3 filters based on RelayState). | ||
| 1668 | /// This is used to determine if consolidation is needed. | ||
| 1669 | /// | ||
| 1670 | /// Confirmed filter counts: | ||
| 1671 | /// - Layer 1: 1 filter (announcement subscription) | ||
| 1672 | /// - Layer 2: 3 filters per 100-repo chunk (for kinds 1617/1618/1621) | ||
| 1673 | /// - Layer 3: 3 filters per 100-event chunk (for replies/reactions/etc) | ||
| 1674 | async fn get_filter_count(&self, relay_url: &str) -> usize { | ||
| 1675 | // Count pending subscriptions | ||
| 1676 | let pending_count = { | ||
| 1677 | let pending = self.pending_sync_index.read().await; | ||
| 1678 | match pending.get(relay_url) { | ||
| 1679 | Some(batches) => batches.iter().map(|b| b.outstanding_subs.len()).sum(), | ||
| 1680 | None => 0, | ||
| 1681 | } | ||
| 1682 | }; | ||
| 1683 | |||
| 1684 | // Count confirmed subscriptions from relay state | ||
| 1685 | let confirmed_count = { | ||
| 1686 | let relay_index = self.relay_sync_index.read().await; | ||
| 1687 | if let Some(state) = relay_index.get(relay_url) { | ||
| 1688 | // Layer 1: 1 filter for announcements | ||
| 1689 | // Layer 2: 3 filters per 100-repo chunk (ceiling division) | ||
| 1690 | // Layer 3: 3 filters per 100-event chunk (ceiling division) | ||
| 1691 | let repo_count = state.repos.len(); | ||
| 1692 | let event_count = state.root_events.len(); | ||
| 1693 | |||
| 1694 | let layer1_filters = 1; | ||
| 1695 | let layer2_filters = if repo_count > 0 { | ||
| 1696 | repo_count.div_ceil(100) * 3 | ||
| 1697 | } else { | ||
| 1698 | 0 | ||
| 1699 | }; | ||
| 1700 | let layer3_filters = if event_count > 0 { | ||
| 1701 | event_count.div_ceil(100) * 3 | ||
| 1702 | } else { | ||
| 1703 | 0 | ||
| 1704 | }; | ||
| 1705 | |||
| 1706 | layer1_filters + layer2_filters + layer3_filters | ||
| 1707 | } else { | ||
| 1708 | 0 | ||
| 1709 | } | ||
| 1710 | }; | ||
| 1711 | |||
| 1712 | let total_count = pending_count + confirmed_count; | ||
| 1713 | |||
| 1714 | tracing::debug!( | ||
| 1715 | relay = %relay_url, | ||
| 1716 | pending_count = pending_count, | ||
| 1717 | confirmed_count = confirmed_count, | ||
| 1718 | total_count = total_count, | ||
| 1719 | "Counted active filters for relay" | ||
| 1720 | ); | ||
| 1721 | |||
| 1722 | total_count | ||
| 1723 | } | ||
| 1724 | |||
| 1725 | /// Wait until all pending batches for a relay are complete | 1664 | /// Wait until all pending batches for a relay are complete |
| 1726 | /// | 1665 | /// |
| 1727 | /// Polls the pending_sync_index until the relay has no pending batches. | 1666 | /// Polls the pending_sync_index until the relay has no pending batches. |
| @@ -1776,7 +1715,11 @@ impl SyncManager { | |||
| 1776 | /// Compares current filter count + new filter count against the threshold. | 1715 | /// Compares current filter count + new filter count against the threshold. |
| 1777 | /// If exceeded, triggers consolidation before adding new filters. | 1716 | /// If exceeded, triggers consolidation before adding new filters. |
| 1778 | async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { | 1717 | async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { |
| 1779 | let current_count = self.get_filter_count(relay_url).await; | 1718 | let current_count = if let Some(connection) = self.connections.get(relay_url) { |
| 1719 | connection.subscription_count().await | ||
| 1720 | } else { | ||
| 1721 | 0 | ||
| 1722 | }; | ||
| 1780 | 1723 | ||
| 1781 | if current_count + new_count > CONSOLIDATION_THRESHOLD { | 1724 | if current_count + new_count > CONSOLIDATION_THRESHOLD { |
| 1782 | tracing::info!( | 1725 | tracing::info!( |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index d69e1ce..de20e0f 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -218,7 +218,11 @@ impl RelayConnection { | |||
| 218 | sub_id = %subscription_id, | 218 | sub_id = %subscription_id, |
| 219 | "Received event" | 219 | "Received event" |
| 220 | ); | 220 | ); |
| 221 | if event_sender.send(RelayEvent::Event(*event, subscription_id.clone())).await.is_err() { | 221 | if event_sender |
| 222 | .send(RelayEvent::Event(*event, subscription_id.clone())) | ||
| 223 | .await | ||
| 224 | .is_err() | ||
| 225 | { | ||
| 222 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); | 226 | tracing::debug!(relay = %url, "Event sender closed, stopping event loop"); |
| 223 | break; | 227 | break; |
| 224 | } | 228 | } |
| @@ -242,7 +246,8 @@ impl RelayConnection { | |||
| 242 | } | 246 | } |
| 243 | RelayMessage::Notice(msg) => { | 247 | RelayMessage::Notice(msg) => { |
| 244 | tracing::debug!(relay = %url, message = %msg, "Received NOTICE"); | 248 | tracing::debug!(relay = %url, message = %msg, "Received NOTICE"); |
| 245 | let _ = event_sender.send(RelayEvent::Notice(msg.to_string())).await; | 249 | let _ = |
| 250 | event_sender.send(RelayEvent::Notice(msg.to_string())).await; | ||
| 246 | // Don't break - continue processing events | 251 | // Don't break - continue processing events |
| 247 | } | 252 | } |
| 248 | RelayMessage::Closed { message: msg, .. } => { | 253 | RelayMessage::Closed { message: msg, .. } => { |
| @@ -358,6 +363,21 @@ impl RelayConnection { | |||
| 358 | &self.url | 363 | &self.url |
| 359 | } | 364 | } |
| 360 | 365 | ||
| 366 | /// Get the number of active subscriptions on this connection | ||
| 367 | /// | ||
| 368 | /// Returns the count of subscriptions tracked by the underlying nostr-sdk client. | ||
| 369 | /// This reflects all active REQ subscriptions on the relay, including: | ||
| 370 | /// - Layer 1 announcement subscriptions | ||
| 371 | /// - Layer 2 repo-tagging subscriptions | ||
| 372 | /// - Layer 3 root-event subscriptions | ||
| 373 | /// - Both historic (auto-close) and live subscriptions | ||
| 374 | /// | ||
| 375 | /// # Returns | ||
| 376 | /// The number of active subscriptions | ||
| 377 | pub async fn subscription_count(&self) -> usize { | ||
| 378 | self.client.subscriptions().await.len() | ||
| 379 | } | ||
| 380 | |||
| 361 | /// Disconnect from the relay | 381 | /// Disconnect from the relay |
| 362 | pub async fn disconnect(&self) { | 382 | pub async fn disconnect(&self) { |
| 363 | self.client.disconnect().await; | 383 | self.client.disconnect().await; |