From e922e14e3ec4b898c111b2100cd63dddbe2fcdb1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 19:59:36 +0000 Subject: feat: add SyncLevel to sync system for purgatory announcement state-only sync Purgatory announcements need state events (kind 30618) synced from external relays, but not full L2/L3 events (patches, issues, PRs) which would be rejected anyway. This implements the SyncLevel concept from the design doc (decision #6): - Add SyncLevel enum (Full vs StateOnly) to RepoSyncNeeds - When announcement enters purgatory during sync, register in RepoSyncIndex with SyncLevel::StateOnly - Add build_sync_level_aware_filters() that partitions repos by level: StateOnly repos only get state event filters (kind 30618) - Update derive_relay_targets to track state_only_repos separately - Update compute_actions to handle both repo sets - SelfSubscriber always uses SyncLevel::Full (promoted repos) --- src/sync/algorithms.rs | 58 +++++++++++++++++++++++++++++++++++++------- src/sync/filters.rs | 31 ++++++++++++++++++++++++ src/sync/mod.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++- src/sync/self_subscriber.rs | 17 +++++++++---- 4 files changed, 150 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 39788bc..9899abc 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState}; /// this repo need to sync from", it's "what repos does this relay need to sync". #[derive(Debug, Clone, Default)] pub struct RelaySyncNeeds { - /// Repos that need to be synced from this relay + /// Repos that need full L2+L3 sync from this relay pub repos: HashSet, + /// Repos that only need state event sync (purgatory announcements) + pub state_only_repos: HashSet, /// Root events that need to be tracked from this relay pub root_events: HashSet, } @@ -67,8 +69,15 @@ pub fn derive_relay_targets( for relay_url in &needs.relays { let entry = relay_targets.entry(relay_url.clone()).or_default(); - entry.repos.insert(repo_id.clone()); - entry.root_events.extend(needs.root_events.iter().cloned()); + match needs.sync_level { + super::SyncLevel::Full => { + entry.repos.insert(repo_id.clone()); + entry.root_events.extend(needs.root_events.iter().cloned()); + } + super::SyncLevel::StateOnly => { + entry.state_only_repos.insert(repo_id.clone()); + } + } } } @@ -96,7 +105,7 @@ pub fn compute_actions( pending: &HashMap>, confirmed: &HashMap, ) -> Vec { - use crate::sync::filters::build_layer2_and_layer3_filters; + use crate::sync::filters::build_sync_level_aware_filters; let mut actions = Vec::new(); @@ -140,14 +149,22 @@ pub fn compute_actions( .map(|state| state.root_events.clone()) .unwrap_or_default(); - // Calculate what's NEW (not in pending, not in confirmed) - let new_repos: HashSet = target_needs + // Calculate what's NEW for full repos (not in pending, not in confirmed) + let new_full_repos: HashSet = target_needs .repos .difference(&pending_repos) .filter(|repo| !confirmed_repos.contains(*repo)) .cloned() .collect(); + // Calculate what's NEW for state-only repos + let new_state_only_repos: HashSet = target_needs + .state_only_repos + .difference(&pending_repos) + .filter(|repo| !confirmed_repos.contains(*repo)) + .cloned() + .collect(); + let new_events: HashSet = target_needs .root_events .difference(&pending_events) @@ -156,13 +173,23 @@ pub fn compute_actions( .collect(); // If there's anything new, create an AddFilters action - if !new_repos.is_empty() || !new_events.is_empty() { - let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); + if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty() + { + let filters = build_sync_level_aware_filters( + &new_full_repos, + &new_state_only_repos, + &new_events, + None, + ); + + // Combine all repos into pending items (pending tracking doesn't need sync level) + let mut all_new_repos = new_full_repos; + all_new_repos.extend(new_state_only_repos); actions.push(AddFilters { relay_url: relay_url.clone(), items: PendingItems { - repos: new_repos, + repos: all_new_repos, root_events: new_events, }, filters, @@ -204,6 +231,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events, + sync_level: Default::default(), }, ); @@ -229,6 +257,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events: HashSet::new(), + sync_level: Default::default(), }, ); } @@ -252,6 +281,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events: HashSet::new(), + sync_level: Default::default(), }, ); @@ -285,6 +315,7 @@ mod tests { ModRepoSyncNeeds { relays: relays1, root_events: root_events1, + sync_level: Default::default(), }, ); @@ -299,6 +330,7 @@ mod tests { ModRepoSyncNeeds { relays: relays2, root_events: root_events2, + sync_level: Default::default(), }, ); @@ -332,6 +364,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -366,6 +399,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -389,6 +423,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -428,6 +463,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -465,6 +501,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -510,6 +547,7 @@ mod tests { ] .into_iter() .collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -572,6 +610,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: HashSet::new(), + state_only_repos: HashSet::new(), root_events: vec![event_id].into_iter().collect(), }, ); @@ -599,6 +638,7 @@ mod tests { "wss://new-relay.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 3592489..1215e81 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs @@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters( filters } +/// Builds filters respecting SyncLevel for each repo +/// +/// StateOnly repos only get state event filters (kind 30618). +/// Full repos get all L2/L3 filters (state + repo-tagging + root event). +/// +/// # Arguments +/// * `full_repos` - Repos needing full L2+L3 sync +/// * `state_only_repos` - Repos needing only state event sync (purgatory) +/// * `root_events` - Root event IDs (only used for Full repos) +/// * `since` - Optional timestamp for incremental sync +pub fn build_sync_level_aware_filters( + full_repos: &HashSet, + state_only_repos: &HashSet, + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + + // All repos (both Full and StateOnly) need state event filters + let all_repos: HashSet = full_repos.union(state_only_repos).cloned().collect(); + filters.extend(state_event_filters_for_our_repos(&all_repos, since)); + + // Only Full repos get repo-tagging and root event filters + if !full_repos.is_empty() { + filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since)); + } + filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); + + filters +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1ee1872..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex; // Supporting Data Structures // ============================================================================= +/// Level of sync needed for a repository +/// +/// Purgatory announcements only need state events synced (to validate git data). +/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SyncLevel { + /// Full L2 + L3 sync (promoted repos with git data) + #[default] + Full, + /// Only state events (kind 30618) - for purgatory announcements + StateOnly, +} + /// What repos and root events need to be synced #[derive(Debug, Clone, Default)] pub struct RepoSyncNeeds { @@ -92,6 +105,8 @@ pub struct RepoSyncNeeds { pub relays: HashSet, /// Root event IDs - 1617/1618/1621 - that reference this repo pub root_events: HashSet, + /// Sync level - StateOnly for purgatory, Full for promoted repos + pub sync_level: SyncLevel, } /// Connection status for a relay @@ -1677,6 +1692,7 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); + let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1719,8 +1735,49 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { + // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly + // so that state events (kind 30618) are synced for this purgatory announcement + if event.kind == Kind::GitRepoAnnouncement { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = format!("30617:{}:{}", event.pubkey, identifier); + + // Extract relay URLs from the purgatory entry + let relays = write_policy + .purgatory() + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + tracing::info!( + event_id = %event.id, + repo_id = %repo_id, + relay_count = relays.len(), + "Registering purgatory announcement in RepoSyncIndex with StateOnly level" + ); + + // Register in RepoSyncIndex with StateOnly level + let mut index = repo_sync_index.write().await; + let entry = index + .entry(repo_id) + .or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + sync_level: SyncLevel::StateOnly, + }); + entry.relays.extend(relays); + // Don't upgrade sync_level if already Full + // (e.g., if announcement was promoted before this runs) + } + } // State events (kind 30618) - extract identifier and trigger immediate sync - if event.kind.as_u16() == 30618 { + else if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..db16c62 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -16,7 +16,7 @@ use nostr_sdk::Timestamp; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; -use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; +use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; // ============================================================================= // LoopControl - Result of notification processing @@ -58,6 +58,7 @@ impl PendingUpdates { let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), + sync_level: SyncLevel::Full, }); entry.relays.extend(relays); entry.root_events.extend(root_events); @@ -475,6 +476,7 @@ impl SelfSubscriber { .or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), + sync_level: SyncLevel::Full, }); entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); @@ -499,21 +501,26 @@ impl SelfSubscriber { continue; } - // Build filters for these repos - let filters = crate::sync::filters::build_layer2_and_layer3_filters( + // Build filters for these repos (sync-level-aware) + let filters = crate::sync::filters::build_sync_level_aware_filters( &needs.repos, + &needs.state_only_repos, &needs.root_events, None, ); // Log before moving values - let repo_count = needs.repos.len(); + let repo_count = needs.repos.len() + needs.state_only_repos.len(); let event_count = needs.root_events.len(); + // Combine all repos into pending items + let mut all_repos = needs.repos; + all_repos.extend(needs.state_only_repos); + let action = AddFilters { relay_url: relay_url.clone(), items: crate::sync::PendingItems { - repos: needs.repos, + repos: all_repos, root_events: needs.root_events, }, filters, -- cgit v1.2.3