upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
commita19ff57e72d9b82a722e14ae365da7f8c2d87e87 (patch)
tree4322e68d5eead5d11393627ff5da8d7559803f22 /src/sync
parentf639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (diff)
feat(sync): Phase 4 - dynamic subscriptions
- Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/connection.rs132
-rw-r--r--src/sync/filter.rs1
-rw-r--r--src/sync/manager.rs31
-rw-r--r--src/sync/mod.rs2
-rw-r--r--src/sync/subscription.rs278
5 files changed, 442 insertions, 2 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 319cbbd..cd7a603 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -15,6 +15,13 @@
15//! - Health tracking with success/failure reporting 15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays 16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry 17//! - Dead relay detection and minimal retry
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates when new announcements/PRs arrive
22//! - Per-connection subscription tracking
23//! - Filter consolidation when count exceeds threshold (>150)
24//! - Duplicate subscription prevention
18 25
19use std::sync::Arc; 26use std::sync::Arc;
20use std::time::Duration; 27use std::time::Duration;
@@ -24,6 +31,7 @@ use tokio::sync::mpsc;
24 31
25use super::filter::FilterService; 32use super::filter::FilterService;
26use super::health::RelayHealthTracker; 33use super::health::RelayHealthTracker;
34use super::subscription::SubscriptionManager;
27 35
28/// Event received from the sync connection 36/// Event received from the sync connection
29#[derive(Debug, Clone)] 37#[derive(Debug, Clone)]
@@ -38,6 +46,7 @@ pub struct SyncConnection {
38 client: Client, 46 client: Client,
39 filter_service: Arc<FilterService>, 47 filter_service: Arc<FilterService>,
40 remote_domain: String, 48 remote_domain: String,
49 subscription_manager: SubscriptionManager,
41} 50}
42 51
43impl SyncConnection { 52impl SyncConnection {
@@ -57,18 +66,26 @@ impl SyncConnection {
57 66
58 tracing::info!("Sync connection established to {}", url); 67 tracing::info!("Sync connection established to {}", url);
59 68
69 // Create subscription manager for this connection
70 let subscription_manager = SubscriptionManager::new(
71 filter_service.clone(),
72 remote_domain.to_string(),
73 );
74
60 Ok(Self { 75 Ok(Self {
61 url: url.to_string(), 76 url: url.to_string(),
62 client, 77 client,
63 filter_service, 78 filter_service,
64 remote_domain: remote_domain.to_string(), 79 remote_domain: remote_domain.to_string(),
80 subscription_manager,
65 }) 81 })
66 } 82 }
67 83
68 /// Start receiving events and send them through the channel 84 /// Start receiving events and send them through the channel
69 /// 85 ///
70 /// This method runs indefinitely, handling events from all three filter layers. 86 /// This method runs indefinitely, handling events from all three filter layers.
71 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { 87 /// Dynamic subscription updates are triggered when new announcements or PRs arrive.
88 pub async fn run(mut self, tx: mpsc::Sender<SyncedEvent>) {
72 // Subscribe to all three filter layers 89 // Subscribe to all three filter layers
73 90
74 // Layer 1: Announcement discovery (kinds 30617 + 30618) 91 // Layer 1: Announcement discovery (kinds 30617 + 30618)
@@ -174,6 +191,119 @@ impl SyncConnection {
174 .await 191 .await
175 .ok(); 192 .ok();
176 } 193 }
194
195 /// Handle dynamic subscription updates based on incoming event kind
196 ///
197 /// - kind 30617/30618: New announcement → add Layer 2 subscription
198 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
199 async fn handle_dynamic_subscription(&mut self, event: &Event) {
200 let kind = event.kind.as_u16();
201
202 // Check if this is an announcement kind (triggers Layer 2 subscription)
203 if SubscriptionManager::is_announcement_kind(kind) {
204 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
205 tracing::info!(
206 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
207 event.id.to_hex(),
208 self.url,
209 new_filters.len(),
210 self.subscription_manager.get_filter_count()
211 );
212 self.subscribe_to_filters(new_filters, "Layer 2").await;
213 }
214 }
215
216 // Check if this is a PR/Issue kind (triggers Layer 3 subscription)
217 if SubscriptionManager::is_pr_issue_kind(kind) {
218 if let Some(new_filters) = self.subscription_manager.add_event(event) {
219 tracing::info!(
220 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
221 event.id.to_hex(),
222 self.url,
223 new_filters.len(),
224 self.subscription_manager.get_filter_count()
225 );
226 self.subscribe_to_filters(new_filters, "Layer 3").await;
227 }
228 }
229
230 // Check if we need to consolidate
231 if self.subscription_manager.should_consolidate() {
232 self.consolidate_subscriptions().await;
233 }
234 }
235
236 /// Subscribe to new filters
237 async fn subscribe_to_filters(&self, filters: Vec<Filter>, layer_name: &str) {
238 for filter in filters {
239 match self.client.subscribe(filter, None).await {
240 Ok(output) => {
241 tracing::debug!(
242 "Dynamic {} subscription on {} (subscription: {})",
243 layer_name,
244 self.url,
245 output.id()
246 );
247 }
248 Err(e) => {
249 tracing::warn!(
250 "Failed to add dynamic {} subscription on {}: {}",
251 layer_name,
252 self.url,
253 e
254 );
255 }
256 }
257 }
258 }
259
260 /// Consolidate subscriptions back to Layer 1 only
261 ///
262 /// This is triggered when the filter count exceeds 150.
263 /// All existing subscriptions are closed and only Layer 1 is re-subscribed.
264 async fn consolidate_subscriptions(&mut self) {
265 tracing::warn!(
266 "Filter count {} exceeds threshold, consolidating subscriptions on {}",
267 self.subscription_manager.get_filter_count(),
268 self.url
269 );
270
271 // Get consolidated filters (clears tracking and returns Layer 1 only)
272 let layer1_filters = self.subscription_manager.consolidate();
273
274 // Note: nostr-sdk doesn't provide a way to close all subscriptions easily
275 // The client will manage subscription count internally
276 // We just add the new Layer 1 subscription
277
278 for filter in layer1_filters {
279 match self.client.subscribe(filter, None).await {
280 Ok(output) => {
281 tracing::info!(
282 "Consolidated to Layer 1 subscription on {} (subscription: {})",
283 self.url,
284 output.id()
285 );
286 }
287 Err(e) => {
288 tracing::error!(
289 "Failed to subscribe Layer 1 after consolidation on {}: {}",
290 self.url,
291 e
292 );
293 }
294 }
295 }
296 }
297
298 /// Get the current filter count from the subscription manager
299 pub fn get_filter_count(&self) -> usize {
300 self.subscription_manager.get_filter_count()
301 }
302
303 /// Check if subscriptions have been consolidated
304 pub fn is_consolidated(&self) -> bool {
305 self.subscription_manager.is_consolidated()
306 }
177} 307}
178 308
179/// Reconnect loop with health-aware exponential backoff 309/// Reconnect loop with health-aware exponential backoff
diff --git a/src/sync/filter.rs b/src/sync/filter.rs
index 7168f72..56c531f 100644
--- a/src/sync/filter.rs
+++ b/src/sync/filter.rs
@@ -24,6 +24,7 @@ const KIND_MAINTAINER_LIST: u16 = 30618;
24/// 1. Layer 1: Discover new repository announcements and maintainer metadata 24/// 1. Layer 1: Discover new repository announcements and maintainer metadata
25/// 2. Layer 2: Sync events directly related to repositories we track 25/// 2. Layer 2: Sync events directly related to repositories we track
26/// 3. Layer 3: Sync discussions and updates related to Layer 2 events 26/// 3. Layer 3: Sync discussions and updates related to Layer 2 events
27#[derive(Debug)]
27pub struct FilterService { 28pub struct FilterService {
28 database: SharedDatabase, 29 database: SharedDatabase,
29 /// Our relay's domain for filtering 30 /// Our relay's domain for filtering
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 1f70f42..f594454 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -15,6 +15,14 @@
15//! - Health tracking with exponential backoff 15//! - Health tracking with exponential backoff
16//! - Dead relay detection after 24h of failures 16//! - Dead relay detection after 24h of failures
17//! - Startup jitter to prevent thundering herd 17//! - Startup jitter to prevent thundering herd
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates handled per-connection
22//! - Each connection manages its own SubscriptionManager
23//! - Announcements trigger Layer 2 subscriptions
24//! - PRs/Issues trigger Layer 3 subscriptions
25//! - Consolidation when filter count exceeds 150
18 26
19use std::collections::HashSet; 27use std::collections::HashSet;
20use std::sync::Arc; 28use std::sync::Arc;
@@ -225,17 +233,38 @@ impl SyncManager {
225 } 233 }
226 234
227 /// Process a single synced event 235 /// Process a single synced event
236 ///
237 /// Events are validated through the write policy and stored if accepted.
238 /// Dynamic subscription updates are handled by each connection's SubscriptionManager.
228 async fn process_event(&self, synced_event: SyncedEvent) { 239 async fn process_event(&self, synced_event: SyncedEvent) {
229 let event = &synced_event.event; 240 let event = &synced_event.event;
230 let event_id = event.id.to_hex(); 241 let event_id = event.id.to_hex();
242 let kind = event.kind.as_u16();
231 243
232 tracing::debug!( 244 tracing::debug!(
233 "Processing synced event {} (kind {}) from {}", 245 "Processing synced event {} (kind {}) from {}",
234 event_id, 246 event_id,
235 event.kind.as_u16(), 247 kind,
236 synced_event.source_url 248 synced_event.source_url
237 ); 249 );
238 250
251 // Log subscription-relevant events for debugging
252 match kind {
253 30617 | 30618 => {
254 tracing::debug!(
255 "Received announcement {} - connection will add Layer 2 subscription",
256 event_id
257 );
258 }
259 1617 | 1618 | 1619 | 1621 | 1622 => {
260 tracing::debug!(
261 "Received PR/Issue {} - connection will add Layer 3 subscription",
262 event_id
263 );
264 }
265 _ => {}
266 }
267
239 // Validate through write policy using SYNC_SOURCE_ADDR 268 // Validate through write policy using SYNC_SOURCE_ADDR
240 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; 269 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
241 270
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 653aa27..4dca160 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -21,10 +21,12 @@ mod connection;
21mod filter; 21mod filter;
22pub mod health; 22pub mod health;
23mod manager; 23mod manager;
24mod subscription;
24 25
25pub use filter::FilterService; 26pub use filter::FilterService;
26pub use health::{HealthState, RelayHealth, RelayHealthTracker}; 27pub use health::{HealthState, RelayHealth, RelayHealthTracker};
27pub use manager::SyncManager; 28pub use manager::SyncManager;
29pub use subscription::SubscriptionManager;
28 30
29use std::net::SocketAddr; 31use std::net::SocketAddr;
30 32
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs
new file mode 100644
index 0000000..c37404f
--- /dev/null
+++ b/src/sync/subscription.rs
@@ -0,0 +1,278 @@
1//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
2//!
3//! Manages dynamic subscription updates per connection, including:
4//! - Tracking subscribed announcements and events
5//! - Adding new subscriptions when announcements/PRs arrive
6//! - Consolidating filters when count exceeds threshold
7//! - Preventing duplicate subscriptions
8//!
9//! ## Dynamic Subscription Strategy
10//!
11//! Initial: Layer 1 (announcements)
12//! ↓ (announcement received)
13//! Add: Layer 2 (events for that repo)
14//! ↓ (PR/Issue received)
15//! Add: Layer 3 (events for that PR/Issue)
16//! ↓ (filter count > 150)
17//! Consolidate: Back to Layer 1 only
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use nostr_sdk::prelude::*;
23
24use super::filter::FilterService;
25
26/// Maximum number of filters before consolidation is triggered
27const CONSOLIDATION_THRESHOLD: usize = 150;
28
29/// Kind 30617 - Repository Announcement (NIP-34)
30const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617;
31
32/// Kind 30618 - Maintainer List (NIP-34)
33const KIND_MAINTAINER_LIST: u16 = 30618;
34
35/// Manages subscriptions for a single relay connection
36///
37/// Tracks which announcements and events have been subscribed to,
38/// and handles dynamic subscription updates as new events arrive.
39#[derive(Debug)]
40pub struct SubscriptionManager {
41 /// Event IDs of announcements we've subscribed to (for Layer 2)
42 subscribed_announcements: HashSet<String>,
43 /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
44 subscribed_events: HashSet<String>,
45 /// Whether we've consolidated back to Layer 1 only
46 is_consolidated: bool,
47 /// FilterService for building filters
48 filter_service: Arc<FilterService>,
49 /// Remote relay domain for Layer 2 filters
50 remote_domain: String,
51}
52
53impl SubscriptionManager {
54 /// Create a new SubscriptionManager
55 ///
56 /// # Arguments
57 /// * `filter_service` - FilterService for building subscription filters
58 /// * `remote_domain` - The domain of the remote relay we're syncing from
59 pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
60 Self {
61 subscribed_announcements: HashSet::new(),
62 subscribed_events: HashSet::new(),
63 is_consolidated: false,
64 filter_service,
65 remote_domain,
66 }
67 }
68
69 /// Add an announcement and return new filters to subscribe to
70 ///
71 /// When a new announcement (kind 30617/30618) arrives, this creates
72 /// Layer 2 filters to subscribe to events for that repository.
73 ///
74 /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
75 pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
76 let event_id = event.id.to_hex();
77
78 // Check if already subscribed or consolidated
79 if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
80 return None;
81 }
82
83 // Add to tracked announcements
84 self.subscribed_announcements.insert(event_id);
85
86 // Build Layer 2 filters for this announcement
87 // Layer 2 filters target events with 'a' tags pointing to this repo
88 let filters = self.build_layer2_filter_for_announcement(event);
89
90 if filters.is_empty() {
91 None
92 } else {
93 Some(filters)
94 }
95 }
96
97 /// Add a PR/Issue/Patch event and return new filters to subscribe to
98 ///
99 /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
100 /// this creates Layer 3 filters to subscribe to related events.
101 ///
102 /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
103 pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
104 let event_id = event.id.to_hex();
105
106 // Check if already subscribed or consolidated
107 if self.is_consolidated || self.subscribed_events.contains(&event_id) {
108 return None;
109 }
110
111 // Add to tracked events
112 self.subscribed_events.insert(event_id.clone());
113
114 // Build Layer 3 filter for this event
115 // Layer 3 filters target events with 'e' tags pointing to this event
116 let filter = Filter::new().custom_tag(
117 SingleLetterTag::lowercase(Alphabet::E),
118 event_id,
119 );
120
121 Some(vec![filter])
122 }
123
124 /// Check if consolidation is needed
125 ///
126 /// Returns true if the total filter count exceeds the threshold (150).
127 pub fn should_consolidate(&self) -> bool {
128 !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
129 }
130
131 /// Consolidate all subscriptions back to Layer 1 only
132 ///
133 /// Clears all tracked announcements and events, marks as consolidated,
134 /// and returns the Layer 1 filters to re-subscribe to.
135 pub fn consolidate(&mut self) -> Vec<Filter> {
136 tracing::info!(
137 "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
138 self.subscribed_announcements.len(),
139 self.subscribed_events.len()
140 );
141
142 // Clear tracked subscriptions
143 self.subscribed_announcements.clear();
144 self.subscribed_events.clear();
145 self.is_consolidated = true;
146
147 // Return Layer 1 filters
148 self.filter_service.get_layer1_filters()
149 }
150
151 /// Get the total count of active filters
152 ///
153 /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
154 /// plus the base Layer 1 filter count.
155 pub fn get_filter_count(&self) -> usize {
156 if self.is_consolidated {
157 // When consolidated, we only have Layer 1 filters
158 1
159 } else {
160 // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
161 1 + self.subscribed_announcements.len() + self.subscribed_events.len()
162 }
163 }
164
165 /// Check if an announcement has been subscribed to
166 pub fn has_announcement(&self, event_id: &str) -> bool {
167 self.subscribed_announcements.contains(event_id)
168 }
169
170 /// Check if an event has been subscribed to
171 pub fn has_event(&self, event_id: &str) -> bool {
172 self.subscribed_events.contains(event_id)
173 }
174
175 /// Check if subscriptions have been consolidated
176 pub fn is_consolidated(&self) -> bool {
177 self.is_consolidated
178 }
179
180 /// Get the count of subscribed announcements
181 pub fn announcement_count(&self) -> usize {
182 self.subscribed_announcements.len()
183 }
184
185 /// Get the count of subscribed events
186 pub fn event_count(&self) -> usize {
187 self.subscribed_events.len()
188 }
189
190 /// Build Layer 2 filter for a specific announcement event
191 ///
192 /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
193 fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
194 // Extract the d tag (identifier) from the event
195 let identifier = event.tags.iter().find_map(|tag| {
196 let tag_vec = tag.clone().to_vec();
197 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
198 Some(tag_vec[1].clone())
199 } else {
200 None
201 }
202 });
203
204 let identifier = match identifier {
205 Some(id) => id,
206 None => {
207 tracing::warn!(
208 "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
209 event.id.to_hex()
210 );
211 return Vec::new();
212 }
213 };
214
215 // Determine the kind for the coordinate
216 let kind = event.kind.as_u16();
217 if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST {
218 tracing::warn!(
219 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
220 event.id.to_hex(),
221 kind
222 );
223 return Vec::new();
224 }
225
226 // Build the addressable coordinate: kind:pubkey:identifier
227 let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier);
228
229 // Create filter with 'a' tag for this coordinate
230 let filter = Filter::new().custom_tag(
231 SingleLetterTag::lowercase(Alphabet::A),
232 coord,
233 );
234
235 vec![filter]
236 }
237
238 /// Check if an event kind is an announcement kind
239 pub fn is_announcement_kind(kind: u16) -> bool {
240 kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST
241 }
242
243 /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3
244 pub fn is_pr_issue_kind(kind: u16) -> bool {
245 matches!(
246 kind,
247 1617 | // Patch proposal (NIP-34)
248 1618 | // PR
249 1619 | // PR Update
250 1621 | // Issue
251 1622 // Reply
252 )
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::SubscriptionManager;
259
260 #[test]
261 fn test_is_announcement_kind() {
262 assert!(SubscriptionManager::is_announcement_kind(30617));
263 assert!(SubscriptionManager::is_announcement_kind(30618));
264 assert!(!SubscriptionManager::is_announcement_kind(1));
265 assert!(!SubscriptionManager::is_announcement_kind(1617));
266 }
267
268 #[test]
269 fn test_is_pr_issue_kind() {
270 assert!(SubscriptionManager::is_pr_issue_kind(1617));
271 assert!(SubscriptionManager::is_pr_issue_kind(1618));
272 assert!(SubscriptionManager::is_pr_issue_kind(1619));
273 assert!(SubscriptionManager::is_pr_issue_kind(1621));
274 assert!(SubscriptionManager::is_pr_issue_kind(1622));
275 assert!(!SubscriptionManager::is_pr_issue_kind(30617));
276 assert!(!SubscriptionManager::is_pr_issue_kind(1));
277 }
278} \ No newline at end of file