From 46b306dcfa4850a688367c04e9e06e8d9c2883dc Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 21:55:31 +0000 Subject: fix: enable Layer 3 sync by adding root events to pending queue When root events (issues/patches) are received via self-subscription, handle_root_event() was only updating the repo_sync_index directly. This caused process_batch() to early-return when pending.is_empty(), so Layer 3 filters for comments/replies were never created. The fix adds root events to both: 1. repo_sync_index (for immediate availability) 2. pending queue (to trigger Layer 3 filter creation in next batch) Critical: The pending entry must include relays from repo_sync_index so derive_relay_targets() knows where to send Layer 3 subscriptions. The Layer 3 test now verifies that events sent BEFORE the subscription is established are still synced - proving subscriptions without 'since' correctly fetch historical events. Enabled 4 previously ignored Layer 3 tests: - test_live_sync_layer3_events - test_layer3_sync_with_lowercase_e_tag - test_layer3_sync_with_uppercase_e_tag - test_layer3_sync_with_q_tag --- src/sync/self_subscriber.rs | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) (limited to 'src/sync') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 27e7e64..f83b081 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -167,13 +167,14 @@ impl SelfSubscriber { } } else { // For root event kinds (1617, 1618, 1619, 1621), - // process them to update the RepoSyncIndex + // process them to update the RepoSyncIndex AND add to pending + // for Layer 3 filter creation tracing::trace!( kind = %event.kind, event_id = %event.id, "Received root event" ); - self.handle_root_event(&event).await; + self.handle_root_event(&event, pending).await; } LoopControl::Continue } @@ -375,8 +376,9 @@ impl SelfSubscriber { /// Handle a root event (1617/1618/1619/1621) /// /// Extracts the 'a' tag to find the repo addressable reference, - /// then updates the RepoSyncIndex with the event ID. - async fn handle_root_event(&self, event: &Event) { + /// then updates the RepoSyncIndex with the event ID AND adds to pending + /// so that Layer 3 filters will be created in the next batch. + async fn handle_root_event(&self, event: &Event, pending: &mut PendingUpdates) { // Extract 'a' tag to find the repo addressable reference let repo_a_tag = event.tags.iter().find(|tag| { let tag_vec = tag.as_slice(); @@ -406,15 +408,31 @@ impl SelfSubscriber { } }; - // Look up repo in repo_sync_index + // Look up repo in repo_sync_index - add root event directly and also to pending let mut index = self.repo_sync_index.write().await; if let Some(repo_sync) = index.get_mut(&repo_ref) { - // Add event.id to root_events set + // Add event.id to root_events set in the index (immediate availability) repo_sync.root_events.insert(event.id); + + // Clone the relays before releasing the lock - Layer 3 filters need to be + // sent to the same relays as Layer 2 filters for this repo + let relays = repo_sync.relays.clone(); + + // Release lock before modifying pending + drop(index); + + // Also add root event to pending - this ensures batch processing runs + // and creates Layer 3 filters for events referencing this root event. + // CRITICAL: Include relays so derive_relay_targets knows where to send filters! + let mut root_events = HashSet::new(); + root_events.insert(event.id); + pending.add_repo(repo_ref.clone(), relays.clone(), root_events); + tracing::debug!( event_id = %event.id, repo_ref = %repo_ref, - "Added root event to repo" + relay_count = relays.len(), + "Added root event to index and pending for Layer 3 filter creation" ); } else { tracing::debug!( -- cgit v1.2.3