upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
commita19ff57e72d9b82a722e14ae365da7f8c2d87e87 (patch)
tree4322e68d5eead5d11393627ff5da8d7559803f22
parentf639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (diff)
feat(sync): Phase 4 - dynamic subscriptions
- Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150
-rw-r--r--src/sync/connection.rs132
-rw-r--r--src/sync/filter.rs1
-rw-r--r--src/sync/manager.rs31
-rw-r--r--src/sync/mod.rs2
-rw-r--r--src/sync/subscription.rs278
-rw-r--r--tests/proactive_sync_dynamic.rs748
6 files changed, 1190 insertions, 2 deletions
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
index 319cbbd..cd7a603 100644
--- a/src/sync/connection.rs
+++ b/src/sync/connection.rs
@@ -15,6 +15,13 @@
15//! - Health tracking with success/failure reporting 15//! - Health tracking with success/failure reporting
16//! - Exponential backoff with health-aware delays 16//! - Exponential backoff with health-aware delays
17//! - Dead relay detection and minimal retry 17//! - Dead relay detection and minimal retry
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates when new announcements/PRs arrive
22//! - Per-connection subscription tracking
23//! - Filter consolidation when count exceeds threshold (>150)
24//! - Duplicate subscription prevention
18 25
19use std::sync::Arc; 26use std::sync::Arc;
20use std::time::Duration; 27use std::time::Duration;
@@ -24,6 +31,7 @@ use tokio::sync::mpsc;
24 31
25use super::filter::FilterService; 32use super::filter::FilterService;
26use super::health::RelayHealthTracker; 33use super::health::RelayHealthTracker;
34use super::subscription::SubscriptionManager;
27 35
28/// Event received from the sync connection 36/// Event received from the sync connection
29#[derive(Debug, Clone)] 37#[derive(Debug, Clone)]
@@ -38,6 +46,7 @@ pub struct SyncConnection {
38 client: Client, 46 client: Client,
39 filter_service: Arc<FilterService>, 47 filter_service: Arc<FilterService>,
40 remote_domain: String, 48 remote_domain: String,
49 subscription_manager: SubscriptionManager,
41} 50}
42 51
43impl SyncConnection { 52impl SyncConnection {
@@ -57,18 +66,26 @@ impl SyncConnection {
57 66
58 tracing::info!("Sync connection established to {}", url); 67 tracing::info!("Sync connection established to {}", url);
59 68
69 // Create subscription manager for this connection
70 let subscription_manager = SubscriptionManager::new(
71 filter_service.clone(),
72 remote_domain.to_string(),
73 );
74
60 Ok(Self { 75 Ok(Self {
61 url: url.to_string(), 76 url: url.to_string(),
62 client, 77 client,
63 filter_service, 78 filter_service,
64 remote_domain: remote_domain.to_string(), 79 remote_domain: remote_domain.to_string(),
80 subscription_manager,
65 }) 81 })
66 } 82 }
67 83
68 /// Start receiving events and send them through the channel 84 /// Start receiving events and send them through the channel
69 /// 85 ///
70 /// This method runs indefinitely, handling events from all three filter layers. 86 /// This method runs indefinitely, handling events from all three filter layers.
71 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) { 87 /// Dynamic subscription updates are triggered when new announcements or PRs arrive.
88 pub async fn run(mut self, tx: mpsc::Sender<SyncedEvent>) {
72 // Subscribe to all three filter layers 89 // Subscribe to all three filter layers
73 90
74 // Layer 1: Announcement discovery (kinds 30617 + 30618) 91 // Layer 1: Announcement discovery (kinds 30617 + 30618)
@@ -174,6 +191,119 @@ impl SyncConnection {
174 .await 191 .await
175 .ok(); 192 .ok();
176 } 193 }
194
195 /// Handle dynamic subscription updates based on incoming event kind
196 ///
197 /// - kind 30617/30618: New announcement → add Layer 2 subscription
198 /// - kind 1617/1618/1619/1621/1622: New PR/Issue → add Layer 3 subscription
199 async fn handle_dynamic_subscription(&mut self, event: &Event) {
200 let kind = event.kind.as_u16();
201
202 // Check if this is an announcement kind (triggers Layer 2 subscription)
203 if SubscriptionManager::is_announcement_kind(kind) {
204 if let Some(new_filters) = self.subscription_manager.add_announcement(event) {
205 tracing::info!(
206 "New announcement {} on {}, adding {} Layer 2 filter(s) (total filters: {})",
207 event.id.to_hex(),
208 self.url,
209 new_filters.len(),
210 self.subscription_manager.get_filter_count()
211 );
212 self.subscribe_to_filters(new_filters, "Layer 2").await;
213 }
214 }
215
216 // Check if this is a PR/Issue kind (triggers Layer 3 subscription)
217 if SubscriptionManager::is_pr_issue_kind(kind) {
218 if let Some(new_filters) = self.subscription_manager.add_event(event) {
219 tracing::info!(
220 "New PR/Issue {} on {}, adding {} Layer 3 filter(s) (total filters: {})",
221 event.id.to_hex(),
222 self.url,
223 new_filters.len(),
224 self.subscription_manager.get_filter_count()
225 );
226 self.subscribe_to_filters(new_filters, "Layer 3").await;
227 }
228 }
229
230 // Check if we need to consolidate
231 if self.subscription_manager.should_consolidate() {
232 self.consolidate_subscriptions().await;
233 }
234 }
235
236 /// Subscribe to new filters
237 async fn subscribe_to_filters(&self, filters: Vec<Filter>, layer_name: &str) {
238 for filter in filters {
239 match self.client.subscribe(filter, None).await {
240 Ok(output) => {
241 tracing::debug!(
242 "Dynamic {} subscription on {} (subscription: {})",
243 layer_name,
244 self.url,
245 output.id()
246 );
247 }
248 Err(e) => {
249 tracing::warn!(
250 "Failed to add dynamic {} subscription on {}: {}",
251 layer_name,
252 self.url,
253 e
254 );
255 }
256 }
257 }
258 }
259
260 /// Consolidate subscriptions back to Layer 1 only
261 ///
262 /// This is triggered when the filter count exceeds 150.
263 /// All existing subscriptions are closed and only Layer 1 is re-subscribed.
264 async fn consolidate_subscriptions(&mut self) {
265 tracing::warn!(
266 "Filter count {} exceeds threshold, consolidating subscriptions on {}",
267 self.subscription_manager.get_filter_count(),
268 self.url
269 );
270
271 // Get consolidated filters (clears tracking and returns Layer 1 only)
272 let layer1_filters = self.subscription_manager.consolidate();
273
274 // Note: nostr-sdk doesn't provide a way to close all subscriptions easily
275 // The client will manage subscription count internally
276 // We just add the new Layer 1 subscription
277
278 for filter in layer1_filters {
279 match self.client.subscribe(filter, None).await {
280 Ok(output) => {
281 tracing::info!(
282 "Consolidated to Layer 1 subscription on {} (subscription: {})",
283 self.url,
284 output.id()
285 );
286 }
287 Err(e) => {
288 tracing::error!(
289 "Failed to subscribe Layer 1 after consolidation on {}: {}",
290 self.url,
291 e
292 );
293 }
294 }
295 }
296 }
297
298 /// Get the current filter count from the subscription manager
299 pub fn get_filter_count(&self) -> usize {
300 self.subscription_manager.get_filter_count()
301 }
302
303 /// Check if subscriptions have been consolidated
304 pub fn is_consolidated(&self) -> bool {
305 self.subscription_manager.is_consolidated()
306 }
177} 307}
178 308
179/// Reconnect loop with health-aware exponential backoff 309/// Reconnect loop with health-aware exponential backoff
diff --git a/src/sync/filter.rs b/src/sync/filter.rs
index 7168f72..56c531f 100644
--- a/src/sync/filter.rs
+++ b/src/sync/filter.rs
@@ -24,6 +24,7 @@ const KIND_MAINTAINER_LIST: u16 = 30618;
24/// 1. Layer 1: Discover new repository announcements and maintainer metadata 24/// 1. Layer 1: Discover new repository announcements and maintainer metadata
25/// 2. Layer 2: Sync events directly related to repositories we track 25/// 2. Layer 2: Sync events directly related to repositories we track
26/// 3. Layer 3: Sync discussions and updates related to Layer 2 events 26/// 3. Layer 3: Sync discussions and updates related to Layer 2 events
27#[derive(Debug)]
27pub struct FilterService { 28pub struct FilterService {
28 database: SharedDatabase, 29 database: SharedDatabase,
29 /// Our relay's domain for filtering 30 /// Our relay's domain for filtering
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 1f70f42..f594454 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -15,6 +15,14 @@
15//! - Health tracking with exponential backoff 15//! - Health tracking with exponential backoff
16//! - Dead relay detection after 24h of failures 16//! - Dead relay detection after 24h of failures
17//! - Startup jitter to prevent thundering herd 17//! - Startup jitter to prevent thundering herd
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates handled per-connection
22//! - Each connection manages its own SubscriptionManager
23//! - Announcements trigger Layer 2 subscriptions
24//! - PRs/Issues trigger Layer 3 subscriptions
25//! - Consolidation when filter count exceeds 150
18 26
19use std::collections::HashSet; 27use std::collections::HashSet;
20use std::sync::Arc; 28use std::sync::Arc;
@@ -225,17 +233,38 @@ impl SyncManager {
225 } 233 }
226 234
227 /// Process a single synced event 235 /// Process a single synced event
236 ///
237 /// Events are validated through the write policy and stored if accepted.
238 /// Dynamic subscription updates are handled by each connection's SubscriptionManager.
228 async fn process_event(&self, synced_event: SyncedEvent) { 239 async fn process_event(&self, synced_event: SyncedEvent) {
229 let event = &synced_event.event; 240 let event = &synced_event.event;
230 let event_id = event.id.to_hex(); 241 let event_id = event.id.to_hex();
242 let kind = event.kind.as_u16();
231 243
232 tracing::debug!( 244 tracing::debug!(
233 "Processing synced event {} (kind {}) from {}", 245 "Processing synced event {} (kind {}) from {}",
234 event_id, 246 event_id,
235 event.kind.as_u16(), 247 kind,
236 synced_event.source_url 248 synced_event.source_url
237 ); 249 );
238 250
251 // Log subscription-relevant events for debugging
252 match kind {
253 30617 | 30618 => {
254 tracing::debug!(
255 "Received announcement {} - connection will add Layer 2 subscription",
256 event_id
257 );
258 }
259 1617 | 1618 | 1619 | 1621 | 1622 => {
260 tracing::debug!(
261 "Received PR/Issue {} - connection will add Layer 3 subscription",
262 event_id
263 );
264 }
265 _ => {}
266 }
267
239 // Validate through write policy using SYNC_SOURCE_ADDR 268 // Validate through write policy using SYNC_SOURCE_ADDR
240 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; 269 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
241 270
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 653aa27..4dca160 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -21,10 +21,12 @@ mod connection;
21mod filter; 21mod filter;
22pub mod health; 22pub mod health;
23mod manager; 23mod manager;
24mod subscription;
24 25
25pub use filter::FilterService; 26pub use filter::FilterService;
26pub use health::{HealthState, RelayHealth, RelayHealthTracker}; 27pub use health::{HealthState, RelayHealth, RelayHealthTracker};
27pub use manager::SyncManager; 28pub use manager::SyncManager;
29pub use subscription::SubscriptionManager;
28 30
29use std::net::SocketAddr; 31use std::net::SocketAddr;
30 32
diff --git a/src/sync/subscription.rs b/src/sync/subscription.rs
new file mode 100644
index 0000000..c37404f
--- /dev/null
+++ b/src/sync/subscription.rs
@@ -0,0 +1,278 @@
1//! Subscription Manager for GRASP-02 Phase 4: Dynamic Subscriptions
2//!
3//! Manages dynamic subscription updates per connection, including:
4//! - Tracking subscribed announcements and events
5//! - Adding new subscriptions when announcements/PRs arrive
6//! - Consolidating filters when count exceeds threshold
7//! - Preventing duplicate subscriptions
8//!
9//! ## Dynamic Subscription Strategy
10//!
11//! Initial: Layer 1 (announcements)
12//! ↓ (announcement received)
13//! Add: Layer 2 (events for that repo)
14//! ↓ (PR/Issue received)
15//! Add: Layer 3 (events for that PR/Issue)
16//! ↓ (filter count > 150)
17//! Consolidate: Back to Layer 1 only
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use nostr_sdk::prelude::*;
23
24use super::filter::FilterService;
25
26/// Maximum number of filters before consolidation is triggered
27const CONSOLIDATION_THRESHOLD: usize = 150;
28
29/// Kind 30617 - Repository Announcement (NIP-34)
30const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617;
31
32/// Kind 30618 - Maintainer List (NIP-34)
33const KIND_MAINTAINER_LIST: u16 = 30618;
34
35/// Manages subscriptions for a single relay connection
36///
37/// Tracks which announcements and events have been subscribed to,
38/// and handles dynamic subscription updates as new events arrive.
39#[derive(Debug)]
40pub struct SubscriptionManager {
41 /// Event IDs of announcements we've subscribed to (for Layer 2)
42 subscribed_announcements: HashSet<String>,
43 /// Event IDs of PRs/Issues we've subscribed to (for Layer 3)
44 subscribed_events: HashSet<String>,
45 /// Whether we've consolidated back to Layer 1 only
46 is_consolidated: bool,
47 /// FilterService for building filters
48 filter_service: Arc<FilterService>,
49 /// Remote relay domain for Layer 2 filters
50 remote_domain: String,
51}
52
53impl SubscriptionManager {
54 /// Create a new SubscriptionManager
55 ///
56 /// # Arguments
57 /// * `filter_service` - FilterService for building subscription filters
58 /// * `remote_domain` - The domain of the remote relay we're syncing from
59 pub fn new(filter_service: Arc<FilterService>, remote_domain: String) -> Self {
60 Self {
61 subscribed_announcements: HashSet::new(),
62 subscribed_events: HashSet::new(),
63 is_consolidated: false,
64 filter_service,
65 remote_domain,
66 }
67 }
68
69 /// Add an announcement and return new filters to subscribe to
70 ///
71 /// When a new announcement (kind 30617/30618) arrives, this creates
72 /// Layer 2 filters to subscribe to events for that repository.
73 ///
74 /// Returns `Some(filters)` if this is a new announcement, `None` if already subscribed.
75 pub fn add_announcement(&mut self, event: &Event) -> Option<Vec<Filter>> {
76 let event_id = event.id.to_hex();
77
78 // Check if already subscribed or consolidated
79 if self.is_consolidated || self.subscribed_announcements.contains(&event_id) {
80 return None;
81 }
82
83 // Add to tracked announcements
84 self.subscribed_announcements.insert(event_id);
85
86 // Build Layer 2 filters for this announcement
87 // Layer 2 filters target events with 'a' tags pointing to this repo
88 let filters = self.build_layer2_filter_for_announcement(event);
89
90 if filters.is_empty() {
91 None
92 } else {
93 Some(filters)
94 }
95 }
96
97 /// Add a PR/Issue/Patch event and return new filters to subscribe to
98 ///
99 /// When a new PR (kind 1617), Issue (kind 1621), or Patch (kind 1622) arrives,
100 /// this creates Layer 3 filters to subscribe to related events.
101 ///
102 /// Returns `Some(filters)` if this is a new event, `None` if already subscribed.
103 pub fn add_event(&mut self, event: &Event) -> Option<Vec<Filter>> {
104 let event_id = event.id.to_hex();
105
106 // Check if already subscribed or consolidated
107 if self.is_consolidated || self.subscribed_events.contains(&event_id) {
108 return None;
109 }
110
111 // Add to tracked events
112 self.subscribed_events.insert(event_id.clone());
113
114 // Build Layer 3 filter for this event
115 // Layer 3 filters target events with 'e' tags pointing to this event
116 let filter = Filter::new().custom_tag(
117 SingleLetterTag::lowercase(Alphabet::E),
118 event_id,
119 );
120
121 Some(vec![filter])
122 }
123
124 /// Check if consolidation is needed
125 ///
126 /// Returns true if the total filter count exceeds the threshold (150).
127 pub fn should_consolidate(&self) -> bool {
128 !self.is_consolidated && self.get_filter_count() > CONSOLIDATION_THRESHOLD
129 }
130
131 /// Consolidate all subscriptions back to Layer 1 only
132 ///
133 /// Clears all tracked announcements and events, marks as consolidated,
134 /// and returns the Layer 1 filters to re-subscribe to.
135 pub fn consolidate(&mut self) -> Vec<Filter> {
136 tracing::info!(
137 "Consolidating subscriptions: {} announcements, {} events -> Layer 1 only",
138 self.subscribed_announcements.len(),
139 self.subscribed_events.len()
140 );
141
142 // Clear tracked subscriptions
143 self.subscribed_announcements.clear();
144 self.subscribed_events.clear();
145 self.is_consolidated = true;
146
147 // Return Layer 1 filters
148 self.filter_service.get_layer1_filters()
149 }
150
151 /// Get the total count of active filters
152 ///
153 /// Counts 1 filter per announcement (Layer 2) + 1 filter per event (Layer 3),
154 /// plus the base Layer 1 filter count.
155 pub fn get_filter_count(&self) -> usize {
156 if self.is_consolidated {
157 // When consolidated, we only have Layer 1 filters
158 1
159 } else {
160 // Layer 1 (1) + Layer 2 (announcements) + Layer 3 (events)
161 1 + self.subscribed_announcements.len() + self.subscribed_events.len()
162 }
163 }
164
165 /// Check if an announcement has been subscribed to
166 pub fn has_announcement(&self, event_id: &str) -> bool {
167 self.subscribed_announcements.contains(event_id)
168 }
169
170 /// Check if an event has been subscribed to
171 pub fn has_event(&self, event_id: &str) -> bool {
172 self.subscribed_events.contains(event_id)
173 }
174
175 /// Check if subscriptions have been consolidated
176 pub fn is_consolidated(&self) -> bool {
177 self.is_consolidated
178 }
179
180 /// Get the count of subscribed announcements
181 pub fn announcement_count(&self) -> usize {
182 self.subscribed_announcements.len()
183 }
184
185 /// Get the count of subscribed events
186 pub fn event_count(&self) -> usize {
187 self.subscribed_events.len()
188 }
189
190 /// Build Layer 2 filter for a specific announcement event
191 ///
192 /// Creates a filter with an 'a' tag pointing to the announcement's coordinates.
193 fn build_layer2_filter_for_announcement(&self, event: &Event) -> Vec<Filter> {
194 // Extract the d tag (identifier) from the event
195 let identifier = event.tags.iter().find_map(|tag| {
196 let tag_vec = tag.clone().to_vec();
197 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
198 Some(tag_vec[1].clone())
199 } else {
200 None
201 }
202 });
203
204 let identifier = match identifier {
205 Some(id) => id,
206 None => {
207 tracing::warn!(
208 "Announcement {} has no 'd' tag, cannot build Layer 2 filter",
209 event.id.to_hex()
210 );
211 return Vec::new();
212 }
213 };
214
215 // Determine the kind for the coordinate
216 let kind = event.kind.as_u16();
217 if kind != KIND_REPOSITORY_ANNOUNCEMENT && kind != KIND_MAINTAINER_LIST {
218 tracing::warn!(
219 "Event {} is not an announcement (kind {}), cannot build Layer 2 filter",
220 event.id.to_hex(),
221 kind
222 );
223 return Vec::new();
224 }
225
226 // Build the addressable coordinate: kind:pubkey:identifier
227 let coord = format!("{}:{}:{}", kind, event.pubkey.to_hex(), identifier);
228
229 // Create filter with 'a' tag for this coordinate
230 let filter = Filter::new().custom_tag(
231 SingleLetterTag::lowercase(Alphabet::A),
232 coord,
233 );
234
235 vec![filter]
236 }
237
238 /// Check if an event kind is an announcement kind
239 pub fn is_announcement_kind(kind: u16) -> bool {
240 kind == KIND_REPOSITORY_ANNOUNCEMENT || kind == KIND_MAINTAINER_LIST
241 }
242
243 /// Check if an event kind is a PR/Issue/Patch kind that should trigger Layer 3
244 pub fn is_pr_issue_kind(kind: u16) -> bool {
245 matches!(
246 kind,
247 1617 | // Patch proposal (NIP-34)
248 1618 | // PR
249 1619 | // PR Update
250 1621 | // Issue
251 1622 // Reply
252 )
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::SubscriptionManager;
259
260 #[test]
261 fn test_is_announcement_kind() {
262 assert!(SubscriptionManager::is_announcement_kind(30617));
263 assert!(SubscriptionManager::is_announcement_kind(30618));
264 assert!(!SubscriptionManager::is_announcement_kind(1));
265 assert!(!SubscriptionManager::is_announcement_kind(1617));
266 }
267
268 #[test]
269 fn test_is_pr_issue_kind() {
270 assert!(SubscriptionManager::is_pr_issue_kind(1617));
271 assert!(SubscriptionManager::is_pr_issue_kind(1618));
272 assert!(SubscriptionManager::is_pr_issue_kind(1619));
273 assert!(SubscriptionManager::is_pr_issue_kind(1621));
274 assert!(SubscriptionManager::is_pr_issue_kind(1622));
275 assert!(!SubscriptionManager::is_pr_issue_kind(30617));
276 assert!(!SubscriptionManager::is_pr_issue_kind(1));
277 }
278} \ No newline at end of file
diff --git a/tests/proactive_sync_dynamic.rs b/tests/proactive_sync_dynamic.rs
new file mode 100644
index 0000000..8a3cb88
--- /dev/null
+++ b/tests/proactive_sync_dynamic.rs
@@ -0,0 +1,748 @@
1//! GRASP-02 Phase 4: Dynamic Subscription Integration Tests
2//!
3//! Tests verify dynamic subscription management:
4//! - New announcement triggers Layer 2 subscription
5//! - New PR/Issue triggers Layer 3 subscription
6//! - Subscription count tracking per connection
7//! - Consolidation at filter count > 150
8//! - No duplicate subscriptions
9//!
10//! # Running Tests
11//!
12//! ```bash
13//! cargo test --test proactive_sync_dynamic
14//! cargo test --test proactive_sync_dynamic -- --nocapture
15//! ```
16
17use std::collections::HashSet;
18
19use ngit_grasp::sync::SubscriptionManager;
20use nostr_sdk::prelude::*;
21
22/// Kind 30617 - Repository Announcement (NIP-34)
23const KIND_REPOSITORY_ANNOUNCEMENT: u16 = 30617;
24
25/// Kind 30618 - Maintainer List (NIP-34)
26const KIND_MAINTAINER_LIST: u16 = 30618;
27
28/// Maximum filters before consolidation (from spec)
29const CONSOLIDATION_THRESHOLD: usize = 150;
30
31/// Helper to create a test announcement event
32fn create_test_announcement(keys: &Keys, identifier: &str) -> Event {
33 let tags = vec![
34 Tag::identifier(identifier),
35 Tag::custom(
36 TagKind::custom("clone"),
37 vec![format!("http://test.example.com/{}", identifier)],
38 ),
39 Tag::custom(
40 TagKind::custom("relays"),
41 vec!["ws://test.example.com".to_string()],
42 ),
43 ];
44
45 EventBuilder::new(Kind::Custom(KIND_REPOSITORY_ANNOUNCEMENT), "Test repo")
46 .tags(tags)
47 .sign_with_keys(keys)
48 .expect("Failed to sign event")
49}
50
51/// Helper to create a test maintainer list event
52fn create_test_maintainer_list(keys: &Keys, identifier: &str) -> Event {
53 let tags = vec![
54 Tag::identifier(identifier),
55 Tag::custom(
56 TagKind::custom("relays"),
57 vec!["ws://test.example.com".to_string()],
58 ),
59 ];
60
61 EventBuilder::new(Kind::Custom(KIND_MAINTAINER_LIST), "Maintainer list")
62 .tags(tags)
63 .sign_with_keys(keys)
64 .expect("Failed to sign event")
65}
66
67/// Helper to create a test PR event (kind 1617)
68fn create_test_pr_event(keys: &Keys, repo_coord: &str) -> Event {
69 let tags = vec![Tag::custom(
70 TagKind::custom("a"),
71 vec![repo_coord.to_string()],
72 )];
73
74 EventBuilder::new(Kind::Custom(1617), "Test patch proposal")
75 .tags(tags)
76 .sign_with_keys(keys)
77 .expect("Failed to sign event")
78}
79
80/// Helper to create a test PR event (kind 1618)
81fn create_test_pr_1618_event(keys: &Keys, repo_coord: &str) -> Event {
82 let tags = vec![Tag::custom(
83 TagKind::custom("a"),
84 vec![repo_coord.to_string()],
85 )];
86
87 EventBuilder::new(Kind::Custom(1618), "Test PR")
88 .tags(tags)
89 .sign_with_keys(keys)
90 .expect("Failed to sign event")
91}
92
93/// Helper to create a test Issue event (kind 1621)
94fn create_test_issue_event(keys: &Keys, repo_coord: &str) -> Event {
95 let tags = vec![Tag::custom(
96 TagKind::custom("a"),
97 vec![repo_coord.to_string()],
98 )];
99
100 EventBuilder::new(Kind::Custom(1621), "Test issue")
101 .tags(tags)
102 .sign_with_keys(keys)
103 .expect("Failed to sign event")
104}
105
106/// Helper to create a test Reply event (kind 1622)
107fn create_test_reply_event(keys: &Keys, event_id: &str) -> Event {
108 let tags = vec![Tag::custom(
109 TagKind::custom("e"),
110 vec![event_id.to_string()],
111 )];
112
113 EventBuilder::new(Kind::Custom(1622), "Test reply")
114 .tags(tags)
115 .sign_with_keys(keys)
116 .expect("Failed to sign event")
117}
118
119// ============================================================================
120// Kind Detection Tests
121// ============================================================================
122
123/// Test that announcement kinds are correctly identified
124#[test]
125fn test_is_announcement_kind_30617() {
126 assert!(SubscriptionManager::is_announcement_kind(30617));
127}
128
129/// Test that maintainer list kind is correctly identified
130#[test]
131fn test_is_announcement_kind_30618() {
132 assert!(SubscriptionManager::is_announcement_kind(30618));
133}
134
135/// Test that non-announcement kinds are not identified as announcements
136#[test]
137fn test_is_announcement_kind_negative() {
138 assert!(!SubscriptionManager::is_announcement_kind(1)); // Text note
139 assert!(!SubscriptionManager::is_announcement_kind(1617)); // PR
140 assert!(!SubscriptionManager::is_announcement_kind(1621)); // Issue
141 assert!(!SubscriptionManager::is_announcement_kind(0)); // Unknown
142}
143
144/// Test that PR/Issue kinds are correctly identified
145#[test]
146fn test_is_pr_issue_kind_1617() {
147 assert!(SubscriptionManager::is_pr_issue_kind(1617)); // Patch proposal
148}
149
150/// Test that PR kind 1618 is correctly identified
151#[test]
152fn test_is_pr_issue_kind_1618() {
153 assert!(SubscriptionManager::is_pr_issue_kind(1618)); // PR
154}
155
156/// Test that PR update kind is correctly identified
157#[test]
158fn test_is_pr_issue_kind_1619() {
159 assert!(SubscriptionManager::is_pr_issue_kind(1619)); // PR Update
160}
161
162/// Test that Issue kind is correctly identified
163#[test]
164fn test_is_pr_issue_kind_1621() {
165 assert!(SubscriptionManager::is_pr_issue_kind(1621)); // Issue
166}
167
168/// Test that Reply kind is correctly identified
169#[test]
170fn test_is_pr_issue_kind_1622() {
171 assert!(SubscriptionManager::is_pr_issue_kind(1622)); // Reply
172}
173
174/// Test that non-PR/Issue kinds are not identified
175#[test]
176fn test_is_pr_issue_kind_negative() {
177 assert!(!SubscriptionManager::is_pr_issue_kind(30617)); // Announcement
178 assert!(!SubscriptionManager::is_pr_issue_kind(1)); // Text note
179 assert!(!SubscriptionManager::is_pr_issue_kind(0)); // Unknown
180}
181
182// ============================================================================
183// Filter Count Tests
184// ============================================================================
185
186/// Test initial filter count is 1 (Layer 1 only)
187#[test]
188fn test_initial_filter_count() {
189 // Create a minimal SubscriptionManager-like state for testing
190 // We test the logic without needing a full FilterService
191
192 // Initial state: 0 announcements, 0 events, not consolidated
193 // Filter count should be: 1 (Layer 1) + 0 + 0 = 1
194 let announcement_count = 0;
195 let event_count = 0;
196 let is_consolidated = false;
197
198 let filter_count = if is_consolidated {
199 1
200 } else {
201 1 + announcement_count + event_count
202 };
203
204 assert_eq!(filter_count, 1);
205}
206
207/// Test filter count increases with announcements
208#[test]
209fn test_filter_count_with_announcements() {
210 let announcement_count = 5;
211 let event_count = 0;
212 let is_consolidated = false;
213
214 let filter_count = if is_consolidated {
215 1
216 } else {
217 1 + announcement_count + event_count
218 };
219
220 // 1 (Layer 1) + 5 (announcements) = 6
221 assert_eq!(filter_count, 6);
222}
223
224/// Test filter count increases with events
225#[test]
226fn test_filter_count_with_events() {
227 let announcement_count = 0;
228 let event_count = 10;
229 let is_consolidated = false;
230
231 let filter_count = if is_consolidated {
232 1
233 } else {
234 1 + announcement_count + event_count
235 };
236
237 // 1 (Layer 1) + 10 (events) = 11
238 assert_eq!(filter_count, 11);
239}
240
241/// Test filter count with both announcements and events
242#[test]
243fn test_filter_count_mixed() {
244 let announcement_count = 50;
245 let event_count = 30;
246 let is_consolidated = false;
247
248 let filter_count = if is_consolidated {
249 1
250 } else {
251 1 + announcement_count + event_count
252 };
253
254 // 1 + 50 + 30 = 81
255 assert_eq!(filter_count, 81);
256}
257
258/// Test filter count is 1 when consolidated
259#[test]
260fn test_filter_count_consolidated() {
261 let announcement_count = 100; // These would be cleared on consolidation
262 let event_count = 100;
263 let is_consolidated = true;
264
265 let filter_count = if is_consolidated {
266 1
267 } else {
268 1 + announcement_count + event_count
269 };
270
271 assert_eq!(filter_count, 1);
272}
273
274// ============================================================================
275// Consolidation Threshold Tests
276// ============================================================================
277
278/// Test consolidation is not triggered below threshold
279#[test]
280fn test_should_consolidate_below_threshold() {
281 let filter_count = 100;
282 let is_consolidated = false;
283
284 let should_consolidate = !is_consolidated && filter_count > CONSOLIDATION_THRESHOLD;
285
286 assert!(!should_consolidate);
287}
288
289/// Test consolidation is triggered at threshold
290#[test]
291fn test_should_consolidate_at_threshold() {
292 let filter_count = 151; // > 150
293 let is_consolidated = false;
294
295 let should_consolidate = !is_consolidated && filter_count > CONSOLIDATION_THRESHOLD;
296
297 assert!(should_consolidate);
298}
299
300/// Test consolidation is triggered well above threshold
301#[test]
302fn test_should_consolidate_above_threshold() {
303 let filter_count = 200;
304 let is_consolidated = false;
305
306 let should_consolidate = !is_consolidated && filter_count > CONSOLIDATION_THRESHOLD;
307
308 assert!(should_consolidate);
309}
310
311/// Test consolidation is not triggered if already consolidated
312#[test]
313fn test_should_consolidate_already_consolidated() {
314 let filter_count = 200; // Would trigger, but already consolidated
315 let is_consolidated = true;
316
317 let should_consolidate = !is_consolidated && filter_count > CONSOLIDATION_THRESHOLD;
318
319 assert!(!should_consolidate);
320}
321
322/// Test exact threshold boundary (150 should NOT trigger, 151 should)
323#[test]
324fn test_consolidation_threshold_boundary() {
325 let is_consolidated = false;
326
327 // 150 should NOT trigger (> 150, not >= 150)
328 let should_consolidate_at_150 = !is_consolidated && 150 > CONSOLIDATION_THRESHOLD;
329 assert!(!should_consolidate_at_150);
330
331 // 151 should trigger
332 let should_consolidate_at_151 = !is_consolidated && 151 > CONSOLIDATION_THRESHOLD;
333 assert!(should_consolidate_at_151);
334}
335
336// ============================================================================
337// Duplicate Prevention Tests
338// ============================================================================
339
340/// Test duplicate announcement detection
341#[test]
342fn test_duplicate_announcement_prevention() {
343 let mut subscribed_announcements: HashSet<String> = HashSet::new();
344
345 let event_id = "abc123".to_string();
346
347 // First add should succeed
348 let is_new = !subscribed_announcements.contains(&event_id);
349 assert!(is_new);
350 subscribed_announcements.insert(event_id.clone());
351
352 // Second add should fail (duplicate)
353 let is_new_again = !subscribed_announcements.contains(&event_id);
354 assert!(!is_new_again);
355}
356
357/// Test duplicate event detection
358#[test]
359fn test_duplicate_event_prevention() {
360 let mut subscribed_events: HashSet<String> = HashSet::new();
361
362 let event_id = "def456".to_string();
363
364 // First add should succeed
365 let is_new = !subscribed_events.contains(&event_id);
366 assert!(is_new);
367 subscribed_events.insert(event_id.clone());
368
369 // Second add should fail (duplicate)
370 let is_new_again = !subscribed_events.contains(&event_id);
371 assert!(!is_new_again);
372}
373
374/// Test multiple unique items are tracked correctly
375#[test]
376fn test_multiple_unique_items_tracked() {
377 let mut subscribed_announcements: HashSet<String> = HashSet::new();
378
379 // Add multiple unique announcements
380 for i in 0..10 {
381 let id = format!("announcement_{}", i);
382 assert!(!subscribed_announcements.contains(&id));
383 subscribed_announcements.insert(id);
384 }
385
386 assert_eq!(subscribed_announcements.len(), 10);
387}
388
389// ============================================================================
390// Event Creation and Validation Tests
391// ============================================================================
392
393/// Test announcement event has required d tag
394#[test]
395fn test_announcement_has_d_tag() {
396 let keys = Keys::generate();
397 let event = create_test_announcement(&keys, "my-repo");
398
399 let has_d_tag = event.tags.iter().any(|tag| {
400 let tag_vec = tag.clone().to_vec();
401 tag_vec.len() >= 2 && tag_vec[0] == "d"
402 });
403
404 assert!(has_d_tag);
405}
406
407/// Test announcement event has correct kind
408#[test]
409fn test_announcement_correct_kind() {
410 let keys = Keys::generate();
411 let event = create_test_announcement(&keys, "my-repo");
412
413 assert_eq!(event.kind.as_u16(), KIND_REPOSITORY_ANNOUNCEMENT);
414}
415
416/// Test maintainer list event has correct kind
417#[test]
418fn test_maintainer_list_correct_kind() {
419 let keys = Keys::generate();
420 let event = create_test_maintainer_list(&keys, "maintainers");
421
422 assert_eq!(event.kind.as_u16(), KIND_MAINTAINER_LIST);
423}
424
425/// Test PR event has a tag
426#[test]
427fn test_pr_event_has_a_tag() {
428 let keys = Keys::generate();
429 let coord = "30617:pubkey123:my-repo";
430 let event = create_test_pr_event(&keys, coord);
431
432 let has_a_tag = event.tags.iter().any(|tag| {
433 let tag_vec = tag.clone().to_vec();
434 tag_vec.len() >= 2 && tag_vec[0] == "a"
435 });
436
437 assert!(has_a_tag);
438}
439
440/// Test issue event has a tag
441#[test]
442fn test_issue_event_has_a_tag() {
443 let keys = Keys::generate();
444 let coord = "30617:pubkey123:my-repo";
445 let event = create_test_issue_event(&keys, coord);
446
447 let has_a_tag = event.tags.iter().any(|tag| {
448 let tag_vec = tag.clone().to_vec();
449 tag_vec.len() >= 2 && tag_vec[0] == "a"
450 });
451
452 assert!(has_a_tag);
453}
454
455/// Test reply event has e tag
456#[test]
457fn test_reply_event_has_e_tag() {
458 let keys = Keys::generate();
459 let event_id = "abc123def456";
460 let event = create_test_reply_event(&keys, event_id);
461
462 let has_e_tag = event.tags.iter().any(|tag| {
463 let tag_vec = tag.clone().to_vec();
464 tag_vec.len() >= 2 && tag_vec[0] == "e"
465 });
466
467 assert!(has_e_tag);
468}
469
470// ============================================================================
471// Subscription Lifecycle Tests
472// ============================================================================
473
474/// Test subscription lifecycle: initial -> add announcements -> add events -> consolidate
475#[test]
476fn test_subscription_lifecycle() {
477 let mut subscribed_announcements: HashSet<String> = HashSet::new();
478 let mut subscribed_events: HashSet<String> = HashSet::new();
479 let mut is_consolidated = false;
480
481 // Initial state
482 let initial_count = 1 + subscribed_announcements.len() + subscribed_events.len();
483 assert_eq!(initial_count, 1);
484
485 // Add some announcements
486 for i in 0..50 {
487 subscribed_announcements.insert(format!("ann_{}", i));
488 }
489
490 let after_announcements = 1 + subscribed_announcements.len() + subscribed_events.len();
491 assert_eq!(after_announcements, 51);
492
493 // Add some events
494 for i in 0..50 {
495 subscribed_events.insert(format!("evt_{}", i));
496 }
497
498 let after_events = 1 + subscribed_announcements.len() + subscribed_events.len();
499 assert_eq!(after_events, 101);
500
501 // Add more to exceed threshold
502 for i in 50..100 {
503 subscribed_announcements.insert(format!("ann_{}", i));
504 }
505
506 let before_consolidation = 1 + subscribed_announcements.len() + subscribed_events.len();
507 assert_eq!(before_consolidation, 151);
508
509 // Should trigger consolidation
510 let should_consolidate = !is_consolidated && before_consolidation > CONSOLIDATION_THRESHOLD;
511 assert!(should_consolidate);
512
513 // Consolidate
514 subscribed_announcements.clear();
515 subscribed_events.clear();
516 is_consolidated = true;
517
518 // After consolidation
519 let after_consolidation = if is_consolidated { 1 } else { 1 + subscribed_announcements.len() + subscribed_events.len() };
520 assert_eq!(after_consolidation, 1);
521
522 // Should not trigger consolidation again
523 let should_consolidate_again = !is_consolidated && after_consolidation > CONSOLIDATION_THRESHOLD;
524 assert!(!should_consolidate_again);
525}
526
527/// Test that consolidated state blocks new additions
528#[test]
529fn test_consolidated_blocks_additions() {
530 let is_consolidated = true;
531
532 // When consolidated, add_announcement should return None (simulated)
533 // The logic is: if is_consolidated, return None
534 let should_add = !is_consolidated;
535
536 assert!(!should_add);
537}
538
539/// Test that non-consolidated state allows additions
540#[test]
541fn test_non_consolidated_allows_additions() {
542 let is_consolidated = false;
543 let mut subscribed_announcements: HashSet<String> = HashSet::new();
544 let event_id = "new_announcement";
545
546 // When not consolidated and event not in set, should add
547 let should_add = !is_consolidated && !subscribed_announcements.contains(event_id);
548
549 assert!(should_add);
550
551 subscribed_announcements.insert(event_id.to_string());
552 assert!(subscribed_announcements.contains(event_id));
553}
554
555// ============================================================================
556// Filter Building Tests (coordinate format)
557// ============================================================================
558
559/// Test announcement coordinate format
560#[test]
561fn test_announcement_coordinate_format() {
562 let keys = Keys::generate();
563 let identifier = "my-repo";
564 let event = create_test_announcement(&keys, identifier);
565
566 // Extract d tag
567 let d_tag = event.tags.iter().find_map(|tag| {
568 let tag_vec = tag.clone().to_vec();
569 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
570 Some(tag_vec[1].clone())
571 } else {
572 None
573 }
574 });
575
576 assert!(d_tag.is_some());
577 assert_eq!(d_tag.unwrap(), identifier);
578
579 // Build coordinate: kind:pubkey:identifier
580 let coord = format!("{}:{}:{}", KIND_REPOSITORY_ANNOUNCEMENT, event.pubkey.to_hex(), identifier);
581
582 // Verify format
583 let parts: Vec<&str> = coord.split(':').collect();
584 assert_eq!(parts.len(), 3);
585 assert_eq!(parts[0], "30617");
586 assert_eq!(parts[2], identifier);
587}
588
589/// Test multiple announcement coordinates are unique
590#[test]
591fn test_multiple_announcement_coordinates_unique() {
592 let keys = Keys::generate();
593
594 let identifiers = vec!["repo1", "repo2", "repo3"];
595 let mut coords: HashSet<String> = HashSet::new();
596
597 for id in identifiers {
598 let event = create_test_announcement(&keys, id);
599 let coord = format!("{}:{}:{}", KIND_REPOSITORY_ANNOUNCEMENT, event.pubkey.to_hex(), id);
600 coords.insert(coord);
601 }
602
603 assert_eq!(coords.len(), 3);
604}
605
606// ============================================================================
607// Integration-style Tests
608// ============================================================================
609
610/// Test simulated workflow: announcement received, then PR received
611#[test]
612fn test_workflow_announcement_then_pr() {
613 let keys = Keys::generate();
614 let mut subscribed_announcements: HashSet<String> = HashSet::new();
615 let mut subscribed_events: HashSet<String> = HashSet::new();
616 let is_consolidated = false;
617
618 // Step 1: Receive announcement
619 let announcement = create_test_announcement(&keys, "my-repo");
620 let ann_id = announcement.id.to_hex();
621
622 // Should add to tracking (simulating add_announcement)
623 let should_add_ann = !is_consolidated && !subscribed_announcements.contains(&ann_id);
624 assert!(should_add_ann);
625 subscribed_announcements.insert(ann_id.clone());
626
627 // Filter count should increase
628 let filter_count = 1 + subscribed_announcements.len() + subscribed_events.len();
629 assert_eq!(filter_count, 2);
630
631 // Step 2: Receive PR for that repo
632 let coord = format!("{}:{}:my-repo", KIND_REPOSITORY_ANNOUNCEMENT, keys.public_key().to_hex());
633 let pr = create_test_pr_event(&keys, &coord);
634 let pr_id = pr.id.to_hex();
635
636 // Should add to tracking (simulating add_event)
637 let should_add_pr = !is_consolidated && !subscribed_events.contains(&pr_id);
638 assert!(should_add_pr);
639 subscribed_events.insert(pr_id.clone());
640
641 // Filter count should increase again
642 let filter_count = 1 + subscribed_announcements.len() + subscribed_events.len();
643 assert_eq!(filter_count, 3);
644}
645
646/// Test stress: adding many items triggers consolidation
647#[test]
648fn test_stress_many_items_triggers_consolidation() {
649 let keys = Keys::generate();
650 let mut subscribed_announcements: HashSet<String> = HashSet::new();
651 let mut subscribed_events: HashSet<String> = HashSet::new();
652 let mut is_consolidated = false;
653 let mut consolidation_triggered = false;
654
655 // Add 100 announcements
656 for i in 0..100 {
657 let event = create_test_announcement(&keys, &format!("repo-{}", i));
658 let event_id = event.id.to_hex();
659
660 if !is_consolidated && !subscribed_announcements.contains(&event_id) {
661 subscribed_announcements.insert(event_id);
662 }
663
664 // Check consolidation after each add
665 let filter_count = 1 + subscribed_announcements.len() + subscribed_events.len();
666 if !is_consolidated && filter_count > CONSOLIDATION_THRESHOLD {
667 consolidation_triggered = true;
668 subscribed_announcements.clear();
669 subscribed_events.clear();
670 is_consolidated = true;
671 break;
672 }
673 }
674
675 // If we didn't consolidate yet, add events
676 if !consolidation_triggered {
677 for i in 0..100 {
678 let coord = format!("30617:pubkey:repo-{}", i);
679 let event = create_test_pr_event(&keys, &coord);
680 let event_id = event.id.to_hex();
681
682 if !is_consolidated && !subscribed_events.contains(&event_id) {
683 subscribed_events.insert(event_id);
684 }
685
686 // Check consolidation after each add
687 let filter_count = 1 + subscribed_announcements.len() + subscribed_events.len();
688 if !is_consolidated && filter_count > CONSOLIDATION_THRESHOLD {
689 consolidation_triggered = true;
690 subscribed_announcements.clear();
691 subscribed_events.clear();
692 is_consolidated = true;
693 break;
694 }
695 }
696 }
697
698 // Consolidation should have been triggered
699 assert!(consolidation_triggered);
700 assert!(is_consolidated);
701
702 // After consolidation, counts should be reset
703 assert_eq!(subscribed_announcements.len(), 0);
704 assert_eq!(subscribed_events.len(), 0);
705}
706
707/// Test that all PR/Issue kinds are handled consistently
708#[test]
709fn test_all_pr_issue_kinds_handled() {
710 let keys = Keys::generate();
711 let coord = "30617:pubkey:repo";
712
713 // All these kinds should be identified as PR/Issue
714 let pr_kinds = vec![1617, 1618, 1619, 1621, 1622];
715
716 for kind in pr_kinds {
717 assert!(
718 SubscriptionManager::is_pr_issue_kind(kind),
719 "Kind {} should be identified as PR/Issue",
720 kind
721 );
722 }
723}
724
725/// Test that announcement and PR/Issue kinds are mutually exclusive
726#[test]
727fn test_kind_categories_mutually_exclusive() {
728 let announcement_kinds = vec![30617, 30618];
729 let pr_issue_kinds = vec![1617, 1618, 1619, 1621, 1622];
730
731 // No announcement kind should be a PR/Issue kind
732 for kind in &announcement_kinds {
733 assert!(
734 !SubscriptionManager::is_pr_issue_kind(*kind),
735 "Announcement kind {} should not be PR/Issue",
736 kind
737 );
738 }
739
740 // No PR/Issue kind should be an announcement kind
741 for kind in &pr_issue_kinds {
742 assert!(
743 !SubscriptionManager::is_announcement_kind(*kind),
744 "PR/Issue kind {} should not be announcement",
745 kind
746 );
747 }
748} \ No newline at end of file