From 49b9405dfcbb872686acdd7abc12dc9c94adc2ab Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 18 Feb 2026 23:17:08 +0000 Subject: test: update sync tests to set up git data for purgatory flow All sync tests now create a local git repo, send announcement + state event to the source relay, and push git data to release both from purgatory before the syncing relay starts bootstrap sync. --- tests/sync/maintainer_reprocessing.rs | 153 ++++++++++++++++++++++++---------- 1 file changed, 110 insertions(+), 43 deletions(-) (limited to 'tests/sync/maintainer_reprocessing.rs') diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index df1bf78..266a437 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -7,7 +7,10 @@ use std::time::Duration; use nostr_sdk::prelude::*; -use crate::common::{sync_helpers::*, TestRelay}; +use crate::common::{ + sync_helpers::*, + TestRelay, +}; /// Test that maintainer announcements are re-processed immediately when owner announcement accepted /// @@ -37,10 +40,12 @@ async fn test_maintainer_announcement_reprocessed_immediately() { let start = std::time::Instant::now(); - // Step 1: Send maintainer announcement to relay_a (will be rejected - doesn't list relay_b) - let client_a = TestClient::new(relay_a.url(), maintainer_keys.clone()) - .await - .expect("Failed to connect to relay_a"); + // Step 1: Send maintainer announcement to relay_a (will be rejected by relay_b - doesn't list relay_b) + // Use HTTP clone URL pointing to relay_a's git endpoint so it can be released from purgatory + let maintainer_npub = maintainer_keys + .public_key() + .to_bech32() + .expect("Failed to get npub"); let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") @@ -48,27 +53,50 @@ async fn test_maintainer_announcement_reprocessed_immediately() { Tag::identifier(identifier), Tag::custom( TagKind::custom("clone"), - vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], + vec![format!( + "http://{}/{}/{}.git", + relay_a.domain(), + maintainer_npub, + identifier + )], ), Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), ]) .sign_with_keys(&maintainer_keys) .unwrap(); - client_a.send_event(&maintainer_announcement).await.unwrap(); + send_to_relay(&relay_a, &maintainer_announcement) + .await + .unwrap(); println!("✓ Maintainer announcement sent to relay_a"); - // Step 2: Send owner announcement to relay_b (lists relay_a + maintainer) - let client_b = TestClient::new(relay_b.url(), owner_keys.clone()) - .await - .expect("Failed to connect to relay_b"); + // Push git data for maintainer's repo to relay_a → releases maintainer announcement from purgatory + let _git_dir_maintainer = push_git_data_to_relay( + &relay_a, + &maintainer_keys, + identifier, + &[&relay_a.domain()], + ) + .await; + println!("✓ Maintainer git data pushed to relay_a (announcement released from purgatory)"); + + // Step 2: Set up owner announcement on relay_b (lists relay_a + maintainer) with git data + let owner_npub = owner_keys + .public_key() + .to_bech32() + .expect("Failed to get npub"); let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") .tags(vec![ Tag::identifier(identifier), Tag::custom( TagKind::custom("clone"), - vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], + vec![format!( + "http://{}/{}/{}.git", + relay_b.domain(), + owner_npub, + identifier + )], ), Tag::custom( TagKind::custom("relays"), @@ -82,9 +110,14 @@ async fn test_maintainer_announcement_reprocessed_immediately() { .sign_with_keys(&owner_keys) .unwrap(); - client_b.send_event(&owner_announcement).await.unwrap(); + send_to_relay(&relay_b, &owner_announcement).await.unwrap(); println!("✓ Owner announcement sent to relay_b"); + // Push git data for owner's repo to relay_b → releases owner announcement from purgatory + let _git_dir_owner = + push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; + println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) tokio::time::sleep(Duration::from_secs(3)).await; @@ -114,15 +147,13 @@ async fn test_maintainer_announcement_reprocessed_immediately() { // Step 5: Verify it happened quickly (not 24 hours!) assert!( - elapsed.as_secs() < 10, - "Re-processing should happen in <10 seconds, took {:?}", + elapsed.as_secs() < 15, + "Re-processing should happen in <15 seconds, took {:?}", elapsed ); println!("✅ Maintainer announcement re-processed in {:?}", elapsed); - client_a.disconnect().await; - client_b.disconnect().await; relay_a.stop().await; relay_b.stop().await; } @@ -253,15 +284,18 @@ async fn test_multiple_maintainers_all_reprocessed() { let identifier = "multi-maintainer-repo"; - // Step 1: Send three maintainer announcements to relay_a - let client_a = TestClient::new(relay_a.url(), maintainer1_keys.clone()) - .await - .expect("Failed to connect to relay_a"); - + // Step 1: Send three maintainer announcements to relay_a with git data + // (purgatory requires git data before announcements are accepted) + let mut git_dirs_maintainers = Vec::new(); for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] .iter() .enumerate() { + let m_npub = maintainer_keys + .public_key() + .to_bech32() + .expect("Failed to get npub"); + let announcement = EventBuilder::new( Kind::GitRepoAnnouncement, format!("Maintainer {} repository", idx + 1), @@ -270,28 +304,45 @@ async fn test_multiple_maintainers_all_reprocessed() { Tag::identifier(identifier), Tag::custom( TagKind::custom("clone"), - vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], + vec![format!( + "http://{}/{}/{}.git", + relay_a.domain(), + m_npub, + identifier + )], ), Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), ]) .sign_with_keys(maintainer_keys) .unwrap(); - client_a.send_event(&announcement).await.unwrap(); + send_to_relay(&relay_a, &announcement).await.unwrap(); + + // Push git data to release each maintainer's announcement from purgatory + let git_dir = + push_git_data_to_relay(&relay_a, maintainer_keys, identifier, &[&relay_a.domain()]) + .await; + git_dirs_maintainers.push(git_dir); } - println!("✓ Three maintainer announcements sent to relay_a"); + println!("✓ Three maintainer announcements sent to relay_a with git data"); // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) - let client_b = TestClient::new(relay_b.url(), owner_keys.clone()) - .await - .expect("Failed to connect to relay_b"); + let owner_npub = owner_keys + .public_key() + .to_bech32() + .expect("Failed to get npub"); let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") .tags(vec![ Tag::identifier(identifier), Tag::custom( TagKind::custom("clone"), - vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], + vec![format!( + "http://{}/{}/{}.git", + relay_b.domain(), + owner_npub, + identifier + )], ), Tag::custom( TagKind::custom("relays"), @@ -309,9 +360,14 @@ async fn test_multiple_maintainers_all_reprocessed() { .sign_with_keys(&owner_keys) .unwrap(); - client_b.send_event(&owner_announcement).await.unwrap(); + send_to_relay(&relay_b, &owner_announcement).await.unwrap(); println!("✓ Owner announcement sent to relay_b"); + // Push git data for owner to relay_b → releases owner announcement from purgatory + let _git_dir_owner = + push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; + println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + // Step 3: Wait for sync and re-processing tokio::time::sleep(Duration::from_secs(3)).await; @@ -333,8 +389,6 @@ async fn test_multiple_maintainers_all_reprocessed() { println!("✅ All three maintainer announcements re-processed successfully"); - client_a.disconnect().await; - client_b.disconnect().await; relay_a.stop().await; relay_b.stop().await; } @@ -356,12 +410,8 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { let identifier = "invalid-maintainer-repo"; - // Create client using TestClient helper - let client = TestClient::new(relay.url(), owner_keys.clone()) - .await - .expect("Failed to connect to relay"); - // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) + // This one uses example.com clone URL - it goes to purgatory on relay, never promoted let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -378,17 +428,28 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .sign_with_keys(&maintainer_keys) .unwrap(); - // Send maintainer announcement - expect it to be rejected - let _ = client.send_event(&maintainer_announcement).await; + // Send maintainer announcement - expect it to be rejected (purgatory / policy) + send_to_relay(&relay, &maintainer_announcement).await.ok(); tokio::time::sleep(Duration::from_millis(200)).await; - // Step 2: Send owner announcement with INVALID maintainer hex + // Step 2: Set up owner announcement with INVALID maintainer hex and git data + // Use HTTP clone URL to relay's git endpoint so it can be released from purgatory + let owner_npub = owner_keys + .public_key() + .to_bech32() + .expect("Failed to get npub"); + let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") .tags(vec![ Tag::identifier(identifier), Tag::custom( TagKind::custom("clone"), - vec![format!("https://{}/{}.git", relay.domain(), identifier)], + vec![format!( + "http://{}/{}/{}.git", + relay.domain(), + owner_npub, + identifier + )], ), Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]), Tag::custom( @@ -399,7 +460,14 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .sign_with_keys(&owner_keys) .unwrap(); - client.send_event(&owner_announcement).await.unwrap(); + send_to_relay(&relay, &owner_announcement).await.unwrap(); + + // Push git data to relay → releases owner announcement from purgatory + let _git_dir = + push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; + println!("✓ Owner git data pushed to relay (announcement released from purgatory)"); + + // Wait for processing tokio::time::sleep(Duration::from_millis(500)).await; // Step 3: Verify owner announcement accepted, maintainer not re-processed @@ -429,6 +497,5 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { println!("✅ Invalid maintainer pubkey handled gracefully without panic"); - client.disconnect().await; relay.stop().await; } -- cgit v1.2.3 From 70749ea9df1f6061c332112c617b615f91d79d48 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 11:17:10 +0000 Subject: fix: re-process hot-cache maintainer announcements after git push promotion When an owner announcement is promoted from purgatory via a git push, any maintainer announcements sitting in the rejected_events_index hot cache were never re-processed. The invalidate_and_get call only existed in SyncManager::process_event_static (the nostr sync path); the git push promotion path (http -> handlers -> git::sync) had no access to the rejected_events_index at all. Thread rejected_events_index and write_policy through the git push path: - process_purgatory_announcements: after saving the promoted announcement, parse its maintainers tag and call invalidate_and_get() for each, then re-process any returned hot-cache events via admit_event + save - process_newly_available_git_data: accept optional write_policy and rejected_events_index, pass them through to process_purgatory_announcements - handle_receive_pack: accept Arc and Arc, pass them to process_newly_available_git_data - HttpService / run_server: carry the two new fields, clone into each handle_receive_pack call - main.rs: obtain rejected_events_index from sync_manager before moving it into its task; wrap write_policy in Arc for the HTTP server - RealSyncContext::process_newly_available_git_data: pass None for both new params (purgatory sync path already handles this via SyncManager::process_event_static) Also rewrite the maintainer_reprocessing integration tests to correctly exercise the hot-cache path now that announcements require git data before being released from purgatory: - Start relay_b with relay_a as bootstrap so its SyncManager syncs maintainer announcements via negentropy before the owner git push - Use push_unique_git_data_to_relay (new helper) to give each maintainer a distinct commit hash, preventing git from skipping pack transfer - Make wait_for_event_on_relay poll in a retry loop so transient timing gaps between DB write and query do not cause false negatives --- src/git/handlers.rs | 7 +- src/git/sync.rs | 113 +++++++++++++++- src/http/mod.rs | 22 +++- src/main.rs | 7 + src/purgatory/sync/context.rs | 6 +- tests/sync/maintainer_reprocessing.rs | 235 +++++++++++++++++++++------------- 6 files changed, 298 insertions(+), 92 deletions(-) (limited to 'tests/sync/maintainer_reprocessing.rs') diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..13d6ba0 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs @@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess; use crate::git::authorization::{authorize_push, parse_pushed_refs}; use crate::git::sync::process_newly_available_git_data; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// Handle GET /info/refs?service=git-{upload,receive}-pack /// @@ -195,6 +196,8 @@ pub async fn handle_receive_pack( purgatory: Arc, git_data_path: &str, git_protocol: Option<&str>, + write_policy: Arc, + rejected_events_index: Arc, ) -> Result>, GitError> { debug!("Handling receive-pack for {:?}", repo_path); @@ -307,6 +310,8 @@ pub async fn handle_receive_pack( Some(&relay), &purgatory, git_data_path_buf, + Some(&write_policy), + Some(&rejected_events_index), ) .await { diff --git a/src/git/sync.rs b/src/git/sync.rs index 4b35023..8401736 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -32,6 +32,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::process::Command; +use std::sync::Arc; use tracing::{debug, info, warn}; use nostr_sdk::Event; @@ -41,9 +42,10 @@ use crate::git::authorization::{ RepositoryData, }; use crate::git::{self, oid_exists}; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::nostr::events::RepositoryState; use crate::purgatory::{can_apply_state, Purgatory}; +use crate::sync::rejected_index::RejectedEventsIndex; /// Result of processing newly available git data. /// @@ -819,6 +821,8 @@ pub async fn process_newly_available_git_data( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> anyhow::Result { let mut result = ProcessResult::default(); @@ -848,6 +852,8 @@ pub async fn process_newly_available_git_data( local_relay, purgatory, git_data_path, + write_policy, + rejected_events_index, ) .await; result.merge(announcement_result); @@ -1277,6 +1283,10 @@ async fn process_purgatory_pr_events( /// /// When git data arrives for a repository, any announcements in purgatory /// for that repository should be promoted to the database and served to clients. +/// +/// When `write_policy` and `rejected_events_index` are provided (git push path), +/// any maintainer announcements sitting in the hot cache are re-processed immediately +/// after the owner announcement is promoted, so they don't wait for the next sync cycle. async fn process_purgatory_announcements( identifier: &str, source_repo_path: &Path, @@ -1284,6 +1294,8 @@ async fn process_purgatory_announcements( local_relay: Option<&nostr_relay_builder::LocalRelay>, purgatory: &Purgatory, git_data_path: &Path, + write_policy: Option<&Nip34WritePolicy>, + rejected_events_index: Option<&Arc>, ) -> ProcessResult { let mut result = ProcessResult::default(); @@ -1339,6 +1351,105 @@ async fn process_purgatory_announcements( } result.announcements_released += 1; + + // Re-process any maintainer announcements sitting in the hot cache. + // + // When an owner announcement is promoted from purgatory via a git push, + // maintainer announcements that arrived earlier (via relay sync) may have + // been rejected and stored in the hot cache because the owner announcement + // didn't exist in the DB yet. Now that the owner announcement is saved, + // we must invalidate and re-process those cached events immediately. + // + // This only applies on the git push path (write_policy + rejected_events_index + // are Some). The purgatory sync path already handles this via + // SyncManager::process_event_static. + if let (Some(wp), Some(rei), Some(relay)) = + (write_policy, rejected_events_index, local_relay) + { + use crate::nostr::events::RepositoryAnnouncement; + use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { + if !announcement.maintainers.is_empty() { + debug!( + identifier = %identifier, + event_id = %event.id, + maintainer_count = announcement.maintainers.len(), + "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements" + ); + + for maintainer_hex in &announcement.maintainers { + match nostr_sdk::PublicKey::from_hex(maintainer_hex) { + Ok(maintainer_pubkey) => { + let (removed, hot_events) = rei.invalidate_and_get( + &maintainer_pubkey, + &announcement.identifier, + Some(crate::sync::rejected_index::EventType::Announcement), + ); + + if removed > 0 { + info!( + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + removed_from_cold_index = removed, + hot_cache_events = hot_events.len(), + "Invalidated rejected maintainer announcements after git push promotion" + ); + } + + // Re-process events from hot cache + let dummy_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 0, + ); + for hot_event in hot_events { + info!( + event_id = %hot_event.id, + maintainer = %maintainer_hex, + identifier = %announcement.identifier, + "Re-processing maintainer announcement from hot cache after git push promotion" + ); + match wp.admit_event(&hot_event, &dummy_addr).await { + WritePolicyResult::Accept => { + match database.save_event(&hot_event).await { + Ok(_) => { + relay.notify_event(hot_event.clone()); + info!( + event_id = %hot_event.id, + "Maintainer announcement accepted and saved on re-processing" + ); + } + Err(e) => { + warn!( + event_id = %hot_event.id, + error = %e, + "Failed to save re-processed maintainer announcement" + ); + } + } + } + _ => { + warn!( + event_id = %hot_event.id, + "Maintainer announcement still rejected on re-processing" + ); + } + } + } + } + Err(e) => { + warn!( + maintainer_hex = %maintainer_hex, + error = %e, + "Invalid maintainer public key in promoted announcement" + ); + } + } + } + } + } + } } Err(e) => { warn!( diff --git a/src/http/mod.rs b/src/http/mod.rs index ffb1562..cfd7c52 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -26,8 +26,9 @@ use tokio::net::TcpListener; use crate::config::Config; use crate::git; use crate::metrics::Metrics; -use crate::nostr::builder::SharedDatabase; +use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; use crate::purgatory::Purgatory; +use crate::sync::rejected_index::RejectedEventsIndex; /// CORS headers required by GRASP-01 specification (lines 40-47) const CORS_ALLOW_ORIGIN: &str = "*"; @@ -97,6 +98,10 @@ struct HttpService { metrics: Option>, /// Purgatory for event/git coordination purgatory: Arc, + /// Write policy for re-processing hot-cache events after git push promotion + write_policy: Arc, + /// Rejected events index for hot-cache re-processing after git push promotion + rejected_events_index: Arc, } impl HttpService { @@ -107,6 +112,8 @@ impl HttpService { database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> Self { Self { relay, @@ -115,6 +122,8 @@ impl HttpService { database, metrics, purgatory, + write_policy, + rejected_events_index, } } } @@ -132,6 +141,8 @@ impl Service> for HttpService { let git_data_path = self.config.effective_git_data_path(); let database = self.database.clone(); let purgatory = self.purgatory.clone(); + let write_policy = self.write_policy.clone(); + let rejected_events_index = self.rejected_events_index.clone(); // Handle OPTIONS preflight requests (CORS) // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content @@ -293,6 +304,8 @@ impl Service> for HttpService { purgatory.clone(), &git_data_path, git_protocol.as_deref(), + write_policy.clone(), + rejected_events_index.clone(), ) .await; @@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String { /// * `relay` - The LocalRelay for WebSocket connections /// * `database` - The database for direct queries (e.g., push authorization) /// * `metrics` - Optional metrics for Prometheus endpoint +/// * `purgatory` - Purgatory for event/git coordination +/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion +/// * `rejected_events_index` - Rejected events index for hot-cache re-processing pub async fn run_server( config: Config, relay: LocalRelay, database: SharedDatabase, metrics: Option>, purgatory: Arc, + write_policy: Arc, + rejected_events_index: Arc, ) -> anyhow::Result<()> { let bind_addr: SocketAddr = config.bind_address.parse()?; @@ -582,6 +600,8 @@ pub async fn run_server( database.clone(), metrics.clone(), purgatory.clone(), + write_policy.clone(), + rejected_events_index.clone(), ); tokio::spawn(async move { diff --git a/src/main.rs b/src/main.rs index ab6ede7..6769cf3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,7 +130,9 @@ async fn main() -> Result<()> { } // Get a reference to the rejected events index for shutdown persistence + // and for the HTTP server's git push path (hot-cache re-processing) let shutdown_rejected_index = sync_manager.rejected_events_index(); + let http_rejected_index = shutdown_rejected_index.clone(); tokio::spawn(async move { sync_manager.run().await; @@ -206,6 +208,9 @@ async fn main() -> Result<()> { // Start HTTP server with integrated relay and database info!("Starting HTTP server on {}", config.bind_address); + // Wrap write_policy in Arc for sharing between HTTP server connections + let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); + // Run server until shutdown signal, then cleanup tokio::select! { result = http::run_server( @@ -214,6 +219,8 @@ async fn main() -> Result<()> { relay_with_db.database, metrics, purgatory, + http_write_policy, + http_rejected_index, ) => { result? } diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..ece8cd6 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs @@ -474,7 +474,9 @@ impl SyncContext for RealSyncContext { source_repo_path: &Path, new_oids: &HashSet, ) -> Result { - // Delegate to the unified function from git::sync + // Delegate to the unified function from git::sync. + // Pass None for write_policy and rejected_events_index: the purgatory sync path + // already handles hot-cache re-processing via SyncManager::process_event_static. let result = crate::git::sync::process_newly_available_git_data( source_repo_path, new_oids, @@ -482,6 +484,8 @@ impl SyncContext for RealSyncContext { self.local_relay.as_ref(), &self.purgatory, &self.git_data_path, + None, + None, ) .await?; diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 266a437..61d8e14 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -2,51 +2,61 @@ //! //! Tests the two-tier rejected events index and immediate re-processing of //! maintainer announcements when owner announcements are accepted. +//! +//! ## Test design +//! +//! Announcements now require git data before they are released from purgatory and +//! served to other relays. The hot-cache re-processing path we want to exercise is: +//! +//! relay_b syncs maintainer announcement from relay_a +//! → write policy rejects it (no owner announcement in DB yet) +//! → event stored in hot cache +//! owner git push to relay_b promotes owner announcement from purgatory +//! → our new code calls rejected_events_index.invalidate_and_get() +//! → maintainer announcement re-processed and accepted +//! +//! To guarantee the maintainer announcements arrive at relay_b *before* the owner +//! git push, relay_b is started with relay_a as its bootstrap relay. That way +//! relay_b's SyncManager connects to relay_a immediately and syncs whatever is +//! already in relay_a's DB. We push the maintainer git data first (so the +//! announcements are in relay_a's DB), wait briefly for the sync round-trip, then +//! send the owner announcement + git push. use std::time::Duration; use nostr_sdk::prelude::*; -use crate::common::{ - sync_helpers::*, - TestRelay, -}; +use crate::common::{sync_helpers::*, TestRelay}; -/// Test that maintainer announcements are re-processed immediately when owner announcement accepted +/// Test that a maintainer announcement is re-processed immediately when the owner +/// announcement is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + maintainer) -/// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement +/// 1. relay_a: Maintainer sends announcement + git data → accepted into relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs maintainer announcement +/// → rejected by write policy (no owner in DB) → stored in hot cache +/// 3. relay_b: Owner sends announcement → purgatory (no git data yet) +/// 4. relay_b: Owner git push → owner announcement promoted from purgatory +/// → hot-cache re-processing fires → maintainer announcement accepted /// 5. Both announcements should be in relay_b's database -/// -/// Expected time: <5 seconds (vs 24 hours without hot cache) #[tokio::test] async fn test_maintainer_announcement_reprocessed_immediately() { // Start relay_a (where maintainer announcement will be sent) let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer_keys = Keys::generate(); - let identifier = "test-repo"; - let start = std::time::Instant::now(); - - // Step 1: Send maintainer announcement to relay_a (will be rejected by relay_b - doesn't list relay_b) - // Use HTTP clone URL pointing to relay_a's git endpoint so it can be released from purgatory + // Step 1: Send maintainer announcement to relay_a then push git data so it lands in + // relay_a's DB. The announcement lists relay_a only (not relay_b), so relay_b's write + // policy will reject it when it arrives via sync. let maintainer_npub = maintainer_keys .public_key() .to_bech32() .expect("Failed to get npub"); - let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -60,27 +70,33 @@ async fn test_maintainer_announcement_reprocessed_immediately() { identifier )], ), - Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), + Tag::custom( + TagKind::custom("relays"), + vec![relay_a.url().to_string()], + ), ]) .sign_with_keys(&maintainer_keys) .unwrap(); + send_to_relay(&relay_a, &maintainer_announcement).await.unwrap(); + let _git_dir_maintainer = + push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]) + .await; + println!("✓ Maintainer announcement + git data pushed to relay_a"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // relay_b's initial negentropy sync will pick up the maintainer announcement and reject it + // (no owner announcement in relay_b's DB yet), storing it in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); - send_to_relay(&relay_a, &maintainer_announcement) - .await - .unwrap(); - println!("✓ Maintainer announcement sent to relay_a"); - - // Push git data for maintainer's repo to relay_a → releases maintainer announcement from purgatory - let _git_dir_maintainer = push_git_data_to_relay( - &relay_a, - &maintainer_keys, - identifier, - &[&relay_a.domain()], - ) - .await; - println!("✓ Maintainer git data pushed to relay_a (announcement released from purgatory)"); - - // Step 2: Set up owner announcement on relay_b (lists relay_a + maintainer) with git data + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcement should be in hot cache)"); + + let start = std::time::Instant::now(); + + // Step 3: Send owner announcement to relay_b → goes to purgatory (no git data yet). + // The announcement lists relay_a + relay_b and names the maintainer. let owner_npub = owner_keys .public_key() .to_bech32() @@ -111,19 +127,21 @@ async fn test_maintainer_announcement_reprocessed_immediately() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner's repo to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory, which triggers hot-cache + // re-processing of the maintainer announcement via our new code path. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)"); - // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; let elapsed = start.elapsed(); - // Step 4: Verify both announcements are in relay_b's database + // Step 6: Verify both announcements are in relay_b's database. let owner_filter = Filter::new() .kind(Kind::GitRepoAnnouncement) .author(owner_keys.public_key()) @@ -145,7 +163,6 @@ async fn test_maintainer_announcement_reprocessed_immediately() { "Maintainer announcement should be re-processed and accepted in relay_b" ); - // Step 5: Verify it happened quickly (not 24 hours!) assert!( elapsed.as_secs() < 15, "Re-processing should happen in <15 seconds, took {:?}", @@ -258,13 +275,16 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { relay.stop().await; } -/// Test multiple maintainers are all re-processed when owner announcement accepted +/// Test that all maintainer announcements are re-processed when the owner announcement +/// is promoted from purgatory via a git push. /// /// Flow: -/// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b) -/// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers) -/// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index -/// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements +/// 1. relay_a: Three maintainers send announcements + git data → in relay_a's DB +/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs all three maintainer +/// announcements → all rejected (no owner in DB) → all in hot cache +/// 3. relay_b: Owner sends announcement → purgatory +/// 4. relay_b: Owner git push → owner promoted → hot-cache re-processing fires for +/// all three maintainers /// 5. All four announcements should be in relay_b's database #[tokio::test] async fn test_multiple_maintainers_all_reprocessed() { @@ -272,21 +292,23 @@ async fn test_multiple_maintainers_all_reprocessed() { let relay_a = TestRelay::start().await; println!("relay_a started at {}", relay_a.url()); - // Start relay_b with sync enabled (will sync from relay_a) - let relay_b = TestRelay::start_with_sync(None).await; - println!("relay_b started at {}", relay_b.url()); - // Create keys let owner_keys = Keys::generate(); let maintainer1_keys = Keys::generate(); let maintainer2_keys = Keys::generate(); let maintainer3_keys = Keys::generate(); - let identifier = "multi-maintainer-repo"; + // Use a unique identifier per test run to avoid cross-test interference when + // tests run in parallel (each test gets its own namespace on relay_a). + let identifier = &format!( + "multi-maintainer-repo-{}", + owner_keys.public_key().to_hex()[..8].to_string() + ); - // Step 1: Send three maintainer announcements to relay_a with git data - // (purgatory requires git data before announcements are accepted) - let mut git_dirs_maintainers = Vec::new(); + // Step 1: Send each maintainer announcement to relay_a then push git data so all three + // land in relay_a's DB. Each announcement lists relay_a only, so relay_b will reject + // them when syncing (no owner announcement in relay_b's DB yet). + let mut git_dirs = Vec::new(); for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] .iter() .enumerate() @@ -295,13 +317,12 @@ async fn test_multiple_maintainers_all_reprocessed() { .public_key() .to_bech32() .expect("Failed to get npub"); - let announcement = EventBuilder::new( Kind::GitRepoAnnouncement, format!("Maintainer {} repository", idx + 1), ) .tags(vec![ - Tag::identifier(identifier), + Tag::identifier(identifier.as_str()), Tag::custom( TagKind::custom("clone"), vec![format!( @@ -315,18 +336,53 @@ async fn test_multiple_maintainers_all_reprocessed() { ]) .sign_with_keys(maintainer_keys) .unwrap(); - send_to_relay(&relay_a, &announcement).await.unwrap(); + // Use push_unique_git_data_to_relay so each maintainer gets a distinct commit + // hash. Identical hashes cause git to skip pack transfer when the object + // already exists on the server, leaving the announcement in purgatory. + let git_dir = push_unique_git_data_to_relay( + &relay_a, + maintainer_keys, + identifier, + &[&relay_a.domain()], + &m_npub, + ) + .await; + git_dirs.push(git_dir); + } + println!("✓ Three maintainer announcements + git data pushed to relay_a"); - // Push git data to release each maintainer's announcement from purgatory - let git_dir = - push_git_data_to_relay(&relay_a, maintainer_keys, identifier, &[&relay_a.domain()]) - .await; - git_dirs_maintainers.push(git_dir); + // Confirm all three announcements are queryable on relay_a before starting relay_b. + // This eliminates the race between relay_a's DB writes and relay_b's initial negentropy sync. + for (name, keys) in [ + ("maintainer1", &maintainer1_keys), + ("maintainer2", &maintainer2_keys), + ("maintainer3", &maintainer3_keys), + ] { + let filter = Filter::new() + .kind(Kind::GitRepoAnnouncement) + .author(keys.public_key()) + .identifier(identifier); + let found = + wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; + assert!(found, "{} announcement should be in relay_a before starting relay_b", name); } - println!("✓ Three maintainer announcements sent to relay_a with git data"); + println!("✓ All three maintainer announcements confirmed in relay_a's DB"); + + // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. + // Because all three maintainer announcements are confirmed in relay_a's DB, relay_b's + // initial negentropy sync will pick them all up and reject them (no owner announcement + // in relay_b's DB yet), storing them in the hot cache. + let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; + println!("relay_b started at {}", relay_b.url()); + + // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. + // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // allow extra time for slow CI environments. + tokio::time::sleep(Duration::from_secs(3)).await; + println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); - // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) + // Step 3: Send owner announcement to relay_b → goes to purgatory. let owner_npub = owner_keys .public_key() .to_bech32() @@ -361,17 +417,19 @@ async fn test_multiple_maintainers_all_reprocessed() { .unwrap(); send_to_relay(&relay_b, &owner_announcement).await.unwrap(); - println!("✓ Owner announcement sent to relay_b"); + println!("✓ Owner announcement sent to relay_b (now in purgatory)"); - // Push git data for owner to relay_b → releases owner announcement from purgatory + // Step 4: Push owner git data to relay_b. + // This promotes the owner announcement from purgatory and triggers hot-cache + // re-processing for all three maintainer announcements. let _git_dir_owner = push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; - println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); + println!("✓ Owner git data pushed to relay_b (hot-cache re-processing should fire)"); - // Step 3: Wait for sync and re-processing - tokio::time::sleep(Duration::from_secs(3)).await; + // Step 5: Wait briefly for async processing to complete. + tokio::time::sleep(Duration::from_secs(1)).await; - // Step 4: Verify all four announcements are in relay_b's database + // Step 6: Verify all four announcements are in relay_b's database. for (name, keys) in [ ("owner", &owner_keys), ("maintainer1", &maintainer1_keys), @@ -396,10 +454,10 @@ async fn test_multiple_maintainers_all_reprocessed() { /// Test that invalid maintainer public keys don't cause panics /// /// Flow: -/// 1. Maintainer announcement arrives → Rejected -/// 2. Owner announcement arrives with INVALID maintainer hex → Should handle gracefully -/// 3. Owner announcement should still be accepted -/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey) +/// 1. Maintainer announcement arrives → Rejected (doesn't list our relay) +/// 2. Owner announcement + git push → accepted, with INVALID maintainer hex in maintainers tag +/// 3. Owner announcement should be accepted +/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey can't be parsed) #[tokio::test] async fn test_invalid_maintainer_pubkey_handled_gracefully() { let relay = TestRelay::start().await; @@ -410,8 +468,12 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { let identifier = "invalid-maintainer-repo"; + // Create client using TestClient helper + let client = TestClient::new(relay.url(), owner_keys.clone()) + .await + .expect("Failed to connect to relay"); + // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) - // This one uses example.com clone URL - it goes to purgatory on relay, never promoted let maintainer_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") .tags(vec![ @@ -428,12 +490,13 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .sign_with_keys(&maintainer_keys) .unwrap(); - // Send maintainer announcement - expect it to be rejected (purgatory / policy) - send_to_relay(&relay, &maintainer_announcement).await.ok(); + // Send maintainer announcement - expect it to be rejected + let _ = client.send_event(&maintainer_announcement).await; tokio::time::sleep(Duration::from_millis(200)).await; - // Step 2: Set up owner announcement with INVALID maintainer hex and git data - // Use HTTP clone URL to relay's git endpoint so it can be released from purgatory + // Step 2: Send owner announcement with INVALID maintainer hex, then push git data. + // The announcement goes to purgatory first; the git push promotes it. + // The invalid maintainer hex should be handled gracefully (no panic). let owner_npub = owner_keys .public_key() .to_bech32() @@ -461,13 +524,8 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { .unwrap(); send_to_relay(&relay, &owner_announcement).await.unwrap(); - - // Push git data to relay → releases owner announcement from purgatory let _git_dir = push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; - println!("✓ Owner git data pushed to relay (announcement released from purgatory)"); - - // Wait for processing tokio::time::sleep(Duration::from_millis(500)).await; // Step 3: Verify owner announcement accepted, maintainer not re-processed @@ -497,5 +555,6 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { println!("✅ Invalid maintainer pubkey handled gracefully without panic"); + client.disconnect().await; relay.stop().await; } -- cgit v1.2.3 From f62ef12fb84e2210f9a0a67a5e1e574a8ee66c16 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 23 Feb 2026 12:48:26 +0000 Subject: refactor: replace inline purgatory sync registration with timer-only approach Remove the redundant inline kind-30617 registration block from the sync event loop and the three is_generic/recompute_new_sync_filters_for_relay calls from confirm_batch error paths. The purgatory announcement sync timer (run_purgatory_announcement_sync) is now the sole registration path. Consolidate NGIT_SYNC_BATCH_WINDOW_MS and NGIT_PURGATORY_SYNC_INTERVAL_MS into a single NGIT_TEST=1 flag that sets both timers to 200ms, replacing two ad-hoc env vars with one reusable test-mode flag. --- src/sync/mod.rs | 103 ++++++---------------------------- src/sync/self_subscriber.rs | 12 ++-- tests/common/relay.rs | 2 +- tests/sync/maintainer_reprocessing.rs | 2 +- 4 files changed, 26 insertions(+), 93 deletions(-) (limited to 'tests/sync/maintainer_reprocessing.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed5b6e7..44efbf0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -399,21 +399,24 @@ async fn run_daily_timer( /// Background task that periodically syncs purgatory announcements into repo_sync_index. /// -/// Runs every 5 seconds. For each announcement currently in purgatory, ensures there -/// is a `StateOnly` entry in `repo_sync_index`. New entries trigger `handle_new_sync_filters` -/// which connects to the relay URLs listed in the announcement and subscribes to state -/// events (kind 30618). +/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). +/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in +/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the +/// relay URLs listed in the announcement and subscribes to state events (kind 30618). /// -/// This covers two cases: -/// - Sync-path announcements: registered inline during event processing, but this -/// provides a safety net in case the inline registration was missed. +/// This is the sole registration path for purgatory announcements: +/// - Sync-path announcements: registered here within one interval of arriving. /// - User-submitted purgatory announcements: the SelfSubscriber never sees them -/// (they're rejected from DB), so this timer is the primary registration path. +/// (they're rejected from DB), so this timer is the only registration path. async fn run_purgatory_announcement_sync( sync_manager: Arc>, mut shutdown_rx: broadcast::Receiver<()>, ) { - let interval = Duration::from_secs(5); + let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_secs(5) + }; loop { tokio::select! { _ = tokio::time::sleep(interval) => { @@ -1084,24 +1087,12 @@ impl SyncManager { { let mut completed_batch = batches.remove(idx); completed_batch.failed = true; // Mark as failed - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_fallback); } drop(pending); self.confirm_batch(&relay_url_for_fallback, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_fallback, - ) - .await; - } } } return; @@ -1195,24 +1186,12 @@ impl SyncManager { if let Some(batches) = pending.get_mut(&relay_url_for_retry) { if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { let completed_batch = batches.remove(idx); - let is_generic = - completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); if batches.is_empty() { pending.remove(&relay_url_for_retry); } drop(pending); self.confirm_batch(&relay_url_for_retry, completed_batch) .await; - // For generic filter (announcement) batches, recompute filters - // so any purgatory repos registered during this batch get - // state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay( - &relay_url_for_retry, - ) - .await; - } } } return; @@ -1223,8 +1202,6 @@ impl SyncManager { // 3. Batch complete - extract and remove let completed_batch = batches.remove(batch_idx); - let is_generic = completed_batch.items.repos.is_empty() - && completed_batch.items.root_events.is_empty(); // Clean up empty relay entry if batches.is_empty() { @@ -1236,12 +1213,6 @@ impl SyncManager { // 4. Confirm the batch (moves items to RelayState) self.confirm_batch(relay_url, completed_batch).await; - - // 5. For generic filter (announcement) batches, recompute sync filters so any - // purgatory repos registered during this batch get state-only subscriptions triggered. - if is_generic { - self.recompute_new_sync_filters_for_relay(relay_url).await; - } } /// Confirm a completed batch by moving items to RelayState @@ -1370,7 +1341,7 @@ impl SyncManager { /// to be batched and create Layer 2/3 filters before we mark sync complete. /// /// The 6-second delay is based on: - /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) + /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) /// - Buffer for processing: 1 second /// /// Called after each batch is confirmed to detect completion. @@ -1785,7 +1756,6 @@ impl SyncManager { let eose_tx = self.eose_tx.as_ref().unwrap().clone(); let metrics_clone = self.metrics.clone(); let pending_sync_index = Arc::clone(&self.pending_sync_index); - let repo_sync_index = Arc::clone(&self.repo_sync_index); let health_tracker = Arc::clone(&self.health_tracker); let rejected_events_index = Arc::clone(&self.rejected_events_index); @@ -1827,50 +1797,13 @@ impl SyncManager { // For sync-triggered events that go to purgatory, trigger immediate sync // (instead of the default 3-minute delay for user-submitted events) + // + // Note: announcement events (kind 30617) are registered in repo_sync_index + // by the purgatory announcement sync timer (run_purgatory_announcement_sync) + // rather than inline here. if result == ProcessResult::Purgatory { - // Announcement events (kind 30617) - register in RepoSyncIndex with StateOnly - // so that state events (kind 30618) are synced for this purgatory announcement - if event.kind == Kind::GitRepoAnnouncement { - if let Some(identifier) = event.tags.iter().find_map(|tag| { - let tag_vec = tag.as_slice(); - if tag_vec.len() >= 2 && tag_vec[0] == "d" { - Some(tag_vec[1].to_string()) - } else { - None - } - }) { - let repo_id = format!("30617:{}:{}", event.pubkey, identifier); - - // Extract relay URLs from the purgatory entry - let relays = write_policy - .purgatory() - .find_announcement(&event.pubkey, &identifier) - .map(|entry| entry.relays) - .unwrap_or_default(); - - tracing::info!( - event_id = %event.id, - repo_id = %repo_id, - relay_count = relays.len(), - "Registering purgatory announcement in RepoSyncIndex with StateOnly level" - ); - - // Register in RepoSyncIndex with StateOnly level - let mut index = repo_sync_index.write().await; - let entry = index - .entry(repo_id) - .or_insert_with(|| RepoSyncNeeds { - relays: HashSet::new(), - root_events: HashSet::new(), - sync_level: SyncLevel::StateOnly, - }); - entry.relays.extend(relays); - // Don't upgrade sync_level if already Full - // (e.g., if announcement was promoted before this runs) - } - } // State events (kind 30618) - extract identifier and trigger immediate sync - else if event.kind.as_u16() == 30618 { + if event.kind.as_u16() == 30618 { if let Some(identifier) = event.tags.iter().find_map(|tag| { let tag_vec = tag.clone().to_vec(); if tag_vec.len() >= 2 && tag_vec[0] == "d" { diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 70c3dbf..ab10c49 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs @@ -126,14 +126,14 @@ impl SelfSubscriber { /// Get batch window from environment or use default /// - /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. + /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. /// Default: 5000ms (5 seconds) fn get_batch_window() -> Duration { - std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") - .ok() - .and_then(|s| s.parse::().ok()) - .map(Duration::from_millis) - .unwrap_or(Duration::from_millis(5000)) + if std::env::var("NGIT_TEST").as_deref() == Ok("1") { + Duration::from_millis(200) + } else { + Duration::from_millis(5000) + } } /// Process a relay pool notification diff --git a/tests/common/relay.rs b/tests/common/relay.rs index 0ec9a2e..b1e96cf 100644 --- a/tests/common/relay.rs +++ b/tests/common/relay.rs @@ -204,7 +204,7 @@ impl TestRelay { .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) .env("NGIT_DATABASE_BACKEND", "memory") // Force in-memory database for isolation .env("NGIT_OWNER_NPUB", &test_npub) - .env("NGIT_SYNC_BATCH_WINDOW_MS", "200") // Fast batch window for tests (200ms instead of 5s default) + .env("NGIT_TEST", "1") // Enable test mode: fast timers (200ms batch window, 200ms purgatory sync) .env("NGIT_SYNC_STARTUP_DELAY_SECS", "0") // No startup delay for faster tests .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // No jitter for tests .env("NGIT_SYNC_DISCONNECT_CHECK_INTERVAL_SECS", "1") // Fast reconnect attempts for tests diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 61d8e14..ff1eb43 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs @@ -377,7 +377,7 @@ async fn test_multiple_maintainers_all_reprocessed() { println!("relay_b started at {}", relay_b.url()); // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. - // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we + // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we // allow extra time for slow CI environments. tokio::time::sleep(Duration::from_secs(3)).await; println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); -- cgit v1.2.3