From 8fc4078d60f0ccf16318fe7fa765fcdd3627fe1f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 12 Feb 2026 14:02:22 +0000 Subject: chore: fix clippy warnings - Derive Default for config structs instead of manual impl - Fix doc comment formatting in ArchiveConfig::matches - Collapse nested if statement in validate_announcement - Allow too_many_arguments for SyncManager::new --- src/sync/mod.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index bc8c428..1ee1872 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -584,6 +584,7 @@ impl SyncManager { /// * `config` - Configuration for sync settings /// * `data_path` - Path to git data directory (for persistence) /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) + #[allow(clippy::too_many_arguments)] pub fn new( bootstrap_relay_url: Option, service_domain: String, -- cgit v1.2.3 From 1d09e4bdea7e328cf2740818df9df660c5532a99 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 13:24:46 +0000 Subject: feat: implement announcement purgatory core (breaks archive sync test) Route new announcements to purgatory instead of accepting immediately. Announcements are promoted to the database when git data arrives, ensuring we only serve announcements for repos with actual content. Implemented: - AnnouncementPurgatoryEntry type and DashMap store - Route new announcements to purgatory (replacement announcements skip) - Promote announcements on git data arrival (process_purgatory_announcements) - Authorization checks purgatory announcements (fetch_repository_data_with_purgatory) - State policy uses purgatory announcements for maintainer validation - Cleanup task handles announcement expiry - Updated count()/cleanup() to 3-tuples Known broken: - test_archive_read_only_creates_bare_repo fails: sync module does not treat purgatory announcements as confirmed repos, so per-repo sync (state events, PRs) is never triggered for purgatory announcements - Announcement persistence (save/restore) not implemented - SyncLevel (StateOnly vs Full) not implemented - Soft expiry two-phase not implemented - Expiry extension on state event / git auth not wired up --- src/git/authorization.rs | 38 +++++- src/git/sync.rs | 110 ++++++++++++++++- src/main.rs | 8 +- src/nostr/builder.rs | 23 ++++ src/nostr/policy/announcement.rs | 117 +++++++++++++++++- src/nostr/policy/state.rs | 10 +- src/purgatory/mod.rs | 260 ++++++++++++++++++++++++++++++++++----- src/purgatory/sync/context.rs | 7 +- src/purgatory/types.rs | 39 ++++++ src/sync/mod.rs | 68 +++++++++- tests/archive_read_only.rs | 59 ++++++--- tests/purgatory.rs | 4 +- tests/purgatory_persistence.rs | 26 ++-- 13 files changed, 691 insertions(+), 78 deletions(-) (limited to 'src/sync') diff --git a/src/git/authorization.rs b/src/git/authorization.rs index e174b51..9d53c4f 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs @@ -287,6 +287,39 @@ pub async fn fetch_repository_data( }) } +/// Fetch repository data including announcements from purgatory +/// +/// This combines database announcements with purgatory announcements, +/// which is needed for authorization when the announcement hasn't been +/// promoted yet (no git data has arrived). +pub async fn fetch_repository_data_with_purgatory( + database: &SharedDatabase, + purgatory: &crate::purgatory::Purgatory, + identifier: &str, +) -> Result { + // First, fetch from database + let mut repo_data = fetch_repository_data(database, identifier).await?; + + // Then, add announcements from purgatory + let purgatory_announcements = purgatory.get_announcements_by_identifier(identifier); + let purgatory_count = purgatory_announcements.len(); + + for entry in purgatory_announcements { + if let Ok(announcement) = RepositoryAnnouncement::from_event(entry.event) { + repo_data.announcements.push(announcement); + } + } + + debug!( + "Fetched repository data with purgatory: {} announcements ({} from purgatory), {} states", + repo_data.announcements.len(), + purgatory_count, + repo_data.states.len() + ); + + Ok(repo_data) +} + pub fn pubkey_authorised_for_repo_owners( pubkey: &PublicKey, db_repo_data: &RepositoryData, @@ -539,8 +572,9 @@ pub async fn get_state_authorization_for_specific_owner_repo( use crate::git::list_refs; use crate::purgatory::RefUpdate; - // Fetch announcements only - we don't need database states - let repo_data = fetch_repository_data(database, identifier).await?; + // Fetch announcements from database AND purgatory - needed for authorization + // when the announcement hasn't been promoted yet (no git data has arrived) + let repo_data = fetch_repository_data_with_purgatory(database, purgatory, identifier).await?; if repo_data.announcements.is_empty() { return Ok(AuthorizationResult::denied( diff --git a/src/git/sync.rs b/src/git/sync.rs index e8e9655..13f30b6 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -51,6 +51,8 @@ use crate::purgatory::{can_apply_state, Purgatory}; /// or from purgatory sync fetching OIDs from remote servers). #[derive(Debug, Default, Clone)] pub struct ProcessResult { + /// Number of announcements released from purgatory + pub announcements_released: usize, /// Number of state events released from purgatory pub states_released: usize, /// Number of PR events released from purgatory @@ -70,11 +72,12 @@ pub struct ProcessResult { impl ProcessResult { /// Check if any events were released pub fn released_any(&self) -> bool { - self.states_released > 0 || self.prs_released > 0 + self.announcements_released > 0 || self.states_released > 0 || self.prs_released > 0 } /// Merge another ProcessResult into this one pub fn merge(&mut self, other: ProcessResult) { + self.announcements_released += other.announcements_released; self.states_released += other.states_released; self.prs_released += other.prs_released; self.repos_synced += other.repos_synced; @@ -836,6 +839,18 @@ pub async fn process_newly_available_git_data( "Processing newly available git data" ); + // Process announcements from purgatory + let announcement_result = process_purgatory_announcements( + &identifier, + source_repo_path, + database, + local_relay, + purgatory, + git_data_path, + ) + .await; + result.merge(announcement_result); + // Process state events from purgatory let state_result = process_purgatory_state_events( &identifier, @@ -863,6 +878,7 @@ pub async fn process_newly_available_git_data( if result.released_any() { info!( identifier = %identifier, + announcements_released = result.announcements_released, states_released = result.states_released, prs_released = result.prs_released, repos_synced = result.repos_synced, @@ -1250,6 +1266,90 @@ async fn process_purgatory_pr_events( result } +/// Process announcements from purgatory that can now be promoted. +/// +/// When git data arrives for a repository, any announcements in purgatory +/// for that repository should be promoted to the database and served to clients. +async fn process_purgatory_announcements( + identifier: &str, + source_repo_path: &Path, + database: &SharedDatabase, + local_relay: Option<&nostr_relay_builder::LocalRelay>, + purgatory: &Purgatory, + git_data_path: &Path, +) -> ProcessResult { + let mut result = ProcessResult::default(); + + // Extract owner pubkey from the source repo path + let owner_pubkey = match extract_owner_from_repo_path(source_repo_path, git_data_path) { + Some(npub) => npub, + None => { + debug!( + identifier = %identifier, + "Could not extract owner from repo path" + ); + return result; + } + }; + + // Parse the npub back to PublicKey + let owner = match nostr_sdk::PublicKey::parse(&owner_pubkey) { + Ok(pk) => pk, + Err(e) => { + warn!( + identifier = %identifier, + owner_pubkey = %owner_pubkey, + error = %e, + "Failed to parse owner pubkey" + ); + result.errors.push(format!("Failed to parse owner pubkey: {}", e)); + return result; + } + }; + + // Check if there's an announcement in purgatory for this owner and identifier + let announcement_event = purgatory.promote_announcement(&owner, identifier); + + if let Some(event) = announcement_event { + // Save to database + match database.save_event(&event).await { + Ok(_) => { + info!( + identifier = %identifier, + event_id = %event.id, + "Promoted announcement from purgatory to database" + ); + + // Notify WebSocket subscribers + if let Some(relay) = local_relay { + if relay.notify_event(event.clone()) { + debug!( + identifier = %identifier, + event_id = %event.id, + "Broadcast announcement event to WebSocket listeners" + ); + } + } + + result.announcements_released += 1; + } + Err(e) => { + warn!( + identifier = %identifier, + event_id = %event.id, + error = %e, + "Failed to save announcement to database" + ); + result + .errors + .push(format!("Failed to save announcement: {}", e)); + } + } + } + + result +} + /// Extract owner pubkey from a repository path. /// /// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. @@ -1271,6 +1371,7 @@ mod tests { #[test] fn test_process_result_default() { let result = ProcessResult::default(); + assert_eq!(result.announcements_released, 0); assert_eq!(result.states_released, 0); assert_eq!(result.prs_released, 0); assert_eq!(result.repos_synced, 0); @@ -1282,6 +1383,10 @@ mod tests { let mut result = ProcessResult::default(); assert!(!result.released_any()); + result.announcements_released = 1; + assert!(result.released_any()); + + result.announcements_released = 0; result.states_released = 1; assert!(result.released_any()); @@ -1293,6 +1398,7 @@ mod tests { #[test] fn test_process_result_merge() { let mut result1 = ProcessResult { + announcements_released: 0, states_released: 1, prs_released: 2, repos_synced: 3, @@ -1303,6 +1409,7 @@ mod tests { }; let result2 = ProcessResult { + announcements_released: 5, states_released: 10, prs_released: 20, repos_synced: 30, @@ -1314,6 +1421,7 @@ mod tests { result1.merge(result2); + assert_eq!(result1.announcements_released, 5); assert_eq!(result1.states_released, 11); assert_eq!(result1.prs_released, 22); assert_eq!(result1.repos_synced, 33); diff --git a/src/main.rs b/src/main.rs index 5e5b83a..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -142,11 +142,11 @@ async fn main() -> Result<()> { let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { interval.tick().await; - let (state_removed, pr_removed) = cleanup_purgatory.cleanup(); - if state_removed > 0 || pr_removed > 0 { + let (announcement_removed, state_removed, pr_removed) = cleanup_purgatory.cleanup(); + if announcement_removed > 0 || state_removed > 0 || pr_removed > 0 { info!( - "Purgatory cleanup: removed {} state events, {} PR events", - state_removed, pr_removed + "Purgatory cleanup: removed {} announcements, {} state events, {} PR events", + announcement_removed, state_removed, pr_removed ); } } diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 34014db..aff12a6 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -138,6 +138,29 @@ impl Nip34WritePolicy { } } } + AnnouncementResult::AcceptPurgatory => { + // New announcement - add to purgatory + match self.announcement_policy.add_to_purgatory(event) { + Ok(()) => { + tracing::info!( + "Accepted announcement to purgatory: {} (waiting for git data)", + event_id_str + ); + WritePolicyResult::Reject { + status: true, // Client sees OK + message: "purgatory: won't be served until git data arrives".into(), + } + } + Err(e) => { + tracing::warn!( + "Failed to add announcement to purgatory {}: {}", + event_id_str, + e + ); + WritePolicyResult::reject(e) + } + } + } AnnouncementResult::AcceptMaintainer => { // Parse announcement to get details for logging match RepositoryAnnouncement::from_event(event.clone()) { diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 15a6e58..1118497 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs @@ -3,6 +3,7 @@ /// Handles validation of NIP-34 repository announcements (kind 30617) /// according to GRASP-01 specification. use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; +use std::collections::HashSet; use super::PolicyContext; use crate::config::Config; @@ -11,12 +12,14 @@ use crate::nostr::events::{validate_announcement, RepositoryAnnouncement}; /// Result of announcement policy evaluation #[derive(Debug, Clone, PartialEq)] pub enum AnnouncementResult { - /// Accept: Event lists our service (GRASP-01 compliant) + /// Accept: Event lists our service (GRASP-01 compliant) - replacement announcement Accept, /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer) AcceptMaintainer, /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only) AcceptArchive, + /// Accept to purgatory: New announcement, waiting for git data + AcceptPurgatory, /// Reject: Event fails validation with reason Reject(String), } @@ -35,10 +38,12 @@ impl AnnouncementPolicy { /// Validate a repository announcement event /// - /// Returns `Accept` if the announcement lists the service properly, - /// `AcceptMaintainer` if accepted via maintainer exception, - /// `AcceptArchive` if accepted via GRASP-05 archive config, - /// or `Reject` with reason. + /// Returns: + /// - `Accept` if this is a replacement announcement (active announcement exists) + /// - `AcceptPurgatory` if this is a new announcement (no active announcement exists) + /// - `AcceptMaintainer` if accepted via maintainer exception + /// - `AcceptArchive` if accepted via GRASP-05 archive config + /// - `Reject` with reason if validation fails pub async fn validate(&self, event: &Event) -> AnnouncementResult { // First, try validation (GRASP-01 + GRASP-05) let validation_result = validate_announcement(event, &self.config); @@ -67,11 +72,111 @@ impl AnnouncementPolicy { Err(_) => AnnouncementResult::Reject(reason), } } - // Accept, AcceptArchive, or AcceptMaintainer - return as-is + AnnouncementResult::Accept | AnnouncementResult::AcceptArchive => { + // Parse announcement to check for existing active announcement + match RepositoryAnnouncement::from_event(event.clone()) { + Ok(announcement) => { + // Check if there's already an active announcement for this (pubkey, identifier) + match self + .has_active_announcement(&event.pubkey, &announcement.identifier) + .await + { + Ok(true) => { + // Replacement announcement - accept immediately + tracing::debug!( + identifier = %announcement.identifier, + "Replacement announcement - accepting immediately" + ); + validation_result + } + Ok(false) => { + // New announcement - route to purgatory + tracing::debug!( + identifier = %announcement.identifier, + "New announcement - routing to purgatory" + ); + AnnouncementResult::AcceptPurgatory + } + Err(e) => { + tracing::warn!( + error = %e, + "Failed to check for existing announcement - rejecting" + ); + AnnouncementResult::Reject(format!( + "Database error checking existing announcement: {}", + e + )) + } + } + } + Err(e) => AnnouncementResult::Reject(format!( + "Failed to parse announcement: {}", + e + )), + } + } + // AcceptPurgatory shouldn't come from validate_announcement, but handle it result => result, } } + /// Check if there's an active announcement in the database for this (pubkey, identifier) + async fn has_active_announcement( + &self, + pubkey: &PublicKey, + identifier: &str, + ) -> Result { + let filter = Filter::new() + .kind(Kind::GitRepoAnnouncement) + .author(*pubkey) + .custom_tag( + SingleLetterTag::lowercase(Alphabet::D), + identifier.to_string(), + ); + + let events: Vec = match self.ctx.database.query(filter).await { + Ok(events) => events.into_iter().collect(), + Err(e) => return Err(format!("Database query failed: {}", e)), + }; + + Ok(!events.is_empty()) + } + + /// Add an announcement to purgatory + /// + /// Creates the bare repository and stores the announcement in purgatory + /// until git data arrives. + pub fn add_to_purgatory(&self, event: &Event) -> Result<(), String> { + let announcement = RepositoryAnnouncement::from_event(event.clone()) + .map_err(|e| format!("Failed to parse announcement: {}", e))?; + + // Create bare repository + self.ensure_bare_repository(&announcement)?; + + // Build repo path + let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); + + // Extract relays from announcement + let relays: HashSet = announcement.relays.iter().cloned().collect(); + + // Add to purgatory + self.ctx.purgatory.add_announcement( + event.clone(), + announcement.identifier.clone(), + event.pubkey, + repo_path, + relays, + ); + + tracing::info!( + identifier = %announcement.identifier, + event_id = %event.id, + "Added announcement to purgatory" + ); + + Ok(()) + } + /// Create a bare git repository if it doesn't exist /// Path format: //.git pub fn ensure_bare_repository( diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index f94f004..4bfb513 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -10,7 +10,7 @@ use nostr_relay_builder::prelude::Event; use super::PolicyContext; use crate::git; -use crate::git::authorization::fetch_repository_data; +use crate::git::authorization::fetch_repository_data_with_purgatory; use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; /// Result of state policy evaluation @@ -76,7 +76,13 @@ impl StatePolicy { } // Get all repositories and state events from db with identifier - let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; + // Include purgatory announcements for authorization + let db_repo_data = fetch_repository_data_with_purgatory( + &self.ctx.database, + &self.ctx.purgatory, + &state.identifier, + ) + .await?; // CRITICAL: Check if author is authorized via maintainer set // State events MUST be rejected if author is not in maintainer set of any accepted announcement diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 47798a6..3b5514b 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -17,7 +17,7 @@ pub mod sync; mod types; pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; -pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; +pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; use dashmap::DashMap; use nostr_sdk::prelude::*; @@ -100,7 +100,8 @@ struct PurgatoryState { /// Main purgatory structure holding events awaiting git data. /// -/// Provides thread-safe concurrent access to two separate stores: +/// Provides thread-safe concurrent access to three separate stores: +/// - Announcements indexed by (pubkey, identifier) /// - State events indexed by repository identifier /// - PR events indexed by event ID /// @@ -121,6 +122,10 @@ struct PurgatoryState { /// that we've already determined have no git data available. #[derive(Clone)] pub struct Purgatory { + /// Repository announcements (kind 30617) indexed by (owner pubkey, identifier). + /// Key: (PublicKey, String) where String is the repository identifier. + announcement_purgatory: Arc>, + /// State events (kind 30618) indexed by repository identifier. /// Multiple state events can wait for the same identifier (different maintainers). state_events: Arc>>, @@ -145,6 +150,7 @@ impl Purgatory { /// Create a new empty purgatory. pub fn new(git_data_path: impl Into) -> Self { Self { + announcement_purgatory: Arc::new(DashMap::new()), state_events: Arc::new(DashMap::new()), pr_events: Arc::new(DashMap::new()), sync_queue: Arc::new(DashMap::new()), @@ -513,9 +519,171 @@ impl Purgatory { self.pr_events.remove(event_id); } + // ========================================================================= + // Announcement Purgatory Methods + // ========================================================================= + + /// Add a repository announcement to purgatory. + /// + /// The announcement will be held until git data arrives, at which point + /// it will be promoted to the database and served to clients. + /// + /// # Arguments + /// * `event` - The announcement event (kind 30617) + /// * `identifier` - The repository identifier from the 'd' tag + /// * `owner` - The owner pubkey (event author) + /// * `repo_path` - Path to the bare git repository + /// * `relays` - Relay URLs from the announcement (for sync registration) + pub fn add_announcement( + &self, + event: Event, + identifier: String, + owner: PublicKey, + repo_path: PathBuf, + relays: HashSet, + ) { + let now = Instant::now(); + let entry = AnnouncementPurgatoryEntry { + event, + identifier: identifier.clone(), + owner, + repo_path, + relays, + created_at: now, + expires_at: now + DEFAULT_EXPIRY, + soft_expired: false, + }; + + let key = (owner, identifier); + self.announcement_purgatory.insert(key.clone(), entry); + + tracing::debug!( + owner = %key.0, + identifier = %key.1, + "Added announcement to purgatory" + ); + } + + /// Find an announcement in purgatory by owner and identifier. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// The announcement entry if found, None otherwise + pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.get(&key).map(|entry| entry.clone()) + } + + /// Get all announcements in purgatory for a given identifier. + /// + /// This is used for authorization - state events and git pushes need to + /// check purgatory announcements for maintainer validation. + /// + /// # Arguments + /// * `identifier` - The repository identifier + /// + /// # Returns + /// Vector of announcement entries for this identifier + pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec { + self.announcement_purgatory + .iter() + .filter(|entry| entry.key().1 == identifier) + .map(|entry| entry.value().clone()) + .collect() + } + + /// Remove an announcement from purgatory. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + pub fn remove_announcement(&self, owner: &PublicKey, identifier: &str) { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.remove(&key); + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Removed announcement from purgatory" + ); + } + + /// Promote an announcement from purgatory to active status. + /// + /// This is called when git data arrives. The announcement event is returned + /// so it can be saved to the database. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// The announcement event if found, None otherwise + pub fn promote_announcement(&self, owner: &PublicKey, identifier: &str) -> Option { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.remove(&key).map(|(_, entry)| { + tracing::info!( + owner = %owner, + identifier = %identifier, + "Promoted announcement from purgatory to database" + ); + entry.event + }) + } + + /// Check if there's an announcement in purgatory for the given owner and identifier. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// + /// # Returns + /// true if an announcement exists in purgatory, false otherwise + pub fn has_purgatory_announcement(&self, owner: &PublicKey, identifier: &str) -> bool { + let key = (*owner, identifier.to_string()); + self.announcement_purgatory.contains_key(&key) + } + + /// Extend the expiry for an announcement in purgatory. + /// + /// This is called when state events arrive for a purgatory announcement, + /// indicating the repository is actively receiving metadata. + /// + /// # Arguments + /// * `owner` - The owner pubkey + /// * `identifier` - The repository identifier + /// * `duration` - Minimum duration to guarantee from now + pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) { + let key = (*owner, identifier.to_string()); + if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) { + let now = Instant::now(); + let new_expiry = now + duration; + if entry.expires_at < new_expiry { + entry.expires_at = new_expiry; + // If soft-expired, revive it + if entry.soft_expired { + entry.soft_expired = false; + tracing::debug!( + owner = %owner, + identifier = %identifier, + "Revived soft-expired announcement" + ); + } + } + } + } + + /// Get count of announcements in purgatory. + pub fn announcement_count(&self) -> usize { + self.announcement_purgatory.len() + } + /// Get all event IDs currently stored in purgatory AND previously expired events. /// /// Returns a HashSet of all event IDs for: + /// - Announcements currently held in purgatory /// - State events currently held in purgatory /// - PR events currently held in purgatory /// - Events that previously expired from purgatory without finding git data @@ -530,6 +698,11 @@ impl Purgatory { pub fn event_ids(&self) -> HashSet { let mut ids = HashSet::new(); + // Collect announcement event IDs + for entry in self.announcement_purgatory.iter() { + ids.insert(entry.value().event.id); + } + // Collect state event IDs for entry in self.state_events.iter() { for state_entry in entry.value().iter() { @@ -609,9 +782,28 @@ impl Purgatory { /// will be filtered out during future negentropy/REQ sync operations. /// /// # Returns - /// Tuple of (num_state_removed, num_pr_removed) - pub fn cleanup(&self) -> (usize, usize) { + /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) + pub fn cleanup(&self) -> (usize, usize, usize) { let now = Instant::now(); + + // Remove expired announcements and mark them as expired + let expired_announcements: Vec<(PublicKey, String, EventId)> = self + .announcement_purgatory + .iter() + .filter(|entry| entry.value().expires_at <= now) + .map(|entry| { + let key = entry.key(); + let event_id = entry.value().event.id; + (key.0.clone(), key.1.clone(), event_id) + }) + .collect(); + + let announcement_removed = expired_announcements.len(); + for (owner, identifier, event_id) in expired_announcements { + self.mark_expired(event_id); + self.announcement_purgatory.remove(&(owner, identifier)); + } + let mut state_removed = 0; // Remove expired state events and mark them as expired @@ -655,17 +847,17 @@ impl Purgatory { self.pr_events.remove(&event_id_str); } - (state_removed, pr_removed) + (announcement_removed, state_removed, pr_removed) } /// Remove expired entries from purgatory (legacy method). /// /// # Returns - /// Total number of entries removed (state + PR events) + /// Total number of entries removed (announcement + state + PR events) #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] pub fn remove_expired(&self) -> usize { - let (state, pr) = self.cleanup(); - state + pr + let (announcement, state, pr) = self.cleanup(); + announcement + state + pr } /// Remove old expired event records. @@ -699,11 +891,12 @@ impl Purgatory { /// Get current count of entries in purgatory. /// /// # Returns - /// Tuple of (state_event_count, pr_event_count) - pub fn count(&self) -> (usize, usize) { + /// Tuple of (announcement_count, state_event_count, pr_event_count) + pub fn count(&self) -> (usize, usize, usize) { + let announcement_count = self.announcement_purgatory.len(); let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); let pr_count = self.pr_events.len(); - (state_count, pr_count) + (announcement_count, state_count, pr_count) } /// Get count of expired events being tracked. @@ -717,6 +910,7 @@ impl Purgatory { /// Clear all entries from purgatory (for testing). #[cfg(test)] pub fn clear(&self) { + self.announcement_purgatory.clear(); self.state_events.clear(); self.pr_events.clear(); self.sync_queue.clear(); @@ -990,7 +1184,8 @@ mod tests { #[test] fn test_purgatory_creation() { let purgatory = Purgatory::new(PathBuf::new()); - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1008,7 +1203,8 @@ mod tests { purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0); assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } @@ -1213,7 +1409,7 @@ fn test_cleanup_removes_expired_entries() { purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); // Verify entries are there - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); assert_eq!(pr_count, 2); @@ -1231,14 +1427,14 @@ fn test_cleanup_removes_expired_entries() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // Verify counts assert_eq!(state_removed, 1); assert_eq!(pr_removed, 2); // Verify entries are gone - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1260,14 +1456,14 @@ fn test_cleanup_preserves_non_expired_entries() { purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // Nothing should be removed assert_eq!(state_removed, 0); assert_eq!(pr_removed, 0); // Verify entries are still there - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); assert_eq!(pr_count, 1); } @@ -1314,14 +1510,14 @@ fn test_cleanup_mixed_expired_and_fresh() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); // One of each should be removed assert_eq!(state_removed, 1); assert_eq!(pr_removed, 1); // Verify remaining counts - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 1); // One state event remains assert_eq!(pr_count, 1); // One PR event remains } @@ -1391,7 +1587,7 @@ fn test_expired_event_tracking() { } // Run cleanup - let (state_removed, pr_removed) = purgatory.cleanup(); + let (_, state_removed, pr_removed) = purgatory.cleanup(); assert_eq!(state_removed, 1); assert_eq!(pr_removed, 1); @@ -1501,7 +1697,7 @@ fn test_expired_events_prevent_readdition() { } // Event should NOT be re-added - let (state_count, _) = purgatory.count(); + let (_, state_count, _) = purgatory.count(); assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); } @@ -1520,7 +1716,7 @@ fn test_pr_placeholder_not_marked_expired() { } // Run cleanup - let (_, pr_removed) = purgatory.cleanup(); + let (_, _, pr_removed) = purgatory.cleanup(); assert_eq!(pr_removed, 1); // Expired count should be 0 (placeholders don't have event IDs to track) @@ -1606,7 +1802,7 @@ async fn test_save_and_restore_state_events() { assert!(!state_file.exists()); // Verify state events were restored - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 2); let restored_entries = purgatory2.find_state("test-repo"); @@ -1662,7 +1858,7 @@ async fn test_save_and_restore_pr_events() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify PR event was restored - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 1); let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); @@ -1691,7 +1887,7 @@ async fn test_save_and_restore_pr_placeholders() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify placeholder was restored - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 1); let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); @@ -1769,7 +1965,7 @@ async fn test_save_and_restore_empty_purgatory() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify purgatory is still empty - let (state_count, pr_count) = purgatory2.count(); + let (_, state_count, pr_count) = purgatory2.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); assert_eq!(purgatory2.expired_count(), 0); @@ -1789,7 +1985,7 @@ async fn test_restore_missing_file() { assert!(result.is_err()); // Purgatory should remain empty - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -1811,7 +2007,7 @@ async fn test_restore_corrupted_json() { assert!(result.is_err()); // Purgatory should remain empty - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -2044,7 +2240,7 @@ async fn test_mixed_pr_events_and_placeholders() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify both were restored correctly - let (_, pr_count) = purgatory2.count(); + let (_, _, pr_count) = purgatory2.count(); assert_eq!(pr_count, 2); // Verify PR event @@ -2141,7 +2337,7 @@ async fn test_comprehensive_roundtrip() { purgatory.cleanup(); // Verify initial state - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) assert_eq!(pr_count, 2); // pr-1, pr-2 assert_eq!(purgatory.expired_count(), 1); // expired_event @@ -2154,7 +2350,7 @@ async fn test_comprehensive_roundtrip() { purgatory2.restore_from_disk(&state_file).unwrap(); // Verify all data was restored correctly - let (state_count2, pr_count2) = purgatory2.count(); + let (_, state_count2, pr_count2) = purgatory2.count(); assert_eq!(state_count2, 2); assert_eq!(pr_count2, 2); assert_eq!(purgatory2.expired_count(), 1); diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 33c2d12..778cdb8 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -279,7 +279,12 @@ impl SyncContext for RealSyncContext { } async fn fetch_repository_data(&self, identifier: &str) -> Result { - crate::git::authorization::fetch_repository_data(&self.database, identifier).await + crate::git::authorization::fetch_repository_data_with_purgatory( + &self.database, + &self.purgatory, + identifier, + ) + .await } fn collect_needed_oids(&self, identifier: &str) -> HashSet { diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 919504b..d891bc9 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs @@ -6,6 +6,8 @@ use nostr_sdk::prelude::*; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::path::PathBuf; use std::time::Instant; /// Default value for Instant fields during deserialization @@ -113,3 +115,40 @@ pub struct PrPurgatoryEntry { #[serde(skip, default = "instant_now")] pub expires_at: Instant, } + +/// Entry for a repository announcement (kind 30617) waiting in purgatory. +/// +/// Announcements are held in purgatory until git data arrives, proving +/// the repository has actual content. This prevents serving announcements +/// for empty repositories. +/// +/// Note: `Instant` fields cannot be serialized directly. Use the `persistence` +/// module to convert to/from serializable wrapper types. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AnnouncementPurgatoryEntry { + /// The nostr announcement event (kind 30617) + pub event: Event, + + /// The repository identifier from the event's 'd' tag + pub identifier: String, + + /// The owner pubkey (event author) + pub owner: PublicKey, + + /// Path to the bare git repository + pub repo_path: PathBuf, + + /// Relay URLs from the announcement (for sync registration) + pub relays: HashSet, + + /// When this entry was added to purgatory + #[serde(skip, default = "instant_now")] + pub created_at: Instant, + + /// Expiry deadline (30 min from creation, may be extended) + #[serde(skip, default = "instant_now")] + pub expires_at: Instant, + + /// Whether the bare repo has been deleted (soft expiry) + pub soft_expired: bool, +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1ee1872..872df66 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1719,8 +1719,50 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { + // Announcements (kind 30617) - re-process rejected state events + // When an announcement goes to purgatory, state events that were + // previously rejected ("no announcement exists") can now be authorized + // via fetch_repository_data_with_purgatory. + if event.kind == Kind::GitRepoAnnouncement { + use crate::nostr::events::RepositoryAnnouncement; + + if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) { + // Re-process rejected state events for this announcement + let (removed, hot_events) = rejected_events_index.invalidate_and_get( + &event.pubkey, + &announcement.identifier, + Some(rejected_index::EventType::State), + ); + + if removed > 0 { + tracing::info!( + pubkey = %event.pubkey, + identifier = %announcement.identifier, + removed_from_cold_index = removed, + hot_cache_events = hot_events.len(), + "Invalidated rejected state events (announcement now in purgatory)" + ); + } + + // Re-process state events from hot cache immediately + if !hot_events.is_empty() { + let _stats = Self::reprocess_events_from_hot_cache( + hot_events, + "state event (announcement in purgatory)", + &event.pubkey, + &announcement.identifier, + &relay_url_clone, + &database, + &write_policy, + &local_relay, + &rejected_events_index, + ) + .await; + } + } + } // State events (kind 30618) - extract identifier and trigger immediate sync - if event.kind.as_u16() == 30618 { + else if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { @@ -1754,7 +1796,9 @@ impl SyncManager { // Track pagination state for this subscription (REQ+EOSE) // and received event IDs for negentropy batches - if result == ProcessResult::Saved || result == ProcessResult::Duplicate { + // Include Purgatory results so announcements in purgatory still trigger + // per-repo sync (state events, PR events) from the source relay. + if result == ProcessResult::Saved || result == ProcessResult::Duplicate || result == ProcessResult::Purgatory { let mut pending = pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_clone) { for batch in batches.iter_mut() { @@ -2506,6 +2550,26 @@ impl SyncManager { "{} added to purgatory (waiting for git data)", context ); + // Trigger immediate sync for re-processed events that go to purgatory + // (same as sync-triggered events in the main event loop) + if event.kind.as_u16() == 30618 { + // State event - extract identifier from 'd' tag + if let Some(id) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.clone().to_vec(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].clone()) + } else { + None + } + }) { + write_policy.purgatory().enqueue_sync_immediate(&id); + } + } else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 { + // PR event - extract identifier from 'a' tag + if let Some(id) = crate::git::sync::extract_identifier_from_pr_event(&event) { + write_policy.purgatory().enqueue_sync_immediate(&id); + } + } } ProcessResult::Rejected => { stats.rejected += 1; diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index be6959b..e39b4b2 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -165,6 +165,7 @@ async fn test_archive_read_only_creates_bare_repo() { // c) Put state event in purgatory (git data missing on archive relay) // d) Fetch git data from source relay's clone URL // e) Release the state event from purgatory + let found = wait_for_event_served( archive_relay.url(), &state_event_id, @@ -267,11 +268,13 @@ async fn test_archive_read_only_creates_bare_repo() { /// This verifies the security model: archive mode only syncs git data /// when there are state events to validate against. /// -/// Scenario: -/// 1. Start source relay with announcement only (no state events) -/// 2. Start archive relay syncing from source -/// 3. Archive relay syncs announcement (creates bare repo) -/// 4. Verify git data is NOT synced (no state events to trigger purgatory sync) +/// With announcement purgatory, the flow is: +/// 1. Send announcement to source relay (goes to purgatory) +/// 2. Send state event to source relay (goes to purgatory) +/// 3. Push git data to source relay (promotes announcement and state event) +/// 4. Start archive relay with sync from source +/// 5. Archive relay syncs the promoted announcement +/// 6. Verify git data is NOT synced (archive has no state event to authorize git fetch) #[tokio::test] async fn test_archive_without_state_events_does_not_sync_git() { // 1. Start source relay @@ -290,7 +293,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { let npub = keys.public_key().to_bech32().expect("Failed to get npub"); - // 3. Create and send announcement listing BOTH relays (but NO state event) + // 3. Create and send announcement listing BOTH relays let announcement = create_repo_announcement( &keys, &[&source_relay.domain(), &archive_domain], @@ -306,7 +309,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { tokio::time::sleep(Duration::from_millis(500)).await; - // Send announcement to source relay + // Send announcement to source relay (goes to purgatory) source_client .send_event(&announcement) .await @@ -314,11 +317,39 @@ async fn test_archive_without_state_events_does_not_sync_git() { tokio::time::sleep(Duration::from_millis(200)).await; - // 4. Push git data to source relay (but no state event to authorize it) - // This push will fail because there's no state event in purgatory - // That's expected - we're testing that archive mode doesn't blindly fetch git data + // 4. Create and send state event to source relay (goes to purgatory) + let clone_url = format!( + "http://{}/{}/{}.git", + source_relay.domain(), + npub, + identifier + ); + let relay_url = source_relay.url().to_string(); + + let state_event = create_state_event( + &keys, + identifier, + &[("main", &commit_hash)], + &[], + &[&clone_url], + &[&relay_url], + ) + .expect("Failed to create state event"); + + source_client + .send_event(&state_event) + .await + .expect("Failed to send state event to source"); + + tokio::time::sleep(Duration::from_millis(200)).await; + + // 5. Push git data to source relay (promotes announcement and state event) + push_to_relay(temp_dir.path(), &source_relay.domain(), &npub, identifier) + .expect("Push to source should succeed"); + + tokio::time::sleep(Duration::from_millis(500)).await; - // 5. Start archive relay + // 6. Start archive relay (without state event - we don't send state event to archive) let archive_relay = TestRelay::start_with_archive_and_sync( archive_port, Some(source_relay.url().to_string()), @@ -333,10 +364,10 @@ async fn test_archive_without_state_events_does_not_sync_git() { .await .expect("Sync connection should establish"); - // Give time for any potential git sync to happen + // Give time for sync to fetch announcement tokio::time::sleep(Duration::from_secs(3)).await; - // 6. Verify bare repository was created (announcement was accepted) + // 7. Verify bare repository was created (announcement was synced and accepted to purgatory) let repo_path = archive_relay .git_data_path() .join(format!("{}/{}.git", npub, identifier)); @@ -346,7 +377,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { "Bare repository should be created for archive announcement" ); - // 7. Verify git data was NOT synced (no state events to trigger purgatory sync) + // 8. Verify git data was NOT synced (no state events on archive to trigger git fetch) // Check that the commit does NOT exist in the archive relay's repo let output = tokio::process::Command::new("git") .args(["cat-file", "-t", &commit_hash]) diff --git a/tests/purgatory.rs b/tests/purgatory.rs index e99540b..efc28c9 100644 --- a/tests/purgatory.rs +++ b/tests/purgatory.rs @@ -58,10 +58,10 @@ macro_rules! isolated_purgatory_test { } // ============================================================ -// Announcement Purgatory Tests (commented out - feature not yet implemented) +// Announcement Purgatory Tests // ============================================================ -// isolated_purgatory_test!(test_announcement_not_served_before_git_data); +isolated_purgatory_test!(test_announcement_not_served_before_git_data); isolated_purgatory_test!(test_announcement_served_after_git_push); isolated_purgatory_test!(test_bare_repo_exists_for_purgatory_announcement); isolated_purgatory_test!(test_state_event_accepted_for_purgatory_announcement); diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs index fe37c33..5abbf15 100644 --- a/tests/purgatory_persistence.rs +++ b/tests/purgatory_persistence.rs @@ -120,7 +120,8 @@ async fn test_full_purgatory_save_restore_cycle() { // so we'll focus on testing state and PR events persistence // Verify initial counts - let (state_count, pr_count) = purgatory.count(); + let (announcement_count, state_count, pr_count) = purgatory.count(); + assert_eq!(announcement_count, 0, "Should have 0 announcements"); assert_eq!(state_count, 2, "Should have 2 state events"); assert_eq!( pr_count, 3, @@ -142,7 +143,8 @@ async fn test_full_purgatory_save_restore_cycle() { ); // Verify all data was restored - let (state_count2, pr_count2) = purgatory2.count(); + let (announcement_count2, state_count2, pr_count2) = purgatory2.count(); + assert_eq!(announcement_count2, 0, "Should have 0 announcements after restore"); assert_eq!(state_count2, 2, "Should have 2 state events after restore"); assert_eq!( pr_count2, 3, @@ -275,7 +277,7 @@ async fn test_purgatory_downtime_adjustment() { purgatory2.restore_from_disk(&state_path).unwrap(); // Verify event is still there (downtime was accounted for) - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 1); let repo1_states = purgatory2.find_state("repo1"); @@ -401,7 +403,7 @@ async fn test_purgatory_restore_missing_file() { assert!(result.is_err(), "Should error on missing file"); // Purgatory should still be usable (empty state) - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); @@ -410,7 +412,7 @@ async fn test_purgatory_restore_missing_file() { let event = create_test_event(&keys, "test").await; purgatory.add_state(event, "repo1".to_string(), keys.public_key()); - let (state_count, _) = purgatory.count(); + let (_, state_count, _) = purgatory.count(); assert_eq!(state_count, 1); } @@ -461,7 +463,7 @@ async fn test_purgatory_restore_corrupted_file() { assert!(result.is_err(), "Should error on corrupted file"); // Purgatory should still be usable - let (state_count, pr_count) = purgatory.count(); + let (_, state_count, pr_count) = purgatory.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); } @@ -504,7 +506,7 @@ async fn test_empty_purgatory_save_restore() { purgatory2.restore_from_disk(&state_path).unwrap(); // Verify empty state - let (state_count, pr_count) = purgatory2.count(); + let (_, state_count, pr_count) = purgatory2.count(); assert_eq!(state_count, 0); assert_eq!(pr_count, 0); assert_eq!(purgatory2.expired_count(), 0); @@ -591,7 +593,7 @@ async fn test_purgatory_continues_working_after_restore() { purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key()); // Verify both old and new events work - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 2); let repo1_states = purgatory2.find_state("repo1"); @@ -603,7 +605,7 @@ async fn test_purgatory_continues_working_after_restore() { assert_eq!(repo2_states[0].event.id, event2.id); // Verify cleanup still works - let (state_removed, pr_removed) = purgatory2.cleanup(); + let (_, state_removed, pr_removed) = purgatory2.cleanup(); // Nothing should be expired yet assert_eq!(state_removed, 0); assert_eq!(pr_removed, 0); @@ -684,15 +686,15 @@ async fn test_purgatory_entries_expired_during_downtime() { purgatory2.restore_from_disk(&state_path).unwrap(); // Event should be restored - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 1); // Cleanup should work (even if nothing is expired yet) - let (state_removed, _) = purgatory2.cleanup(); + let (_, state_removed, _) = purgatory2.cleanup(); // Nothing expired yet since we didn't wait 30 minutes assert_eq!(state_removed, 0); - let (state_count, _) = purgatory2.count(); + let (_, state_count, _) = purgatory2.count(); assert_eq!(state_count, 1); } -- cgit v1.2.3 From 8c903c9449d387c9b0edefa5aa283b176a3ed0cb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 17:42:08 +0000 Subject: fix: revert wrong sync approach for purgatory announcements The partial fix treating ProcessResult::Purgatory as confirmed in pending_sync_index would trigger full L2/L3 sync for purgatory announcements. Per design (decision #6), purgatory announcements should only sync state events via SyncLevel::StateOnly (not yet implemented). Ignore test_archive_read_only_creates_bare_repo until SyncLevel is implemented in Phase 3. --- src/purgatory/sync/context.rs | 7 +---- src/sync/mod.rs | 68 ++----------------------------------------- tests/archive_read_only.rs | 1 + 3 files changed, 4 insertions(+), 72 deletions(-) (limited to 'src/sync') diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 778cdb8..33c2d12 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -279,12 +279,7 @@ impl SyncContext for RealSyncContext { } async fn fetch_repository_data(&self, identifier: &str) -> Result { - crate::git::authorization::fetch_repository_data_with_purgatory( - &self.database, - &self.purgatory, - identifier, - ) - .await + crate::git::authorization::fetch_repository_data(&self.database, identifier).await } fn collect_needed_oids(&self, identifier: &str) -> HashSet { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 872df66..1ee1872 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1719,50 +1719,8 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { - // Announcements (kind 30617) - re-process rejected state events - // When an announcement goes to purgatory, state events that were - // previously rejected ("no announcement exists") can now be authorized - // via fetch_repository_data_with_purgatory. - if event.kind == Kind::GitRepoAnnouncement { - use crate::nostr::events::RepositoryAnnouncement; - - if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) { - // Re-process rejected state events for this announcement - let (removed, hot_events) = rejected_events_index.invalidate_and_get( - &event.pubkey, - &announcement.identifier, - Some(rejected_index::EventType::State), - ); - - if removed > 0 { - tracing::info!( - pubkey = %event.pubkey, - identifier = %announcement.identifier, - removed_from_cold_index = removed, - hot_cache_events = hot_events.len(), - "Invalidated rejected state events (announcement now in purgatory)" - ); - } - - // Re-process state events from hot cache immediately - if !hot_events.is_empty() { - let _stats = Self::reprocess_events_from_hot_cache( - hot_events, - "state event (announcement in purgatory)", - &event.pubkey, - &announcement.identifier, - &relay_url_clone, - &database, - &write_policy, - &local_relay, - &rejected_events_index, - ) - .await; - } - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { @@ -1796,9 +1754,7 @@ impl SyncManager { // Track pagination state for this subscription (REQ+EOSE) // and received event IDs for negentropy batches - // Include Purgatory results so announcements in purgatory still trigger - // per-repo sync (state events, PR events) from the source relay. - if result == ProcessResult::Saved || result == ProcessResult::Duplicate || result == ProcessResult::Purgatory { + if result == ProcessResult::Saved || result == ProcessResult::Duplicate { let mut pending = pending_sync_index.write().await; if let Some(batches) = pending.get_mut(&relay_url_clone) { for batch in batches.iter_mut() { @@ -2550,26 +2506,6 @@ impl SyncManager { "{} added to purgatory (waiting for git data)", context ); - // Trigger immediate sync for re-processed events that go to purgatory - // (same as sync-triggered events in the main event loop) - if event.kind.as_u16() == 30618 { - // State event - extract identifier from 'd' tag - if let Some(id) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.clone().to_vec(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].clone()) - } else { - None - } - }) { - write_policy.purgatory().enqueue_sync_immediate(&id); - } - } else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 { - // PR event - extract identifier from 'a' tag - if let Some(id) = crate::git::sync::extract_identifier_from_pr_event(&event) { - write_policy.purgatory().enqueue_sync_immediate(&id); - } - } } ProcessResult::Rejected => { stats.rejected += 1; diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index e39b4b2..e388ae5 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -55,6 +55,7 @@ use std::time::Duration; /// 5. Verify bare repository is created and git data is synced /// 6. Verify git pushes are rejected (read-only mode) #[tokio::test] +#[ignore] // Requires SyncLevel implementation (Phase 3) - purgatory announcements don't trigger per-repo sync yet async fn test_archive_read_only_creates_bare_repo() { // 1. Start source relay let source_relay = TestRelay::start().await; -- cgit v1.2.3 From e922e14e3ec4b898c111b2100cd63dddbe2fcdb1 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Feb 2026 19:59:36 +0000 Subject: feat: add SyncLevel to sync system for purgatory announcement state-only sync Purgatory announcements need state events (kind 30618) synced from external relays, but not full L2/L3 events (patches, issues, PRs) which would be rejected anyway. This implements the SyncLevel concept from the design doc (decision #6): - Add SyncLevel enum (Full vs StateOnly) to RepoSyncNeeds - When announcement enters purgatory during sync, register in RepoSyncIndex with SyncLevel::StateOnly - Add build_sync_level_aware_filters() that partitions repos by level: StateOnly repos only get state event filters (kind 30618) - Update derive_relay_targets to track state_only_repos separately - Update compute_actions to handle both repo sets - SelfSubscriber always uses SyncLevel::Full (promoted repos) --- src/sync/algorithms.rs | 58 +++++++++++++++++++++++++++++++++++++------- src/sync/filters.rs | 31 ++++++++++++++++++++++++ src/sync/mod.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++- src/sync/self_subscriber.rs | 17 +++++++++---- 4 files changed, 150 insertions(+), 15 deletions(-) (limited to 'src/sync') diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 39788bc..9899abc 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs @@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState}; /// this repo need to sync from", it's "what repos does this relay need to sync". #[derive(Debug, Clone, Default)] pub struct RelaySyncNeeds { - /// Repos that need to be synced from this relay + /// Repos that need full L2+L3 sync from this relay pub repos: HashSet, + /// Repos that only need state event sync (purgatory announcements) + pub state_only_repos: HashSet, /// Root events that need to be tracked from this relay pub root_events: HashSet, } @@ -67,8 +69,15 @@ pub fn derive_relay_targets( for relay_url in &needs.relays { let entry = relay_targets.entry(relay_url.clone()).or_default(); - entry.repos.insert(repo_id.clone()); - entry.root_events.extend(needs.root_events.iter().cloned()); + match needs.sync_level { + super::SyncLevel::Full => { + entry.repos.insert(repo_id.clone()); + entry.root_events.extend(needs.root_events.iter().cloned()); + } + super::SyncLevel::StateOnly => { + entry.state_only_repos.insert(repo_id.clone()); + } + } } } @@ -96,7 +105,7 @@ pub fn compute_actions( pending: &HashMap>, confirmed: &HashMap, ) -> Vec { - use crate::sync::filters::build_layer2_and_layer3_filters; + use crate::sync::filters::build_sync_level_aware_filters; let mut actions = Vec::new(); @@ -140,14 +149,22 @@ pub fn compute_actions( .map(|state| state.root_events.clone()) .unwrap_or_default(); - // Calculate what's NEW (not in pending, not in confirmed) - let new_repos: HashSet = target_needs + // Calculate what's NEW for full repos (not in pending, not in confirmed) + let new_full_repos: HashSet = target_needs .repos .difference(&pending_repos) .filter(|repo| !confirmed_repos.contains(*repo)) .cloned() .collect(); + // Calculate what's NEW for state-only repos + let new_state_only_repos: HashSet = target_needs + .state_only_repos + .difference(&pending_repos) + .filter(|repo| !confirmed_repos.contains(*repo)) + .cloned() + .collect(); + let new_events: HashSet = target_needs .root_events .difference(&pending_events) @@ -156,13 +173,23 @@ pub fn compute_actions( .collect(); // If there's anything new, create an AddFilters action - if !new_repos.is_empty() || !new_events.is_empty() { - let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); + if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty() + { + let filters = build_sync_level_aware_filters( + &new_full_repos, + &new_state_only_repos, + &new_events, + None, + ); + + // Combine all repos into pending items (pending tracking doesn't need sync level) + let mut all_new_repos = new_full_repos; + all_new_repos.extend(new_state_only_repos); actions.push(AddFilters { relay_url: relay_url.clone(), items: PendingItems { - repos: new_repos, + repos: all_new_repos, root_events: new_events, }, filters, @@ -204,6 +231,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events, + sync_level: Default::default(), }, ); @@ -229,6 +257,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events: HashSet::new(), + sync_level: Default::default(), }, ); } @@ -252,6 +281,7 @@ mod tests { ModRepoSyncNeeds { relays, root_events: HashSet::new(), + sync_level: Default::default(), }, ); @@ -285,6 +315,7 @@ mod tests { ModRepoSyncNeeds { relays: relays1, root_events: root_events1, + sync_level: Default::default(), }, ); @@ -299,6 +330,7 @@ mod tests { ModRepoSyncNeeds { relays: relays2, root_events: root_events2, + sync_level: Default::default(), }, ); @@ -332,6 +364,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -366,6 +399,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -389,6 +423,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -428,6 +463,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -465,6 +501,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -510,6 +547,7 @@ mod tests { ] .into_iter() .collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); @@ -572,6 +610,7 @@ mod tests { "wss://relay1.com".to_string(), RelaySyncNeeds { repos: HashSet::new(), + state_only_repos: HashSet::new(), root_events: vec![event_id].into_iter().collect(), }, ); @@ -599,6 +638,7 @@ mod tests { "wss://new-relay.com".to_string(), RelaySyncNeeds { repos: vec!["repo1".to_string()].into_iter().collect(), + state_only_repos: HashSet::new(), root_events: HashSet::new(), }, ); diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 3592489..1215e81 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs @@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters( filters } +/// Builds filters respecting SyncLevel for each repo +/// +/// StateOnly repos only get state event filters (kind 30618). +/// Full repos get all L2/L3 filters (state + repo-tagging + root event). +/// +/// # Arguments +/// * `full_repos` - Repos needing full L2+L3 sync +/// * `state_only_repos` - Repos needing only state event sync (purgatory) +/// * `root_events` - Root event IDs (only used for Full repos) +/// * `since` - Optional timestamp for incremental sync +pub fn build_sync_level_aware_filters( + full_repos: &HashSet, + state_only_repos: &HashSet, + root_events: &HashSet, + since: Option, +) -> Vec { + let mut filters = Vec::new(); + + // All repos (both Full and StateOnly) need state event filters + let all_repos: HashSet = full_repos.union(state_only_repos).cloned().collect(); + filters.extend(state_event_filters_for_our_repos(&all_repos, since)); + + // Only Full repos get repo-tagging and root event filters + if !full_repos.is_empty() { + filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since)); + } + filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); + + filters +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1ee1872..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex; // Supporting Data Structures // ============================================================================= +/// Level of sync needed for a repository +/// +/// Purgatory announcements only need state events synced (to validate git data). +/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SyncLevel { + /// Full L2 + L3 sync (promoted repos with git data) + #[default] + Full, + /// Only state events (kind 30618) - for purgatory announcements + StateOnly, +} + /// What repos and root events need to be synced #[derive(Debug, Clone, Default)] pub struct RepoSyncNeeds { @@ -92,6 +105,8 @@ pub struct RepoSyncNeeds { pub relays: HashSet, /// Root event IDs - 1617/1618/1621 - that reference this repo pub root_events: HashSet, + /// Sync level - StateOnly for purgatory, Full for promoted repos + pub sync_level: SyncLevel, } /// Connection status for a relay @@ -1677,6 +1692,7 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); + let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1719,8 +1735,49 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) if result == ProcessResult::Purgatory { + // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly + // so that state events (kind 30618) are synced for this purgatory announcement + if event.kind == Kind::GitRepoAnnouncement { + if let Some(identifier) = event.tags.iter().find_map(|tag| { + let tag_vec = tag.as_slice(); + if tag_vec.len() >= 2 && tag_vec[0] == "d" { + Some(tag_vec[1].to_string()) + } else { + None + } + }) { + let repo_id = format!("30617:{}:{}", event.pubkey, identifier); + + // Extract relay URLs from the purgatory entry + let relays = write_policy + .purgatory() + .find_announcement(&event.pubkey, &identifier) + .map(|entry| entry.relays) + .unwrap_or_default(); + + tracing::info!( + event_id = %event.id, + repo_id = %repo_id, + relay_count = relays.len(), + "Registering purgatory announcement in RepoSyncIndex with StateOnly level" + ); + + // Register in RepoSyncIndex with StateOnly level + let mut index = repo_sync_index.write().await; + let entry = index + .entry(repo_id) + .or_insert_with(|| RepoSyncNeeds { + relays: HashSet::new(), + root_events: HashSet::new(), + sync_level: SyncLevel::StateOnly, + }); + entry.relays.extend(relays); + // Don't upgrade sync_level if already Full + // (e.g., if announcement was promoted before this runs) + } + } // State events (kind 30618) - extract identifier and trigger immediate sync - if event.kind.as_u16() == 30618 { + else if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..db16c62 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -16,7 +16,7 @@ use nostr_sdk::Timestamp; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; -use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; +use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; // ============================================================================= // LoopControl - Result of notification processing @@ -58,6 +58,7 @@ impl PendingUpdates { let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), + sync_level: SyncLevel::Full, }); entry.relays.extend(relays); entry.root_events.extend(root_events); @@ -475,6 +476,7 @@ impl SelfSubscriber { .or_insert_with(|| RepoSyncNeeds { relays: HashSet::new(), root_events: HashSet::new(), + sync_level: SyncLevel::Full, }); entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); @@ -499,21 +501,26 @@ impl SelfSubscriber { continue; } - // Build filters for these repos - let filters = crate::sync::filters::build_layer2_and_layer3_filters( + // Build filters for these repos (sync-level-aware) + let filters = crate::sync::filters::build_sync_level_aware_filters( &needs.repos, + &needs.state_only_repos, &needs.root_events, None, ); // Log before moving values - let repo_count = needs.repos.len(); + let repo_count = needs.repos.len() + needs.state_only_repos.len(); let event_count = needs.root_events.len(); + // Combine all repos into pending items + let mut all_repos = needs.repos; + all_repos.extend(needs.state_only_repos); + let action = AddFilters { relay_url: relay_url.clone(), items: crate::sync::PendingItems { - repos: needs.repos, + repos: all_repos, root_events: needs.root_events, }, filters, -- cgit v1.2.3 From 806936e7d1aab5dfd0c2ad6b98a115122dc1785c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 17:12:04 +0000 Subject: fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery StateOnly repos in a pending batch had their repo IDs included in the negentropy REQ+EOSE fallback, which called build_layer2_and_layer3_filters. This generated #a/#A/#q tag filters for repos whose announcements were still in purgatory (not yet promoted to the database). When the remote relay responded with PR events matching those filters, the write policy correctly rejected them as 'orphan' (no accepted repo in DB yet). However, nostr-sdk's client-level deduplication then silently dropped the same event on all subsequent deliveries, making it permanently unavailable even after the announcement was promoted. Fix: split batch_repos into full vs state-only by consulting repo_sync_index at fallback time, then call build_sync_level_aware_filters which only generates #a/#A/#q filters for Full repos. StateOnly repos only get the kind 30618 + #d filter they were originally subscribed with. --- src/sync/mod.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 108 insertions(+), 7 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..6ab8d33 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -557,6 +557,13 @@ pub struct SyncManager { /// Purgatory for read-only access to events awaiting git data purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) + // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters + // actions when user-submitted purgatory announcements need to trigger relay discovery. + /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) + #[allow(dead_code)] + action_tx: Option>, + /// Receiver for AddFilters actions (taken by run() when the event loop starts) + action_rx: Option>, local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, @@ -643,6 +650,11 @@ impl SyncManager { } } + // Create action channel upfront so callers (e.g. write policy) can send AddFilters + // actions before run() is called (e.g. when user-submitted purgatory announcements + // need to trigger relay discovery). + let (action_tx, action_rx) = tokio::sync::mpsc::channel::(100); + Self { bootstrap_relay_url, service_domain, @@ -663,9 +675,22 @@ impl SyncManager { connect_tx: None, shutdown_tx: None, metrics: sync_metrics, + action_tx: Some(action_tx), + action_rx: Some(action_rx), } } + /// Get a clone of the action sender for external use. + /// + /// This allows the write policy to send AddFilters actions to the SyncManager + /// when user-submitted purgatory announcements need to trigger relay discovery. + /// + /// # Returns + /// Clone of the action sender, or None if the channel was never created. + pub fn action_tx(&self) -> Option> { + self.action_tx.clone() + } + /// Generate a unique batch ID /// /// Increments the internal counter and returns the new value. @@ -686,6 +711,17 @@ impl SyncManager { self.rejected_events_index.clone() } + /// Get a clone of the repo sync index. + /// + /// This allows access to the repo sync index for upgrading sync levels + /// when announcements are promoted from purgatory. + /// + /// # Returns + /// Clone of the repo sync index (Arc>) + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Save rejected events index to disk. /// /// This is called during shutdown to persist the rejected events cache, @@ -949,11 +985,31 @@ impl SyncManager { // Drop the lock before async operations drop(pending); - // Create REQ+EOSE subscriptions using original semantic filters + // Create REQ+EOSE subscriptions using sync-level-aware filters. // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // + // CRITICAL: Use build_sync_level_aware_filters to avoid generating + // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements + // are still in purgatory. If we send Layer 2 filters too early, the + // remote relay may return PR events that our write policy rejects as + // "orphan" (no promoted repo). nostr-sdk deduplication then silently + // drops the event on retry, making it permanently unavailable. + let (full_repos, state_only_repos) = { + let index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_id in &batch_repos { + match index.get(repo_id).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } + _ => { full.insert(repo_id.clone()); } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1037,8 +1093,20 @@ impl SyncManager { pending.remove(&relay_url_for_fallback); } drop(pending); + let is_generic_filter = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + + // Trigger filter recomputation for generic filter batches + if is_generic_filter { + tracing::info!( + relay = %relay_url_for_fallback, + "Announcement batch complete (fallback path) - triggering filter recomputation" + ); + self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) + .await; + } } } return; @@ -1136,8 +1204,20 @@ impl SyncManager { pending.remove(&relay_url_for_retry); } drop(pending); + let is_generic_filter = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + + // Trigger filter recomputation for generic filter batches + if is_generic_filter { + tracing::info!( + relay = %relay_url_for_retry, + "Announcement batch complete (retry path) - triggering filter recomputation" + ); + self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) + .await; + } } } return; @@ -1158,7 +1238,20 @@ impl SyncManager { drop(pending); // 4. Confirm the batch (moves items to RelayState) + let is_generic_filter = + completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter batches (announcements), trigger filter recomputation + // to subscribe to state events for purgatory announcements that were registered + // during event processing. + if is_generic_filter { + tracing::info!( + relay = %relay_url, + "Announcement batch complete - triggering filter recomputation for purgatory repos" + ); + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState @@ -1437,8 +1530,16 @@ impl SyncManager { "SyncManager starting" ); - // 1. Create action channel for self-subscriber -> manager communication - let (action_tx, mut action_rx) = mpsc::channel::(100); + // 1. Take action channel receiver (created in new()) - sender is shared with write policy + let mut action_rx = self + .action_rx + .take() + .expect("action_rx should be set in new()"); + // Get a clone of action_tx for the self-subscriber + let action_tx_for_subscriber = self + .action_tx + .clone() + .expect("action_tx should be set in new()"); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -1457,7 +1558,7 @@ impl SyncManager { format!("ws://{}", self.config.bind_address), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), - action_tx, + action_tx_for_subscriber, ); let subscriber_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); -- cgit v1.2.3 From a804164468d3beafb243ece12555b4d1692a075d Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:28:44 +0000 Subject: Revert "fix: use sync-level-aware filters in negentropy fallback to prevent premature PR event delivery" This reverts commit 806936e7d1aab5dfd0c2ad6b98a115122dc1785c. --- src/sync/mod.rs | 115 ++++---------------------------------------------------- 1 file changed, 7 insertions(+), 108 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6ab8d33..519017b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -557,13 +557,6 @@ pub struct SyncManager { /// Purgatory for read-only access to events awaiting git data purgatory: Arc, /// Local relay for submitting synced events (enables broadcast to WebSocket subscribers) - // NOTE: action_tx is also used by external callers (e.g. write policy) to send AddFilters - // actions when user-submitted purgatory announcements need to trigger relay discovery. - /// Sender for AddFilters actions (pre-created so it can be cloned before run() is called) - #[allow(dead_code)] - action_tx: Option>, - /// Receiver for AddFilters actions (taken by run() when the event loop starts) - action_rx: Option>, local_relay: LocalRelay, /// Configuration reference for sync settings config: Config, @@ -650,11 +643,6 @@ impl SyncManager { } } - // Create action channel upfront so callers (e.g. write policy) can send AddFilters - // actions before run() is called (e.g. when user-submitted purgatory announcements - // need to trigger relay discovery). - let (action_tx, action_rx) = tokio::sync::mpsc::channel::(100); - Self { bootstrap_relay_url, service_domain, @@ -675,22 +663,9 @@ impl SyncManager { connect_tx: None, shutdown_tx: None, metrics: sync_metrics, - action_tx: Some(action_tx), - action_rx: Some(action_rx), } } - /// Get a clone of the action sender for external use. - /// - /// This allows the write policy to send AddFilters actions to the SyncManager - /// when user-submitted purgatory announcements need to trigger relay discovery. - /// - /// # Returns - /// Clone of the action sender, or None if the channel was never created. - pub fn action_tx(&self) -> Option> { - self.action_tx.clone() - } - /// Generate a unique batch ID /// /// Increments the internal counter and returns the new value. @@ -711,17 +686,6 @@ impl SyncManager { self.rejected_events_index.clone() } - /// Get a clone of the repo sync index. - /// - /// This allows access to the repo sync index for upgrading sync levels - /// when announcements are promoted from purgatory. - /// - /// # Returns - /// Clone of the repo sync index (Arc>) - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Save rejected events index to disk. /// /// This is called during shutdown to persist the rejected events cache, @@ -985,31 +949,11 @@ impl SyncManager { // Drop the lock before async operations drop(pending); - // Create REQ+EOSE subscriptions using sync-level-aware filters. + // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail. - // - // CRITICAL: Use build_sync_level_aware_filters to avoid generating - // Layer 2 (#a/#A/#q) filters for StateOnly repos whose announcements - // are still in purgatory. If we send Layer 2 filters too early, the - // remote relay may return PR events that our write policy rejects as - // "orphan" (no promoted repo). nostr-sdk deduplication then silently - // drops the event on retry, making it permanently unavailable. - let (full_repos, state_only_repos) = { - let index = self.repo_sync_index.read().await; - let mut full = HashSet::new(); - let mut state_only = HashSet::new(); - for repo_id in &batch_repos { - match index.get(repo_id).map(|n| n.sync_level) { - Some(SyncLevel::StateOnly) => { state_only.insert(repo_id.clone()); } - _ => { full.insert(repo_id.clone()); } - } - } - (full, state_only) - }; - let fallback_filters = filters::build_sync_level_aware_filters( - &full_repos, - &state_only_repos, + // succeed even when ID-based queries fail + let fallback_filters = filters::build_layer2_and_layer3_filters( + &batch_repos, &batch_root_events, None, ); @@ -1093,20 +1037,8 @@ impl SyncManager { pending.remove(&relay_url_for_fallback); } drop(pending); - let is_generic_filter = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - - // Trigger filter recomputation for generic filter batches - if is_generic_filter { - tracing::info!( - relay = %relay_url_for_fallback, - "Announcement batch complete (fallback path) - triggering filter recomputation" - ); - self.recompute_new_sync_filters_for_relay(&relay_url_for_fallback) - .await; - } } } return; @@ -1204,20 +1136,8 @@ impl SyncManager { pending.remove(&relay_url_for_retry); } drop(pending); - let is_generic_filter = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - - // Trigger filter recomputation for generic filter batches - if is_generic_filter { - tracing::info!( - relay = %relay_url_for_retry, - "Announcement batch complete (retry path) - triggering filter recomputation" - ); - self.recompute_new_sync_filters_for_relay(&relay_url_for_retry) - .await; - } } } return; @@ -1238,20 +1158,7 @@ impl SyncManager { drop(pending); // 4. Confirm the batch (moves items to RelayState) - let is_generic_filter = - completed_batch.items.repos.is_empty() && completed_batch.items.root_events.is_empty(); self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter batches (announcements), trigger filter recomputation - // to subscribe to state events for purgatory announcements that were registered - // during event processing. - if is_generic_filter { - tracing::info!( - relay = %relay_url, - "Announcement batch complete - triggering filter recomputation for purgatory repos" - ); - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1530,16 +1437,8 @@ impl SyncManager { "SyncManager starting" ); - // 1. Take action channel receiver (created in new()) - sender is shared with write policy - let mut action_rx = self - .action_rx - .take() - .expect("action_rx should be set in new()"); - // Get a clone of action_tx for the self-subscriber - let action_tx_for_subscriber = self - .action_tx - .clone() - .expect("action_tx should be set in new()"); + // 1. Create action channel for self-subscriber -> manager communication + let (action_tx, mut action_rx) = mpsc::channel::(100); // 2. Create disconnect channel for spawned tasks -> manager communication let (disconnect_tx, mut disconnect_rx) = mpsc::channel::(100); @@ -1558,7 +1457,7 @@ impl SyncManager { format!("ws://{}", self.config.bind_address), self.service_domain.clone(), Arc::clone(&self.repo_sync_index), - action_tx_for_subscriber, + action_tx, ); let subscriber_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); -- cgit v1.2.3 From e22021f0b248ebcf3bd09210d59b2cdb4701032f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 19:41:29 +0000 Subject: fix: simplify purgatory sync - fix SelfSubscriber sync_level upgrade and negentropy fallback Three targeted fixes for purgatory announcement sync: 1. SelfSubscriber sync_level upgrade: After or_insert_with in process_batch, always set entry.sync_level = SyncLevel::Full so that when a promoted announcement is broadcast via notify_event and SelfSubscriber receives it, an existing StateOnly entry gets upgraded to Full and PR event subscriptions are triggered immediately (not delayed up to 24h). 2. Negentropy fallback filter split: In handle_eose, when falling back from negentropy to REQ+EOSE, split batch_repos by SyncLevel and call build_sync_level_aware_filters instead of build_layer2_and_layer3_filters. Prevents StateOnly (purgatory) repos from getting Layer 2 #a/#A/#q filters prematurely, which caused nostr-sdk client deduplication to permanently drop PR events after orphan rejection. 3. Recompute sync filters after announcement batch EOSE: Add recompute_new_sync_filters_for_relay calls at all three batch-completion paths in handle_eose for generic filter (announcement) batches. This triggers state-only subscriptions for any purgatory repos registered during that batch, fixing the 24h delay before state event sync starts. 4. User-submitted purgatory announcements: Add repo_sync_index field to PolicyContext with setter/getter, wire in main.rs after SyncManager creation, and register in AcceptPurgatory handler so user-submitted announcements get StateOnly sync started immediately. 5. Update archive tests: test_archive_without_state_events_does_not_sync_git updated to reflect that StateOnly subscription now proactively fetches state events from source relays. test_archive_read_only_creates_bare_repo un-ignored as it now works end-to-end. --- src/main.rs | 7 +++++ src/nostr/builder.rs | 54 +++++++++++++++++++++++++++++++++++++ src/nostr/policy/mod.rs | 19 +++++++++++++ src/sync/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++++++--- src/sync/self_subscriber.rs | 4 +++ tests/archive_read_only.rs | 63 +++++++++++++++++++++++++++---------------- 6 files changed, 187 insertions(+), 26 deletions(-) (limited to 'src/sync') diff --git a/src/main.rs b/src/main.rs index ab6ede7..ebe05a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,13 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); + // Wire repo_sync_index into write policy so user-submitted purgatory announcements + // get registered for state event sync immediately (Fix 3). + let repo_sync_index = sync_manager.repo_sync_index(); + relay_with_db + .write_policy + .set_repo_sync_index(repo_sync_index); + tokio::spawn(async move { sync_manager.run().await; }); diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..8d1e461 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -17,6 +17,7 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; +use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -98,6 +99,14 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } + /// Set the repo sync index so that user-submitted purgatory announcements can + /// be registered for state event sync immediately. + /// + /// This must be called after SyncManager is created. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + self.ctx.set_repo_sync_index(index); + } + /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -146,6 +155,51 @@ impl Nip34WritePolicy { "Accepted announcement to purgatory: {} (waiting for git data)", event_id_str ); + + // Register repo in repo_sync_index with StateOnly level so that + // state event sync starts promptly via the next batch EOSE recompute. + // This handles user-submitted purgatory announcements - the SelfSubscriber + // only sees DB events, so it won't pick these up automatically. + if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { + if let Ok(announcement) = + RepositoryAnnouncement::from_event(event.clone()) + { + use std::collections::HashSet; + let repo_id = format!( + "30617:{}:{}", + event.pubkey, + announcement.identifier + ); + + // Extract relay URLs from the announcement event tags + let relays: HashSet = event + .tags + .iter() + .flat_map(|tag| { + let tag_vec = tag.as_slice(); + if !tag_vec.is_empty() && tag_vec[0] == "relays" { + tag_vec[1..].iter().map(|s| s.to_string()).collect::>() + } else { + vec![] + } + }) + .collect(); + + let mut index = repo_sync_index.write().await; + index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { + relays, + root_events: HashSet::new(), + sync_level: SyncLevel::StateOnly, + }); + drop(index); + + tracing::debug!( + repo_id = %repo_id, + "Registered purgatory announcement in repo_sync_index as StateOnly" + ); + } + } + WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..c958586 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; +use crate::sync::RepoSyncIndex; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -34,6 +35,8 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, + /// Repo sync index for registering purgatory announcements (set after SyncManager creation) + pub repo_sync_index: Arc>>, } impl PolicyContext { @@ -51,6 +54,7 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, + repo_sync_index: Arc::new(std::sync::RwLock::new(None)), } } @@ -68,4 +72,19 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } + + /// Set the repo sync index after SyncManager has been created. + /// + /// This allows purgatory announcements submitted by users to be registered + /// in the sync index so state event sync starts promptly. + pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { + let mut guard = self.repo_sync_index.write().unwrap(); + *guard = Some(index); + } + + /// Get a clone of the repo sync index if it has been set. + pub fn get_repo_sync_index(&self) -> Option { + let guard = self.repo_sync_index.read().unwrap(); + guard.clone() + } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..916e2b0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -700,6 +700,14 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } + /// Get a clone of the repo sync index Arc. + /// + /// This allows the write policy to register user-submitted purgatory announcements + /// in the sync index so that state event sync starts promptly. + pub fn repo_sync_index(&self) -> RepoSyncIndex { + self.repo_sync_index.clone() + } + /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -951,9 +959,29 @@ impl SyncManager { // Create REQ+EOSE subscriptions using original semantic filters // This queries by kind/author/tags instead of by ID, which may - // succeed even when ID-based queries fail - let fallback_filters = filters::build_layer2_and_layer3_filters( - &batch_repos, + // succeed even when ID-based queries fail. + // Split batch_repos by SyncLevel to avoid sending Layer 2 filters + // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be + // rejected as orphan and then silently dropped by nostr-sdk deduplication. + let (full_repos, state_only_repos) = { + let repo_index = self.repo_sync_index.read().await; + let mut full = HashSet::new(); + let mut state_only = HashSet::new(); + for repo_ref in &batch_repos { + match repo_index.get(repo_ref).map(|n| n.sync_level) { + Some(SyncLevel::StateOnly) => { + state_only.insert(repo_ref.clone()); + } + _ => { + full.insert(repo_ref.clone()); + } + } + } + (full, state_only) + }; + let fallback_filters = filters::build_sync_level_aware_filters( + &full_repos, + &state_only_repos, &batch_root_events, None, ); @@ -1033,12 +1061,24 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_fallback, + ) + .await; + } } } return; @@ -1132,12 +1172,24 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); + let is_generic = + completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; + // For generic filter (announcement) batches, recompute filters + // so any purgatory repos registered during this batch get + // state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay( + &relay_url_for_retry, + ) + .await; + } } } return; @@ -1148,6 +1200,8 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); + let is_generic = completed_batch.items.repos.is_empty() + && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1159,6 +1213,12 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; + + // 5. For generic filter (announcement) batches, recompute sync filters so any + // purgatory repos registered during this batch get state-only subscriptions triggered. + if is_generic { + self.recompute_new_sync_filters_for_relay(relay_url).await; + } } /// Confirm a completed batch by moving items to RelayState diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index db16c62..70c3dbf 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -478,6 +478,10 @@ impl SelfSubscriber { root_events: HashSet::new(), sync_level: SyncLevel::Full, }); + // Upgrade sync_level to Full - this handles the case where the entry + // already exists as StateOnly (purgatory announcement) and is now being + // promoted (git data arrived and the event was broadcast via notify_event). + entry.sync_level = SyncLevel::Full; entry.relays.extend(needs.relays); entry.root_events.extend(needs.root_events); diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs index e388ae5..069b3b7 100644 --- a/tests/archive_read_only.rs +++ b/tests/archive_read_only.rs @@ -55,7 +55,6 @@ use std::time::Duration; /// 5. Verify bare repository is created and git data is synced /// 6. Verify git pushes are rejected (read-only mode) #[tokio::test] -#[ignore] // Requires SyncLevel implementation (Phase 3) - purgatory announcements don't trigger per-repo sync yet async fn test_archive_read_only_creates_bare_repo() { // 1. Start source relay let source_relay = TestRelay::start().await; @@ -264,24 +263,24 @@ async fn test_archive_read_only_creates_bare_repo() { source_relay.stop().await; } -/// Test that archive mode without state events does NOT sync git data. +/// Test that archive mode proactively syncs state events and git data +/// when the source relay has state events available. /// -/// This verifies the security model: archive mode only syncs git data -/// when there are state events to validate against. +/// With StateOnly sync now implemented, purgatory announcements subscribe +/// to state events from the relays listed in the announcement. This means +/// the archive relay will: +/// 1. Sync the announcement → purgatory → register as StateOnly in repo_sync_index +/// 2. Subscribe to state events (kind 30618) on source relay +/// 3. Receive the state event → purgatory sync triggered +/// 4. Fetch git data from source relay's clone URL /// -/// With announcement purgatory, the flow is: -/// 1. Send announcement to source relay (goes to purgatory) -/// 2. Send state event to source relay (goes to purgatory) -/// 3. Push git data to source relay (promotes announcement and state event) -/// 4. Start archive relay with sync from source -/// 5. Archive relay syncs the promoted announcement -/// 6. Verify git data is NOT synced (archive has no state event to authorize git fetch) +/// This test verifies the full sync chain works end-to-end for archive mode. #[tokio::test] -async fn test_archive_without_state_events_does_not_sync_git() { +async fn test_archive_syncs_state_events_and_git_data_via_state_only_subscription() { // 1. Start source relay let source_relay = TestRelay::start().await; let keys = Keys::generate(); - let identifier = "archive-no-state-repo"; + let identifier = "archive-state-only-sync-repo"; // Pre-allocate archive relay port let archive_port = TestRelay::find_free_port(); @@ -295,6 +294,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { let npub = keys.public_key().to_bech32().expect("Failed to get npub"); // 3. Create and send announcement listing BOTH relays + // The archive relay will subscribe to state events on BOTH listed relays let announcement = create_repo_announcement( &keys, &[&source_relay.domain(), &archive_domain], @@ -337,6 +337,8 @@ async fn test_archive_without_state_events_does_not_sync_git() { ) .expect("Failed to create state event"); + let state_event_id = state_event.id; + source_client .send_event(&state_event) .await @@ -348,9 +350,12 @@ async fn test_archive_without_state_events_does_not_sync_git() { push_to_relay(temp_dir.path(), &source_relay.domain(), &npub, identifier) .expect("Push to source should succeed"); - tokio::time::sleep(Duration::from_millis(500)).await; + // Wait for state event to be promoted on source relay + wait_for_event_served(source_relay.url(), &state_event_id, Duration::from_secs(5)) + .await + .expect("State event should be served on source relay after push"); - // 6. Start archive relay (without state event - we don't send state event to archive) + // 6. Start archive relay - StateOnly subscription will proactively fetch state events let archive_relay = TestRelay::start_with_archive_and_sync( archive_port, Some(source_relay.url().to_string()), @@ -360,15 +365,28 @@ async fn test_archive_without_state_events_does_not_sync_git() { ) .await; - // Wait for sync + // Wait for sync connection wait_for_sync_connection(archive_relay.url(), 1, Duration::from_secs(5)) .await .expect("Sync connection should establish"); - // Give time for sync to fetch announcement - tokio::time::sleep(Duration::from_secs(3)).await; + // 7. Wait for state event to be served on archive relay + // The StateOnly subscription fetches the state event from source relay, + // which then triggers purgatory sync and git data fetch. + let found = wait_for_event_served( + archive_relay.url(), + &state_event_id, + Duration::from_secs(30), // Allow time for sync + git fetch + ) + .await; + + assert!( + found.is_ok(), + "State event should be served on archive after StateOnly subscription fetches it: {:?}", + found.err() + ); - // 7. Verify bare repository was created (announcement was synced and accepted to purgatory) + // 8. Verify bare repository was created let repo_path = archive_relay .git_data_path() .join(format!("{}/{}.git", npub, identifier)); @@ -378,8 +396,7 @@ async fn test_archive_without_state_events_does_not_sync_git() { "Bare repository should be created for archive announcement" ); - // 8. Verify git data was NOT synced (no state events on archive to trigger git fetch) - // Check that the commit does NOT exist in the archive relay's repo + // 9. Verify git data was synced via the state event chain let output = tokio::process::Command::new("git") .args(["cat-file", "-t", &commit_hash]) .current_dir(&repo_path) @@ -389,8 +406,8 @@ async fn test_archive_without_state_events_does_not_sync_git() { let commit_exists = output.map(|o| o.status.success()).unwrap_or(false); assert!( - !commit_exists, - "Git data should NOT be synced without state events (security: validates against Nostr state)" + commit_exists, + "Git data should be synced via StateOnly subscription → state event → git fetch chain" ); // Cleanup -- cgit v1.2.3 From ee113a654e2971a6ebdb07398cc5638dbe59b48c Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 20:32:13 +0000 Subject: fix: replace repo_sync_index wiring with purgatory announcement sync timer Instead of threading repo_sync_index through PolicyContext/builder.rs/main.rs to handle user-submitted purgatory announcements, add a simple background timer (run_purgatory_announcement_sync, every 5s) that scans the purgatory for announcement entries and registers them in repo_sync_index as StateOnly. This is simpler and covers both flows: - Sync-path announcements: inline registration still happens during event processing (sync/mod.rs:1839+), timer provides a safety net - User-submitted announcements: SelfSubscriber never sees them (rejected from DB), timer is the primary registration path The timer calls sync_purgatory_announcements_to_index() which: 1. Snapshots purgatory via new announcements_for_sync() public method 2. Or_inserts StateOnly entries (never downgrades Full entries) 3. Detects newly added relay URLs and calls handle_new_sync_filters to connect and subscribe - fixing the failing test that expected relay discovery from a user-submitted purgatory announcement Removes: repo_sync_index field from PolicyContext, set/get_repo_sync_index methods, set_repo_sync_index on Nip34WritePolicy, wiring in main.rs, and the inline AcceptPurgatory registration block in builder.rs. --- src/main.rs | 7 --- src/nostr/builder.rs | 54 +-------------------- src/nostr/policy/mod.rs | 19 -------- src/purgatory/mod.rs | 17 +++++++ src/sync/mod.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 134 insertions(+), 88 deletions(-) (limited to 'src/sync') diff --git a/src/main.rs b/src/main.rs index ebe05a3..ab6ede7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,13 +132,6 @@ async fn main() -> Result<()> { // Get a reference to the rejected events index for shutdown persistence let shutdown_rejected_index = sync_manager.rejected_events_index(); - // Wire repo_sync_index into write policy so user-submitted purgatory announcements - // get registered for state event sync immediately (Fix 3). - let repo_sync_index = sync_manager.repo_sync_index(); - relay_with_db - .write_policy - .set_repo_sync_index(repo_sync_index); - tokio::spawn(async move { sync_manager.run().await; }); diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 8d1e461..c2d4939 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -17,7 +17,7 @@ use crate::nostr::policy::{ AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, }; -use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; + /// Type alias for the shared database used by the relay pub type SharedDatabase = Arc; @@ -99,14 +99,6 @@ impl Nip34WritePolicy { self.ctx.set_local_relay(relay); } - /// Set the repo sync index so that user-submitted purgatory announcements can - /// be registered for state event sync immediately. - /// - /// This must be called after SyncManager is created. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - self.ctx.set_repo_sync_index(index); - } - /// Handle repository announcement event async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); @@ -156,50 +148,6 @@ impl Nip34WritePolicy { event_id_str ); - // Register repo in repo_sync_index with StateOnly level so that - // state event sync starts promptly via the next batch EOSE recompute. - // This handles user-submitted purgatory announcements - the SelfSubscriber - // only sees DB events, so it won't pick these up automatically. - if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { - if let Ok(announcement) = - RepositoryAnnouncement::from_event(event.clone()) - { - use std::collections::HashSet; - let repo_id = format!( - "30617:{}:{}", - event.pubkey, - announcement.identifier - ); - - // Extract relay URLs from the announcement event tags - let relays: HashSet = event - .tags - .iter() - .flat_map(|tag| { - let tag_vec = tag.as_slice(); - if !tag_vec.is_empty() && tag_vec[0] == "relays" { - tag_vec[1..].iter().map(|s| s.to_string()).collect::>() - } else { - vec![] - } - }) - .collect(); - - let mut index = repo_sync_index.write().await; - index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { - relays, - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - drop(index); - - tracing::debug!( - repo_id = %repo_id, - "Registered purgatory announcement in repo_sync_index as StateOnly" - ); - } - } - WritePolicyResult::Reject { status: true, // Client sees OK message: "purgatory: won't be served until git data arrives".into(), diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index c958586..1566b6c 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs @@ -20,7 +20,6 @@ pub use crate::git::sync::AlignmentResult; use super::SharedDatabase; use crate::purgatory::Purgatory; -use crate::sync::RepoSyncIndex; use nostr_relay_builder::LocalRelay; use std::sync::Arc; @@ -35,8 +34,6 @@ pub struct PolicyContext { pub local_relay: Arc>>, /// Configuration reference for policy settings (includes blacklists) pub config: crate::config::Config, - /// Repo sync index for registering purgatory announcements (set after SyncManager creation) - pub repo_sync_index: Arc>>, } impl PolicyContext { @@ -54,7 +51,6 @@ impl PolicyContext { purgatory, local_relay: Arc::new(std::sync::RwLock::new(None)), config, - repo_sync_index: Arc::new(std::sync::RwLock::new(None)), } } @@ -72,19 +68,4 @@ impl PolicyContext { let guard = self.local_relay.read().unwrap(); guard.clone() } - - /// Set the repo sync index after SyncManager has been created. - /// - /// This allows purgatory announcements submitted by users to be registered - /// in the sync index so state event sync starts promptly. - pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { - let mut guard = self.repo_sync_index.write().unwrap(); - *guard = Some(index); - } - - /// Get a clone of the repo sync index if it has been set. - pub fn get_repo_sync_index(&self) -> Option { - let guard = self.repo_sync_index.read().unwrap(); - guard.clone() - } } diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 3b5514b..1894738 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs @@ -680,6 +680,23 @@ impl Purgatory { self.announcement_purgatory.len() } + /// Collect (repo_id, relay_urls) for all announcements currently in purgatory. + /// + /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable + /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory + /// announcement sync timer to register StateOnly entries in `repo_sync_index`. + pub fn announcements_for_sync(&self) -> Vec<(String, HashSet)> { + self.announcement_purgatory + .iter() + .map(|entry| { + let (owner, identifier) = entry.key(); + let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); + let relays = entry.value().relays.clone(); + (repo_id, relays) + }) + .collect() + } + /// Get all event IDs currently stored in purgatory AND previously expired events. /// /// Returns a HashSet of all event IDs for: diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 916e2b0..ed5b6e7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -397,6 +397,37 @@ async fn run_daily_timer( } } +/// Background task that periodically syncs purgatory announcements into repo_sync_index. +/// +/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there +/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` +/// which connects to the relay URLs listed in the announcement and subscribes to state +/// events (kind 30618). +/// +/// This covers two cases: +/// - Sync-path announcements: registered inline during event processing, but this +/// provides a safety net in case the inline registration was missed. +/// - User-submitted purgatory announcements: the SelfSubscriber never sees them +/// (they're rejected from DB), so this timer is the primary registration path. +async fn run_purgatory_announcement_sync( + sync_manager: Arc>, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let interval = Duration::from_secs(5); + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + let mut manager = sync_manager.lock().await; + manager.sync_purgatory_announcements_to_index().await; + } + _ = shutdown_rx.recv() => { + tracing::debug!("Purgatory announcement sync timer received shutdown signal"); + break; + } + } + } +} + // Combined Health and Metrics Checker /// Background task for cleaning up expired entries from the rejected events index @@ -700,14 +731,6 @@ impl SyncManager { self.rejected_events_index.save_to_disk(path) } - /// Get a clone of the repo sync index Arc. - /// - /// This allows the write policy to register user-submitted purgatory announcements - /// in the sync index so that state event sync starts promptly. - pub fn repo_sync_index(&self) -> RepoSyncIndex { - self.repo_sync_index.clone() - } - /// Handle EOSE (End Of Stored Events) for a subscription /// /// This method: @@ -1560,7 +1583,17 @@ impl SyncManager { run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; }); - // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications + // 11. Spawn purgatory announcement sync timer (every 5s) + // Ensures purgatory announcements (including user-submitted ones that never + // touch the DB) are registered in repo_sync_index as StateOnly so that + // state event subscriptions are established on their listed relay URLs. + let purgatory_sync_manager = Arc::clone(&sync_manager); + let purgatory_sync_shutdown = shutdown_tx.subscribe(); + tokio::spawn(async move { + run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; + }); + + // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications loop { // Wait for an event without holding the lock tokio::select! { @@ -2419,6 +2452,80 @@ impl SyncManager { } } + /// Sync purgatory announcements into repo_sync_index as StateOnly entries. + /// + /// Called periodically by the purgatory announcement sync timer (every 5s). + /// For each announcement currently in purgatory, ensures a `StateOnly` entry + /// exists in `repo_sync_index`. New entries are then picked up by + /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes + /// to state events for that repo. + /// + /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). + async fn sync_purgatory_announcements_to_index(&mut self) { + use crate::sync::algorithms::{compute_actions, derive_relay_targets}; + + // Collect all purgatory announcements (snapshot - no async holds) + let announcements = self.purgatory.announcements_for_sync(); + + if announcements.is_empty() { + return; + } + + // Register any new entries in repo_sync_index as StateOnly + let mut new_relay_urls: std::collections::HashSet = std::collections::HashSet::new(); + { + let mut index = self.repo_sync_index.write().await; + for (repo_id, relays) in &announcements { + let entry = index.entry(repo_id.clone()).or_insert_with(|| { + tracing::debug!( + repo_id = %repo_id, + "Registering purgatory announcement in repo_sync_index as StateOnly" + ); + RepoSyncNeeds { + relays: std::collections::HashSet::new(), + root_events: std::collections::HashSet::new(), + sync_level: SyncLevel::StateOnly, + } + }); + // Don't downgrade an already-Full entry + // Add any new relay URLs + for relay in relays { + if entry.relays.insert(relay.clone()) { + new_relay_urls.insert(relay.clone()); + } + } + } + } + + if new_relay_urls.is_empty() { + return; + } + + // For any relay URLs that are new, compute and send AddFilters actions + let all_targets = { + let repo_index = self.repo_sync_index.read().await; + derive_relay_targets(&repo_index) + }; + + let actions = { + let pending_index = self.pending_sync_index.read().await; + let relay_index = self.relay_sync_index.read().await; + compute_actions(&all_targets, &pending_index, &relay_index) + }; + + for action in actions { + // Only act on relays that have new URLs (avoids redundant work) + if new_relay_urls.contains(&action.relay_url) { + tracing::info!( + relay = %action.relay_url, + repos = action.items.repos.len(), + "Purgatory sync timer: connecting to new relay from purgatory announcement" + ); + self.handle_new_sync_filters(action).await; + } + } + } + /// Handle a relay disconnection /// /// This method is called when the event loop terminates and sends a disconnect notification. -- cgit v1.2.3 From f62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:48:26 +0000 Subject: refactor: replace inline purgatory sync registration with timer-only approach Remove the redundant inline kind-30617 registration block from the sync event loop and the three is_generic/recompute_new_sync_filters_for_relay calls from confirm_batch error paths. The purgatory announcement sync timer (run_purgatory_announcement_sync) is now the sole registration path. Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing two ad-hoc env vars with one reusable test-mode flag. --- src/sync/mod.rs | 103 ++++++---------------------------- src/sync/self_subscriber.rs | 12 ++-- tests/common/relay.rs | 2 +- tests/sync/maintainer_reprocessing.rs | 2 +- 4 files changed, 26 insertions(+), 93 deletions(-) (limited to 'src/sync') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -399,21 +399,24 @@ async fn run_daily_timer( /// Background task that periodically syncs purgatory announcements into repo_sync_index. /// -/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there -/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` -/// which connects to the relay URLs listed in the announcement and subscribes to state -/// events (kind 30618). +/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). +/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in +/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the +/// relay URLs listed in the announcement and subscribes to state events (kind 30618). /// -/// This covers two cases: -/// - Sync-path announcements: registered inline during event processing, but this -/// provides a safety net in case the inline registration was missed. +/// This is the sole registration path for purgatory announcements: +/// - Sync-path announcements: registered here within one interval of arriving. /// - User-submitted purgatory announcements: the SelfSubscriber never sees them -/// (they're rejected from DB), so this timer is the primary registration path. +/// (they're rejected from DB), so this timer is the only registration path. async fn run_purgatory_announcement_sync( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, ) { - let interval = Duration::from_secs(5); + let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_secs(5) + }; loop { tokio::select! { _ = tokio::time::sleep(interval) => { @@ -1084,24 +1087,12 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_fallback, - ) - .await; - } } } return; @@ -1195,24 +1186,12 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_retry, - ) - .await; - } } } return; @@ -1223,8 +1202,6 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); - let is_generic = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1236,12 +1213,6 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter (announcement) batches, recompute sync filters so any - // purgatory repos registered during this batch get state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1370,7 +1341,7 @@ impl SyncManager { /// to be batched and create Layer 2/3 filters before we mark sync complete. /// /// The 6-second delay is based on: - /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) + /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) /// - Buffer for processing: 1 second /// /// Called after each batch is confirmed to detect completion. @@ -1785,7 +1756,6 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); - let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1827,50 +1797,13 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) + // + // Note: announcement events (kind 30617) are registered in repo_sync_index + // by the purgatory announcement sync timer (run_purgatory_announcement_sync) + // rather than inline here. if result == ProcessResult::Purgatory { - // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly - // so that state events (kind 30618) are synced for this purgatory announcement - if event.kind == Kind::GitRepoAnnouncement { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = format!("30617:{}:{}", event.pubkey, identifier); - - // Extract relay URLs from the purgatory entry - let relays = write_policy - .purgatory() - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - tracing::info!( - event_id = %event.id, - repo_id = %repo_id, - relay_count = relays.len(), - "Registering purgatory announcement in RepoSyncIndex with StateOnly level" - ); - - // Register in RepoSyncIndex with StateOnly level - let mut index = repo_sync_index.write().await; - let entry = index - .entry(repo_id) - .or_insert_with(|| RepoSyncNeeds { - relays: HashSet::new(), - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - entry.relays.extend(relays); - // Don't upgrade sync_level if already Full - // (e.g., if announcement was promoted before this runs) - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -126,14 +126,14 @@ impl SelfSubscriber { /// Get batch window from environment or use default /// - /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. + /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. /// Default: 5000ms (5 seconds) fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse::().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_millis(5000)) + if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_millis(5000) + } } /// Process a relay pool notification diff --git a/tests/common/relay.rs b/tests/common/relay.rs index 0ec9a2e..b1e96cf 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs @@ -204,7 +204,7 @@ impl TestRelay { .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation .env("NGIT_OWNER_NPUB", &test_npub) - .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) + .env("NGIT_TEST", "1") // Enable test mode: fast timers (200ms batch window, 200ms purgatory sync) .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 61d8e14..ff1eb43 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -377,7 +377,7 @@ async fn test_multiple_maintainers_all_reprocessed() { println!("relay_b started at {}", relay_b.url()); // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. - // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we // allow extra time for slow CI environments. tokio::time::sleep(Duration::from_secs(3)).await; println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); -- cgit v1.2.3