From d450e2bd2bab8a5e13084edcc0eec45194ac9fb6 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 14:02:14 +0000 Subject: refactor: deduplicate SelfSubscriber select branches (SIMPLIFY-2) --- src/sync/self_subscriber.rs | 158 +++++++++++++++++++++----------------------- 1 file changed, 75 insertions(+), 83 deletions(-) (limited to 'src') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 8113035..e9e61ff 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -13,10 +13,23 @@ use std::time::Duration; use nostr_sdk::prelude::*; use nostr_sdk::Timestamp; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; +// ============================================================================= +// LoopControl - Result of notification processing +// ============================================================================= + +/// Control flow result from processing a notification +enum LoopControl { + /// Continue processing the next notification + Continue, + /// Break out of the event loop + Break, +} + // ============================================================================= // PendingUpdates - Accumulator for batching // ============================================================================= @@ -118,6 +131,64 @@ impl SelfSubscriber { .unwrap_or(Duration::from_millis(5000)) } + /// Process a relay pool notification + /// + /// Handles incoming events from the subscription, queueing 30617 announcements + /// for batch processing and immediately processing root events. + /// + /// Returns `LoopControl::Break` if the loop should exit, `LoopControl::Continue` otherwise. + async fn process_notification( + &self, + notification: Result, + pending: &mut PendingUpdates, + ) -> LoopControl { + match notification { + Ok(RelayPoolNotification::Event { event, .. }) => { + // Only process 30617 events that list our relay + if event.kind == Kind::Custom(30617) { + if !self.lists_our_relay(&event) { + return LoopControl::Continue; + } + + // Extract repo ID and relays + if let Some(repo_id) = Self::extract_repo_id(&event) { + let relays = Self::extract_relay_urls(&event); + // 30617 announcements don't contribute to root_events - those are + // the 1617/1618/1619/1621 event IDs that get added when we receive + // root events via handle_root_event. See mod.rs:71 for details. + pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); + tracing::info!( + event_id = %event.id, + repo_id = %repo_id, + relay_count = relays.len(), + relays = ?relays, + "[DIAG] Queued 30617 announcement for batch processing" + ); + } + } else { + // For root event kinds (1617, 1618, 1619, 1621), + // process them to update the RepoSyncIndex + tracing::trace!( + kind = %event.kind, + event_id = %event.id, + "Received root event" + ); + self.handle_root_event(&event).await; + } + LoopControl::Continue + } + Ok(RelayPoolNotification::Shutdown) => { + tracing::info!("SelfSubscriber received shutdown notification"); + LoopControl::Break + } + Err(e) => { + tracing::error!(error = %e, "Error receiving notification"); + LoopControl::Break + } + _ => LoopControl::Continue, + } + } + /// Extract relay URLs from event tags /// /// Extracts URLs from: @@ -267,49 +338,8 @@ impl SelfSubscriber { if let Some(ref mut rx) = shutdown_rx { tokio::select! { notification = notifications.recv() => { - match notification { - Ok(RelayPoolNotification::Event { event, .. }) => { - // Only process 30617 events that list our relay - if event.kind == Kind::Custom(30617) { - if !self.lists_our_relay(&event) { - continue; - } - - // Extract repo ID and relays - if let Some(repo_id) = Self::extract_repo_id(&event) { - let relays = Self::extract_relay_urls(&event); - // 30617 announcements don't contribute to root_events - those are - // the 1617/1618/1619/1621 event IDs that get added when we receive - // root events via handle_root_event. See mod.rs:71 for details. - pending.add_repo(repo_id.clone(), relays.clone(), HashSet::new()); - tracing::info!( - event_id = %event.id, - repo_id = %repo_id, - relay_count = relays.len(), - relays = ?relays, - "[DIAG] Queued 30617 announcement for batch processing" - ); - } - } else { - // For root event kinds (1617, 1618, 1619, 1621), - // process them to update the RepoSyncIndex - tracing::trace!( - kind = %event.kind, - event_id = %event.id, - "Received root event" - ); - self.handle_root_event(&event).await; - } - } - Ok(RelayPoolNotification::Shutdown) => { - tracing::info!("SelfSubscriber received shutdown notification"); - break; - } - Err(e) => { - tracing::error!(error = %e, "Error receiving notification"); - break; - } - _ => {} + if let LoopControl::Break = self.process_notification(notification, &mut pending).await { + break; } } _ = timer.tick() => { @@ -326,46 +356,8 @@ impl SelfSubscriber { // No shutdown receiver - original behavior tokio::select! { notification = notifications.recv() => { - match notification { - Ok(RelayPoolNotification::Event { event, .. }) => { - // Only process 30617 events that list our relay - if event.kind == Kind::Custom(30617) { - if !self.lists_our_relay(&event) { - continue; - } - - // Extract repo ID and relays - if let Some(repo_id) = Self::extract_repo_id(&event) { - let relays = Self::extract_relay_urls(&event); - let mut root_events = HashSet::new(); - root_events.insert(event.id); - - pending.add_repo(repo_id, relays, root_events); - tracing::debug!( - event_id = %event.id, - "Queued 30617 announcement for batch processing" - ); - } - } else { - // For root event kinds (1617, 1618, 1619, 1621), - // process them to update the RepoSyncIndex - tracing::trace!( - kind = %event.kind, - event_id = %event.id, - "Received root event" - ); - self.handle_root_event(&event).await; - } - } - Ok(RelayPoolNotification::Shutdown) => { - tracing::info!("SelfSubscriber received shutdown notification"); - break; - } - Err(e) => { - tracing::error!(error = %e, "Error receiving notification"); - break; - } - _ => {} + if let LoopControl::Break = self.process_notification(notification, &mut pending).await { + break; } } _ = timer.tick() => { -- cgit v1.2.3