upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/manager.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-04 18:15:19 +0000
commita19ff57e72d9b82a722e14ae365da7f8c2d87e87 (patch)
tree4322e68d5eead5d11393627ff5da8d7559803f22 /src/sync/manager.rs
parentf639ecfac6687c9e8de4e3f305e168b2e4e1bb87 (diff)
feat(sync): Phase 4 - dynamic subscriptions
- Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150
Diffstat (limited to 'src/sync/manager.rs')
-rw-r--r--src/sync/manager.rs31
1 files changed, 30 insertions, 1 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 1f70f42..f594454 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -15,6 +15,14 @@
15//! - Health tracking with exponential backoff 15//! - Health tracking with exponential backoff
16//! - Dead relay detection after 24h of failures 16//! - Dead relay detection after 24h of failures
17//! - Startup jitter to prevent thundering herd 17//! - Startup jitter to prevent thundering herd
18//!
19//! ## Phase 4 Features
20//!
21//! - Dynamic subscription updates handled per-connection
22//! - Each connection manages its own SubscriptionManager
23//! - Announcements trigger Layer 2 subscriptions
24//! - PRs/Issues trigger Layer 3 subscriptions
25//! - Consolidation when filter count exceeds 150
18 26
19use std::collections::HashSet; 27use std::collections::HashSet;
20use std::sync::Arc; 28use std::sync::Arc;
@@ -225,17 +233,38 @@ impl SyncManager {
225 } 233 }
226 234
227 /// Process a single synced event 235 /// Process a single synced event
236 ///
237 /// Events are validated through the write policy and stored if accepted.
238 /// Dynamic subscription updates are handled by each connection's SubscriptionManager.
228 async fn process_event(&self, synced_event: SyncedEvent) { 239 async fn process_event(&self, synced_event: SyncedEvent) {
229 let event = &synced_event.event; 240 let event = &synced_event.event;
230 let event_id = event.id.to_hex(); 241 let event_id = event.id.to_hex();
242 let kind = event.kind.as_u16();
231 243
232 tracing::debug!( 244 tracing::debug!(
233 "Processing synced event {} (kind {}) from {}", 245 "Processing synced event {} (kind {}) from {}",
234 event_id, 246 event_id,
235 event.kind.as_u16(), 247 kind,
236 synced_event.source_url 248 synced_event.source_url
237 ); 249 );
238 250
251 // Log subscription-relevant events for debugging
252 match kind {
253 30617 | 30618 => {
254 tracing::debug!(
255 "Received announcement {} - connection will add Layer 2 subscription",
256 event_id
257 );
258 }
259 1617 | 1618 | 1619 | 1621 | 1622 => {
260 tracing::debug!(
261 "Received PR/Issue {} - connection will add Layer 3 subscription",
262 event_id
263 );
264 }
265 _ => {}
266 }
267
239 // Validate through write policy using SYNC_SOURCE_ADDR 268 // Validate through write policy using SYNC_SOURCE_ADDR
240 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; 269 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
241 270