upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/subscription.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
commita19ff57e72d9b82a722e14ae365da7f8c2d87e87 (patch)
tree4322e68d5eead5d11393627ff5da8d7559803f22 /src/sync/subscription.rs
parentf639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (diff)
feat(sync): Phase 4 - dynamic subscriptions
- Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150
Diffstat (limited to 'src/sync/subscription.rs')
-rw-r--r--src/sync/subscription.rs278
1 files changed, 278 insertions, 0 deletions
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs
new file mode 100644
index 0000000..c37404f
--- /dev/null
+++ b/src/sync/subscription.rs
@@ -0,0 +1,278 @@
1//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
2//!
3//! Manages dynamic subscription updates per connection, including:
4//! - Tracking subscribed announcements and events
5//! - Adding new subscriptions when announcements/PRs arrive
6//! - Consolidating filters when count exceeds threshold
7//! - Preventing duplicate subscriptions
8//!
9//! ## Dynamic Subscription Strategy
10//!
11//! Initial: Layer 1 (announcements)
12//! ↓ (announcement received)
13//! Add: Layer 2 (events for that repo)
14//! ↓ (PR/Issue received)
15//! Add: Layer 3 (events for that PR/Issue)
16//! ↓ (filter count > 150)
17//! Consolidate: Back to Layer 1 only
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use nostr_sdk::prelude::*;
23
24use super::filter::FilterService;
25
26/// Maximum number of filters before consolidation is triggered
27const CONSOLIDATION_THRESHOLD: usize = 150;
28
29/// Kind 30617 - Repository Announcement (NIP-34)
30const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617;
31
32/// Kind 30618 - Maintainer List (NIP-34)
33const KIND_MAINTAINER_LIST: u16 = 30618;
34
35/// Manages subscriptions for a single relay connection
36///
37/// Tracks which announcements and events have been subscribed to,
38/// and handles dynamic subscription updates as new events arrive.
39#[derive(Debug)]
40pub struct SubscriptionManager {
41 /// Event IDs of announcements we've subscribed to (for Layer 2)
42 subscribed_announcements: HashSet<String>,
43 /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
44 subscribed_events: HashSet<String>,
45 /// Whether we've consolidated back to Layer 1 only
46 is_consolidated: bool,
47 /// FilterService for building filters
48 filter_service: Arc<FilterService>,
49 /// Remote relay domain for Layer 2 filters
50 remote_domain: String,
51}
52
53impl SubscriptionManager {
54 /// Create a new SubscriptionManager
55 ///
56 /// # Arguments
57 /// * `filter_service` - FilterService for building subscription filters
58 /// * `remote_domain` - The domain of the remote relay we're syncing from
59 pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
60 Self {
61 subscribed_announcements: HashSet::new(),
62 subscribed_events: HashSet::new(),
63 is_consolidated: false,
64 filter_service,
65 remote_domain,
66 }
67 }
68
69 /// Add an announcement and return new filters to subscribe to
70 ///
71 /// When a new announcement (kind 30617/30618) arrives, this creates
72 /// Layer 2 filters to subscribe to events for that repository.
73 ///
74 /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
75 pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
76 let event_id = event.id.to_hex();
77
78 // Check if already subscribed or consolidated
79 if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
80 return None;
81 }
82
83 // Add to tracked announcements
84 self.subscribed_announcements.insert(event_id);
85
86 // Build Layer 2 filters for this announcement
87 // Layer 2 filters target events with 'a' tags pointing to this repo
88 let filters = self.build_layer2_filter_for_announcement(event);
89
90 if filters.is_empty() {
91 None
92 } else {
93 Some(filters)
94 }
95 }
96
97 /// Add a PR/Issue/Patch event and return new filters to subscribe to
98 ///
99 /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
100 /// this creates Layer 3 filters to subscribe to related events.
101 ///
102 /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
103 pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
104 let event_id = event.id.to_hex();
105
106 // Check if already subscribed or consolidated
107 if self.is_consolidated || self.subscribed_events.contains(&event_id) {
108 return None;
109 }
110
111 // Add to tracked events
112 self.subscribed_events.insert(event_id.clone());
113
114 // Build Layer 3 filter for this event
115 // Layer 3 filters target events with 'e' tags pointing to this event
116 let filter = Filter::new().custom_tag(
117 SingleLetterTag::lowercase(Alphabet::E),
118 event_id,
119 );
120
121 Some(vec![filter])
122 }
123
124 /// Check if consolidation is needed
125 ///
126 /// Returns true if the total filter count exceeds the threshold (150).
127 pub fn should_consolidate(&self) -> bool {
128 !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
129 }
130
131 /// Consolidate all subscriptions back to Layer 1 only
132 ///
133 /// Clears all tracked announcements and events, marks as consolidated,
134 /// and returns the Layer 1 filters to re-subscribe to.
135 pub fn consolidate(&mut self) -> Vec<Filter> {
136 tracing::info!(
137 "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
138 self.subscribed_announcements.len(),
139 self.subscribed_events.len()
140 );
141
142 // Clear tracked subscriptions
143 self.subscribed_announcements.clear();
144 self.subscribed_events.clear();
145 self.is_consolidated = true;
146
147 // Return Layer 1 filters
148 self.filter_service.get_layer1_filters()
149 }
150
151 /// Get the total count of active filters
152 ///
153 /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
154 /// plus the base Layer 1 filter count.
155 pub fn get_filter_count(&self) -> usize {
156 if self.is_consolidated {
157 // When consolidated, we only have Layer 1 filters
158 1
159 } else {
160 // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
161 1 + self.subscribed_announcements.len() + self.subscribed_events.len()
162 }
163 }
164
165 /// Check if an announcement has been subscribed to
166 pub fn has_announcement(&self, event_id: &str) -> bool {
167 self.subscribed_announcements.contains(event_id)
168 }
169
170 /// Check if an event has been subscribed to
171 pub fn has_event(&self, event_id: &str) -> bool {
172 self.subscribed_events.contains(event_id)
173 }
174
175 /// Check if subscriptions have been consolidated
176 pub fn is_consolidated(&self) -> bool {
177 self.is_consolidated
178 }
179
180 /// Get the count of subscribed announcements
181 pub fn announcement_count(&self) -> usize {
182 self.subscribed_announcements.len()
183 }
184
185 /// Get the count of subscribed events
186 pub fn event_count(&self) -> usize {
187 self.subscribed_events.len()
188 }
189
190 /// Build Layer 2 filter for a specific announcement event
191 ///
192 /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
193 fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
194 // Extract the d tag (identifier) from the event
195 let identifier = event.tags.iter().find_map(|tag| {
196 let tag_vec = tag.clone().to_vec();
197 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
198 Some(tag_vec[1].clone())
199 } else {
200 None
201 }
202 });
203
204 let identifier = match identifier {
205 Some(id) => id,
206 None => {
207 tracing::warn!(
208 "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
209 event.id.to_hex()
210 );
211 return Vec::new();
212 }
213 };
214
215 // Determine the kind for the coordinate
216 let kind = event.kind.as_u16();
217 if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST {
218 tracing::warn!(
219 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
220 event.id.to_hex(),
221 kind
222 );
223 return Vec::new();
224 }
225
226 // Build the addressable coordinate: kind:pubkey:identifier
227 let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier);
228
229 // Create filter with 'a' tag for this coordinate
230 let filter = Filter::new().custom_tag(
231 SingleLetterTag::lowercase(Alphabet::A),
232 coord,
233 );
234
235 vec![filter]
236 }
237
238 /// Check if an event kind is an announcement kind
239 pub fn is_announcement_kind(kind: u16) -> bool {
240 kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST
241 }
242
243 /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3
244 pub fn is_pr_issue_kind(kind: u16) -> bool {
245 matches!(
246 kind,
247 1617 | // Patch proposal (NIP-34)
248 1618 | // PR
249 1619 | // PR Update
250 1621 | // Issue
251 1622 // Reply
252 )
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::SubscriptionManager;
259
260 #[test]
261 fn test_is_announcement_kind() {
262 assert!(SubscriptionManager::is_announcement_kind(30617));
263 assert!(SubscriptionManager::is_announcement_kind(30618));
264 assert!(!SubscriptionManager::is_announcement_kind(1));
265 assert!(!SubscriptionManager::is_announcement_kind(1617));
266 }
267
268 #[test]
269 fn test_is_pr_issue_kind() {
270 assert!(SubscriptionManager::is_pr_issue_kind(1617));
271 assert!(SubscriptionManager::is_pr_issue_kind(1618));
272 assert!(SubscriptionManager::is_pr_issue_kind(1619));
273 assert!(SubscriptionManager::is_pr_issue_kind(1621));
274 assert!(SubscriptionManager::is_pr_issue_kind(1622));
275 assert!(!SubscriptionManager::is_pr_issue_kind(30617));
276 assert!(!SubscriptionManager::is_pr_issue_kind(1));
277 }
278} \ No newline at end of file