upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/connection.rs')
-rw-r--r--src/sync/connection.rs132
1 files changed, 131 insertions, 1 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 319cbbd..cd7a603 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -15,6 +15,13 @@
15//! - Health tracking with success/failure reporting 15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays 16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry 17//! - Dead relay detection and minimal retry
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates when new announcements/PRs arrive
22//! - Per-connection subscription tracking
23//! - Filter consolidation when count exceeds threshold (>150)
24//! - Duplicate subscription prevention
18 25
19use std::sync::Arc; 26use std::sync::Arc;
20use std::time::Duration; 27use std::time::Duration;
@@ -24,6 +31,7 @@ use tokio::sync::mpsc;
24 31
25use super::filter::FilterService; 32use super::filter::FilterService;
26use super::health::RelayHealthTracker; 33use super::health::RelayHealthTracker;
34use super::subscription::SubscriptionManager;
27 35
28/// Event received from the sync connection 36/// Event received from the sync connection
29#[derive(Debug, Clone)] 37#[derive(Debug, Clone)]
@@ -38,6 +46,7 @@ pub struct SyncConnection {
38 client: Client, 46 client: Client,
39 filter_service: Arc<FilterService>, 47 filter_service: Arc<FilterService>,
40 remote_domain: String, 48 remote_domain: String,
49 subscription_manager: SubscriptionManager,
41} 50}
42 51
43impl SyncConnection { 52impl SyncConnection {
@@ -57,18 +66,26 @@ impl SyncConnection {
57 66
58 tracing::info!("Sync connection established to {}", url); 67 tracing::info!("Sync connection established to {}", url);
59 68
69 // Create subscription manager for this connection
70 let subscription_manager = SubscriptionManager::new(
71 filter_service.clone(),
72 remote_domain.to_string(),
73 );
74
60 Ok(Self { 75 Ok(Self {
61 url: url.to_string(), 76 url: url.to_string(),
62 client, 77 client,
63 filter_service, 78 filter_service,
64 remote_domain: remote_domain.to_string(), 79 remote_domain: remote_domain.to_string(),
80 subscription_manager,
65 }) 81 })
66 } 82 }
67 83
68 /// Start receiving events and send them through the channel 84 /// Start receiving events and send them through the channel
69 /// 85 ///
70 /// This method runs indefinitely, handling events from all three filter layers. 86 /// This method runs indefinitely, handling events from all three filter layers.
71 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { 87 /// Dynamic subscription updates are triggered when new announcements or PRs arrive.
88 pub async fn run(mut self, tx: mpsc::Sender<SyncedEvent>) {
72 // Subscribe to all three filter layers 89 // Subscribe to all three filter layers
73 90
74 // Layer 1: Announcement discovery (kinds 30617 + 30618) 91 // Layer 1: Announcement discovery (kinds 30617 + 30618)
@@ -174,6 +191,119 @@ impl SyncConnection {
174 .await 191 .await
175 .ok(); 192 .ok();
176 } 193 }
194
195 /// Handle dynamic subscription updates based on incoming event kind
196 ///
197 /// - kind 30617/30618: New announcement → add Layer 2 subscription
198 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
199 async fn handle_dynamic_subscription(&mut self, event: &Event) {
200 let kind = event.kind.as_u16();
201
202 // Check if this is an announcement kind (triggers Layer 2 subscription)
203 if SubscriptionManager::is_announcement_kind(kind) {
204 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
205 tracing::info!(
206 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
207 event.id.to_hex(),
208 self.url,
209 new_filters.len(),
210 self.subscription_manager.get_filter_count()
211 );
212 self.subscribe_to_filters(new_filters, "Layer 2").await;
213 }
214 }
215
216 // Check if this is a PR/Issue kind (triggers Layer 3 subscription)
217 if SubscriptionManager::is_pr_issue_kind(kind) {
218 if let Some(new_filters) = self.subscription_manager.add_event(event) {
219 tracing::info!(
220 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
221 event.id.to_hex(),
222 self.url,
223 new_filters.len(),
224 self.subscription_manager.get_filter_count()
225 );
226 self.subscribe_to_filters(new_filters, "Layer 3").await;
227 }
228 }
229
230 // Check if we need to consolidate
231 if self.subscription_manager.should_consolidate() {
232 self.consolidate_subscriptions().await;
233 }
234 }
235
236 /// Subscribe to new filters
237 async fn subscribe_to_filters(&self, filters: Vec<Filter>, layer_name: &str) {
238 for filter in filters {
239 match self.client.subscribe(filter, None).await {
240 Ok(output) => {
241 tracing::debug!(
242 "Dynamic {} subscription on {} (subscription: {})",
243 layer_name,
244 self.url,
245 output.id()
246 );
247 }
248 Err(e) => {
249 tracing::warn!(
250 "Failed to add dynamic {} subscription on {}: {}",
251 layer_name,
252 self.url,
253 e
254 );
255 }
256 }
257 }
258 }
259
260 /// Consolidate subscriptions back to Layer 1 only
261 ///
262 /// This is triggered when the filter count exceeds 150.
263 /// All existing subscriptions are closed and only Layer 1 is re-subscribed.
264 async fn consolidate_subscriptions(&mut self) {
265 tracing::warn!(
266 "Filter count {} exceeds threshold, consolidating subscriptions on {}",
267 self.subscription_manager.get_filter_count(),
268 self.url
269 );
270
271 // Get consolidated filters (clears tracking and returns Layer 1 only)
272 let layer1_filters = self.subscription_manager.consolidate();
273
274 // Note: nostr-sdk doesn't provide a way to close all subscriptions easily
275 // The client will manage subscription count internally
276 // We just add the new Layer 1 subscription
277
278 for filter in layer1_filters {
279 match self.client.subscribe(filter, None).await {
280 Ok(output) => {
281 tracing::info!(
282 "Consolidated to Layer 1 subscription on {} (subscription: {})",
283 self.url,
284 output.id()
285 );
286 }
287 Err(e) => {
288 tracing::error!(
289 "Failed to subscribe Layer 1 after consolidation on {}: {}",
290 self.url,
291 e
292 );
293 }
294 }
295 }
296 }
297
298 /// Get the current filter count from the subscription manager
299 pub fn get_filter_count(&self) -> usize {
300 self.subscription_manager.get_filter_count()
301 }
302
303 /// Check if subscriptions have been consolidated
304 pub fn is_consolidated(&self) -> bool {
305 self.subscription_manager.is_consolidated()
306 }
177} 307}
178 308
179/// Reconnect loop with health-aware exponential backoff 309/// Reconnect loop with health-aware exponential backoff