diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/self_subscriber.rs | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 27e7e64..f83b081 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -167,13 +167,14 @@ impl SelfSubscriber { | |||
| 167 | } | 167 | } |
| 168 | } else { | 168 | } else { |
| 169 | // For root event kinds (1617, 1618, 1619, 1621), | 169 | // For root event kinds (1617, 1618, 1619, 1621), |
| 170 | // process them to update the RepoSyncIndex | 170 | // process them to update the RepoSyncIndex AND add to pending |
| 171 | // for Layer 3 filter creation | ||
| 171 | tracing::trace!( | 172 | tracing::trace!( |
| 172 | kind = %event.kind, | 173 | kind = %event.kind, |
| 173 | event_id = %event.id, | 174 | event_id = %event.id, |
| 174 | "Received root event" | 175 | "Received root event" |
| 175 | ); | 176 | ); |
| 176 | self.handle_root_event(&event).await; | 177 | self.handle_root_event(&event, pending).await; |
| 177 | } | 178 | } |
| 178 | LoopControl::Continue | 179 | LoopControl::Continue |
| 179 | } | 180 | } |
| @@ -375,8 +376,9 @@ impl SelfSubscriber { | |||
| 375 | /// Handle a root event (1617/1618/1619/1621) | 376 | /// Handle a root event (1617/1618/1619/1621) |
| 376 | /// | 377 | /// |
| 377 | /// Extracts the 'a' tag to find the repo addressable reference, | 378 | /// Extracts the 'a' tag to find the repo addressable reference, |
| 378 | /// then updates the RepoSyncIndex with the event ID. | 379 | /// then updates the RepoSyncIndex with the event ID AND adds to pending |
| 379 | async fn handle_root_event(&self, event: &Event) { | 380 | /// so that Layer 3 filters will be created in the next batch. |
| 381 | async fn handle_root_event(&self, event: &Event, pending: &mut PendingUpdates) { | ||
| 380 | // Extract 'a' tag to find the repo addressable reference | 382 | // Extract 'a' tag to find the repo addressable reference |
| 381 | let repo_a_tag = event.tags.iter().find(|tag| { | 383 | let repo_a_tag = event.tags.iter().find(|tag| { |
| 382 | let tag_vec = tag.as_slice(); | 384 | let tag_vec = tag.as_slice(); |
| @@ -406,15 +408,31 @@ impl SelfSubscriber { | |||
| 406 | } | 408 | } |
| 407 | }; | 409 | }; |
| 408 | 410 | ||
| 409 | // Look up repo in repo_sync_index | 411 | // Look up repo in repo_sync_index - add root event directly and also to pending |
| 410 | let mut index = self.repo_sync_index.write().await; | 412 | let mut index = self.repo_sync_index.write().await; |
| 411 | if let Some(repo_sync) = index.get_mut(&repo_ref) { | 413 | if let Some(repo_sync) = index.get_mut(&repo_ref) { |
| 412 | // Add event.id to root_events set | 414 | // Add event.id to root_events set in the index (immediate availability) |
| 413 | repo_sync.root_events.insert(event.id); | 415 | repo_sync.root_events.insert(event.id); |
| 416 | |||
| 417 | // Clone the relays before releasing the lock - Layer 3 filters need to be | ||
| 418 | // sent to the same relays as Layer 2 filters for this repo | ||
| 419 | let relays = repo_sync.relays.clone(); | ||
| 420 | |||
| 421 | // Release lock before modifying pending | ||
| 422 | drop(index); | ||
| 423 | |||
| 424 | // Also add root event to pending - this ensures batch processing runs | ||
| 425 | // and creates Layer 3 filters for events referencing this root event. | ||
| 426 | // CRITICAL: Include relays so derive_relay_targets knows where to send filters! | ||
| 427 | let mut root_events = HashSet::new(); | ||
| 428 | root_events.insert(event.id); | ||
| 429 | pending.add_repo(repo_ref.clone(), relays.clone(), root_events); | ||
| 430 | |||
| 414 | tracing::debug!( | 431 | tracing::debug!( |
| 415 | event_id = %event.id, | 432 | event_id = %event.id, |
| 416 | repo_ref = %repo_ref, | 433 | repo_ref = %repo_ref, |
| 417 | "Added root event to repo" | 434 | relay_count = relays.len(), |
| 435 | "Added root event to index and pending for Layer 3 filter creation" | ||
| 418 | ); | 436 | ); |
| 419 | } else { | 437 | } else { |
| 420 | tracing::debug!( | 438 | tracing::debug!( |