diff options
Diffstat (limited to 'src/sync/manager.rs')
| -rw-r--r-- | src/sync/manager.rs | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/src/sync/manager.rs b/src/sync/manager.rs index 1f70f42..f594454 100644 --- a/src/sync/manager.rs +++ b/src/sync/manager.rs | |||
| @@ -15,6 +15,14 @@ | |||
| 15 | //! - Health tracking with exponential backoff | 15 | //! - Health tracking with exponential backoff |
| 16 | //! - Dead relay detection after 24h of failures | 16 | //! - Dead relay detection after 24h of failures |
| 17 | //! - Startup jitter to prevent thundering herd | 17 | //! - Startup jitter to prevent thundering herd |
| 18 | //! | ||
| 19 | //! ## Phase 4 Features | ||
| 20 | //! | ||
| 21 | //! - Dynamic subscription updates handled per-connection | ||
| 22 | //! - Each connection manages its own SubscriptionManager | ||
| 23 | //! - Announcements trigger Layer 2 subscriptions | ||
| 24 | //! - PRs/Issues trigger Layer 3 subscriptions | ||
| 25 | //! - Consolidation when filter count exceeds 150 | ||
| 18 | 26 | ||
| 19 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| 20 | use std::sync::Arc; | 28 | use std::sync::Arc; |
| @@ -225,17 +233,38 @@ impl SyncManager { | |||
| 225 | } | 233 | } |
| 226 | 234 | ||
| 227 | /// Process a single synced event | 235 | /// Process a single synced event |
| 236 | /// | ||
| 237 | /// Events are validated through the write policy and stored if accepted. | ||
| 238 | /// Dynamic subscription updates are handled by each connection's SubscriptionManager. | ||
| 228 | async fn process_event(&self, synced_event: SyncedEvent) { | 239 | async fn process_event(&self, synced_event: SyncedEvent) { |
| 229 | let event = &synced_event.event; | 240 | let event = &synced_event.event; |
| 230 | let event_id = event.id.to_hex(); | 241 | let event_id = event.id.to_hex(); |
| 242 | let kind = event.kind.as_u16(); | ||
| 231 | 243 | ||
| 232 | tracing::debug!( | 244 | tracing::debug!( |
| 233 | "Processing synced event {} (kind {}) from {}", | 245 | "Processing synced event {} (kind {}) from {}", |
| 234 | event_id, | 246 | event_id, |
| 235 | event.kind.as_u16(), | 247 | kind, |
| 236 | synced_event.source_url | 248 | synced_event.source_url |
| 237 | ); | 249 | ); |
| 238 | 250 | ||
| 251 | // Log subscription-relevant events for debugging | ||
| 252 | match kind { | ||
| 253 | 30617 | 30618 => { | ||
| 254 | tracing::debug!( | ||
| 255 | "Received announcement {} - connection will add Layer 2 subscription", | ||
| 256 | event_id | ||
| 257 | ); | ||
| 258 | } | ||
| 259 | 1617 | 1618 | 1619 | 1621 | 1622 => { | ||
| 260 | tracing::debug!( | ||
| 261 | "Received PR/Issue {} - connection will add Layer 3 subscription", | ||
| 262 | event_id | ||
| 263 | ); | ||
| 264 | } | ||
| 265 | _ => {} | ||
| 266 | } | ||
| 267 | |||
| 239 | // Validate through write policy using SYNC_SOURCE_ADDR | 268 | // Validate through write policy using SYNC_SOURCE_ADDR |
| 240 | let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; | 269 | let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await; |
| 241 | 270 | ||