From 565715adf14cafd0f0155d553f583581334a8dac Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 19 Dec 2025 13:56:25 +0000 Subject: sync: fix autoclose on EOSE for historic filters --- src/sync/mod.rs | 20 ++++++++----------- src/sync/relay_connection.rs | 47 +++++++++++++------------------------------- 2 files changed, 22 insertions(+), 45 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3f8e503..88608b1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1614,15 +1614,7 @@ impl SyncManager { let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); // Re-subscribe to Layer 1 with since filter - let layer1_filter = filters::build_announcement_filter(Some(since)); - if let Err(e) = connection.subscribe_filter(layer1_filter).await { - tracing::error!( - relay = %relay_url, - error = %e, - "Failed to re-subscribe to Layer 1 during consolidation" - ); - } - + self.sync_generic_filters(relay_url, Some(since)).await; // Rebuild Layer 2 and Layer 3 with since filter self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; @@ -1817,7 +1809,11 @@ impl SyncManager { for filter in filters.iter() { // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) - match connection.subscribe_filter(filter.clone().limit(1)).await { + // Live subscriptions do NOT auto-close - we want them to stay open for new events + match connection + .subscribe_filter(filter.clone().limit(1), false) + .await + { Ok(sub_id) => { sub_ids.push(sub_id); } @@ -2045,7 +2041,7 @@ impl SyncManager { let mut subscription_ids = HashSet::new(); for (idx, filter) in ids_filters.iter().enumerate() { if let Some(conn) = self.connections.get(relay_url) { - match conn.subscribe_filter(filter.clone()).await { + match conn.subscribe_filter(filter.clone(), true).await { Ok(sub_id) => { subscription_ids.insert(sub_id); } @@ -2102,7 +2098,7 @@ impl SyncManager { ); if let Some(conn) = self.connections.get(relay_url) { - match conn.subscribe_filter(filter.clone()).await { + match conn.subscribe_filter(filter.clone(), true).await { Ok(sub_id) => { subscription_ids.insert(sub_id); } diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index fa229c4..37094be 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs @@ -307,21 +307,33 @@ impl RelayConnection { /// /// # Arguments /// * `filter` - The filter to subscribe to + /// * `auto_close` - If true, subscription automatically closes after EOSE (for historic sync). If false, stays open for new events (for live sync). /// /// # Returns /// * `Ok(SubscriptionId)` - The subscription ID on success /// * `Err(String)` - Error description on failure - pub async fn subscribe_filter(&self, filter: Filter) -> Result { + pub async fn subscribe_filter( + &self, + filter: Filter, + auto_close: bool, + ) -> Result { // DEBUG TRACING: Log the filter being subscribed to tracing::debug!( relay = %self.url, filter = ?filter, + auto_close = auto_close, "subscribe_filter called with filter" ); + let opts = if auto_close { + Some(SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE)) + } else { + None + }; + let output = self .client - .subscribe(filter, None) + .subscribe(filter, opts) .await .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; @@ -334,37 +346,6 @@ impl RelayConnection { Ok(output.val) } - /// Subscribe to multiple filters at once - /// - /// Each filter creates its own subscription. Returns when all subscriptions - /// are established. This is useful for Layer 2 + 3 filters together. - /// - /// # Arguments - /// * `filters` - Vec of filters to subscribe to - /// - /// # Returns - /// * `Ok(Vec)` - The subscription IDs on success - /// * `Err(String)` - Error description on failure - pub async fn subscribe_filters( - &self, - filters: Vec, - ) -> Result, String> { - if filters.is_empty() { - return Ok(vec![]); - } - - let mut sub_ids = Vec::with_capacity(filters.len()); - for filter in filters { - let output = self - .client - .subscribe(filter, None) - .await - .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; - sub_ids.push(output.val); - } - Ok(sub_ids) - } - /// Get the relay URL pub fn url(&self) -> &str { &self.url -- cgit v1.2.3