From a19ff57e72d9b82a722e14ae365da7f8c2d87e87 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 4 Dec 2025 18:15:19 +0000 Subject: feat(sync): Phase 4 - dynamic subscriptions - Add SubscriptionManager for per-connection tracking - Trigger subscription updates on new repo/PR events - Implement consolidation when filter count > 150 --- src/sync/manager.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) (limited to 'src/sync/manager.rs') diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 1f70f42..f594454 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs @@ -15,6 +15,14 @@ //! - Health tracking with exponential backoff //! - Dead relay detection after 24h of failures //! - Startup jitter to prevent thundering herd +//! +//! ## Phase 4 Features +//! +//! - Dynamic subscription updates handled per-connection +//! - Each connection manages its own SubscriptionManager +//! - Announcements trigger Layer 2 subscriptions +//! - PRs/Issues trigger Layer 3 subscriptions +//! - Consolidation when filter count exceeds 150 use std::collections::HashSet; use std::sync::Arc; @@ -225,17 +233,38 @@ impl SyncManager { } /// Process a single synced event + /// + /// Events are validated through the write policy and stored if accepted. + /// Dynamic subscription updates are handled by each connection's SubscriptionManager. async fn process_event(&self, synced_event: SyncedEvent) { let event = &synced_event.event; let event_id = event.id.to_hex(); + let kind = event.kind.as_u16(); tracing::debug!( "Processing synced event {} (kind {}) from {}", event_id, - event.kind.as_u16(), + kind, synced_event.source_url ); + // Log subscription-relevant events for debugging + match kind { + 30617 | 30618 => { + tracing::debug!( + "Received announcement {} - connection will add Layer 2 subscription", + event_id + ); + } + 1617 | 1618 | 1619 | 1621 | 1622 => { + tracing::debug!( + "Received PR/Issue {} - connection will add Layer 3 subscription", + event_id + ); + } + _ => {} + } + // Validate through write policy using SYNC_SOURCE_ADDR let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; -- cgit v1.2.3