From c1730d5cafc3af2d5ec8f3bdbed5c32bb15fcb74 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 10 Dec 2025 12:46:41 +0000 Subject: sync: enhance SelfSubscriber with reconnect and root event tracking --- src/sync/self_subscriber.rs | 103 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 12 deletions(-) (limited to 'src/sync') diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 1dec219..229d2e1 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -12,6 +12,7 @@ use std::collections::{HashMap, HashSet}; use std::time::Duration; use nostr_sdk::prelude::*; +use nostr_sdk::Timestamp; use tokio::sync::mpsc; use super::{RepoSyncIndex, RepoSyncNeeds}; @@ -101,6 +102,8 @@ pub struct SelfSubscriber { repo_sync_index: RepoSyncIndex, /// Channel to send actions to SyncManager action_tx: mpsc::Sender, + /// Last time we connected - used for since filter on reconnect + last_connected: Option, } impl SelfSubscriber { @@ -122,6 +125,7 @@ impl SelfSubscriber { relay_domain, repo_sync_index, action_tx, + last_connected: None, } } @@ -205,7 +209,7 @@ impl SelfSubscriber { /// /// Connects to own relay, subscribes to relevant event kinds, /// and batches updates before processing them. - pub async fn run(self) { + pub async fn run(mut self) { let client = Client::default(); // Add own relay @@ -223,13 +227,36 @@ impl SelfSubscriber { // Subscribe to announcement and root event kinds // Per v4 spec: 30617, 1617, 1618, 1619, 1621 (NOT 30618) - let filter = Filter::new().kinds(vec![ - Kind::Custom(30617), // Repository Announcements - Kind::Custom(1617), // Patches - Kind::Custom(1618), // Issues - Kind::Custom(1619), // Replies/Status - Kind::Custom(1621), // Pull Requests - ]); + // Check if we have a last_connected time for reconnect filtering + let filter = if let Some(last) = self.last_connected { + // Quick reconnect - use since filter (15 min buffer) + let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); + tracing::debug!( + since = %since, + "Using since filter for reconnect" + ); + Filter::new() + .kinds(vec![ + Kind::Custom(30617), // Repository Announcements + Kind::Custom(1617), // Patches + Kind::Custom(1618), // Issues + Kind::Custom(1619), // Replies/Status + Kind::Custom(1621), // Pull Requests + ]) + .since(since) + } else { + // First connection - no since filter + Filter::new().kinds(vec![ + Kind::Custom(30617), // Repository Announcements + Kind::Custom(1617), // Patches + Kind::Custom(1618), // Issues + Kind::Custom(1619), // Replies/Status + Kind::Custom(1621), // Pull Requests + ]) + }; + + // Update last_connected AFTER creating filter but BEFORE subscribing + self.last_connected = Some(Timestamp::now()); if let Err(e) = client.subscribe(filter, None).await { tracing::error!( @@ -278,14 +305,13 @@ impl SelfSubscriber { } } else { // For root event kinds (1617, 1618, 1619, 1621), - // we need to check if they reference repos we care about. - // For now, we'll track them in a simpler way. - // Full implementation would extract 'a' tag and match to known repos. + // process them to update the RepoSyncIndex tracing::trace!( kind = %event.kind, event_id = %event.id, - "Received root event (processing deferred)" + "Received root event" ); + self.handle_root_event(&event).await; } } Ok(RelayPoolNotification::Shutdown) => { @@ -310,6 +336,59 @@ impl SelfSubscriber { tracing::info!("SelfSubscriber stopped"); } + /// Handle a root event (1617/1618/1619/1621) + /// + /// Extracts the 'a' tag to find the repo addressable reference, + /// then updates the RepoSyncIndex with the event ID. + async fn handle_root_event(&self, event: &Event) { + // Extract 'a' tag to find the repo addressable reference + let repo_a_tag = event.tags.iter().find(|tag| { + let tag_vec = tag.as_slice(); + !tag_vec.is_empty() && tag_vec[0] == "a" + }); + + let repo_ref = match repo_a_tag { + Some(tag) => { + let tag_vec = tag.as_slice(); + // Get first value from tag (the 'a' tag value at index 1) + if tag_vec.len() >= 2 { + tag_vec[1].clone() + } else { + tracing::warn!( + event_id = %event.id, + "Root event has 'a' tag but no content" + ); + return; + } + } + None => { + tracing::warn!( + event_id = %event.id, + "Root event missing 'a' tag" + ); + return; + } + }; + + // Look up repo in repo_sync_index + let mut index = self.repo_sync_index.write().await; + if let Some(repo_sync) = index.get_mut(&repo_ref) { + // Add event.id to root_events set + repo_sync.root_events.insert(event.id); + tracing::debug!( + event_id = %event.id, + repo_ref = %repo_ref, + "Added root event to repo" + ); + } else { + tracing::debug!( + event_id = %event.id, + repo_ref = %repo_ref, + "Root event references unknown repo" + ); + } + } + /// Process accumulated batch /// /// Updates the RepoSyncIndex with discovered repos, then derives per-relay -- cgit v1.2.3