From a19ff57e72d9b82a722e14ae365da7f8c2d87e87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:15:19 +0000 Subject: feat(sync): Phase 4 - dynamic subscriptions - Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150 --- src/sync/subscription.rs | 278 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 src/sync/subscription.rs (limited to 'src/sync/subscription.rs') diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs new file mode 100644 index 0000000..c37404f --- /dev/null +++ b/src/sync/subscription.rs @@ -0,0 +1,278 @@ +//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions +//! +//! Manages dynamic subscription updates per connection, including: +//! - Tracking subscribed announcements and events +//! - Adding new subscriptions when announcements/PRs arrive +//! - Consolidating filters when count exceeds threshold +//! - Preventing duplicate subscriptions +//! +//! ## Dynamic Subscription Strategy +//! +//! Initial: Layer 1 (announcements) +//! ↓ (announcement received) +//! Add: Layer 2 (events for that repo) +//! ↓ (PR/Issue received) +//! Add: Layer 3 (events for that PR/Issue) +//! ↓ (filter count > 150) +//! Consolidate: Back to Layer 1 only + +use std::collections::HashSet; +use std::sync::Arc; + +use nostr_sdk::prelude::*; + +use super::filter::FilterService; + +/// Maximum number of filters before consolidation is triggered +const CONSOLIDATION_THRESHOLD: usize = 150; + +/// Kind 30617 - Repository Announcement (NIP-34) +const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617; + +/// Kind 30618 - Maintainer List (NIP-34) +const KIND_MAINTAINER_LIST: u16 = 30618; + +/// Manages subscriptions for a single relay connection +/// +/// Tracks which announcements and events have been subscribed to, +/// and handles dynamic subscription updates as new events arrive. +#[derive(Debug)] +pub struct SubscriptionManager { + /// Event IDs of announcements we've subscribed to (for Layer 2) + subscribed_announcements: HashSet, + /// Event IDs of PRs/Issues we've subscribed to (for Layer 3) + subscribed_events: HashSet, + /// Whether we've consolidated back to Layer 1 only + is_consolidated: bool, + /// FilterService for building filters + filter_service: Arc, + /// Remote relay domain for Layer 2 filters + remote_domain: String, +} + +impl SubscriptionManager { + /// Create a new SubscriptionManager + /// + /// # Arguments + /// * `filter_service` - FilterService for building subscription filters + /// * `remote_domain` - The domain of the remote relay we're syncing from + pub fn new(filter_service: Arc, remote_domain: String) -> Self { + Self { + subscribed_announcements: HashSet::new(), + subscribed_events: HashSet::new(), + is_consolidated: false, + filter_service, + remote_domain, + } + } + + /// Add an announcement and return new filters to subscribe to + /// + /// When a new announcement (kind 30617/30618) arrives, this creates + /// Layer 2 filters to subscribe to events for that repository. + /// + /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed. + pub fn add_announcement(&mut self, event: &Event) -> Option> { + let event_id = event.id.to_hex(); + + // Check if already subscribed or consolidated + if self.is_consolidated || self.subscribed_announcements.contains(&event_id) { + return None; + } + + // Add to tracked announcements + self.subscribed_announcements.insert(event_id); + + // Build Layer 2 filters for this announcement + // Layer 2 filters target events with 'a' tags pointing to this repo + let filters = self.build_layer2_filter_for_announcement(event); + + if filters.is_empty() { + None + } else { + Some(filters) + } + } + + /// Add a PR/Issue/Patch event and return new filters to subscribe to + /// + /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives, + /// this creates Layer 3 filters to subscribe to related events. + /// + /// Returns `Some(filters)` if this is a new event, `None` if already subscribed. + pub fn add_event(&mut self, event: &Event) -> Option> { + let event_id = event.id.to_hex(); + + // Check if already subscribed or consolidated + if self.is_consolidated || self.subscribed_events.contains(&event_id) { + return None; + } + + // Add to tracked events + self.subscribed_events.insert(event_id.clone()); + + // Build Layer 3 filter for this event + // Layer 3 filters target events with 'e' tags pointing to this event + let filter = Filter::new().custom_tag( + SingleLetterTag::lowercase(Alphabet::E), + event_id, + ); + + Some(vec![filter]) + } + + /// Check if consolidation is needed + /// + /// Returns true if the total filter count exceeds the threshold (150). + pub fn should_consolidate(&self) -> bool { + !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD + } + + /// Consolidate all subscriptions back to Layer 1 only + /// + /// Clears all tracked announcements and events, marks as consolidated, + /// and returns the Layer 1 filters to re-subscribe to. + pub fn consolidate(&mut self) -> Vec { + tracing::info!( + "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only", + self.subscribed_announcements.len(), + self.subscribed_events.len() + ); + + // Clear tracked subscriptions + self.subscribed_announcements.clear(); + self.subscribed_events.clear(); + self.is_consolidated = true; + + // Return Layer 1 filters + self.filter_service.get_layer1_filters() + } + + /// Get the total count of active filters + /// + /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3), + /// plus the base Layer 1 filter count. + pub fn get_filter_count(&self) -> usize { + if self.is_consolidated { + // When consolidated, we only have Layer 1 filters + 1 + } else { + // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events) + 1 + self.subscribed_announcements.len() + self.subscribed_events.len() + } + } + + /// Check if an announcement has been subscribed to + pub fn has_announcement(&self, event_id: &str) -> bool { + self.subscribed_announcements.contains(event_id) + } + + /// Check if an event has been subscribed to + pub fn has_event(&self, event_id: &str) -> bool { + self.subscribed_events.contains(event_id) + } + + /// Check if subscriptions have been consolidated + pub fn is_consolidated(&self) -> bool { + self.is_consolidated + } + + /// Get the count of subscribed announcements + pub fn announcement_count(&self) -> usize { + self.subscribed_announcements.len() + } + + /// Get the count of subscribed events + pub fn event_count(&self) -> usize { + self.subscribed_events.len() + } + + /// Build Layer 2 filter for a specific announcement event + /// + /// Creates a filter with an 'a' tag pointing to the announcement's coordinates. + fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec { + // Extract the d tag (identifier) from the event + let identifier = event.tags.iter().find_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].clone()) + } else { + None + } + }); + + let identifier = match identifier { + Some(id) => id, + None => { + tracing::warn!( + "Announcement {} has no 'd' tag, cannot build Layer 2 filter", + event.id.to_hex() + ); + return Vec::new(); + } + }; + + // Determine the kind for the coordinate + let kind = event.kind.as_u16(); + if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST { + tracing::warn!( + "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", + event.id.to_hex(), + kind + ); + return Vec::new(); + } + + // Build the addressable coordinate: kind:pubkey:identifier + let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier); + + // Create filter with 'a' tag for this coordinate + let filter = Filter::new().custom_tag( + SingleLetterTag::lowercase(Alphabet::A), + coord, + ); + + vec![filter] + } + + /// Check if an event kind is an announcement kind + pub fn is_announcement_kind(kind: u16) -> bool { + kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST + } + + /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3 + pub fn is_pr_issue_kind(kind: u16) -> bool { + matches!( + kind, + 1617 | // Patch proposal (NIP-34) + 1618 | // PR + 1619 | // PR Update + 1621 | // Issue + 1622 // Reply + ) + } +} + +#[cfg(test)] +mod tests { + use super::SubscriptionManager; + + #[test] + fn test_is_announcement_kind() { + assert!(SubscriptionManager::is_announcement_kind(30617)); + assert!(SubscriptionManager::is_announcement_kind(30618)); + assert!(!SubscriptionManager::is_announcement_kind(1)); + assert!(!SubscriptionManager::is_announcement_kind(1617)); + } + + #[test] + fn test_is_pr_issue_kind() { + assert!(SubscriptionManager::is_pr_issue_kind(1617)); + assert!(SubscriptionManager::is_pr_issue_kind(1618)); + assert!(SubscriptionManager::is_pr_issue_kind(1619)); + assert!(SubscriptionManager::is_pr_issue_kind(1621)); + assert!(SubscriptionManager::is_pr_issue_kind(1622)); + assert!(!SubscriptionManager::is_pr_issue_kind(30617)); + assert!(!SubscriptionManager::is_pr_issue_kind(1)); + } +} \ No newline at end of file -- cgit v1.2.3