upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 11:17:10 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-23 12:05:29 +0000
commit70749ea9df1f6061c332112c617b615f91d79d48 (patch)
tree6f4ace061681d8356ea79eb782fddb36c1f31d23 /src
parent9f15929b10825c2f55434a98794fc551794cad2b (diff)
fix: re-process hot-cache maintainer announcements after git push promotion
When an owner announcement is promoted from purgatory via a git push, any maintainer announcements sitting in the rejected_events_index hot cache were never re-processed. The invalidate_and_get call only existed in SyncManager::process_event_static (the nostr sync path); the git push promotion path (http -> handlers -> git::sync) had no access to the rejected_events_index at all. Thread rejected_events_index and write_policy through the git push path: - process_purgatory_announcements: after saving the promoted announcement, parse its maintainers tag and call invalidate_and_get() for each, then re-process any returned hot-cache events via admit_event + save - process_newly_available_git_data: accept optional write_policy and rejected_events_index, pass them through to process_purgatory_announcements - handle_receive_pack: accept Arc<Nip34WritePolicy> and Arc<RejectedEventsIndex>, pass them to process_newly_available_git_data - HttpService / run_server: carry the two new fields, clone into each handle_receive_pack call - main.rs: obtain rejected_events_index from sync_manager before moving it into its task; wrap write_policy in Arc for the HTTP server - RealSyncContext::process_newly_available_git_data: pass None for both new params (purgatory sync path already handles this via SyncManager::process_event_static) Also rewrite the maintainer_reprocessing integration tests to correctly exercise the hot-cache path now that announcements require git data before being released from purgatory: - Start relay_b with relay_a as bootstrap so its SyncManager syncs maintainer announcements via negentropy before the owner git push - Use push_unique_git_data_to_relay (new helper) to give each maintainer a distinct commit hash, preventing git from skipping pack transfer - Make wait_for_event_on_relay poll in a retry loop so transient timing gaps between DB write and query do not cause false negatives
Diffstat (limited to 'src')
-rw-r--r--src/git/handlers.rs7
-rw-r--r--src/git/sync.rs113
-rw-r--r--src/http/mod.rs22
-rw-r--r--src/main.rs7
-rw-r--r--src/purgatory/sync/context.rs6
5 files changed, 151 insertions, 4 deletions
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 017eee4..13d6ba0 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess;
17 17
18use crate::git::authorization::{authorize_push, parse_pushed_refs}; 18use crate::git::authorization::{authorize_push, parse_pushed_refs};
19use crate::git::sync::process_newly_available_git_data; 19use crate::git::sync::process_newly_available_git_data;
20use crate::nostr::builder::SharedDatabase; 20use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
21use crate::purgatory::Purgatory; 21use crate::purgatory::Purgatory;
22use crate::sync::rejected_index::RejectedEventsIndex;
22 23
23/// Handle GET /info/refs?service=git-{upload,receive}-pack 24/// Handle GET /info/refs?service=git-{upload,receive}-pack
24/// 25///
@@ -195,6 +196,8 @@ pub async fn handle_receive_pack(
195 purgatory: Arc<Purgatory>, 196 purgatory: Arc<Purgatory>,
196 git_data_path: &str, 197 git_data_path: &str,
197 git_protocol: Option<&str>, 198 git_protocol: Option<&str>,
199 write_policy: Arc<Nip34WritePolicy>,
200 rejected_events_index: Arc<RejectedEventsIndex>,
198) -> Result<Response<Full<Bytes>>, GitError> { 201) -> Result<Response<Full<Bytes>>, GitError> {
199 debug!("Handling receive-pack for {:?}", repo_path); 202 debug!("Handling receive-pack for {:?}", repo_path);
200 203
@@ -307,6 +310,8 @@ pub async fn handle_receive_pack(
307 Some(&relay), 310 Some(&relay),
308 &purgatory, 311 &purgatory,
309 git_data_path_buf, 312 git_data_path_buf,
313 Some(&write_policy),
314 Some(&rejected_events_index),
310 ) 315 )
311 .await 316 .await
312 { 317 {
diff --git a/src/git/sync.rs b/src/git/sync.rs
index 4b35023..8401736 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -32,6 +32,7 @@
32use std::collections::{HashMap, HashSet}; 32use std::collections::{HashMap, HashSet};
33use std::path::Path; 33use std::path::Path;
34use std::process::Command; 34use std::process::Command;
35use std::sync::Arc;
35use tracing::{debug, info, warn}; 36use tracing::{debug, info, warn};
36 37
37use nostr_sdk::Event; 38use nostr_sdk::Event;
@@ -41,9 +42,10 @@ use crate::git::authorization::{
41 RepositoryData, 42 RepositoryData,
42}; 43};
43use crate::git::{self, oid_exists}; 44use crate::git::{self, oid_exists};
44use crate::nostr::builder::SharedDatabase; 45use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
45use crate::nostr::events::RepositoryState; 46use crate::nostr::events::RepositoryState;
46use crate::purgatory::{can_apply_state, Purgatory}; 47use crate::purgatory::{can_apply_state, Purgatory};
48use crate::sync::rejected_index::RejectedEventsIndex;
47 49
48/// Result of processing newly available git data. 50/// Result of processing newly available git data.
49/// 51///
@@ -819,6 +821,8 @@ pub async fn process_newly_available_git_data(
819 local_relay: Option<&nostr_relay_builder::LocalRelay>, 821 local_relay: Option<&nostr_relay_builder::LocalRelay>,
820 purgatory: &Purgatory, 822 purgatory: &Purgatory,
821 git_data_path: &Path, 823 git_data_path: &Path,
824 write_policy: Option<&Nip34WritePolicy>,
825 rejected_events_index: Option<&Arc<RejectedEventsIndex>>,
822) -> anyhow::Result<ProcessResult> { 826) -> anyhow::Result<ProcessResult> {
823 let mut result = ProcessResult::default(); 827 let mut result = ProcessResult::default();
824 828
@@ -848,6 +852,8 @@ pub async fn process_newly_available_git_data(
848 local_relay, 852 local_relay,
849 purgatory, 853 purgatory,
850 git_data_path, 854 git_data_path,
855 write_policy,
856 rejected_events_index,
851 ) 857 )
852 .await; 858 .await;
853 result.merge(announcement_result); 859 result.merge(announcement_result);
@@ -1277,6 +1283,10 @@ async fn process_purgatory_pr_events(
1277/// 1283///
1278/// When git data arrives for a repository, any announcements in purgatory 1284/// When git data arrives for a repository, any announcements in purgatory
1279/// for that repository should be promoted to the database and served to clients. 1285/// for that repository should be promoted to the database and served to clients.
1286///
1287/// When `write_policy` and `rejected_events_index` are provided (git push path),
1288/// any maintainer announcements sitting in the hot cache are re-processed immediately
1289/// after the owner announcement is promoted, so they don't wait for the next sync cycle.
1280async fn process_purgatory_announcements( 1290async fn process_purgatory_announcements(
1281 identifier: &str, 1291 identifier: &str,
1282 source_repo_path: &Path, 1292 source_repo_path: &Path,
@@ -1284,6 +1294,8 @@ async fn process_purgatory_announcements(
1284 local_relay: Option<&nostr_relay_builder::LocalRelay>, 1294 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1285 purgatory: &Purgatory, 1295 purgatory: &Purgatory,
1286 git_data_path: &Path, 1296 git_data_path: &Path,
1297 write_policy: Option<&Nip34WritePolicy>,
1298 rejected_events_index: Option<&Arc<RejectedEventsIndex>>,
1287) -> ProcessResult { 1299) -> ProcessResult {
1288 let mut result = ProcessResult::default(); 1300 let mut result = ProcessResult::default();
1289 1301
@@ -1339,6 +1351,105 @@ async fn process_purgatory_announcements(
1339 } 1351 }
1340 1352
1341 result.announcements_released += 1; 1353 result.announcements_released += 1;
1354
1355 // Re-process any maintainer announcements sitting in the hot cache.
1356 //
1357 // When an owner announcement is promoted from purgatory via a git push,
1358 // maintainer announcements that arrived earlier (via relay sync) may have
1359 // been rejected and stored in the hot cache because the owner announcement
1360 // didn't exist in the DB yet. Now that the owner announcement is saved,
1361 // we must invalidate and re-process those cached events immediately.
1362 //
1363 // This only applies on the git push path (write_policy + rejected_events_index
1364 // are Some). The purgatory sync path already handles this via
1365 // SyncManager::process_event_static.
1366 if let (Some(wp), Some(rei), Some(relay)) =
1367 (write_policy, rejected_events_index, local_relay)
1368 {
1369 use crate::nostr::events::RepositoryAnnouncement;
1370 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult};
1371 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1372
1373 if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) {
1374 if !announcement.maintainers.is_empty() {
1375 debug!(
1376 identifier = %identifier,
1377 event_id = %event.id,
1378 maintainer_count = announcement.maintainers.len(),
1379 "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements"
1380 );
1381
1382 for maintainer_hex in &announcement.maintainers {
1383 match nostr_sdk::PublicKey::from_hex(maintainer_hex) {
1384 Ok(maintainer_pubkey) => {
1385 let (removed, hot_events) = rei.invalidate_and_get(
1386 &maintainer_pubkey,
1387 &announcement.identifier,
1388 Some(crate::sync::rejected_index::EventType::Announcement),
1389 );
1390
1391 if removed > 0 {
1392 info!(
1393 maintainer = %maintainer_hex,
1394 identifier = %announcement.identifier,
1395 removed_from_cold_index = removed,
1396 hot_cache_events = hot_events.len(),
1397 "Invalidated rejected maintainer announcements after git push promotion"
1398 );
1399 }
1400
1401 // Re-process events from hot cache
1402 let dummy_addr = SocketAddr::new(
1403 IpAddr::V4(Ipv4Addr::LOCALHOST),
1404 0,
1405 );
1406 for hot_event in hot_events {
1407 info!(
1408 event_id = %hot_event.id,
1409 maintainer = %maintainer_hex,
1410 identifier = %announcement.identifier,
1411 "Re-processing maintainer announcement from hot cache after git push promotion"
1412 );
1413 match wp.admit_event(&hot_event, &dummy_addr).await {
1414 WritePolicyResult::Accept => {
1415 match database.save_event(&hot_event).await {
1416 Ok(_) => {
1417 relay.notify_event(hot_event.clone());
1418 info!(
1419 event_id = %hot_event.id,
1420 "Maintainer announcement accepted and saved on re-processing"
1421 );
1422 }
1423 Err(e) => {
1424 warn!(
1425 event_id = %hot_event.id,
1426 error = %e,
1427 "Failed to save re-processed maintainer announcement"
1428 );
1429 }
1430 }
1431 }
1432 _ => {
1433 warn!(
1434 event_id = %hot_event.id,
1435 "Maintainer announcement still rejected on re-processing"
1436 );
1437 }
1438 }
1439 }
1440 }
1441 Err(e) => {
1442 warn!(
1443 maintainer_hex = %maintainer_hex,
1444 error = %e,
1445 "Invalid maintainer public key in promoted announcement"
1446 );
1447 }
1448 }
1449 }
1450 }
1451 }
1452 }
1342 } 1453 }
1343 Err(e) => { 1454 Err(e) => {
1344 warn!( 1455 warn!(
diff --git a/src/http/mod.rs b/src/http/mod.rs
index ffb1562..cfd7c52 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -26,8 +26,9 @@ use tokio::net::TcpListener;
26use crate::config::Config; 26use crate::config::Config;
27use crate::git; 27use crate::git;
28use crate::metrics::Metrics; 28use crate::metrics::Metrics;
29use crate::nostr::builder::SharedDatabase; 29use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
30use crate::purgatory::Purgatory; 30use crate::purgatory::Purgatory;
31use crate::sync::rejected_index::RejectedEventsIndex;
31 32
32/// CORS headers required by GRASP-01 specification (lines 40-47) 33/// CORS headers required by GRASP-01 specification (lines 40-47)
33const CORS_ALLOW_ORIGIN: &str = "*"; 34const CORS_ALLOW_ORIGIN: &str = "*";
@@ -97,6 +98,10 @@ struct HttpService {
97 metrics: Option<Arc<Metrics>>, 98 metrics: Option<Arc<Metrics>>,
98 /// Purgatory for event/git coordination 99 /// Purgatory for event/git coordination
99 purgatory: Arc<Purgatory>, 100 purgatory: Arc<Purgatory>,
101 /// Write policy for re-processing hot-cache events after git push promotion
102 write_policy: Arc<Nip34WritePolicy>,
103 /// Rejected events index for hot-cache re-processing after git push promotion
104 rejected_events_index: Arc<RejectedEventsIndex>,
100} 105}
101 106
102impl HttpService { 107impl HttpService {
@@ -107,6 +112,8 @@ impl HttpService {
107 database: SharedDatabase, 112 database: SharedDatabase,
108 metrics: Option<Arc<Metrics>>, 113 metrics: Option<Arc<Metrics>>,
109 purgatory: Arc<Purgatory>, 114 purgatory: Arc<Purgatory>,
115 write_policy: Arc<Nip34WritePolicy>,
116 rejected_events_index: Arc<RejectedEventsIndex>,
110 ) -> Self { 117 ) -> Self {
111 Self { 118 Self {
112 relay, 119 relay,
@@ -115,6 +122,8 @@ impl HttpService {
115 database, 122 database,
116 metrics, 123 metrics,
117 purgatory, 124 purgatory,
125 write_policy,
126 rejected_events_index,
118 } 127 }
119 } 128 }
120} 129}
@@ -132,6 +141,8 @@ impl Service<Request<Incoming>> for HttpService {
132 let git_data_path = self.config.effective_git_data_path(); 141 let git_data_path = self.config.effective_git_data_path();
133 let database = self.database.clone(); 142 let database = self.database.clone();
134 let purgatory = self.purgatory.clone(); 143 let purgatory = self.purgatory.clone();
144 let write_policy = self.write_policy.clone();
145 let rejected_events_index = self.rejected_events_index.clone();
135 146
136 // Handle OPTIONS preflight requests (CORS) 147 // Handle OPTIONS preflight requests (CORS)
137 // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content 148 // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content
@@ -293,6 +304,8 @@ impl Service<Request<Incoming>> for HttpService {
293 purgatory.clone(), 304 purgatory.clone(),
294 &git_data_path, 305 &git_data_path,
295 git_protocol.as_deref(), 306 git_protocol.as_deref(),
307 write_policy.clone(),
308 rejected_events_index.clone(),
296 ) 309 )
297 .await; 310 .await;
298 311
@@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String {
557/// * `relay` - The LocalRelay for WebSocket connections 570/// * `relay` - The LocalRelay for WebSocket connections
558/// * `database` - The database for direct queries (e.g., push authorization) 571/// * `database` - The database for direct queries (e.g., push authorization)
559/// * `metrics` - Optional metrics for Prometheus endpoint 572/// * `metrics` - Optional metrics for Prometheus endpoint
573/// * `purgatory` - Purgatory for event/git coordination
574/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion
575/// * `rejected_events_index` - Rejected events index for hot-cache re-processing
560pub async fn run_server( 576pub async fn run_server(
561 config: Config, 577 config: Config,
562 relay: LocalRelay, 578 relay: LocalRelay,
563 database: SharedDatabase, 579 database: SharedDatabase,
564 metrics: Option<Arc<Metrics>>, 580 metrics: Option<Arc<Metrics>>,
565 purgatory: Arc<Purgatory>, 581 purgatory: Arc<Purgatory>,
582 write_policy: Arc<Nip34WritePolicy>,
583 rejected_events_index: Arc<RejectedEventsIndex>,
566) -> anyhow::Result<()> { 584) -> anyhow::Result<()> {
567 let bind_addr: SocketAddr = config.bind_address.parse()?; 585 let bind_addr: SocketAddr = config.bind_address.parse()?;
568 586
@@ -582,6 +600,8 @@ pub async fn run_server(
582 database.clone(), 600 database.clone(),
583 metrics.clone(), 601 metrics.clone(),
584 purgatory.clone(), 602 purgatory.clone(),
603 write_policy.clone(),
604 rejected_events_index.clone(),
585 ); 605 );
586 606
587 tokio::spawn(async move { 607 tokio::spawn(async move {
diff --git a/src/main.rs b/src/main.rs
index ab6ede7..6769cf3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -130,7 +130,9 @@ async fn main() -> Result<()> {
130 } 130 }
131 131
132 // Get a reference to the rejected events index for shutdown persistence 132 // Get a reference to the rejected events index for shutdown persistence
133 // and for the HTTP server's git push path (hot-cache re-processing)
133 let shutdown_rejected_index = sync_manager.rejected_events_index(); 134 let shutdown_rejected_index = sync_manager.rejected_events_index();
135 let http_rejected_index = shutdown_rejected_index.clone();
134 136
135 tokio::spawn(async move { 137 tokio::spawn(async move {
136 sync_manager.run().await; 138 sync_manager.run().await;
@@ -206,6 +208,9 @@ async fn main() -> Result<()> {
206 // Start HTTP server with integrated relay and database 208 // Start HTTP server with integrated relay and database
207 info!("Starting HTTP server on {}", config.bind_address); 209 info!("Starting HTTP server on {}", config.bind_address);
208 210
211 // Wrap write_policy in Arc for sharing between HTTP server connections
212 let http_write_policy = Arc::new(relay_with_db.write_policy.clone());
213
209 // Run server until shutdown signal, then cleanup 214 // Run server until shutdown signal, then cleanup
210 tokio::select! { 215 tokio::select! {
211 result = http::run_server( 216 result = http::run_server(
@@ -214,6 +219,8 @@ async fn main() -> Result<()> {
214 relay_with_db.database, 219 relay_with_db.database,
215 metrics, 220 metrics,
216 purgatory, 221 purgatory,
222 http_write_policy,
223 http_rejected_index,
217 ) => { 224 ) => {
218 result? 225 result?
219 } 226 }
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 3568e89..ece8cd6 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -474,7 +474,9 @@ impl SyncContext for RealSyncContext {
474 source_repo_path: &Path, 474 source_repo_path: &Path,
475 new_oids: &HashSet<String>, 475 new_oids: &HashSet<String>,
476 ) -> Result<ProcessResult> { 476 ) -> Result<ProcessResult> {
477 // Delegate to the unified function from git::sync 477 // Delegate to the unified function from git::sync.
478 // Pass None for write_policy and rejected_events_index: the purgatory sync path
479 // already handles hot-cache re-processing via SyncManager::process_event_static.
478 let result = crate::git::sync::process_newly_available_git_data( 480 let result = crate::git::sync::process_newly_available_git_data(
479 source_repo_path, 481 source_repo_path,
480 new_oids, 482 new_oids,
@@ -482,6 +484,8 @@ impl SyncContext for RealSyncContext {
482 self.local_relay.as_ref(), 484 self.local_relay.as_ref(),
483 &self.purgatory, 485 &self.purgatory,
484 &self.git_data_path, 486 &self.git_data_path,
487 None,
488 None,
485 ) 489 )
486 .await?; 490 .await?;
487 491