From a19ff57e72d9b82a722e14ae365da7f8c2d87e87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:15:19 +0000 Subject: feat(sync): Phase 4 - dynamic subscriptions - Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150 --- src/sync/connection.rs | 132 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) (limited to 'src/sync/connection.rs') diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 319cbbd..cd7a603 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -15,6 +15,13 @@ //! - Health tracking with success/failure reporting //! - Exponential backoff with health-aware delays //! - Dead relay detection and minimal retry +//! +//! ## Phase 4 Features +//! +//! - Dynamic subscription updates when new announcements/PRs arrive +//! - Per-connection subscription tracking +//! - Filter consolidation when count exceeds threshold (>150) +//! - Duplicate subscription prevention use std::sync::Arc; use std::time::Duration; @@ -24,6 +31,7 @@ use tokio::sync::mpsc; use super::filter::FilterService; use super::health::RelayHealthTracker; +use super::subscription::SubscriptionManager; /// Event received from the sync connection #[derive(Debug, Clone)] @@ -38,6 +46,7 @@ pub struct SyncConnection { client: Client, filter_service: Arc, remote_domain: String, + subscription_manager: SubscriptionManager, } impl SyncConnection { @@ -57,18 +66,26 @@ impl SyncConnection { tracing::info!("Sync connection established to {}", url); + // Create subscription manager for this connection + let subscription_manager = SubscriptionManager::new( + filter_service.clone(), + remote_domain.to_string(), + ); + Ok(Self { url: url.to_string(), client, filter_service, remote_domain: remote_domain.to_string(), + subscription_manager, }) } /// Start receiving events and send them through the channel /// /// This method runs indefinitely, handling events from all three filter layers. - pub async fn run(self, tx: mpsc::Sender) { + /// Dynamic subscription updates are triggered when new announcements or PRs arrive. + pub async fn run(mut self, tx: mpsc::Sender) { // Subscribe to all three filter layers // Layer 1: Announcement discovery (kinds 30617 + 30618) @@ -174,6 +191,119 @@ impl SyncConnection { .await .ok(); } + + /// Handle dynamic subscription updates based on incoming event kind + /// + /// - kind 30617/30618: New announcement → add Layer 2 subscription + /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription + async fn handle_dynamic_subscription(&mut self, event: &Event) { + let kind = event.kind.as_u16(); + + // Check if this is an announcement kind (triggers Layer 2 subscription) + if SubscriptionManager::is_announcement_kind(kind) { + if let Some(new_filters) = self.subscription_manager.add_announcement(event) { + tracing::info!( + "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", + event.id.to_hex(), + self.url, + new_filters.len(), + self.subscription_manager.get_filter_count() + ); + self.subscribe_to_filters(new_filters, "Layer 2").await; + } + } + + // Check if this is a PR/Issue kind (triggers Layer 3 subscription) + if SubscriptionManager::is_pr_issue_kind(kind) { + if let Some(new_filters) = self.subscription_manager.add_event(event) { + tracing::info!( + "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", + event.id.to_hex(), + self.url, + new_filters.len(), + self.subscription_manager.get_filter_count() + ); + self.subscribe_to_filters(new_filters, "Layer 3").await; + } + } + + // Check if we need to consolidate + if self.subscription_manager.should_consolidate() { + self.consolidate_subscriptions().await; + } + } + + /// Subscribe to new filters + async fn subscribe_to_filters(&self, filters: Vec, layer_name: &str) { + for filter in filters { + match self.client.subscribe(filter, None).await { + Ok(output) => { + tracing::debug!( + "Dynamic {} subscription on {} (subscription: {})", + layer_name, + self.url, + output.id() + ); + } + Err(e) => { + tracing::warn!( + "Failed to add dynamic {} subscription on {}: {}", + layer_name, + self.url, + e + ); + } + } + } + } + + /// Consolidate subscriptions back to Layer 1 only + /// + /// This is triggered when the filter count exceeds 150. + /// All existing subscriptions are closed and only Layer 1 is re-subscribed. + async fn consolidate_subscriptions(&mut self) { + tracing::warn!( + "Filter count {} exceeds threshold, consolidating subscriptions on {}", + self.subscription_manager.get_filter_count(), + self.url + ); + + // Get consolidated filters (clears tracking and returns Layer 1 only) + let layer1_filters = self.subscription_manager.consolidate(); + + // Note: nostr-sdk doesn't provide a way to close all subscriptions easily + // The client will manage subscription count internally + // We just add the new Layer 1 subscription + + for filter in layer1_filters { + match self.client.subscribe(filter, None).await { + Ok(output) => { + tracing::info!( + "Consolidated to Layer 1 subscription on {} (subscription: {})", + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!( + "Failed to subscribe Layer 1 after consolidation on {}: {}", + self.url, + e + ); + } + } + } + } + + /// Get the current filter count from the subscription manager + pub fn get_filter_count(&self) -> usize { + self.subscription_manager.get_filter_count() + } + + /// Check if subscriptions have been consolidated + pub fn is_consolidated(&self) -> bool { + self.subscription_manager.is_consolidated() + } } /// Reconnect loop with health-aware exponential backoff -- cgit v1.2.3