upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 11:17:10 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 12:05:29 +0000
commit70749ea9df1f6061c332112c617b615f91d79d48 (patch)
tree6f4ace061681d8356ea79eb782fddb36c1f31d23
parent9f15929b10825c2f55434a98794fc551794cad2b (diff)
fix: re-process hot-cache maintainer announcements after git push promotion
When an owner announcement is promoted from purgatory via a git push, any maintainer announcements sitting in the rejected_events_index hot cache were never re-processed. The invalidate_and_get call only existed in SyncManager::process_event_static (the nostr sync path); the git push promotion path (http -> handlers -> git::sync) had no access to the rejected_events_index at all. Thread rejected_events_index and write_policy through the git push path: - process_purgatory_announcements: after saving the promoted announcement, parse its maintainers tag and call invalidate_and_get() for each, then re-process any returned hot-cache events via admit_event + save - process_newly_available_git_data: accept optional write_policy and rejected_events_index, pass them through to process_purgatory_announcements - handle_receive_pack: accept Arc<Nip34WritePolicy> and Arc<RejectedEventsIndex>, pass them to process_newly_available_git_data - HttpService / run_server: carry the two new fields, clone into each handle_receive_pack call - main.rs: obtain rejected_events_index from sync_manager before moving it into its task; wrap write_policy in Arc for the HTTP server - RealSyncContext::process_newly_available_git_data: pass None for both new params (purgatory sync path already handles this via SyncManager::process_event_static) Also rewrite the maintainer_reprocessing integration tests to correctly exercise the hot-cache path now that announcements require git data before being released from purgatory: - Start relay_b with relay_a as bootstrap so its SyncManager syncs maintainer announcements via negentropy before the owner git push - Use push_unique_git_data_to_relay (new helper) to give each maintainer a distinct commit hash, preventing git from skipping pack transfer - Make wait_for_event_on_relay poll in a retry loop so transient timing gaps between DB write and query do not cause false negatives
-rw-r--r--src/git/handlers.rs7
-rw-r--r--src/git/sync.rs113
-rw-r--r--src/http/mod.rs22
-rw-r--r--src/main.rs7
-rw-r--r--src/purgatory/sync/context.rs6
-rw-r--r--tests/sync/maintainer_reprocessing.rs235
6 files changed, 298 insertions, 92 deletions
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 017eee4..13d6ba0 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess;
17 17
18use crate::git::authorization::{authorize_push, parse_pushed_refs}; 18use crate::git::authorization::{authorize_push, parse_pushed_refs};
19use crate::git::sync::process_newly_available_git_data; 19use crate::git::sync::process_newly_available_git_data;
20use crate::nostr::builder::SharedDatabase; 20use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
21use crate::purgatory::Purgatory; 21use crate::purgatory::Purgatory;
22use crate::sync::rejected_index::RejectedEventsIndex;
22 23
23/// Handle GET /info/refs?service=git-{upload,receive}-pack 24/// Handle GET /info/refs?service=git-{upload,receive}-pack
24/// 25///
@@ -195,6 +196,8 @@ pub async fn handle_receive_pack(
195 purgatory: Arc<Purgatory>, 196 purgatory: Arc<Purgatory>,
196 git_data_path: &str, 197 git_data_path: &str,
197 git_protocol: Option<&str>, 198 git_protocol: Option<&str>,
199 write_policy: Arc<Nip34WritePolicy>,
200 rejected_events_index: Arc<RejectedEventsIndex>,
198) -> Result<Response<Full<Bytes>>, GitError> { 201) -> Result<Response<Full<Bytes>>, GitError> {
199 debug!("Handling receive-pack for {:?}", repo_path); 202 debug!("Handling receive-pack for {:?}", repo_path);
200 203
@@ -307,6 +310,8 @@ pub async fn handle_receive_pack(
307 Some(&relay), 310 Some(&relay),
308 &purgatory, 311 &purgatory,
309 git_data_path_buf, 312 git_data_path_buf,
313 Some(&write_policy),
314 Some(&rejected_events_index),
310 ) 315 )
311 .await 316 .await
312 { 317 {
diff --git a/src/git/sync.rs b/src/git/sync.rs
index 4b35023..8401736 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -32,6 +32,7 @@
32use std::collections::{HashMap, HashSet}; 32use std::collections::{HashMap, HashSet};
33use std::path::Path; 33use std::path::Path;
34use std::process::Command; 34use std::process::Command;
35use std::sync::Arc;
35use tracing::{debug, info, warn}; 36use tracing::{debug, info, warn};
36 37
37use nostr_sdk::Event; 38use nostr_sdk::Event;
@@ -41,9 +42,10 @@ use crate::git::authorization::{
41 RepositoryData, 42 RepositoryData,
42}; 43};
43use crate::git::{self, oid_exists}; 44use crate::git::{self, oid_exists};
44use crate::nostr::builder::SharedDatabase; 45use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
45use crate::nostr::events::RepositoryState; 46use crate::nostr::events::RepositoryState;
46use crate::purgatory::{can_apply_state, Purgatory}; 47use crate::purgatory::{can_apply_state, Purgatory};
48use crate::sync::rejected_index::RejectedEventsIndex;
47 49
48/// Result of processing newly available git data. 50/// Result of processing newly available git data.
49/// 51///
@@ -819,6 +821,8 @@ pub async fn process_newly_available_git_data(
819 local_relay: Option<&nostr_relay_builder::LocalRelay>, 821 local_relay: Option<&nostr_relay_builder::LocalRelay>,
820 purgatory: &Purgatory, 822 purgatory: &Purgatory,
821 git_data_path: &Path, 823 git_data_path: &Path,
824 write_policy: Option<&Nip34WritePolicy>,
825 rejected_events_index: Option<&Arc<RejectedEventsIndex>>,
822) -> anyhow::Result<ProcessResult> { 826) -> anyhow::Result<ProcessResult> {
823 let mut result = ProcessResult::default(); 827 let mut result = ProcessResult::default();
824 828
@@ -848,6 +852,8 @@ pub async fn process_newly_available_git_data(
848 local_relay, 852 local_relay,
849 purgatory, 853 purgatory,
850 git_data_path, 854 git_data_path,
855 write_policy,
856 rejected_events_index,
851 ) 857 )
852 .await; 858 .await;
853 result.merge(announcement_result); 859 result.merge(announcement_result);
@@ -1277,6 +1283,10 @@ async fn process_purgatory_pr_events(
1277/// 1283///
1278/// When git data arrives for a repository, any announcements in purgatory 1284/// When git data arrives for a repository, any announcements in purgatory
1279/// for that repository should be promoted to the database and served to clients. 1285/// for that repository should be promoted to the database and served to clients.
1286///
1287/// When `write_policy` and `rejected_events_index` are provided (git push path),
1288/// any maintainer announcements sitting in the hot cache are re-processed immediately
1289/// after the owner announcement is promoted, so they don't wait for the next sync cycle.
1280async fn process_purgatory_announcements( 1290async fn process_purgatory_announcements(
1281 identifier: &str, 1291 identifier: &str,
1282 source_repo_path: &Path, 1292 source_repo_path: &Path,
@@ -1284,6 +1294,8 @@ async fn process_purgatory_announcements(
1284 local_relay: Option<&nostr_relay_builder::LocalRelay>, 1294 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1285 purgatory: &Purgatory, 1295 purgatory: &Purgatory,
1286 git_data_path: &Path, 1296 git_data_path: &Path,
1297 write_policy: Option<&Nip34WritePolicy>,
1298 rejected_events_index: Option<&Arc<RejectedEventsIndex>>,
1287) -> ProcessResult { 1299) -> ProcessResult {
1288 let mut result = ProcessResult::default(); 1300 let mut result = ProcessResult::default();
1289 1301
@@ -1339,6 +1351,105 @@ async fn process_purgatory_announcements(
1339 } 1351 }
1340 1352
1341 result.announcements_released += 1; 1353 result.announcements_released += 1;
1354
1355 // Re-process any maintainer announcements sitting in the hot cache.
1356 //
1357 // When an owner announcement is promoted from purgatory via a git push,
1358 // maintainer announcements that arrived earlier (via relay sync) may have
1359 // been rejected and stored in the hot cache because the owner announcement
1360 // didn't exist in the DB yet. Now that the owner announcement is saved,
1361 // we must invalidate and re-process those cached events immediately.
1362 //
1363 // This only applies on the git push path (write_policy + rejected_events_index
1364 // are Some). The purgatory sync path already handles this via
1365 // SyncManager::process_event_static.
1366 if let (Some(wp), Some(rei), Some(relay)) =
1367 (write_policy, rejected_events_index, local_relay)
1368 {
1369 use crate::nostr::events::RepositoryAnnouncement;
1370 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult};
1371 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1372
1373 if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) {
1374 if !announcement.maintainers.is_empty() {
1375 debug!(
1376 identifier = %identifier,
1377 event_id = %event.id,
1378 maintainer_count = announcement.maintainers.len(),
1379 "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements"
1380 );
1381
1382 for maintainer_hex in &announcement.maintainers {
1383 match nostr_sdk::PublicKey::from_hex(maintainer_hex) {
1384 Ok(maintainer_pubkey) => {
1385 let (removed, hot_events) = rei.invalidate_and_get(
1386 &maintainer_pubkey,
1387 &announcement.identifier,
1388 Some(crate::sync::rejected_index::EventType::Announcement),
1389 );
1390
1391 if removed > 0 {
1392 info!(
1393 maintainer = %maintainer_hex,
1394 identifier = %announcement.identifier,
1395 removed_from_cold_index = removed,
1396 hot_cache_events = hot_events.len(),
1397 "Invalidated rejected maintainer announcements after git push promotion"
1398 );
1399 }
1400
1401 // Re-process events from hot cache
1402 let dummy_addr = SocketAddr::new(
1403 IpAddr::V4(Ipv4Addr::LOCALHOST),
1404 0,
1405 );
1406 for hot_event in hot_events {
1407 info!(
1408 event_id = %hot_event.id,
1409 maintainer = %maintainer_hex,
1410 identifier = %announcement.identifier,
1411 "Re-processing maintainer announcement from hot cache after git push promotion"
1412 );
1413 match wp.admit_event(&hot_event, &dummy_addr).await {
1414 WritePolicyResult::Accept => {
1415 match database.save_event(&hot_event).await {
1416 Ok(_) => {
1417 relay.notify_event(hot_event.clone());
1418 info!(
1419 event_id = %hot_event.id,
1420 "Maintainer announcement accepted and saved on re-processing"
1421 );
1422 }
1423 Err(e) => {
1424 warn!(
1425 event_id = %hot_event.id,
1426 error = %e,
1427 "Failed to save re-processed maintainer announcement"
1428 );
1429 }
1430 }
1431 }
1432 _ => {
1433 warn!(
1434 event_id = %hot_event.id,
1435 "Maintainer announcement still rejected on re-processing"
1436 );
1437 }
1438 }
1439 }
1440 }
1441 Err(e) => {
1442 warn!(
1443 maintainer_hex = %maintainer_hex,
1444 error = %e,
1445 "Invalid maintainer public key in promoted announcement"
1446 );
1447 }
1448 }
1449 }
1450 }
1451 }
1452 }
1342 } 1453 }
1343 Err(e) => { 1454 Err(e) => {
1344 warn!( 1455 warn!(
diff --git a/src/http/mod.rs b/src/http/mod.rs
index ffb1562..cfd7c52 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -26,8 +26,9 @@ use tokio::net::TcpListener;
26use crate::config::Config; 26use crate::config::Config;
27use crate::git; 27use crate::git;
28use crate::metrics::Metrics; 28use crate::metrics::Metrics;
29use crate::nostr::builder::SharedDatabase; 29use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
30use crate::purgatory::Purgatory; 30use crate::purgatory::Purgatory;
31use crate::sync::rejected_index::RejectedEventsIndex;
31 32
32/// CORS headers required by GRASP-01 specification (lines 40-47) 33/// CORS headers required by GRASP-01 specification (lines 40-47)
33const CORS_ALLOW_ORIGIN: &str = "*"; 34const CORS_ALLOW_ORIGIN: &str = "*";
@@ -97,6 +98,10 @@ struct HttpService {
97 metrics: Option<Arc<Metrics>>, 98 metrics: Option<Arc<Metrics>>,
98 /// Purgatory for event/git coordination 99 /// Purgatory for event/git coordination
99 purgatory: Arc<Purgatory>, 100 purgatory: Arc<Purgatory>,
101 /// Write policy for re-processing hot-cache events after git push promotion
102 write_policy: Arc<Nip34WritePolicy>,
103 /// Rejected events index for hot-cache re-processing after git push promotion
104 rejected_events_index: Arc<RejectedEventsIndex>,
100} 105}
101 106
102impl HttpService { 107impl HttpService {
@@ -107,6 +112,8 @@ impl HttpService {
107 database: SharedDatabase, 112 database: SharedDatabase,
108 metrics: Option<Arc<Metrics>>, 113 metrics: Option<Arc<Metrics>>,
109 purgatory: Arc<Purgatory>, 114 purgatory: Arc<Purgatory>,
115 write_policy: Arc<Nip34WritePolicy>,
116 rejected_events_index: Arc<RejectedEventsIndex>,
110 ) -> Self { 117 ) -> Self {
111 Self { 118 Self {
112 relay, 119 relay,
@@ -115,6 +122,8 @@ impl HttpService {
115 database, 122 database,
116 metrics, 123 metrics,
117 purgatory, 124 purgatory,
125 write_policy,
126 rejected_events_index,
118 } 127 }
119 } 128 }
120} 129}
@@ -132,6 +141,8 @@ impl Service<Request<Incoming>> for HttpService {
132 let git_data_path = self.config.effective_git_data_path(); 141 let git_data_path = self.config.effective_git_data_path();
133 let database = self.database.clone(); 142 let database = self.database.clone();
134 let purgatory = self.purgatory.clone(); 143 let purgatory = self.purgatory.clone();
144 let write_policy = self.write_policy.clone();
145 let rejected_events_index = self.rejected_events_index.clone();
135 146
136 // Handle OPTIONS preflight requests (CORS) 147 // Handle OPTIONS preflight requests (CORS)
137 // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content 148 // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content
@@ -293,6 +304,8 @@ impl Service<Request<Incoming>> for HttpService {
293 purgatory.clone(), 304 purgatory.clone(),
294 &git_data_path, 305 &git_data_path,
295 git_protocol.as_deref(), 306 git_protocol.as_deref(),
307 write_policy.clone(),
308 rejected_events_index.clone(),
296 ) 309 )
297 .await; 310 .await;
298 311
@@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String {
557/// * `relay` - The LocalRelay for WebSocket connections 570/// * `relay` - The LocalRelay for WebSocket connections
558/// * `database` - The database for direct queries (e.g., push authorization) 571/// * `database` - The database for direct queries (e.g., push authorization)
559/// * `metrics` - Optional metrics for Prometheus endpoint 572/// * `metrics` - Optional metrics for Prometheus endpoint
573/// * `purgatory` - Purgatory for event/git coordination
574/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion
575/// * `rejected_events_index` - Rejected events index for hot-cache re-processing
560pub async fn run_server( 576pub async fn run_server(
561 config: Config, 577 config: Config,
562 relay: LocalRelay, 578 relay: LocalRelay,
563 database: SharedDatabase, 579 database: SharedDatabase,
564 metrics: Option<Arc<Metrics>>, 580 metrics: Option<Arc<Metrics>>,
565 purgatory: Arc<Purgatory>, 581 purgatory: Arc<Purgatory>,
582 write_policy: Arc<Nip34WritePolicy>,
583 rejected_events_index: Arc<RejectedEventsIndex>,
566) -> anyhow::Result<()> { 584) -> anyhow::Result<()> {
567 let bind_addr: SocketAddr = config.bind_address.parse()?; 585 let bind_addr: SocketAddr = config.bind_address.parse()?;
568 586
@@ -582,6 +600,8 @@ pub async fn run_server(
582 database.clone(), 600 database.clone(),
583 metrics.clone(), 601 metrics.clone(),
584 purgatory.clone(), 602 purgatory.clone(),
603 write_policy.clone(),
604 rejected_events_index.clone(),
585 ); 605 );
586 606
587 tokio::spawn(async move { 607 tokio::spawn(async move {
diff --git a/src/main.rs b/src/main.rs
index ab6ede7..6769cf3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -130,7 +130,9 @@ async fn main() -> Result<()> {
130 } 130 }
131 131
132 // Get a reference to the rejected events index for shutdown persistence 132 // Get a reference to the rejected events index for shutdown persistence
133 // and for the HTTP server's git push path (hot-cache re-processing)
133 let shutdown_rejected_index = sync_manager.rejected_events_index(); 134 let shutdown_rejected_index = sync_manager.rejected_events_index();
135 let http_rejected_index = shutdown_rejected_index.clone();
134 136
135 tokio::spawn(async move { 137 tokio::spawn(async move {
136 sync_manager.run().await; 138 sync_manager.run().await;
@@ -206,6 +208,9 @@ async fn main() -> Result<()> {
206 // Start HTTP server with integrated relay and database 208 // Start HTTP server with integrated relay and database
207 info!("Starting HTTP server on {}", config.bind_address); 209 info!("Starting HTTP server on {}", config.bind_address);
208 210
211 // Wrap write_policy in Arc for sharing between HTTP server connections
212 let http_write_policy = Arc::new(relay_with_db.write_policy.clone());
213
209 // Run server until shutdown signal, then cleanup 214 // Run server until shutdown signal, then cleanup
210 tokio::select! { 215 tokio::select! {
211 result = http::run_server( 216 result = http::run_server(
@@ -214,6 +219,8 @@ async fn main() -> Result<()> {
214 relay_with_db.database, 219 relay_with_db.database,
215 metrics, 220 metrics,
216 purgatory, 221 purgatory,
222 http_write_policy,
223 http_rejected_index,
217 ) => { 224 ) => {
218 result? 225 result?
219 } 226 }
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 3568e89..ece8cd6 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -474,7 +474,9 @@ impl SyncContext for RealSyncContext {
474 source_repo_path: &Path, 474 source_repo_path: &Path,
475 new_oids: &HashSet<String>, 475 new_oids: &HashSet<String>,
476 ) -> Result<ProcessResult> { 476 ) -> Result<ProcessResult> {
477 // Delegate to the unified function from git::sync 477 // Delegate to the unified function from git::sync.
478 // Pass None for write_policy and rejected_events_index: the purgatory sync path
479 // already handles hot-cache re-processing via SyncManager::process_event_static.
478 let result = crate::git::sync::process_newly_available_git_data( 480 let result = crate::git::sync::process_newly_available_git_data(
479 source_repo_path, 481 source_repo_path,
480 new_oids, 482 new_oids,
@@ -482,6 +484,8 @@ impl SyncContext for RealSyncContext {
482 self.local_relay.as_ref(), 484 self.local_relay.as_ref(),
483 &self.purgatory, 485 &self.purgatory,
484 &self.git_data_path, 486 &self.git_data_path,
487 None,
488 None,
485 ) 489 )
486 .await?; 490 .await?;
487 491
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs
index 266a437..61d8e14 100644
--- a/tests/sync/maintainer_reprocessing.rs
+++ b/tests/sync/maintainer_reprocessing.rs
@@ -2,51 +2,61 @@
2//! 2//!
3//! Tests the two-tier rejected events index and immediate re-processing of 3//! Tests the two-tier rejected events index and immediate re-processing of
4//! maintainer announcements when owner announcements are accepted. 4//! maintainer announcements when owner announcements are accepted.
5//!
6//! ## Test design
7//!
8//! Announcements now require git data before they are released from purgatory and
9//! served to other relays. The hot-cache re-processing path we want to exercise is:
10//!
11//! relay_b syncs maintainer announcement from relay_a
12//! → write policy rejects it (no owner announcement in DB yet)
13//! → event stored in hot cache
14//! owner git push to relay_b promotes owner announcement from purgatory
15//! → our new code calls rejected_events_index.invalidate_and_get()
16//! → maintainer announcement re-processed and accepted
17//!
18//! To guarantee the maintainer announcements arrive at relay_b *before* the owner
19//! git push, relay_b is started with relay_a as its bootstrap relay. That way
20//! relay_b's SyncManager connects to relay_a immediately and syncs whatever is
21//! already in relay_a's DB. We push the maintainer git data first (so the
22//! announcements are in relay_a's DB), wait briefly for the sync round-trip, then
23//! send the owner announcement + git push.
5 24
6use std::time::Duration; 25use std::time::Duration;
7 26
8use nostr_sdk::prelude::*; 27use nostr_sdk::prelude::*;
9 28
10use crate::common::{ 29use crate::common::{sync_helpers::*, TestRelay};
11 sync_helpers::*,
12 TestRelay,
13};
14 30
15/// Test that maintainer announcements are re-processed immediately when owner announcement accepted 31/// Test that a maintainer announcement is re-processed immediately when the owner
32/// announcement is promoted from purgatory via a git push.
16/// 33///
17/// Flow: 34/// Flow:
18/// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b) 35/// 1. relay_a: Maintainer sends announcement + git data → accepted into relay_a's DB
19/// 2. relay_b: Owner sends announcement (lists relay_a + maintainer) 36/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs maintainer announcement
20/// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index 37/// → rejected by write policy (no owner in DB) → stored in hot cache
21/// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement 38/// 3. relay_b: Owner sends announcement → purgatory (no git data yet)
39/// 4. relay_b: Owner git push → owner announcement promoted from purgatory
40/// → hot-cache re-processing fires → maintainer announcement accepted
22/// 5. Both announcements should be in relay_b's database 41/// 5. Both announcements should be in relay_b's database
23///
24/// Expected time: <5 seconds (vs 24 hours without hot cache)
25#[tokio::test] 42#[tokio::test]
26async fn test_maintainer_announcement_reprocessed_immediately() { 43async fn test_maintainer_announcement_reprocessed_immediately() {
27 // Start relay_a (where maintainer announcement will be sent) 44 // Start relay_a (where maintainer announcement will be sent)
28 let relay_a = TestRelay::start().await; 45 let relay_a = TestRelay::start().await;
29 println!("relay_a started at {}", relay_a.url()); 46 println!("relay_a started at {}", relay_a.url());
30 47
31 // Start relay_b with sync enabled (will sync from relay_a)
32 let relay_b = TestRelay::start_with_sync(None).await;
33 println!("relay_b started at {}", relay_b.url());
34
35 // Create keys 48 // Create keys
36 let owner_keys = Keys::generate(); 49 let owner_keys = Keys::generate();
37 let maintainer_keys = Keys::generate(); 50 let maintainer_keys = Keys::generate();
38
39 let identifier = "test-repo"; 51 let identifier = "test-repo";
40 52
41 let start = std::time::Instant::now(); 53 // Step 1: Send maintainer announcement to relay_a then push git data so it lands in
42 54 // relay_a's DB. The announcement lists relay_a only (not relay_b), so relay_b's write
43 // Step 1: Send maintainer announcement to relay_a (will be rejected by relay_b - doesn't list relay_b) 55 // policy will reject it when it arrives via sync.
44 // Use HTTP clone URL pointing to relay_a's git endpoint so it can be released from purgatory
45 let maintainer_npub = maintainer_keys 56 let maintainer_npub = maintainer_keys
46 .public_key() 57 .public_key()
47 .to_bech32() 58 .to_bech32()
48 .expect("Failed to get npub"); 59 .expect("Failed to get npub");
49
50 let maintainer_announcement = 60 let maintainer_announcement =
51 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") 61 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository")
52 .tags(vec![ 62 .tags(vec![
@@ -60,27 +70,33 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
60 identifier 70 identifier
61 )], 71 )],
62 ), 72 ),
63 Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), 73 Tag::custom(
74 TagKind::custom("relays"),
75 vec![relay_a.url().to_string()],
76 ),
64 ]) 77 ])
65 .sign_with_keys(&maintainer_keys) 78 .sign_with_keys(&maintainer_keys)
66 .unwrap(); 79 .unwrap();
80 send_to_relay(&relay_a, &maintainer_announcement).await.unwrap();
81 let _git_dir_maintainer =
82 push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()])
83 .await;
84 println!("✓ Maintainer announcement + git data pushed to relay_a");
85
86 // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately.
87 // relay_b's initial negentropy sync will pick up the maintainer announcement and reject it
88 // (no owner announcement in relay_b's DB yet), storing it in the hot cache.
89 let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await;
90 println!("relay_b started at {}", relay_b.url());
67 91
68 send_to_relay(&relay_a, &maintainer_announcement) 92 // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a.
69 .await 93 tokio::time::sleep(Duration::from_secs(3)).await;
70 .unwrap(); 94 println!("✓ relay_b synced from relay_a (maintainer announcement should be in hot cache)");
71 println!("✓ Maintainer announcement sent to relay_a"); 95
72 96 let start = std::time::Instant::now();
73 // Push git data for maintainer's repo to relay_a → releases maintainer announcement from purgatory 97
74 let _git_dir_maintainer = push_git_data_to_relay( 98 // Step 3: Send owner announcement to relay_b → goes to purgatory (no git data yet).
75 &relay_a, 99 // The announcement lists relay_a + relay_b and names the maintainer.
76 &maintainer_keys,
77 identifier,
78 &[&relay_a.domain()],
79 )
80 .await;
81 println!("✓ Maintainer git data pushed to relay_a (announcement released from purgatory)");
82
83 // Step 2: Set up owner announcement on relay_b (lists relay_a + maintainer) with git data
84 let owner_npub = owner_keys 100 let owner_npub = owner_keys
85 .public_key() 101 .public_key()
86 .to_bech32() 102 .to_bech32()
@@ -111,19 +127,21 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
111 .unwrap(); 127 .unwrap();
112 128
113 send_to_relay(&relay_b, &owner_announcement).await.unwrap(); 129 send_to_relay(&relay_b, &owner_announcement).await.unwrap();
114 println!("✓ Owner announcement sent to relay_b"); 130 println!("✓ Owner announcement sent to relay_b (now in purgatory)");
115 131
116 // Push git data for owner's repo to relay_b → releases owner announcement from purgatory 132 // Step 4: Push owner git data to relay_b.
133 // This promotes the owner announcement from purgatory, which triggers hot-cache
134 // re-processing of the maintainer announcement via our new code path.
117 let _git_dir_owner = 135 let _git_dir_owner =
118 push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; 136 push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await;
119 println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); 137 println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)");
120 138
121 // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) 139 // Step 5: Wait briefly for async processing to complete.
122 tokio::time::sleep(Duration::from_secs(3)).await; 140 tokio::time::sleep(Duration::from_secs(1)).await;
123 141
124 let elapsed = start.elapsed(); 142 let elapsed = start.elapsed();
125 143
126 // Step 4: Verify both announcements are in relay_b's database 144 // Step 6: Verify both announcements are in relay_b's database.
127 let owner_filter = Filter::new() 145 let owner_filter = Filter::new()
128 .kind(Kind::GitRepoAnnouncement) 146 .kind(Kind::GitRepoAnnouncement)
129 .author(owner_keys.public_key()) 147 .author(owner_keys.public_key())
@@ -145,7 +163,6 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
145 "Maintainer announcement should be re-processed and accepted in relay_b" 163 "Maintainer announcement should be re-processed and accepted in relay_b"
146 ); 164 );
147 165
148 // Step 5: Verify it happened quickly (not 24 hours!)
149 assert!( 166 assert!(
150 elapsed.as_secs() < 15, 167 elapsed.as_secs() < 15,
151 "Re-processing should happen in <15 seconds, took {:?}", 168 "Re-processing should happen in <15 seconds, took {:?}",
@@ -258,13 +275,16 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() {
258 relay.stop().await; 275 relay.stop().await;
259} 276}
260 277
261/// Test multiple maintainers are all re-processed when owner announcement accepted 278/// Test that all maintainer announcements are re-processed when the owner announcement
279/// is promoted from purgatory via a git push.
262/// 280///
263/// Flow: 281/// Flow:
264/// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b) 282/// 1. relay_a: Three maintainers send announcements + git data → in relay_a's DB
265/// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers) 283/// 2. relay_b (bootstrapped from relay_a): SyncManager syncs all three maintainer
266/// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index 284/// announcements → all rejected (no owner in DB) → all in hot cache
267/// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements 285/// 3. relay_b: Owner sends announcement → purgatory
286/// 4. relay_b: Owner git push → owner promoted → hot-cache re-processing fires for
287/// all three maintainers
268/// 5. All four announcements should be in relay_b's database 288/// 5. All four announcements should be in relay_b's database
269#[tokio::test] 289#[tokio::test]
270async fn test_multiple_maintainers_all_reprocessed() { 290async fn test_multiple_maintainers_all_reprocessed() {
@@ -272,21 +292,23 @@ async fn test_multiple_maintainers_all_reprocessed() {
272 let relay_a = TestRelay::start().await; 292 let relay_a = TestRelay::start().await;
273 println!("relay_a started at {}", relay_a.url()); 293 println!("relay_a started at {}", relay_a.url());
274 294
275 // Start relay_b with sync enabled (will sync from relay_a)
276 let relay_b = TestRelay::start_with_sync(None).await;
277 println!("relay_b started at {}", relay_b.url());
278
279 // Create keys 295 // Create keys
280 let owner_keys = Keys::generate(); 296 let owner_keys = Keys::generate();
281 let maintainer1_keys = Keys::generate(); 297 let maintainer1_keys = Keys::generate();
282 let maintainer2_keys = Keys::generate(); 298 let maintainer2_keys = Keys::generate();
283 let maintainer3_keys = Keys::generate(); 299 let maintainer3_keys = Keys::generate();
284 300
285 let identifier = "multi-maintainer-repo"; 301 // Use a unique identifier per test run to avoid cross-test interference when
302 // tests run in parallel (each test gets its own namespace on relay_a).
303 let identifier = &format!(
304 "multi-maintainer-repo-{}",
305 owner_keys.public_key().to_hex()[..8].to_string()
306 );
286 307
287 // Step 1: Send three maintainer announcements to relay_a with git data 308 // Step 1: Send each maintainer announcement to relay_a then push git data so all three
288 // (purgatory requires git data before announcements are accepted) 309 // land in relay_a's DB. Each announcement lists relay_a only, so relay_b will reject
289 let mut git_dirs_maintainers = Vec::new(); 310 // them when syncing (no owner announcement in relay_b's DB yet).
311 let mut git_dirs = Vec::new();
290 for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] 312 for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys]
291 .iter() 313 .iter()
292 .enumerate() 314 .enumerate()
@@ -295,13 +317,12 @@ async fn test_multiple_maintainers_all_reprocessed() {
295 .public_key() 317 .public_key()
296 .to_bech32() 318 .to_bech32()
297 .expect("Failed to get npub"); 319 .expect("Failed to get npub");
298
299 let announcement = EventBuilder::new( 320 let announcement = EventBuilder::new(
300 Kind::GitRepoAnnouncement, 321 Kind::GitRepoAnnouncement,
301 format!("Maintainer {} repository", idx + 1), 322 format!("Maintainer {} repository", idx + 1),
302 ) 323 )
303 .tags(vec![ 324 .tags(vec![
304 Tag::identifier(identifier), 325 Tag::identifier(identifier.as_str()),
305 Tag::custom( 326 Tag::custom(
306 TagKind::custom("clone"), 327 TagKind::custom("clone"),
307 vec![format!( 328 vec![format!(
@@ -315,18 +336,53 @@ async fn test_multiple_maintainers_all_reprocessed() {
315 ]) 336 ])
316 .sign_with_keys(maintainer_keys) 337 .sign_with_keys(maintainer_keys)
317 .unwrap(); 338 .unwrap();
318
319 send_to_relay(&relay_a, &announcement).await.unwrap(); 339 send_to_relay(&relay_a, &announcement).await.unwrap();
340 // Use push_unique_git_data_to_relay so each maintainer gets a distinct commit
341 // hash. Identical hashes cause git to skip pack transfer when the object
342 // already exists on the server, leaving the announcement in purgatory.
343 let git_dir = push_unique_git_data_to_relay(
344 &relay_a,
345 maintainer_keys,
346 identifier,
347 &[&relay_a.domain()],
348 &m_npub,
349 )
350 .await;
351 git_dirs.push(git_dir);
352 }
353 println!("✓ Three maintainer announcements + git data pushed to relay_a");
320 354
321 // Push git data to release each maintainer's announcement from purgatory 355 // Confirm all three announcements are queryable on relay_a before starting relay_b.
322 let git_dir = 356 // This eliminates the race between relay_a's DB writes and relay_b's initial negentropy sync.
323 push_git_data_to_relay(&relay_a, maintainer_keys, identifier, &[&relay_a.domain()]) 357 for (name, keys) in [
324 .await; 358 ("maintainer1", &maintainer1_keys),
325 git_dirs_maintainers.push(git_dir); 359 ("maintainer2", &maintainer2_keys),
360 ("maintainer3", &maintainer3_keys),
361 ] {
362 let filter = Filter::new()
363 .kind(Kind::GitRepoAnnouncement)
364 .author(keys.public_key())
365 .identifier(identifier);
366 let found =
367 wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await;
368 assert!(found, "{} announcement should be in relay_a before starting relay_b", name);
326 } 369 }
327 println!("✓ Three maintainer announcements sent to relay_a with git data"); 370 println!("✓ All three maintainer announcements confirmed in relay_a's DB");
371
372 // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately.
373 // Because all three maintainer announcements are confirmed in relay_a's DB, relay_b's
374 // initial negentropy sync will pick them all up and reject them (no owner announcement
375 // in relay_b's DB yet), storing them in the hot cache.
376 let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await;
377 println!("relay_b started at {}", relay_b.url());
378
379 // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a.
380 // The negentropy sync completes within ~200ms (NGIT_SYNC_BATCH_WINDOW_MS=200), but we
381 // allow extra time for slow CI environments.
382 tokio::time::sleep(Duration::from_secs(3)).await;
383 println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)");
328 384
329 // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) 385 // Step 3: Send owner announcement to relay_b goes to purgatory.
330 let owner_npub = owner_keys 386 let owner_npub = owner_keys
331 .public_key() 387 .public_key()
332 .to_bech32() 388 .to_bech32()
@@ -361,17 +417,19 @@ async fn test_multiple_maintainers_all_reprocessed() {
361 .unwrap(); 417 .unwrap();
362 418
363 send_to_relay(&relay_b, &owner_announcement).await.unwrap(); 419 send_to_relay(&relay_b, &owner_announcement).await.unwrap();
364 println!("✓ Owner announcement sent to relay_b"); 420 println!("✓ Owner announcement sent to relay_b (now in purgatory)");
365 421
366 // Push git data for owner to relay_b → releases owner announcement from purgatory 422 // Step 4: Push owner git data to relay_b.
423 // This promotes the owner announcement from purgatory and triggers hot-cache
424 // re-processing for all three maintainer announcements.
367 let _git_dir_owner = 425 let _git_dir_owner =
368 push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; 426 push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await;
369 println!("✓ Owner git data pushed to relay_b (announcement released from purgatory)"); 427 println!("✓ Owner git data pushed to relay_b (hot-cache re-processing should fire)");
370 428
371 // Step 3: Wait for sync and re-processing 429 // Step 5: Wait briefly for async processing to complete.
372 tokio::time::sleep(Duration::from_secs(3)).await; 430 tokio::time::sleep(Duration::from_secs(1)).await;
373 431
374 // Step 4: Verify all four announcements are in relay_b's database 432 // Step 6: Verify all four announcements are in relay_b's database.
375 for (name, keys) in [ 433 for (name, keys) in [
376 ("owner", &owner_keys), 434 ("owner", &owner_keys),
377 ("maintainer1", &maintainer1_keys), 435 ("maintainer1", &maintainer1_keys),
@@ -396,10 +454,10 @@ async fn test_multiple_maintainers_all_reprocessed() {
396/// Test that invalid maintainer public keys don't cause panics 454/// Test that invalid maintainer public keys don't cause panics
397/// 455///
398/// Flow: 456/// Flow:
399/// 1. Maintainer announcement arrives → Rejected 457/// 1. Maintainer announcement arrives → Rejected (doesn't list our relay)
400/// 2. Owner announcement arrives with INVALID maintainer hex Should handle gracefully 458/// 2. Owner announcement + git push → accepted, with INVALID maintainer hex in maintainers tag
401/// 3. Owner announcement should still be accepted 459/// 3. Owner announcement should be accepted
402/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey) 460/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey can't be parsed)
403#[tokio::test] 461#[tokio::test]
404async fn test_invalid_maintainer_pubkey_handled_gracefully() { 462async fn test_invalid_maintainer_pubkey_handled_gracefully() {
405 let relay = TestRelay::start().await; 463 let relay = TestRelay::start().await;
@@ -410,8 +468,12 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() {
410 468
411 let identifier = "invalid-maintainer-repo"; 469 let identifier = "invalid-maintainer-repo";
412 470
471 // Create client using TestClient helper
472 let client = TestClient::new(relay.url(), owner_keys.clone())
473 .await
474 .expect("Failed to connect to relay");
475
413 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) 476 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay)
414 // This one uses example.com clone URL - it goes to purgatory on relay, never promoted
415 let maintainer_announcement = 477 let maintainer_announcement =
416 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") 478 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository")
417 .tags(vec![ 479 .tags(vec![
@@ -428,12 +490,13 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() {
428 .sign_with_keys(&maintainer_keys) 490 .sign_with_keys(&maintainer_keys)
429 .unwrap(); 491 .unwrap();
430 492
431 // Send maintainer announcement - expect it to be rejected (purgatory / policy) 493 // Send maintainer announcement - expect it to be rejected
432 send_to_relay(&relay, &maintainer_announcement).await.ok(); 494 let _ = client.send_event(&maintainer_announcement).await;
433 tokio::time::sleep(Duration::from_millis(200)).await; 495 tokio::time::sleep(Duration::from_millis(200)).await;
434 496
435 // Step 2: Set up owner announcement with INVALID maintainer hex and git data 497 // Step 2: Send owner announcement with INVALID maintainer hex, then push git data.
436 // Use HTTP clone URL to relay's git endpoint so it can be released from purgatory 498 // The announcement goes to purgatory first; the git push promotes it.
499 // The invalid maintainer hex should be handled gracefully (no panic).
437 let owner_npub = owner_keys 500 let owner_npub = owner_keys
438 .public_key() 501 .public_key()
439 .to_bech32() 502 .to_bech32()
@@ -461,13 +524,8 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() {
461 .unwrap(); 524 .unwrap();
462 525
463 send_to_relay(&relay, &owner_announcement).await.unwrap(); 526 send_to_relay(&relay, &owner_announcement).await.unwrap();
464
465 // Push git data to relay → releases owner announcement from purgatory
466 let _git_dir = 527 let _git_dir =
467 push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; 528 push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await;
468 println!("✓ Owner git data pushed to relay (announcement released from purgatory)");
469
470 // Wait for processing
471 tokio::time::sleep(Duration::from_millis(500)).await; 529 tokio::time::sleep(Duration::from_millis(500)).await;
472 530
473 // Step 3: Verify owner announcement accepted, maintainer not re-processed 531 // Step 3: Verify owner announcement accepted, maintainer not re-processed
@@ -497,5 +555,6 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() {
497 555
498 println!("✅ Invalid maintainer pubkey handled gracefully without panic"); 556 println!("✅ Invalid maintainer pubkey handled gracefully without panic");
499 557
558 client.disconnect().await;
500 relay.stop().await; 559 relay.stop().await;
501} 560}