diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 13:56:25 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-19 13:56:25 +0000 |
| commit | 565715adf14cafd0f0155d553f583581334a8dac (patch) | |
| tree | 8f692254da5fd91a8944acf4d815602672dd0ef4 /src/sync | |
| parent | f41bd47bf95dabfa3d0e6cb110e751e7cd43f138 (diff) | |
sync: fix autoclose on EOSE for historic filters
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 20 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 47 |
2 files changed, 22 insertions, 45 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3f8e503..88608b1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1614,15 +1614,7 @@ impl SyncManager { | |||
| 1614 | let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); | 1614 | let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); |
| 1615 | 1615 | ||
| 1616 | // Re-subscribe to Layer 1 with since filter | 1616 | // Re-subscribe to Layer 1 with since filter |
| 1617 | let layer1_filter = filters::build_announcement_filter(Some(since)); | 1617 | self.sync_generic_filters(relay_url, Some(since)).await; |
| 1618 | if let Err(e) = connection.subscribe_filter(layer1_filter).await { | ||
| 1619 | tracing::error!( | ||
| 1620 | relay = %relay_url, | ||
| 1621 | error = %e, | ||
| 1622 | "Failed to re-subscribe to Layer 1 during consolidation" | ||
| 1623 | ); | ||
| 1624 | } | ||
| 1625 | |||
| 1626 | // Rebuild Layer 2 and Layer 3 with since filter | 1618 | // Rebuild Layer 2 and Layer 3 with since filter |
| 1627 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | 1619 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; |
| 1628 | 1620 | ||
| @@ -1817,7 +1809,11 @@ impl SyncManager { | |||
| 1817 | 1809 | ||
| 1818 | for filter in filters.iter() { | 1810 | for filter in filters.iter() { |
| 1819 | // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) | 1811 | // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) |
| 1820 | match connection.subscribe_filter(filter.clone().limit(1)).await { | 1812 | // Live subscriptions do NOT auto-close - we want them to stay open for new events |
| 1813 | match connection | ||
| 1814 | .subscribe_filter(filter.clone().limit(1), false) | ||
| 1815 | .await | ||
| 1816 | { | ||
| 1821 | Ok(sub_id) => { | 1817 | Ok(sub_id) => { |
| 1822 | sub_ids.push(sub_id); | 1818 | sub_ids.push(sub_id); |
| 1823 | } | 1819 | } |
| @@ -2045,7 +2041,7 @@ impl SyncManager { | |||
| 2045 | let mut subscription_ids = HashSet::new(); | 2041 | let mut subscription_ids = HashSet::new(); |
| 2046 | for (idx, filter) in ids_filters.iter().enumerate() { | 2042 | for (idx, filter) in ids_filters.iter().enumerate() { |
| 2047 | if let Some(conn) = self.connections.get(relay_url) { | 2043 | if let Some(conn) = self.connections.get(relay_url) { |
| 2048 | match conn.subscribe_filter(filter.clone()).await { | 2044 | match conn.subscribe_filter(filter.clone(), true).await { |
| 2049 | Ok(sub_id) => { | 2045 | Ok(sub_id) => { |
| 2050 | subscription_ids.insert(sub_id); | 2046 | subscription_ids.insert(sub_id); |
| 2051 | } | 2047 | } |
| @@ -2102,7 +2098,7 @@ impl SyncManager { | |||
| 2102 | ); | 2098 | ); |
| 2103 | 2099 | ||
| 2104 | if let Some(conn) = self.connections.get(relay_url) { | 2100 | if let Some(conn) = self.connections.get(relay_url) { |
| 2105 | match conn.subscribe_filter(filter.clone()).await { | 2101 | match conn.subscribe_filter(filter.clone(), true).await { |
| 2106 | Ok(sub_id) => { | 2102 | Ok(sub_id) => { |
| 2107 | subscription_ids.insert(sub_id); | 2103 | subscription_ids.insert(sub_id); |
| 2108 | } | 2104 | } |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index fa229c4..37094be 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -307,21 +307,33 @@ impl RelayConnection { | |||
| 307 | /// | 307 | /// |
| 308 | /// # Arguments | 308 | /// # Arguments |
| 309 | /// * `filter` - The filter to subscribe to | 309 | /// * `filter` - The filter to subscribe to |
| 310 | /// * `auto_close` - If true, subscription automatically closes after EOSE (for historic sync). If false, stays open for new events (for live sync). | ||
| 310 | /// | 311 | /// |
| 311 | /// # Returns | 312 | /// # Returns |
| 312 | /// * `Ok(SubscriptionId)` - The subscription ID on success | 313 | /// * `Ok(SubscriptionId)` - The subscription ID on success |
| 313 | /// * `Err(String)` - Error description on failure | 314 | /// * `Err(String)` - Error description on failure |
| 314 | pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> { | 315 | pub async fn subscribe_filter( |
| 316 | &self, | ||
| 317 | filter: Filter, | ||
| 318 | auto_close: bool, | ||
| 319 | ) -> Result<SubscriptionId, String> { | ||
| 315 | // DEBUG TRACING: Log the filter being subscribed to | 320 | // DEBUG TRACING: Log the filter being subscribed to |
| 316 | tracing::debug!( | 321 | tracing::debug!( |
| 317 | relay = %self.url, | 322 | relay = %self.url, |
| 318 | filter = ?filter, | 323 | filter = ?filter, |
| 324 | auto_close = auto_close, | ||
| 319 | "subscribe_filter called with filter" | 325 | "subscribe_filter called with filter" |
| 320 | ); | 326 | ); |
| 321 | 327 | ||
| 328 | let opts = if auto_close { | ||
| 329 | Some(SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE)) | ||
| 330 | } else { | ||
| 331 | None | ||
| 332 | }; | ||
| 333 | |||
| 322 | let output = self | 334 | let output = self |
| 323 | .client | 335 | .client |
| 324 | .subscribe(filter, None) | 336 | .subscribe(filter, opts) |
| 325 | .await | 337 | .await |
| 326 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; | 338 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; |
| 327 | 339 | ||
| @@ -334,37 +346,6 @@ impl RelayConnection { | |||
| 334 | Ok(output.val) | 346 | Ok(output.val) |
| 335 | } | 347 | } |
| 336 | 348 | ||
| 337 | /// Subscribe to multiple filters at once | ||
| 338 | /// | ||
| 339 | /// Each filter creates its own subscription. Returns when all subscriptions | ||
| 340 | /// are established. This is useful for Layer 2 + 3 filters together. | ||
| 341 | /// | ||
| 342 | /// # Arguments | ||
| 343 | /// * `filters` - Vec of filters to subscribe to | ||
| 344 | /// | ||
| 345 | /// # Returns | ||
| 346 | /// * `Ok(Vec<SubscriptionId>)` - The subscription IDs on success | ||
| 347 | /// * `Err(String)` - Error description on failure | ||
| 348 | pub async fn subscribe_filters( | ||
| 349 | &self, | ||
| 350 | filters: Vec<Filter>, | ||
| 351 | ) -> Result<Vec<SubscriptionId>, String> { | ||
| 352 | if filters.is_empty() { | ||
| 353 | return Ok(vec![]); | ||
| 354 | } | ||
| 355 | |||
| 356 | let mut sub_ids = Vec::with_capacity(filters.len()); | ||
| 357 | for filter in filters { | ||
| 358 | let output = self | ||
| 359 | .client | ||
| 360 | .subscribe(filter, None) | ||
| 361 | .await | ||
| 362 | .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; | ||
| 363 | sub_ids.push(output.val); | ||
| 364 | } | ||
| 365 | Ok(sub_ids) | ||
| 366 | } | ||
| 367 | |||
| 368 | /// Get the relay URL | 349 | /// Get the relay URL |
| 369 | pub fn url(&self) -> &str { | 350 | pub fn url(&self) -> &str { |
| 370 | &self.url | 351 | &self.url |