From a19ff57e72d9b82a722e14ae365da7f8c2d87e87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:15:19 +0000 Subject: feat(sync): Phase 4 - dynamic subscriptions - Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150 --- src/sync/connection.rs | 132 +++++++++++++++++++++- src/sync/filter.rs | 1 + src/sync/manager.rs | 31 +++++- src/sync/mod.rs | 2 + src/sync/subscription.rs | 278 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 442 insertions(+), 2 deletions(-) create mode 100644 src/sync/subscription.rs (limited to 'src') diff --git a/src/sync/connection.rs b/src/sync/connection.rs index 319cbbd..cd7a603 100644 --- a/src/sync/connection.rs +++ b/src/sync/connection.rs @@ -15,6 +15,13 @@ //! - Health tracking with success/failure reporting //! - Exponential backoff with health-aware delays //! - Dead relay detection and minimal retry +//! +//! ## Phase 4 Features +//! +//! - Dynamic subscription updates when new announcements/PRs arrive +//! - Per-connection subscription tracking +//! - Filter consolidation when count exceeds threshold (>150) +//! - Duplicate subscription prevention use std::sync::Arc; use std::time::Duration; @@ -24,6 +31,7 @@ use tokio::sync::mpsc; use super::filter::FilterService; use super::health::RelayHealthTracker; +use super::subscription::SubscriptionManager; /// Event received from the sync connection #[derive(Debug, Clone)] @@ -38,6 +46,7 @@ pub struct SyncConnection { client: Client, filter_service: Arc, remote_domain: String, + subscription_manager: SubscriptionManager, } impl SyncConnection { @@ -57,18 +66,26 @@ impl SyncConnection { tracing::info!("Sync connection established to {}", url); + // Create subscription manager for this connection + let subscription_manager = SubscriptionManager::new( + filter_service.clone(), + remote_domain.to_string(), + ); + Ok(Self { url: url.to_string(), client, filter_service, remote_domain: remote_domain.to_string(), + subscription_manager, }) } /// Start receiving events and send them through the channel /// /// This method runs indefinitely, handling events from all three filter layers. - pub async fn run(self, tx: mpsc::Sender) { + /// Dynamic subscription updates are triggered when new announcements or PRs arrive. + pub async fn run(mut self, tx: mpsc::Sender) { // Subscribe to all three filter layers // Layer 1: Announcement discovery (kinds 30617 + 30618) @@ -174,6 +191,119 @@ impl SyncConnection { .await .ok(); } + + /// Handle dynamic subscription updates based on incoming event kind + /// + /// - kind 30617/30618: New announcement → add Layer 2 subscription + /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription + async fn handle_dynamic_subscription(&mut self, event: &Event) { + let kind = event.kind.as_u16(); + + // Check if this is an announcement kind (triggers Layer 2 subscription) + if SubscriptionManager::is_announcement_kind(kind) { + if let Some(new_filters) = self.subscription_manager.add_announcement(event) { + tracing::info!( + "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})", + event.id.to_hex(), + self.url, + new_filters.len(), + self.subscription_manager.get_filter_count() + ); + self.subscribe_to_filters(new_filters, "Layer 2").await; + } + } + + // Check if this is a PR/Issue kind (triggers Layer 3 subscription) + if SubscriptionManager::is_pr_issue_kind(kind) { + if let Some(new_filters) = self.subscription_manager.add_event(event) { + tracing::info!( + "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})", + event.id.to_hex(), + self.url, + new_filters.len(), + self.subscription_manager.get_filter_count() + ); + self.subscribe_to_filters(new_filters, "Layer 3").await; + } + } + + // Check if we need to consolidate + if self.subscription_manager.should_consolidate() { + self.consolidate_subscriptions().await; + } + } + + /// Subscribe to new filters + async fn subscribe_to_filters(&self, filters: Vec, layer_name: &str) { + for filter in filters { + match self.client.subscribe(filter, None).await { + Ok(output) => { + tracing::debug!( + "Dynamic {} subscription on {} (subscription: {})", + layer_name, + self.url, + output.id() + ); + } + Err(e) => { + tracing::warn!( + "Failed to add dynamic {} subscription on {}: {}", + layer_name, + self.url, + e + ); + } + } + } + } + + /// Consolidate subscriptions back to Layer 1 only + /// + /// This is triggered when the filter count exceeds 150. + /// All existing subscriptions are closed and only Layer 1 is re-subscribed. + async fn consolidate_subscriptions(&mut self) { + tracing::warn!( + "Filter count {} exceeds threshold, consolidating subscriptions on {}", + self.subscription_manager.get_filter_count(), + self.url + ); + + // Get consolidated filters (clears tracking and returns Layer 1 only) + let layer1_filters = self.subscription_manager.consolidate(); + + // Note: nostr-sdk doesn't provide a way to close all subscriptions easily + // The client will manage subscription count internally + // We just add the new Layer 1 subscription + + for filter in layer1_filters { + match self.client.subscribe(filter, None).await { + Ok(output) => { + tracing::info!( + "Consolidated to Layer 1 subscription on {} (subscription: {})", + self.url, + output.id() + ); + } + Err(e) => { + tracing::error!( + "Failed to subscribe Layer 1 after consolidation on {}: {}", + self.url, + e + ); + } + } + } + } + + /// Get the current filter count from the subscription manager + pub fn get_filter_count(&self) -> usize { + self.subscription_manager.get_filter_count() + } + + /// Check if subscriptions have been consolidated + pub fn is_consolidated(&self) -> bool { + self.subscription_manager.is_consolidated() + } } /// Reconnect loop with health-aware exponential backoff diff --git a/src/sync/filter.rs b/src/sync/filter.rs index 7168f72..56c531f 100644 --- a/src/sync/filter.rs +++ b/src/sync/filter.rs @@ -24,6 +24,7 @@ const KIND_MAINTAINER_LIST: u16 = 30618; /// 1. Layer 1: Discover new repository announcements and maintainer metadata /// 2. Layer 2: Sync events directly related to repositories we track /// 3. Layer 3: Sync discussions and updates related to Layer 2 events +#[derive(Debug)] pub struct FilterService { database: SharedDatabase, /// Our relay's domain for filtering diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 1f70f42..f594454 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -15,6 +15,14 @@ //! - Health tracking with exponential backoff //! - Dead relay detection after 24h of failures //! - Startup jitter to prevent thundering herd +//! +//! ## Phase 4 Features +//! +//! - Dynamic subscription updates handled per-connection +//! - Each connection manages its own SubscriptionManager +//! - Announcements trigger Layer 2 subscriptions +//! - PRs/Issues trigger Layer 3 subscriptions +//! - Consolidation when filter count exceeds 150 use std::collections::HashSet; use std::sync::Arc; @@ -225,17 +233,38 @@ impl SyncManager { } /// Process a single synced event + /// + /// Events are validated through the write policy and stored if accepted. + /// Dynamic subscription updates are handled by each connection's SubscriptionManager. async fn process_event(&self, synced_event: SyncedEvent) { let event = &synced_event.event; let event_id = event.id.to_hex(); + let kind = event.kind.as_u16(); tracing::debug!( "Processing synced event {} (kind {}) from {}", event_id, - event.kind.as_u16(), + kind, synced_event.source_url ); + // Log subscription-relevant events for debugging + match kind { + 30617 | 30618 => { + tracing::debug!( + "Received announcement {} - connection will add Layer 2 subscription", + event_id + ); + } + 1617 | 1618 | 1619 | 1621 | 1622 => { + tracing::debug!( + "Received PR/Issue {} - connection will add Layer 3 subscription", + event_id + ); + } + _ => {} + } + // Validate through write policy using SYNC_SOURCE_ADDR let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 653aa27..4dca160 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -21,10 +21,12 @@ mod connection; mod filter; pub mod health; mod manager; +mod subscription; pub use filter::FilterService; pub use health::{HealthState, RelayHealth, RelayHealthTracker}; pub use manager::SyncManager; +pub use subscription::SubscriptionManager; use std::net::SocketAddr; diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs new file mode 100644 index 0000000..c37404f --- /dev/null +++ b/src/sync/subscription.rs @@ -0,0 +1,278 @@ +//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions +//! +//! Manages dynamic subscription updates per connection, including: +//! - Tracking subscribed announcements and events +//! - Adding new subscriptions when announcements/PRs arrive +//! - Consolidating filters when count exceeds threshold +//! - Preventing duplicate subscriptions +//! +//! ## Dynamic Subscription Strategy +//! +//! Initial: Layer 1 (announcements) +//! ↓ (announcement received) +//! Add: Layer 2 (events for that repo) +//! ↓ (PR/Issue received) +//! Add: Layer 3 (events for that PR/Issue) +//! ↓ (filter count > 150) +//! Consolidate: Back to Layer 1 only + +use std::collections::HashSet; +use std::sync::Arc; + +use nostr_sdk::prelude::*; + +use super::filter::FilterService; + +/// Maximum number of filters before consolidation is triggered +const CONSOLIDATION_THRESHOLD: usize = 150; + +/// Kind 30617 - Repository Announcement (NIP-34) +const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617; + +/// Kind 30618 - Maintainer List (NIP-34) +const KIND_MAINTAINER_LIST: u16 = 30618; + +/// Manages subscriptions for a single relay connection +/// +/// Tracks which announcements and events have been subscribed to, +/// and handles dynamic subscription updates as new events arrive. +#[derive(Debug)] +pub struct SubscriptionManager { + /// Event IDs of announcements we've subscribed to (for Layer 2) + subscribed_announcements: HashSet, + /// Event IDs of PRs/Issues we've subscribed to (for Layer 3) + subscribed_events: HashSet, + /// Whether we've consolidated back to Layer 1 only + is_consolidated: bool, + /// FilterService for building filters + filter_service: Arc, + /// Remote relay domain for Layer 2 filters + remote_domain: String, +} + +impl SubscriptionManager { + /// Create a new SubscriptionManager + /// + /// # Arguments + /// * `filter_service` - FilterService for building subscription filters + /// * `remote_domain` - The domain of the remote relay we're syncing from + pub fn new(filter_service: Arc, remote_domain: String) -> Self { + Self { + subscribed_announcements: HashSet::new(), + subscribed_events: HashSet::new(), + is_consolidated: false, + filter_service, + remote_domain, + } + } + + /// Add an announcement and return new filters to subscribe to + /// + /// When a new announcement (kind 30617/30618) arrives, this creates + /// Layer 2 filters to subscribe to events for that repository. + /// + /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed. + pub fn add_announcement(&mut self, event: &Event) -> Option> { + let event_id = event.id.to_hex(); + + // Check if already subscribed or consolidated + if self.is_consolidated || self.subscribed_announcements.contains(&event_id) { + return None; + } + + // Add to tracked announcements + self.subscribed_announcements.insert(event_id); + + // Build Layer 2 filters for this announcement + // Layer 2 filters target events with 'a' tags pointing to this repo + let filters = self.build_layer2_filter_for_announcement(event); + + if filters.is_empty() { + None + } else { + Some(filters) + } + } + + /// Add a PR/Issue/Patch event and return new filters to subscribe to + /// + /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives, + /// this creates Layer 3 filters to subscribe to related events. + /// + /// Returns `Some(filters)` if this is a new event, `None` if already subscribed. + pub fn add_event(&mut self, event: &Event) -> Option> { + let event_id = event.id.to_hex(); + + // Check if already subscribed or consolidated + if self.is_consolidated || self.subscribed_events.contains(&event_id) { + return None; + } + + // Add to tracked events + self.subscribed_events.insert(event_id.clone()); + + // Build Layer 3 filter for this event + // Layer 3 filters target events with 'e' tags pointing to this event + let filter = Filter::new().custom_tag( + SingleLetterTag::lowercase(Alphabet::E), + event_id, + ); + + Some(vec![filter]) + } + + /// Check if consolidation is needed + /// + /// Returns true if the total filter count exceeds the threshold (150). + pub fn should_consolidate(&self) -> bool { + !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD + } + + /// Consolidate all subscriptions back to Layer 1 only + /// + /// Clears all tracked announcements and events, marks as consolidated, + /// and returns the Layer 1 filters to re-subscribe to. + pub fn consolidate(&mut self) -> Vec { + tracing::info!( + "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only", + self.subscribed_announcements.len(), + self.subscribed_events.len() + ); + + // Clear tracked subscriptions + self.subscribed_announcements.clear(); + self.subscribed_events.clear(); + self.is_consolidated = true; + + // Return Layer 1 filters + self.filter_service.get_layer1_filters() + } + + /// Get the total count of active filters + /// + /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3), + /// plus the base Layer 1 filter count. + pub fn get_filter_count(&self) -> usize { + if self.is_consolidated { + // When consolidated, we only have Layer 1 filters + 1 + } else { + // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events) + 1 + self.subscribed_announcements.len() + self.subscribed_events.len() + } + } + + /// Check if an announcement has been subscribed to + pub fn has_announcement(&self, event_id: &str) -> bool { + self.subscribed_announcements.contains(event_id) + } + + /// Check if an event has been subscribed to + pub fn has_event(&self, event_id: &str) -> bool { + self.subscribed_events.contains(event_id) + } + + /// Check if subscriptions have been consolidated + pub fn is_consolidated(&self) -> bool { + self.is_consolidated + } + + /// Get the count of subscribed announcements + pub fn announcement_count(&self) -> usize { + self.subscribed_announcements.len() + } + + /// Get the count of subscribed events + pub fn event_count(&self) -> usize { + self.subscribed_events.len() + } + + /// Build Layer 2 filter for a specific announcement event + /// + /// Creates a filter with an 'a' tag pointing to the announcement's coordinates. + fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec { + // Extract the d tag (identifier) from the event + let identifier = event.tags.iter().find_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].clone()) + } else { + None + } + }); + + let identifier = match identifier { + Some(id) => id, + None => { + tracing::warn!( + "Announcement {} has no 'd' tag, cannot build Layer 2 filter", + event.id.to_hex() + ); + return Vec::new(); + } + }; + + // Determine the kind for the coordinate + let kind = event.kind.as_u16(); + if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST { + tracing::warn!( + "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", + event.id.to_hex(), + kind + ); + return Vec::new(); + } + + // Build the addressable coordinate: kind:pubkey:identifier + let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier); + + // Create filter with 'a' tag for this coordinate + let filter = Filter::new().custom_tag( + SingleLetterTag::lowercase(Alphabet::A), + coord, + ); + + vec![filter] + } + + /// Check if an event kind is an announcement kind + pub fn is_announcement_kind(kind: u16) -> bool { + kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST + } + + /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3 + pub fn is_pr_issue_kind(kind: u16) -> bool { + matches!( + kind, + 1617 | // Patch proposal (NIP-34) + 1618 | // PR + 1619 | // PR Update + 1621 | // Issue + 1622 // Reply + ) + } +} + +#[cfg(test)] +mod tests { + use super::SubscriptionManager; + + #[test] + fn test_is_announcement_kind() { + assert!(SubscriptionManager::is_announcement_kind(30617)); + assert!(SubscriptionManager::is_announcement_kind(30618)); + assert!(!SubscriptionManager::is_announcement_kind(1)); + assert!(!SubscriptionManager::is_announcement_kind(1617)); + } + + #[test] + fn test_is_pr_issue_kind() { + assert!(SubscriptionManager::is_pr_issue_kind(1617)); + assert!(SubscriptionManager::is_pr_issue_kind(1618)); + assert!(SubscriptionManager::is_pr_issue_kind(1619)); + assert!(SubscriptionManager::is_pr_issue_kind(1621)); + assert!(SubscriptionManager::is_pr_issue_kind(1622)); + assert!(!SubscriptionManager::is_pr_issue_kind(30617)); + assert!(!SubscriptionManager::is_pr_issue_kind(1)); + } +} \ No newline at end of file -- cgit v1.2.3