upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/subscription.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:12 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-09 09:28:18 +0000
commitefaad1e2857914b87307cf78903a957a604697a8 (patch)
treedadd0285727b324328166d06d86a6e1e6fb935cf /src/sync/subscription.rs
parent91dc5e8d718475a73815892452a58e1dbf56c8d9 (diff)
basic sync stub
Diffstat (limited to 'src/sync/subscription.rs')
-rw-r--r--src/sync/subscription.rs229
1 files changed, 0 insertions, 229 deletions
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs
deleted file mode 100644
index bbeaa2a..0000000
--- a/src/sync/subscription.rs
+++ /dev/null
@@ -1,229 +0,0 @@
1//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
2//!
3//! Manages dynamic subscription updates per connection, including:
4//! - Tracking subscribed announcements and events
5//! - Adding new subscriptions when announcements/PRs arrive
6//! - Consolidating filters when count exceeds threshold
7//! - Preventing duplicate subscriptions
8//!
9//! ## Dynamic Subscription Strategy
10//!
11//! Initial: Layer 1 (announcements)
12//! ↓ (announcement received)
13//! Add: Layer 2 (events for that repo)
14//! ↓ (PR/Issue received)
15//! Add: Layer 3 (events for that PR/Issue)
16//! ↓ (filter count > 150)
17//! Consolidate: Back to Layer 1 only
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use nostr_sdk::prelude::*;
23
24use super::filter::FilterService;
25
26/// Maximum number of filters before consolidation is triggered
27const CONSOLIDATION_THRESHOLD: usize = 150;
28
29/// Manages subscriptions for a single relay connection
30///
31/// Tracks which announcements and events have been subscribed to,
32/// and handles dynamic subscription updates as new events arrive.
33#[derive(Debug)]
34pub struct SubscriptionManager {
35 /// Event IDs of announcements we've subscribed to (for Layer 2)
36 subscribed_announcements: HashSet<String>,
37 /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
38 subscribed_events: HashSet<String>,
39 /// Whether we've consolidated back to Layer 1 only
40 is_consolidated: bool,
41 /// FilterService for building filters
42 filter_service: Arc<FilterService>,
43 /// Remote relay domain for Layer 2 filters
44 remote_domain: String,
45}
46
47impl SubscriptionManager {
48 /// Create a new SubscriptionManager
49 ///
50 /// # Arguments
51 /// * `filter_service` - FilterService for building subscription filters
52 /// * `remote_domain` - The domain of the remote relay we're syncing from
53 pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
54 Self {
55 subscribed_announcements: HashSet::new(),
56 subscribed_events: HashSet::new(),
57 is_consolidated: false,
58 filter_service,
59 remote_domain,
60 }
61 }
62
63 /// Add an announcement and return new filters to subscribe to
64 ///
65 /// When a new announcement (kind 30617/30618) arrives, this creates
66 /// Layer 2 filters to subscribe to events for that repository.
67 ///
68 /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
69 pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
70 let event_id = event.id.to_hex();
71
72 // Check if already subscribed or consolidated
73 if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
74 return None;
75 }
76
77 // Add to tracked announcements
78 self.subscribed_announcements.insert(event_id);
79
80 // Build Layer 2 filters for this announcement
81 // Layer 2 filters target events with 'a' tags pointing to this repo
82 let filters = self.build_layer2_filter_for_announcement(event);
83
84 if filters.is_empty() {
85 None
86 } else {
87 Some(filters)
88 }
89 }
90
91 /// Add a PR/Issue/Patch event and return new filters to subscribe to
92 ///
93 /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
94 /// this creates Layer 3 filters to subscribe to related events.
95 ///
96 /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
97 pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
98 let event_id = event.id.to_hex();
99
100 // Check if already subscribed or consolidated
101 if self.is_consolidated || self.subscribed_events.contains(&event_id) {
102 return None;
103 }
104
105 // Add to tracked events
106 self.subscribed_events.insert(event_id.clone());
107
108 // Build Layer 3 filter for this event
109 // Layer 3 filters target events with 'e' tags pointing to this event
110 let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::E), event_id);
111
112 Some(vec![filter])
113 }
114
115 /// Check if consolidation is needed
116 ///
117 /// Returns true if the total filter count exceeds the threshold (150).
118 pub fn should_consolidate(&self) -> bool {
119 !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
120 }
121
122 /// Consolidate all subscriptions back to Layer 1 only
123 ///
124 /// Clears all tracked announcements and events, marks as consolidated,
125 /// and returns the Layer 1 filters to re-subscribe to.
126 pub fn consolidate(&mut self) -> Vec<Filter> {
127 tracing::info!(
128 "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
129 self.subscribed_announcements.len(),
130 self.subscribed_events.len()
131 );
132
133 // Clear tracked subscriptions
134 self.subscribed_announcements.clear();
135 self.subscribed_events.clear();
136 self.is_consolidated = true;
137
138 // Return Layer 1 filters
139 self.filter_service.get_layer1_filters()
140 }
141
142 /// Get the total count of active filters
143 ///
144 /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
145 /// plus the base Layer 1 filter count.
146 pub fn get_filter_count(&self) -> usize {
147 if self.is_consolidated {
148 // When consolidated, we only have Layer 1 filters
149 1
150 } else {
151 // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
152 1 + self.subscribed_announcements.len() + self.subscribed_events.len()
153 }
154 }
155
156 /// Check if an announcement has been subscribed to
157 pub fn has_announcement(&self, event_id: &str) -> bool {
158 self.subscribed_announcements.contains(event_id)
159 }
160
161 /// Check if an event has been subscribed to
162 pub fn has_event(&self, event_id: &str) -> bool {
163 self.subscribed_events.contains(event_id)
164 }
165
166 /// Check if subscriptions have been consolidated
167 pub fn is_consolidated(&self) -> bool {
168 self.is_consolidated
169 }
170
171 /// Get the count of subscribed announcements
172 pub fn announcement_count(&self) -> usize {
173 self.subscribed_announcements.len()
174 }
175
176 /// Get the count of subscribed events
177 pub fn event_count(&self) -> usize {
178 self.subscribed_events.len()
179 }
180
181 /// Build Layer 2 filter for a specific announcement event
182 ///
183 /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
184 fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
185 // Extract the d tag (identifier) from the event
186 let identifier = event.tags.iter().find_map(|tag| {
187 let tag_vec = tag.clone().to_vec();
188 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
189 Some(tag_vec[1].clone())
190 } else {
191 None
192 }
193 });
194
195 let identifier = match identifier {
196 Some(id) => id,
197 None => {
198 tracing::warn!(
199 "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
200 event.id.to_hex()
201 );
202 return Vec::new();
203 }
204 };
205
206 // Verify this is an announcement kind
207 if !matches!(event.kind, Kind::GitRepoAnnouncement | Kind::RepoState) {
208 tracing::warn!(
209 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
210 event.id.to_hex(),
211 event.kind
212 );
213 return Vec::new();
214 }
215
216 // Build the addressable coordinate: kind:pubkey:identifier
217 let coord = format!(
218 "{}:{}:{}",
219 event.kind.as_u16(),
220 event.pubkey.to_hex(),
221 identifier
222 );
223
224 // Create filter with 'a' tag for this coordinate
225 let filter = Filter::new().custom_tag(SingleLetterTag::lowercase(Alphabet::A), coord);
226
227 vec![filter]
228 }
229}