upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 13:56:25 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-19 13:56:25 +0000
commit565715adf14cafd0f0155d553f583581334a8dac (patch)
tree8f692254da5fd91a8944acf4d815602672dd0ef4 /src
parentf41bd47bf95dabfa3d0e6cb110e751e7cd43f138 (diff)
sync: fix autoclose on EOSE for historic filters
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs20
-rw-r--r--src/sync/relay_connection.rs47
2 files changed, 22 insertions, 45 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 3f8e503..88608b1 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1614,15 +1614,7 @@ impl SyncManager {
1614 let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); 1614 let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS));
1615 1615
1616 // Re-subscribe to Layer 1 with since filter 1616 // Re-subscribe to Layer 1 with since filter
1617 let layer1_filter = filters::build_announcement_filter(Some(since)); 1617 self.sync_generic_filters(relay_url, Some(since)).await;
1618 if let Err(e) = connection.subscribe_filter(layer1_filter).await {
1619 tracing::error!(
1620 relay = %relay_url,
1621 error = %e,
1622 "Failed to re-subscribe to Layer 1 during consolidation"
1623 );
1624 }
1625
1626 // Rebuild Layer 2 and Layer 3 with since filter 1618 // Rebuild Layer 2 and Layer 3 with since filter
1627 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; 1619 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
1628 1620
@@ -1817,7 +1809,11 @@ impl SyncManager {
1817 1809
1818 for filter in filters.iter() { 1810 for filter in filters.iter() {
1819 // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit) 1811 // Filters should already NOT have a limit set (live subscription = limit 1 instead of 0 as we dont know whether some relays would treat this as no limit)
1820 match connection.subscribe_filter(filter.clone().limit(1)).await { 1812 // Live subscriptions do NOT auto-close - we want them to stay open for new events
1813 match connection
1814 .subscribe_filter(filter.clone().limit(1), false)
1815 .await
1816 {
1821 Ok(sub_id) => { 1817 Ok(sub_id) => {
1822 sub_ids.push(sub_id); 1818 sub_ids.push(sub_id);
1823 } 1819 }
@@ -2045,7 +2041,7 @@ impl SyncManager {
2045 let mut subscription_ids = HashSet::new(); 2041 let mut subscription_ids = HashSet::new();
2046 for (idx, filter) in ids_filters.iter().enumerate() { 2042 for (idx, filter) in ids_filters.iter().enumerate() {
2047 if let Some(conn) = self.connections.get(relay_url) { 2043 if let Some(conn) = self.connections.get(relay_url) {
2048 match conn.subscribe_filter(filter.clone()).await { 2044 match conn.subscribe_filter(filter.clone(), true).await {
2049 Ok(sub_id) => { 2045 Ok(sub_id) => {
2050 subscription_ids.insert(sub_id); 2046 subscription_ids.insert(sub_id);
2051 } 2047 }
@@ -2102,7 +2098,7 @@ impl SyncManager {
2102 ); 2098 );
2103 2099
2104 if let Some(conn) = self.connections.get(relay_url) { 2100 if let Some(conn) = self.connections.get(relay_url) {
2105 match conn.subscribe_filter(filter.clone()).await { 2101 match conn.subscribe_filter(filter.clone(), true).await {
2106 Ok(sub_id) => { 2102 Ok(sub_id) => {
2107 subscription_ids.insert(sub_id); 2103 subscription_ids.insert(sub_id);
2108 } 2104 }
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index fa229c4..37094be 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -307,21 +307,33 @@ impl RelayConnection {
307 /// 307 ///
308 /// # Arguments 308 /// # Arguments
309 /// * `filter` - The filter to subscribe to 309 /// * `filter` - The filter to subscribe to
310 /// * `auto_close` - If true, subscription automatically closes after EOSE (for historic sync). If false, stays open for new events (for live sync).
310 /// 311 ///
311 /// # Returns 312 /// # Returns
312 /// * `Ok(SubscriptionId)` - The subscription ID on success 313 /// * `Ok(SubscriptionId)` - The subscription ID on success
313 /// * `Err(String)` - Error description on failure 314 /// * `Err(String)` - Error description on failure
314 pub async fn subscribe_filter(&self, filter: Filter) -> Result<SubscriptionId, String> { 315 pub async fn subscribe_filter(
316 &self,
317 filter: Filter,
318 auto_close: bool,
319 ) -> Result<SubscriptionId, String> {
315 // DEBUG TRACING: Log the filter being subscribed to 320 // DEBUG TRACING: Log the filter being subscribed to
316 tracing::debug!( 321 tracing::debug!(
317 relay = %self.url, 322 relay = %self.url,
318 filter = ?filter, 323 filter = ?filter,
324 auto_close = auto_close,
319 "subscribe_filter called with filter" 325 "subscribe_filter called with filter"
320 ); 326 );
321 327
328 let opts = if auto_close {
329 Some(SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE))
330 } else {
331 None
332 };
333
322 let output = self 334 let output = self
323 .client 335 .client
324 .subscribe(filter, None) 336 .subscribe(filter, opts)
325 .await 337 .await
326 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?; 338 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
327 339
@@ -334,37 +346,6 @@ impl RelayConnection {
334 Ok(output.val) 346 Ok(output.val)
335 } 347 }
336 348
337 /// Subscribe to multiple filters at once
338 ///
339 /// Each filter creates its own subscription. Returns when all subscriptions
340 /// are established. This is useful for Layer 2 + 3 filters together.
341 ///
342 /// # Arguments
343 /// * `filters` - Vec of filters to subscribe to
344 ///
345 /// # Returns
346 /// * `Ok(Vec<SubscriptionId>)` - The subscription IDs on success
347 /// * `Err(String)` - Error description on failure
348 pub async fn subscribe_filters(
349 &self,
350 filters: Vec<Filter>,
351 ) -> Result<Vec<SubscriptionId>, String> {
352 if filters.is_empty() {
353 return Ok(vec![]);
354 }
355
356 let mut sub_ids = Vec::with_capacity(filters.len());
357 for filter in filters {
358 let output = self
359 .client
360 .subscribe(filter, None)
361 .await
362 .map_err(|e| format!("Failed to subscribe on {}: {}", self.url, e))?;
363 sub_ids.push(output.val);
364 }
365 Ok(sub_ids)
366 }
367
368 /// Get the relay URL 349 /// Get the relay URL
369 pub fn url(&self) -> &str { 350 pub fn url(&self) -> &str {
370 &self.url 351 &self.url