diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-09 09:28:12 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-09 09:28:18 +0000 |
| commit | efaad1e2857914b87307cf78903a957a604697a8 (patch) | |
| tree | dadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/subscription.rs | |
| parent | 91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff) | |
basic sync stub
Diffstat (limited to 'src/sync/subscription.rs')
| -rw-r--r-- | src/sync/subscription.rs | 229 |
1 files changed, 0 insertions, 229 deletions
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs deleted file mode 100644 index bbeaa2a..0000000 --- a/src/sync/subscription.rs +++ /dev/null | |||
| @@ -1,229 +0,0 @@ | |||
| 1 | //! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions | ||
| 2 | //! | ||
| 3 | //! Manages dynamic subscription updates per connection, including: | ||
| 4 | //! - Tracking subscribed announcements and events | ||
| 5 | //! - Adding new subscriptions when announcements/PRs arrive | ||
| 6 | //! - Consolidating filters when count exceeds threshold | ||
| 7 | //! - Preventing duplicate subscriptions | ||
| 8 | //! | ||
| 9 | //! ## Dynamic Subscription Strategy | ||
| 10 | //! | ||
| 11 | //! Initial: Layer 1 (announcements) | ||
| 12 | //! ↓ (announcement received) | ||
| 13 | //! Add: Layer 2 (events for that repo) | ||
| 14 | //! ↓ (PR/Issue received) | ||
| 15 | //! Add: Layer 3 (events for that PR/Issue) | ||
| 16 | //! ↓ (filter count > 150) | ||
| 17 | //! Consolidate: Back to Layer 1 only | ||
| 18 | |||
| 19 | use std::collections::HashSet; | ||
| 20 | use std::sync::Arc; | ||
| 21 | |||
| 22 | use nostr_sdk::prelude::*; | ||
| 23 | |||
| 24 | use super::filter::FilterService; | ||
| 25 | |||
| 26 | /// Maximum number of filters before consolidation is triggered | ||
| 27 | const CONSOLIDATION_THRESHOLD: usize = 150; | ||
| 28 | |||
| 29 | /// Manages subscriptions for a single relay connection | ||
| 30 | /// | ||
| 31 | /// Tracks which announcements and events have been subscribed to, | ||
| 32 | /// and handles dynamic subscription updates as new events arrive. | ||
| 33 | #[derive(Debug)] | ||
| 34 | pub struct SubscriptionManager { | ||
| 35 | /// Event IDs of announcements we've subscribed to (for Layer 2) | ||
| 36 | subscribed_announcements: HashSet<String>, | ||
| 37 | /// Event IDs of PRs/Issues we've subscribed to (for Layer 3) | ||
| 38 | subscribed_events: HashSet<String>, | ||
| 39 | /// Whether we've consolidated back to Layer 1 only | ||
| 40 | is_consolidated: bool, | ||
| 41 | /// FilterService for building filters | ||
| 42 | filter_service: Arc<FilterService>, | ||
| 43 | /// Remote relay domain for Layer 2 filters | ||
| 44 | remote_domain: String, | ||
| 45 | } | ||
| 46 | |||
| 47 | impl SubscriptionManager { | ||
| 48 | /// Create a new SubscriptionManager | ||
| 49 | /// | ||
| 50 | /// # Arguments | ||
| 51 | /// * `filter_service` - FilterService for building subscription filters | ||
| 52 | /// * `remote_domain` - The domain of the remote relay we're syncing from | ||
| 53 | pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self { | ||
| 54 | Self { | ||
| 55 | subscribed_announcements: HashSet::new(), | ||
| 56 | subscribed_events: HashSet::new(), | ||
| 57 | is_consolidated: false, | ||
| 58 | filter_service, | ||
| 59 | remote_domain, | ||
| 60 | } | ||
| 61 | } | ||
| 62 | |||
| 63 | /// Add an announcement and return new filters to subscribe to | ||
| 64 | /// | ||
| 65 | /// When a new announcement (kind 30617/30618) arrives, this creates | ||
| 66 | /// Layer 2 filters to subscribe to events for that repository. | ||
| 67 | /// | ||
| 68 | /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed. | ||
| 69 | pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> { | ||
| 70 | let event_id = event.id.to_hex(); | ||
| 71 | |||
| 72 | // Check if already subscribed or consolidated | ||
| 73 | if self.is_consolidated || self.subscribed_announcements.contains(&event_id) { | ||
| 74 | return None; | ||
| 75 | } | ||
| 76 | |||
| 77 | // Add to tracked announcements | ||
| 78 | self.subscribed_announcements.insert(event_id); | ||
| 79 | |||
| 80 | // Build Layer 2 filters for this announcement | ||
| 81 | // Layer 2 filters target events with 'a' tags pointing to this repo | ||
| 82 | let filters = self.build_layer2_filter_for_announcement(event); | ||
| 83 | |||
| 84 | if filters.is_empty() { | ||
| 85 | None | ||
| 86 | } else { | ||
| 87 | Some(filters) | ||
| 88 | } | ||
| 89 | } | ||
| 90 | |||
| 91 | /// Add a PR/Issue/Patch event and return new filters to subscribe to | ||
| 92 | /// | ||
| 93 | /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives, | ||
| 94 | /// this creates Layer 3 filters to subscribe to related events. | ||
| 95 | /// | ||
| 96 | /// Returns `Some(filters)` if this is a new event, `None` if already subscribed. | ||
| 97 | pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> { | ||
| 98 | let event_id = event.id.to_hex(); | ||
| 99 | |||
| 100 | // Check if already subscribed or consolidated | ||
| 101 | if self.is_consolidated || self.subscribed_events.contains(&event_id) { | ||
| 102 | return None; | ||
| 103 | } | ||
| 104 | |||
| 105 | // Add to tracked events | ||
| 106 | self.subscribed_events.insert(event_id.clone()); | ||
| 107 | |||
| 108 | // Build Layer 3 filter for this event | ||
| 109 | // Layer 3 filters target events with 'e' tags pointing to this event | ||
| 110 | let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id); | ||
| 111 | |||
| 112 | Some(vec![filter]) | ||
| 113 | } | ||
| 114 | |||
| 115 | /// Check if consolidation is needed | ||
| 116 | /// | ||
| 117 | /// Returns true if the total filter count exceeds the threshold (150). | ||
| 118 | pub fn should_consolidate(&self) -> bool { | ||
| 119 | !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD | ||
| 120 | } | ||
| 121 | |||
| 122 | /// Consolidate all subscriptions back to Layer 1 only | ||
| 123 | /// | ||
| 124 | /// Clears all tracked announcements and events, marks as consolidated, | ||
| 125 | /// and returns the Layer 1 filters to re-subscribe to. | ||
| 126 | pub fn consolidate(&mut self) -> Vec<Filter> { | ||
| 127 | tracing::info!( | ||
| 128 | "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only", | ||
| 129 | self.subscribed_announcements.len(), | ||
| 130 | self.subscribed_events.len() | ||
| 131 | ); | ||
| 132 | |||
| 133 | // Clear tracked subscriptions | ||
| 134 | self.subscribed_announcements.clear(); | ||
| 135 | self.subscribed_events.clear(); | ||
| 136 | self.is_consolidated = true; | ||
| 137 | |||
| 138 | // Return Layer 1 filters | ||
| 139 | self.filter_service.get_layer1_filters() | ||
| 140 | } | ||
| 141 | |||
| 142 | /// Get the total count of active filters | ||
| 143 | /// | ||
| 144 | /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3), | ||
| 145 | /// plus the base Layer 1 filter count. | ||
| 146 | pub fn get_filter_count(&self) -> usize { | ||
| 147 | if self.is_consolidated { | ||
| 148 | // When consolidated, we only have Layer 1 filters | ||
| 149 | 1 | ||
| 150 | } else { | ||
| 151 | // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events) | ||
| 152 | 1 + self.subscribed_announcements.len() + self.subscribed_events.len() | ||
| 153 | } | ||
| 154 | } | ||
| 155 | |||
| 156 | /// Check if an announcement has been subscribed to | ||
| 157 | pub fn has_announcement(&self, event_id: &str) -> bool { | ||
| 158 | self.subscribed_announcements.contains(event_id) | ||
| 159 | } | ||
| 160 | |||
| 161 | /// Check if an event has been subscribed to | ||
| 162 | pub fn has_event(&self, event_id: &str) -> bool { | ||
| 163 | self.subscribed_events.contains(event_id) | ||
| 164 | } | ||
| 165 | |||
| 166 | /// Check if subscriptions have been consolidated | ||
| 167 | pub fn is_consolidated(&self) -> bool { | ||
| 168 | self.is_consolidated | ||
| 169 | } | ||
| 170 | |||
| 171 | /// Get the count of subscribed announcements | ||
| 172 | pub fn announcement_count(&self) -> usize { | ||
| 173 | self.subscribed_announcements.len() | ||
| 174 | } | ||
| 175 | |||
| 176 | /// Get the count of subscribed events | ||
| 177 | pub fn event_count(&self) -> usize { | ||
| 178 | self.subscribed_events.len() | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Build Layer 2 filter for a specific announcement event | ||
| 182 | /// | ||
| 183 | /// Creates a filter with an 'a' tag pointing to the announcement's coordinates. | ||
| 184 | fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> { | ||
| 185 | // Extract the d tag (identifier) from the event | ||
| 186 | let identifier = event.tags.iter().find_map(|tag| { | ||
| 187 | let tag_vec = tag.clone().to_vec(); | ||
| 188 | if tag_vec.len() >= 2 && tag_vec[0] == "d" { | ||
| 189 | Some(tag_vec[1].clone()) | ||
| 190 | } else { | ||
| 191 | None | ||
| 192 | } | ||
| 193 | }); | ||
| 194 | |||
| 195 | let identifier = match identifier { | ||
| 196 | Some(id) => id, | ||
| 197 | None => { | ||
| 198 | tracing::warn!( | ||
| 199 | "Announcement {} has no 'd' tag, cannot build Layer 2 filter", | ||
| 200 | event.id.to_hex() | ||
| 201 | ); | ||
| 202 | return Vec::new(); | ||
| 203 | } | ||
| 204 | }; | ||
| 205 | |||
| 206 | // Verify this is an announcement kind | ||
| 207 | if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) { | ||
| 208 | tracing::warn!( | ||
| 209 | "Event {} is not an announcement (kind {}), cannot build Layer 2 filter", | ||
| 210 | event.id.to_hex(), | ||
| 211 | event.kind | ||
| 212 | ); | ||
| 213 | return Vec::new(); | ||
| 214 | } | ||
| 215 | |||
| 216 | // Build the addressable coordinate: kind:pubkey:identifier | ||
| 217 | let coord = format!( | ||
| 218 | "{}:{}:{}", | ||
| 219 | event.kind.as_u16(), | ||
| 220 | event.pubkey.to_hex(), | ||
| 221 | identifier | ||
| 222 | ); | ||
| 223 | |||
| 224 | // Create filter with 'a' tag for this coordinate | ||
| 225 | let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord); | ||
| 226 | |||
| 227 | vec![filter] | ||
| 228 | } | ||
| 229 | } | ||