upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/git/authorization.rs59
-rw-r--r--src/git/handlers.rs7
-rw-r--r--src/git/sync.rs234
-rw-r--r--src/http/mod.rs22
-rw-r--r--src/main.rs21
-rw-r--r--src/nostr/builder.rs33
-rw-r--r--src/nostr/policy/announcement.rs273
-rw-r--r--src/nostr/policy/deletion.rs498
-rw-r--r--src/nostr/policy/mod.rs2
-rw-r--r--src/nostr/policy/pr_event.rs8
-rw-r--r--src/nostr/policy/related.rs5
-rw-r--r--src/nostr/policy/state.rs75
-rw-r--r--src/purgatory/mod.rs659
-rw-r--r--src/purgatory/sync/context.rs24
-rw-r--r--src/purgatory/types.rs39
-rw-r--r--src/sync/algorithms.rs58
-rw-r--r--src/sync/filters.rs31
-rw-r--r--src/sync/mod.rs167
-rw-r--r--src/sync/self_subscriber.rs34
19 files changed, 2161 insertions, 88 deletions
diff --git a/src/git/authorization.rs b/src/git/authorization.rs
index 27107db..df780bb 100644
--- a/src/git/authorization.rs
+++ b/src/git/authorization.rs
@@ -287,6 +287,39 @@ pub async fn fetch_repository_data(
287 }) 287 })
288} 288}
289 289
290/// Fetch repository data including announcements from purgatory
291///
292/// This combines database announcements with purgatory announcements,
293/// which is needed for authorization when the announcement hasn't been
294/// promoted yet (no git data has arrived).
295pub async fn fetch_repository_data_with_purgatory(
296 database: &SharedDatabase,
297 purgatory: &crate::purgatory::Purgatory,
298 identifier: &str,
299) -> Result<RepositoryData> {
300 // First, fetch from database
301 let mut repo_data = fetch_repository_data(database, identifier).await?;
302
303 // Then, add announcements from purgatory
304 let purgatory_announcements = purgatory.get_announcements_by_identifier(identifier);
305 let purgatory_count = purgatory_announcements.len();
306
307 for entry in purgatory_announcements {
308 if let Ok(announcement) = RepositoryAnnouncement::from_event(entry.event) {
309 repo_data.announcements.push(announcement);
310 }
311 }
312
313 debug!(
314 "Fetched repository data with purgatory: {} announcements ({} from purgatory), {} states",
315 repo_data.announcements.len(),
316 purgatory_count,
317 repo_data.states.len()
318 );
319
320 Ok(repo_data)
321}
322
290pub fn pubkey_authorised_for_repo_owners( 323pub fn pubkey_authorised_for_repo_owners(
291 pubkey: &PublicKey, 324 pubkey: &PublicKey,
292 db_repo_data: &RepositoryData, 325 db_repo_data: &RepositoryData,
@@ -539,8 +572,9 @@ pub async fn get_state_authorization_for_specific_owner_repo(
539 use crate::git::list_refs; 572 use crate::git::list_refs;
540 use crate::purgatory::RefUpdate; 573 use crate::purgatory::RefUpdate;
541 574
542 // Fetch announcements only - we don't need database states 575 // Fetch announcements from database AND purgatory - needed for authorization
543 let repo_data = fetch_repository_data(database, identifier).await?; 576 // when the announcement hasn't been promoted yet (no git data has arrived)
577 let repo_data = fetch_repository_data_with_purgatory(database, purgatory, identifier).await?;
544 578
545 if repo_data.announcements.is_empty() { 579 if repo_data.announcements.is_empty() {
546 return Ok(AuthorizationResult::denied( 580 return Ok(AuthorizationResult::denied(
@@ -649,6 +683,27 @@ pub async fn get_state_authorization_for_specific_owner_repo(
649 .unwrap_or_else(|_| latest_authorized.pubkey.to_hex()) 683 .unwrap_or_else(|_| latest_authorized.pubkey.to_hex())
650 ); 684 );
651 685
686 // Extend purgatory announcement expiry for the owner.
687 //
688 // Per design doc decision #4: git auth extending a state event's expiry
689 // also extends the announcement's expiry. The repo is actively receiving
690 // git data, so the announcement should not expire prematurely.
691 // This also revives soft-expired announcements (recreates bare repo).
692 if let Ok(owner_pk) = PublicKey::parse(owner_pubkey) {
693 if purgatory.has_purgatory_announcement(&owner_pk, identifier) {
694 purgatory.extend_announcement_expiry(
695 &owner_pk,
696 identifier,
697 std::time::Duration::from_secs(1800),
698 );
699 debug!(
700 identifier = %identifier,
701 owner = %owner_pubkey,
702 "Extended purgatory announcement expiry due to git push authorization"
703 );
704 }
705 }
706
652 return Ok(AuthorizationResult { 707 return Ok(AuthorizationResult {
653 authorized: true, 708 authorized: true,
654 reason: "Authorized by state event in purgatory".to_string(), 709 reason: "Authorized by state event in purgatory".to_string(),
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 28cb47f..f43cbb6 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess;
17 17
18use crate::git::authorization::{authorize_push, parse_pushed_refs}; 18use crate::git::authorization::{authorize_push, parse_pushed_refs};
19use crate::git::sync::process_newly_available_git_data; 19use crate::git::sync::process_newly_available_git_data;
20use crate::nostr::builder::SharedDatabase; 20use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
21use crate::purgatory::Purgatory; 21use crate::purgatory::Purgatory;
22use crate::sync::rejected_index::RejectedEventsIndex;
22 23
23/// Handle GET /info/refs?service=git-{upload,receive}-pack 24/// Handle GET /info/refs?service=git-{upload,receive}-pack
24/// 25///
@@ -258,6 +259,8 @@ pub async fn handle_receive_pack(
258 purgatory: Arc<Purgatory>, 259 purgatory: Arc<Purgatory>,
259 git_data_path: &str, 260 git_data_path: &str,
260 git_protocol: Option<&str>, 261 git_protocol: Option<&str>,
262 write_policy: Arc<Nip34WritePolicy>,
263 rejected_events_index: Arc<RejectedEventsIndex>,
261) -> Result<Response<Full<Bytes>>, GitError> { 264) -> Result<Response<Full<Bytes>>, GitError> {
262 debug!("Handling receive-pack for {:?}", repo_path); 265 debug!("Handling receive-pack for {:?}", repo_path);
263 266
@@ -397,6 +400,8 @@ pub async fn handle_receive_pack(
397 Some(&relay), 400 Some(&relay),
398 &purgatory, 401 &purgatory,
399 git_data_path_buf, 402 git_data_path_buf,
403 Some(&write_policy),
404 Some(&rejected_events_index),
400 ) 405 )
401 .await 406 .await
402 { 407 {
diff --git a/src/git/sync.rs b/src/git/sync.rs
index b1a9b49..c24d16b 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -32,17 +32,20 @@
32use std::collections::{HashMap, HashSet}; 32use std::collections::{HashMap, HashSet};
33use std::path::Path; 33use std::path::Path;
34use std::process::Command; 34use std::process::Command;
35use std::sync::Arc;
35use tracing::{debug, info, warn}; 36use tracing::{debug, info, warn};
36 37
37use nostr_sdk::Event; 38use nostr_sdk::Event;
38 39
39use crate::git::authorization::{ 40use crate::git::authorization::{
40 collect_authorized_maintainers, fetch_repository_data, RepositoryData, 41 collect_authorized_maintainers, fetch_repository_data, fetch_repository_data_with_purgatory,
42 RepositoryData,
41}; 43};
42use crate::git::{self, oid_exists}; 44use crate::git::{self, oid_exists};
43use crate::nostr::builder::SharedDatabase; 45use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
44use crate::nostr::events::RepositoryState; 46use crate::nostr::events::RepositoryState;
45use crate::purgatory::{can_apply_state, Purgatory}; 47use crate::purgatory::{can_apply_state, Purgatory};
48use crate::sync::rejected_index::RejectedEventsIndex;
46 49
47/// Result of processing newly available git data. 50/// Result of processing newly available git data.
48/// 51///
@@ -51,6 +54,8 @@ use crate::purgatory::{can_apply_state, Purgatory};
51/// or from purgatory sync fetching OIDs from remote servers). 54/// or from purgatory sync fetching OIDs from remote servers).
52#[derive(Debug, Default, Clone)] 55#[derive(Debug, Default, Clone)]
53pub struct ProcessResult { 56pub struct ProcessResult {
57 /// Number of announcements released from purgatory
58 pub announcements_released: usize,
54 /// Number of state events released from purgatory 59 /// Number of state events released from purgatory
55 pub states_released: usize, 60 pub states_released: usize,
56 /// Number of PR events released from purgatory 61 /// Number of PR events released from purgatory
@@ -70,11 +75,12 @@ pub struct ProcessResult {
70impl ProcessResult { 75impl ProcessResult {
71 /// Check if any events were released 76 /// Check if any events were released
72 pub fn released_any(&self) -> bool { 77 pub fn released_any(&self) -> bool {
73 self.states_released > 0 || self.prs_released > 0 78 self.announcements_released > 0 || self.states_released > 0 || self.prs_released > 0
74 } 79 }
75 80
76 /// Merge another ProcessResult into this one 81 /// Merge another ProcessResult into this one
77 pub fn merge(&mut self, other: ProcessResult) { 82 pub fn merge(&mut self, other: ProcessResult) {
83 self.announcements_released += other.announcements_released;
78 self.states_released += other.states_released; 84 self.states_released += other.states_released;
79 self.prs_released += other.prs_released; 85 self.prs_released += other.prs_released;
80 self.repos_synced += other.repos_synced; 86 self.repos_synced += other.repos_synced;
@@ -815,6 +821,8 @@ pub async fn process_newly_available_git_data(
815 local_relay: Option<&nostr_relay_builder::LocalRelay>, 821 local_relay: Option<&nostr_relay_builder::LocalRelay>,
816 purgatory: &Purgatory, 822 purgatory: &Purgatory,
817 git_data_path: &Path, 823 git_data_path: &Path,
824 write_policy: Option<&Nip34WritePolicy>,
825 rejected_events_index: Option<&Arc<RejectedEventsIndex>>,
818) -> anyhow::Result<ProcessResult> { 826) -> anyhow::Result<ProcessResult> {
819 let mut result = ProcessResult::default(); 827 let mut result = ProcessResult::default();
820 828
@@ -836,6 +844,20 @@ pub async fn process_newly_available_git_data(
836 "Processing newly available git data" 844 "Processing newly available git data"
837 ); 845 );
838 846
847 // Process announcements from purgatory
848 let announcement_result = process_purgatory_announcements(
849 &identifier,
850 source_repo_path,
851 database,
852 local_relay,
853 purgatory,
854 git_data_path,
855 write_policy,
856 rejected_events_index,
857 )
858 .await;
859 result.merge(announcement_result);
860
839 // Process state events from purgatory 861 // Process state events from purgatory
840 let state_result = process_purgatory_state_events( 862 let state_result = process_purgatory_state_events(
841 &identifier, 863 &identifier,
@@ -863,6 +885,7 @@ pub async fn process_newly_available_git_data(
863 if result.released_any() { 885 if result.released_any() {
864 info!( 886 info!(
865 identifier = %identifier, 887 identifier = %identifier,
888 announcements_released = result.announcements_released,
866 states_released = result.states_released, 889 states_released = result.states_released,
867 prs_released = result.prs_released, 890 prs_released = result.prs_released,
868 repos_synced = result.repos_synced, 891 repos_synced = result.repos_synced,
@@ -907,7 +930,10 @@ async fn process_purgatory_state_events(
907 ); 930 );
908 931
909 // Fetch repository data once for all state events 932 // Fetch repository data once for all state events
910 let mut db_repo_data = match fetch_repository_data(database, identifier).await { 933 // IMPORTANT: Use fetch_repository_data_with_purgatory to include announcements
934 // that may still be in purgatory (not yet promoted). This ensures authorization
935 // works correctly even if the announcement promotion happens in the same batch.
936 let mut db_repo_data = match fetch_repository_data_with_purgatory(database, purgatory, identifier).await {
911 Ok(data) => data, 937 Ok(data) => data,
912 Err(e) => { 938 Err(e) => {
913 warn!( 939 warn!(
@@ -1151,6 +1177,9 @@ async fn process_purgatory_pr_events(
1151 ); 1177 );
1152 1178
1153 // Fetch repository data for syncing 1179 // Fetch repository data for syncing
1180 // NOTE: Only fetch from database, NOT purgatory. PR events should only be
1181 // released from purgatory when the announcement has been promoted (validated).
1182 // This ensures we don't accept PR events for announcements that fail validation.
1154 let db_repo_data = match fetch_repository_data(database, identifier).await { 1183 let db_repo_data = match fetch_repository_data(database, identifier).await {
1155 Ok(data) => data, 1184 Ok(data) => data,
1156 Err(e) => { 1185 Err(e) => {
@@ -1250,6 +1279,195 @@ async fn process_purgatory_pr_events(
1250 result 1279 result
1251} 1280}
1252 1281
1282/// Process announcements from purgatory that can now be promoted.
1283///
1284/// When git data arrives for a repository, any announcements in purgatory
1285/// for that repository should be promoted to the database and served to clients.
1286///
1287/// When `write_policy` and `rejected_events_index` are provided (git push path),
1288/// any maintainer announcements sitting in the hot cache are re-processed immediately
1289/// after the owner announcement is promoted, so they don't wait for the next sync cycle.
1290async fn process_purgatory_announcements(
1291 identifier: &str,
1292 source_repo_path: &Path,
1293 database: &SharedDatabase,
1294 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1295 purgatory: &Purgatory,
1296 git_data_path: &Path,
1297 write_policy: Option<&Nip34WritePolicy>,
1298 rejected_events_index: Option<&Arc<RejectedEventsIndex>>,
1299) -> ProcessResult {
1300 let mut result = ProcessResult::default();
1301
1302 // Extract owner pubkey from the source repo path
1303 let owner_pubkey = match extract_owner_from_repo_path(source_repo_path, git_data_path) {
1304 Some(npub) => npub,
1305 None => {
1306 debug!(
1307 identifier = %identifier,
1308 "Could not extract owner from repo path"
1309 );
1310 return result;
1311 }
1312 };
1313
1314 // Parse the npub back to PublicKey
1315 let owner = match nostr_sdk::PublicKey::parse(&owner_pubkey) {
1316 Ok(pk) => pk,
1317 Err(e) => {
1318 warn!(
1319 identifier = %identifier,
1320 owner_pubkey = %owner_pubkey,
1321 error = %e,
1322 "Failed to parse owner pubkey"
1323 );
1324 result.errors.push(format!("Failed to parse owner pubkey: {}", e));
1325 return result;
1326 }
1327 };
1328
1329 // Check if there's an announcement in purgatory for this owner and identifier
1330 let announcement_event = purgatory.promote_announcement(&owner, identifier);
1331
1332 if let Some(event) = announcement_event {
1333 // Save to database
1334 match database.save_event(&event).await {
1335 Ok(_) => {
1336 info!(
1337 identifier = %identifier,
1338 event_id = %event.id,
1339 "Promoted announcement from purgatory to database"
1340 );
1341
1342 // Notify WebSocket subscribers
1343 if let Some(relay) = local_relay {
1344 if relay.notify_event(event.clone()) {
1345 debug!(
1346 identifier = %identifier,
1347 event_id = %event.id,
1348 "Broadcast announcement event to WebSocket listeners"
1349 );
1350 }
1351 }
1352
1353 result.announcements_released += 1;
1354
1355 // Re-process any maintainer announcements sitting in the hot cache.
1356 //
1357 // When an owner announcement is promoted from purgatory via a git push,
1358 // maintainer announcements that arrived earlier (via relay sync) may have
1359 // been rejected and stored in the hot cache because the owner announcement
1360 // didn't exist in the DB yet. Now that the owner announcement is saved,
1361 // we must invalidate and re-process those cached events immediately.
1362 //
1363 // This only applies on the git push path (write_policy + rejected_events_index
1364 // are Some). The purgatory sync path already handles this via
1365 // SyncManager::process_event_static.
1366 if let (Some(wp), Some(rei), Some(relay)) =
1367 (write_policy, rejected_events_index, local_relay)
1368 {
1369 use crate::nostr::events::RepositoryAnnouncement;
1370 use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult};
1371 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1372
1373 if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) {
1374 if !announcement.maintainers.is_empty() {
1375 debug!(
1376 identifier = %identifier,
1377 event_id = %event.id,
1378 maintainer_count = announcement.maintainers.len(),
1379 "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements"
1380 );
1381
1382 for maintainer_hex in &announcement.maintainers {
1383 match nostr_sdk::PublicKey::from_hex(maintainer_hex) {
1384 Ok(maintainer_pubkey) => {
1385 let (removed, hot_events) = rei.invalidate_and_get(
1386 &maintainer_pubkey,
1387 &announcement.identifier,
1388 Some(crate::sync::rejected_index::EventType::Announcement),
1389 );
1390
1391 if removed > 0 {
1392 info!(
1393 maintainer = %maintainer_hex,
1394 identifier = %announcement.identifier,
1395 removed_from_cold_index = removed,
1396 hot_cache_events = hot_events.len(),
1397 "Invalidated rejected maintainer announcements after git push promotion"
1398 );
1399 }
1400
1401 // Re-process events from hot cache
1402 let dummy_addr = SocketAddr::new(
1403 IpAddr::V4(Ipv4Addr::LOCALHOST),
1404 0,
1405 );
1406 for hot_event in hot_events {
1407 info!(
1408 event_id = %hot_event.id,
1409 maintainer = %maintainer_hex,
1410 identifier = %announcement.identifier,
1411 "Re-processing maintainer announcement from hot cache after git push promotion"
1412 );
1413 match wp.admit_event(&hot_event, &dummy_addr).await {
1414 WritePolicyResult::Accept => {
1415 match database.save_event(&hot_event).await {
1416 Ok(_) => {
1417 relay.notify_event(hot_event.clone());
1418 info!(
1419 event_id = %hot_event.id,
1420 "Maintainer announcement accepted and saved on re-processing"
1421 );
1422 }
1423 Err(e) => {
1424 warn!(
1425 event_id = %hot_event.id,
1426 error = %e,
1427 "Failed to save re-processed maintainer announcement"
1428 );
1429 }
1430 }
1431 }
1432 _ => {
1433 warn!(
1434 event_id = %hot_event.id,
1435 "Maintainer announcement still rejected on re-processing"
1436 );
1437 }
1438 }
1439 }
1440 }
1441 Err(e) => {
1442 warn!(
1443 maintainer_hex = %maintainer_hex,
1444 error = %e,
1445 "Invalid maintainer public key in promoted announcement"
1446 );
1447 }
1448 }
1449 }
1450 }
1451 }
1452 }
1453 }
1454 Err(e) => {
1455 warn!(
1456 identifier = %identifier,
1457 event_id = %event.id,
1458 error = %e,
1459 "Failed to save announcement to database"
1460 );
1461 result
1462 .errors
1463 .push(format!("Failed to save announcement: {}", e));
1464 }
1465 }
1466 }
1467
1468 result
1469}
1470
1253/// Extract owner pubkey from a repository path. 1471/// Extract owner pubkey from a repository path.
1254/// 1472///
1255/// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. 1473/// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub.
@@ -1271,6 +1489,7 @@ mod tests {
1271 #[test] 1489 #[test]
1272 fn test_process_result_default() { 1490 fn test_process_result_default() {
1273 let result = ProcessResult::default(); 1491 let result = ProcessResult::default();
1492 assert_eq!(result.announcements_released, 0);
1274 assert_eq!(result.states_released, 0); 1493 assert_eq!(result.states_released, 0);
1275 assert_eq!(result.prs_released, 0); 1494 assert_eq!(result.prs_released, 0);
1276 assert_eq!(result.repos_synced, 0); 1495 assert_eq!(result.repos_synced, 0);
@@ -1282,6 +1501,10 @@ mod tests {
1282 let mut result = ProcessResult::default(); 1501 let mut result = ProcessResult::default();
1283 assert!(!result.released_any()); 1502 assert!(!result.released_any());
1284 1503
1504 result.announcements_released = 1;
1505 assert!(result.released_any());
1506
1507 result.announcements_released = 0;
1285 result.states_released = 1; 1508 result.states_released = 1;
1286 assert!(result.released_any()); 1509 assert!(result.released_any());
1287 1510
@@ -1293,6 +1516,7 @@ mod tests {
1293 #[test] 1516 #[test]
1294 fn test_process_result_merge() { 1517 fn test_process_result_merge() {
1295 let mut result1 = ProcessResult { 1518 let mut result1 = ProcessResult {
1519 announcements_released: 0,
1296 states_released: 1, 1520 states_released: 1,
1297 prs_released: 2, 1521 prs_released: 2,
1298 repos_synced: 3, 1522 repos_synced: 3,
@@ -1303,6 +1527,7 @@ mod tests {
1303 }; 1527 };
1304 1528
1305 let result2 = ProcessResult { 1529 let result2 = ProcessResult {
1530 announcements_released: 5,
1306 states_released: 10, 1531 states_released: 10,
1307 prs_released: 20, 1532 prs_released: 20,
1308 repos_synced: 30, 1533 repos_synced: 30,
@@ -1314,6 +1539,7 @@ mod tests {
1314 1539
1315 result1.merge(result2); 1540 result1.merge(result2);
1316 1541
1542 assert_eq!(result1.announcements_released, 5);
1317 assert_eq!(result1.states_released, 11); 1543 assert_eq!(result1.states_released, 11);
1318 assert_eq!(result1.prs_released, 22); 1544 assert_eq!(result1.prs_released, 22);
1319 assert_eq!(result1.repos_synced, 33); 1545 assert_eq!(result1.repos_synced, 33);
diff --git a/src/http/mod.rs b/src/http/mod.rs
index edc28a3..76ffef3 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -26,8 +26,9 @@ use tokio::net::TcpListener;
26use crate::config::Config; 26use crate::config::Config;
27use crate::git; 27use crate::git;
28use crate::metrics::Metrics; 28use crate::metrics::Metrics;
29use crate::nostr::builder::SharedDatabase; 29use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
30use crate::purgatory::Purgatory; 30use crate::purgatory::Purgatory;
31use crate::sync::rejected_index::RejectedEventsIndex;
31 32
32/// CORS headers required by GRASP-01 specification (lines 40-47) 33/// CORS headers required by GRASP-01 specification (lines 40-47)
33const CORS_ALLOW_ORIGIN: &str = "*"; 34const CORS_ALLOW_ORIGIN: &str = "*";
@@ -97,6 +98,10 @@ struct HttpService {
97 metrics: Option<Arc<Metrics>>, 98 metrics: Option<Arc<Metrics>>,
98 /// Purgatory for event/git coordination 99 /// Purgatory for event/git coordination
99 purgatory: Arc<Purgatory>, 100 purgatory: Arc<Purgatory>,
101 /// Write policy for re-processing hot-cache events after git push promotion
102 write_policy: Arc<Nip34WritePolicy>,
103 /// Rejected events index for hot-cache re-processing after git push promotion
104 rejected_events_index: Arc<RejectedEventsIndex>,
100} 105}
101 106
102impl HttpService { 107impl HttpService {
@@ -107,6 +112,8 @@ impl HttpService {
107 database: SharedDatabase, 112 database: SharedDatabase,
108 metrics: Option<Arc<Metrics>>, 113 metrics: Option<Arc<Metrics>>,
109 purgatory: Arc<Purgatory>, 114 purgatory: Arc<Purgatory>,
115 write_policy: Arc<Nip34WritePolicy>,
116 rejected_events_index: Arc<RejectedEventsIndex>,
110 ) -> Self { 117 ) -> Self {
111 Self { 118 Self {
112 relay, 119 relay,
@@ -115,6 +122,8 @@ impl HttpService {
115 database, 122 database,
116 metrics, 123 metrics,
117 purgatory, 124 purgatory,
125 write_policy,
126 rejected_events_index,
118 } 127 }
119 } 128 }
120} 129}
@@ -132,6 +141,8 @@ impl Service<Request<Incoming>> for HttpService {
132 let git_data_path = self.config.effective_git_data_path(); 141 let git_data_path = self.config.effective_git_data_path();
133 let database = self.database.clone(); 142 let database = self.database.clone();
134 let purgatory = self.purgatory.clone(); 143 let purgatory = self.purgatory.clone();
144 let write_policy = self.write_policy.clone();
145 let rejected_events_index = self.rejected_events_index.clone();
135 146
136 // Handle OPTIONS preflight requests (CORS) 147 // Handle OPTIONS preflight requests (CORS)
137 // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content 148 // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content
@@ -293,6 +304,8 @@ impl Service<Request<Incoming>> for HttpService {
293 purgatory.clone(), 304 purgatory.clone(),
294 &git_data_path, 305 &git_data_path,
295 git_protocol.as_deref(), 306 git_protocol.as_deref(),
307 write_policy.clone(),
308 rejected_events_index.clone(),
296 ) 309 )
297 .await; 310 .await;
298 311
@@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String {
557/// * `relay` - The LocalRelay for WebSocket connections 570/// * `relay` - The LocalRelay for WebSocket connections
558/// * `database` - The database for direct queries (e.g., push authorization) 571/// * `database` - The database for direct queries (e.g., push authorization)
559/// * `metrics` - Optional metrics for Prometheus endpoint 572/// * `metrics` - Optional metrics for Prometheus endpoint
573/// * `purgatory` - Purgatory for event/git coordination
574/// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion
575/// * `rejected_events_index` - Rejected events index for hot-cache re-processing
560pub async fn run_server( 576pub async fn run_server(
561 config: Config, 577 config: Config,
562 relay: LocalRelay, 578 relay: LocalRelay,
563 database: SharedDatabase, 579 database: SharedDatabase,
564 metrics: Option<Arc<Metrics>>, 580 metrics: Option<Arc<Metrics>>,
565 purgatory: Arc<Purgatory>, 581 purgatory: Arc<Purgatory>,
582 write_policy: Arc<Nip34WritePolicy>,
583 rejected_events_index: Arc<RejectedEventsIndex>,
566) -> anyhow::Result<()> { 584) -> anyhow::Result<()> {
567 let bind_addr: SocketAddr = config.bind_address.parse()?; 585 let bind_addr: SocketAddr = config.bind_address.parse()?;
568 586
@@ -582,6 +600,8 @@ pub async fn run_server(
582 database.clone(), 600 database.clone(),
583 metrics.clone(), 601 metrics.clone(),
584 purgatory.clone(), 602 purgatory.clone(),
603 write_policy.clone(),
604 rejected_events_index.clone(),
585 ); 605 );
586 606
587 tokio::spawn(async move { 607 tokio::spawn(async move {
diff --git a/src/main.rs b/src/main.rs
index dd2c903..bf3aefb 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -130,7 +130,9 @@ async fn main() -> Result<()> {
130 } 130 }
131 131
132 // Get a reference to the rejected events index for shutdown persistence 132 // Get a reference to the rejected events index for shutdown persistence
133 // and for the HTTP server's git push path (hot-cache re-processing)
133 let shutdown_rejected_index = sync_manager.rejected_events_index(); 134 let shutdown_rejected_index = sync_manager.rejected_events_index();
135 let http_rejected_index = shutdown_rejected_index.clone();
134 136
135 tokio::spawn(async move { 137 tokio::spawn(async move {
136 sync_manager.run().await; 138 sync_manager.run().await;
@@ -142,11 +144,11 @@ async fn main() -> Result<()> {
142 let mut interval = tokio::time::interval(Duration::from_secs(60)); 144 let mut interval = tokio::time::interval(Duration::from_secs(60));
143 loop { 145 loop {
144 interval.tick().await; 146 interval.tick().await;
145 let (state_removed, pr_removed) = cleanup_purgatory.cleanup(); 147 let (announcement_removed, state_removed, pr_removed) = cleanup_purgatory.cleanup();
146 if state_removed > 0 || pr_removed > 0 { 148 if announcement_removed > 0 || state_removed > 0 || pr_removed > 0 {
147 info!( 149 info!(
148 "Purgatory cleanup: removed {} state events, {} PR events", 150 "Purgatory cleanup: removed {} announcements, {} state events, {} PR events",
149 state_removed, pr_removed 151 announcement_removed, state_removed, pr_removed
150 ); 152 );
151 } 153 }
152 } 154 }
@@ -206,12 +208,15 @@ async fn main() -> Result<()> {
206 // Start HTTP server with integrated relay and database 208 // Start HTTP server with integrated relay and database
207 info!("Starting HTTP server on {}", config.bind_address); 209 info!("Starting HTTP server on {}", config.bind_address);
208 210
211 // Wrap write_policy in Arc for sharing between HTTP server connections
212 let http_write_policy = Arc::new(relay_with_db.write_policy.clone());
213
209 // Run server until shutdown signal, then cleanup 214 // Run server until shutdown signal, then cleanup
210 #[cfg(unix)] 215 #[cfg(unix)]
211 { 216 {
212 use tokio::signal::unix::{signal, SignalKind}; 217 use tokio::signal::unix::{signal, SignalKind};
213 let mut sigterm = signal(SignalKind::terminate())?; 218 let mut sigterm = signal(SignalKind::terminate())?;
214 219
215 tokio::select! { 220 tokio::select! {
216 result = http::run_server( 221 result = http::run_server(
217 config, 222 config,
@@ -219,6 +224,8 @@ async fn main() -> Result<()> {
219 relay_with_db.database, 224 relay_with_db.database,
220 metrics, 225 metrics,
221 purgatory, 226 purgatory,
227 http_write_policy,
228 http_rejected_index,
222 ) => { 229 ) => {
223 result? 230 result?
224 } 231 }
@@ -230,7 +237,7 @@ async fn main() -> Result<()> {
230 } 237 }
231 } 238 }
232 } 239 }
233 240
234 #[cfg(not(unix))] 241 #[cfg(not(unix))]
235 { 242 {
236 tokio::select! { 243 tokio::select! {
@@ -240,6 +247,8 @@ async fn main() -> Result<()> {
240 relay_with_db.database, 247 relay_with_db.database,
241 metrics, 248 metrics,
242 purgatory, 249 purgatory,
250 http_write_policy,
251 http_rejected_index,
243 ) => { 252 ) => {
244 result? 253 result?
245 } 254 }
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 713c129..7a05348 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -14,10 +14,11 @@ use nostr_relay_builder::prelude::*;
14use crate::config::{Config, DatabaseBackend}; 14use crate::config::{Config, DatabaseBackend};
15use crate::nostr::events::RepositoryAnnouncement; 15use crate::nostr::events::RepositoryAnnouncement;
16use crate::nostr::policy::{ 16use crate::nostr::policy::{
17 AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, 17 AnnouncementPolicy, AnnouncementResult, DeletionPolicy, PolicyContext, PrEventPolicy,
18 RelatedEventPolicy, StatePolicy, StateResult, 18 ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult,
19}; 19};
20 20
21
21/// Type alias for the shared database used by the relay 22/// Type alias for the shared database used by the relay
22pub type SharedDatabase = Arc<dyn NostrDatabase>; 23pub type SharedDatabase = Arc<dyn NostrDatabase>;
23 24
@@ -28,6 +29,7 @@ pub type SharedDatabase = Arc<dyn NostrDatabase>;
28/// - `StatePolicy` - State event validation + ref alignment 29/// - `StatePolicy` - State event validation + ref alignment
29/// - `PrEventPolicy` - PR/PR Update validation 30/// - `PrEventPolicy` - PR/PR Update validation
30/// - `RelatedEventPolicy` - Forward/backward reference checking 31/// - `RelatedEventPolicy` - Forward/backward reference checking
32/// - `DeletionPolicy` - NIP-09 event deletion request handling
31/// 33///
32/// Uses stateful database queries to check event relationships. 34/// Uses stateful database queries to check event relationships.
33#[derive(Clone)] 35#[derive(Clone)]
@@ -37,6 +39,7 @@ pub struct Nip34WritePolicy {
37 state_policy: StatePolicy, 39 state_policy: StatePolicy,
38 pr_event_policy: PrEventPolicy, 40 pr_event_policy: PrEventPolicy,
39 related_event_policy: RelatedEventPolicy, 41 related_event_policy: RelatedEventPolicy,
42 deletion_policy: DeletionPolicy,
40} 43}
41 44
42impl std::fmt::Debug for Nip34WritePolicy { 45impl std::fmt::Debug for Nip34WritePolicy {
@@ -68,6 +71,7 @@ impl Nip34WritePolicy {
68 state_policy: StatePolicy::new(ctx.clone()), 71 state_policy: StatePolicy::new(ctx.clone()),
69 pr_event_policy: PrEventPolicy::new(ctx.clone()), 72 pr_event_policy: PrEventPolicy::new(ctx.clone()),
70 related_event_policy: RelatedEventPolicy::new(ctx.clone()), 73 related_event_policy: RelatedEventPolicy::new(ctx.clone()),
74 deletion_policy: DeletionPolicy::new(ctx.clone()),
71 ctx, 75 ctx,
72 } 76 }
73 } 77 }
@@ -205,6 +209,30 @@ impl Nip34WritePolicy {
205 } 209 }
206 } 210 }
207 } 211 }
212 AnnouncementResult::AcceptPurgatory => {
213 // New announcement - add to purgatory
214 match self.announcement_policy.add_to_purgatory(event) {
215 Ok(()) => {
216 tracing::info!(
217 "Accepted announcement to purgatory: {} (waiting for git data)",
218 event_id_str
219 );
220
221 WritePolicyResult::Reject {
222 status: true, // Client sees OK
223 message: "purgatory: won't be served until git data arrives".into(),
224 }
225 }
226 Err(e) => {
227 tracing::warn!(
228 "Failed to add announcement to purgatory {}: {}",
229 event_id_str,
230 e
231 );
232 WritePolicyResult::reject(e)
233 }
234 }
235 }
208 AnnouncementResult::AcceptMaintainer => { 236 AnnouncementResult::AcceptMaintainer => {
209 // Parse announcement to get details for logging 237 // Parse announcement to get details for logging
210 match RepositoryAnnouncement::from_event(event.clone()) { 238 match RepositoryAnnouncement::from_event(event.clone()) {
@@ -621,6 +649,7 @@ impl WritePolicy for Nip34WritePolicy {
621 ); 649 );
622 WritePolicyResult::Accept 650 WritePolicyResult::Accept
623 } 651 }
652 Kind::EventDeletion => self.deletion_policy.handle(event).await,
624 _ => self.handle_related_event(event, "Event").await, 653 _ => self.handle_related_event(event, "Event").await,
625 } 654 }
626 }) 655 })
diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs
index 15a6e58..b366f0b 100644
--- a/src/nostr/policy/announcement.rs
+++ b/src/nostr/policy/announcement.rs
@@ -3,6 +3,8 @@
3/// Handles validation of NIP-34 repository announcements (kind 30617) 3/// Handles validation of NIP-34 repository announcements (kind 30617)
4/// according to GRASP-01 specification. 4/// according to GRASP-01 specification.
5use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; 5use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag};
6use std::collections::HashSet;
7use std::time::Duration;
6 8
7use super::PolicyContext; 9use super::PolicyContext;
8use crate::config::Config; 10use crate::config::Config;
@@ -11,12 +13,14 @@ use crate::nostr::events::{validate_announcement, RepositoryAnnouncement};
11/// Result of announcement policy evaluation 13/// Result of announcement policy evaluation
12#[derive(Debug, Clone, PartialEq)] 14#[derive(Debug, Clone, PartialEq)]
13pub enum AnnouncementResult { 15pub enum AnnouncementResult {
14 /// Accept: Event lists our service (GRASP-01 compliant) 16 /// Accept: Event lists our service (GRASP-01 compliant) - replacement announcement
15 Accept, 17 Accept,
16 /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer) 18 /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer)
17 AcceptMaintainer, 19 AcceptMaintainer,
18 /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only) 20 /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only)
19 AcceptArchive, 21 AcceptArchive,
22 /// Accept to purgatory: New announcement, waiting for git data
23 AcceptPurgatory,
20 /// Reject: Event fails validation with reason 24 /// Reject: Event fails validation with reason
21 Reject(String), 25 Reject(String),
22} 26}
@@ -35,10 +39,13 @@ impl AnnouncementPolicy {
35 39
36 /// Validate a repository announcement event 40 /// Validate a repository announcement event
37 /// 41 ///
38 /// Returns `Accept` if the announcement lists the service properly, 42 /// Returns:
39 /// `AcceptMaintainer` if accepted via maintainer exception, 43 /// - `Accept` if this is a replacement announcement (active announcement exists in DB or
40 /// `AcceptArchive` if accepted via GRASP-05 archive config, 44 /// purgatory)
41 /// or `Reject` with reason. 45 /// - `AcceptPurgatory` if this is a new announcement (no active announcement exists)
46 /// - `AcceptMaintainer` if accepted via maintainer exception
47 /// - `AcceptArchive` if accepted via GRASP-05 archive config
48 /// - `Reject` with reason if validation fails
42 pub async fn validate(&self, event: &Event) -> AnnouncementResult { 49 pub async fn validate(&self, event: &Event) -> AnnouncementResult {
43 // First, try validation (GRASP-01 + GRASP-05) 50 // First, try validation (GRASP-01 + GRASP-05)
44 let validation_result = validate_announcement(event, &self.config); 51 let validation_result = validate_announcement(event, &self.config);
@@ -49,6 +56,23 @@ impl AnnouncementPolicy {
49 // GRASP-01 Exception: Accept announcements from recursive maintainers 56 // GRASP-01 Exception: Accept announcements from recursive maintainers
50 match RepositoryAnnouncement::from_event(event.clone()) { 57 match RepositoryAnnouncement::from_event(event.clone()) {
51 Ok(announcement) => { 58 Ok(announcement) => {
59 // If this pubkey+identifier has a purgatory entry AND the incoming
60 // event is strictly newer, the owner is sending a replacement that
61 // removes our service. Clear the purgatory entry and its bare repo.
62 //
63 // If the incoming event is older than the purgatory entry (e.g. a
64 // relay replay of a superseded announcement), ignore it — the newer
65 // purgatory entry takes precedence and must not be evicted.
66 let should_evict = self
67 .ctx
68 .purgatory
69 .find_announcement(&event.pubkey, &announcement.identifier)
70 .is_some_and(|entry| event.created_at > entry.event.created_at);
71
72 if should_evict {
73 self.remove_purgatory_announcement(&event.pubkey, &announcement.identifier);
74 }
75
52 match self 76 match self
53 .is_maintainer_in_any_announcement( 77 .is_maintainer_in_any_announcement(
54 &announcement.identifier, 78 &announcement.identifier,
@@ -67,11 +91,221 @@ impl AnnouncementPolicy {
67 Err(_) => AnnouncementResult::Reject(reason), 91 Err(_) => AnnouncementResult::Reject(reason),
68 } 92 }
69 } 93 }
70 // Accept, AcceptArchive, or AcceptMaintainer - return as-is 94 AnnouncementResult::Accept | AnnouncementResult::AcceptArchive => {
95 // Parse announcement to check for existing active announcement
96 match RepositoryAnnouncement::from_event(event.clone()) {
97 Ok(announcement) => {
98 let in_db = match self
99 .has_db_announcement(&event.pubkey, &announcement.identifier)
100 .await
101 {
102 Ok(v) => v,
103 Err(e) => {
104 tracing::warn!(
105 error = %e,
106 "Failed to check for existing DB announcement - rejecting"
107 );
108 return AnnouncementResult::Reject(format!(
109 "Database error checking existing announcement: {}",
110 e
111 ));
112 }
113 };
114
115 if in_db {
116 // Replacement announcement with DB entry - accept immediately
117 tracing::debug!(
118 identifier = %announcement.identifier,
119 "Replacement announcement (DB) - accepting immediately"
120 );
121 return validation_result;
122 }
123
124 let in_purgatory = self
125 .ctx
126 .purgatory
127 .has_purgatory_announcement(&event.pubkey, &announcement.identifier);
128
129 if in_purgatory {
130 // Replacement announcement with purgatory entry - replace it and
131 // extend expiry so the new announcement gets a fresh 30-minute window.
132 tracing::debug!(
133 identifier = %announcement.identifier,
134 "Replacement announcement (purgatory) - replacing purgatory entry"
135 );
136 self.replace_purgatory_announcement(event, &announcement);
137 // Return Accept (not AcceptPurgatory) - this is a replacement, not new
138 return validation_result;
139 }
140
141 // No existing announcement - route to purgatory
142 tracing::debug!(
143 identifier = %announcement.identifier,
144 "New announcement - routing to purgatory"
145 );
146 AnnouncementResult::AcceptPurgatory
147 }
148 Err(e) => AnnouncementResult::Reject(format!(
149 "Failed to parse announcement: {}",
150 e
151 )),
152 }
153 }
154 // AcceptPurgatory shouldn't come from validate_announcement, but handle it
71 result => result, 155 result => result,
72 } 156 }
73 } 157 }
74 158
159 /// Replace a purgatory announcement entry with a newer event.
160 ///
161 /// Called when a replacement announcement arrives for a (pubkey, identifier) pair
162 /// that is currently in purgatory. Updates the purgatory entry and extends the
163 /// expiry so the new announcement has a fresh waiting window.
164 fn replace_purgatory_announcement(
165 &self,
166 event: &Event,
167 announcement: &RepositoryAnnouncement,
168 ) {
169 let repo_path = self.ctx.git_data_path.join(announcement.repo_path());
170 let relays: HashSet<String> = announcement.relays.iter().cloned().collect();
171
172 // add_announcement uses the (owner, identifier) key so it overwrites the old entry
173 self.ctx.purgatory.add_announcement(
174 event.clone(),
175 announcement.identifier.clone(),
176 event.pubkey,
177 repo_path,
178 relays,
179 );
180
181 // Extend the announcement's expiry (reset to full 30 min window)
182 self.ctx.purgatory.extend_announcement_expiry(
183 &event.pubkey,
184 &announcement.identifier,
185 Duration::from_secs(1800),
186 );
187
188 // Also extend any state events waiting for this identifier
189 let state_entries = self.ctx.purgatory.find_state(&announcement.identifier);
190 if !state_entries.is_empty() {
191 let state_ids: Vec<_> = state_entries.iter().map(|e| e.event.id).collect();
192 self.ctx.purgatory.extend_expiry(
193 &announcement.identifier,
194 &state_ids,
195 Duration::from_secs(1800),
196 );
197 }
198 }
199
200 /// Remove a purgatory announcement and clean up associated resources.
201 ///
202 /// Called when a replacement announcement is rejected (owner removed our service).
203 /// Deletes the bare repository from disk and removes any state events waiting for
204 /// this identifier.
205 fn remove_purgatory_announcement(&self, pubkey: &PublicKey, identifier: &str) {
206 // Get the repo path before removing from purgatory
207 if let Some(entry) = self.ctx.purgatory.find_announcement(pubkey, identifier) {
208 // Delete the bare repository from disk
209 if entry.repo_path.exists() {
210 if let Err(e) = std::fs::remove_dir_all(&entry.repo_path) {
211 tracing::warn!(
212 path = %entry.repo_path.display(),
213 error = %e,
214 "Failed to delete bare repository during purgatory cleanup"
215 );
216 } else {
217 tracing::info!(
218 path = %entry.repo_path.display(),
219 "Deleted bare repository for rejected purgatory announcement"
220 );
221 }
222 }
223 }
224
225 // Remove the announcement from purgatory
226 self.ctx.purgatory.remove_announcement(pubkey, identifier);
227
228 // Only remove state events if no other owner still has an announcement in purgatory
229 // for this identifier. State events are keyed by identifier alone, so blindly removing
230 // them would also discard state events legitimately belonging to a different owner's
231 // repository that happens to share the same identifier string.
232 let other_owners_remain = !self
233 .ctx
234 .purgatory
235 .get_announcements_by_identifier(identifier)
236 .is_empty();
237
238 if !other_owners_remain {
239 self.ctx.purgatory.remove_state(identifier);
240 }
241
242 tracing::info!(
243 identifier = %identifier,
244 other_owners_remain = %other_owners_remain,
245 "Cleared purgatory entry: owner removed our service from announcement"
246 );
247 }
248
249 /// Check if there's an announcement in the database for this (pubkey, identifier).
250 ///
251 /// Only checks the database (promoted announcements). For purgatory checks use
252 /// `purgatory.has_purgatory_announcement()` directly.
253 async fn has_db_announcement(
254 &self,
255 pubkey: &PublicKey,
256 identifier: &str,
257 ) -> Result<bool, String> {
258 let filter = Filter::new()
259 .kind(Kind::GitRepoAnnouncement)
260 .author(*pubkey)
261 .custom_tag(
262 SingleLetterTag::lowercase(Alphabet::D),
263 identifier.to_string(),
264 );
265
266 let events: Vec<Event> = match self.ctx.database.query(filter).await {
267 Ok(events) => events.into_iter().collect(),
268 Err(e) => return Err(format!("Database query failed: {}", e)),
269 };
270
271 Ok(!events.is_empty())
272 }
273
274 /// Add an announcement to purgatory
275 ///
276 /// Creates the bare repository and stores the announcement in purgatory
277 /// until git data arrives.
278 pub fn add_to_purgatory(&self, event: &Event) -> Result<(), String> {
279 let announcement = RepositoryAnnouncement::from_event(event.clone())
280 .map_err(|e| format!("Failed to parse announcement: {}", e))?;
281
282 // Create bare repository
283 self.ensure_bare_repository(&announcement)?;
284
285 // Build repo path
286 let repo_path = self.ctx.git_data_path.join(announcement.repo_path());
287
288 // Extract relays from announcement
289 let relays: HashSet<String> = announcement.relays.iter().cloned().collect();
290
291 // Add to purgatory
292 self.ctx.purgatory.add_announcement(
293 event.clone(),
294 announcement.identifier.clone(),
295 event.pubkey,
296 repo_path,
297 relays,
298 );
299
300 tracing::info!(
301 identifier = %announcement.identifier,
302 event_id = %event.id,
303 "Added announcement to purgatory"
304 );
305
306 Ok(())
307 }
308
75 /// Create a bare git repository if it doesn't exist 309 /// Create a bare git repository if it doesn't exist
76 /// Path format: <git_data_path>/<npub>/<identifier>.git 310 /// Path format: <git_data_path>/<npub>/<identifier>.git
77 pub fn ensure_bare_repository( 311 pub fn ensure_bare_repository(
@@ -117,6 +351,11 @@ impl AnnouncementPolicy {
117 /// 351 ///
118 /// This enables accepting announcements from maintainers even when they don't list 352 /// This enables accepting announcements from maintainers even when they don't list
119 /// this GRASP server, for maintainer chain discovery and GRASP-02 sync. 353 /// this GRASP server, for maintainer chain discovery and GRASP-02 sync.
354 ///
355 /// Checks both the database (promoted announcements) and purgatory (announcements
356 /// waiting for git data). This is necessary because a maintainer's announcement
357 /// (which lists the recursive maintainer) may still be in purgatory when the
358 /// recursive maintainer's announcement arrives.
120 async fn is_maintainer_in_any_announcement( 359 async fn is_maintainer_in_any_announcement(
121 &self, 360 &self,
122 identifier: &str, 361 identifier: &str,
@@ -128,12 +367,26 @@ impl AnnouncementPolicy {
128 identifier.to_string(), 367 identifier.to_string(),
129 ); 368 );
130 369
131 let announcements: Vec<Event> = match self.ctx.database.query(filter).await { 370 let db_announcements: Vec<Event> = match self.ctx.database.query(filter).await {
132 Ok(events) => events.into_iter().collect(), 371 Ok(events) => events.into_iter().collect(),
133 Err(e) => return Err(format!("Database query failed: {}", e)), 372 Err(e) => return Err(format!("Database query failed: {}", e)),
134 }; 373 };
135 374
136 if announcements.is_empty() { 375 // Also collect purgatory announcements for this identifier
376 let purgatory_announcements: Vec<Event> = self
377 .ctx
378 .purgatory
379 .get_announcements_by_identifier(identifier)
380 .into_iter()
381 .map(|entry| entry.event)
382 .collect();
383
384 let all_announcements: Vec<&Event> = db_announcements
385 .iter()
386 .chain(purgatory_announcements.iter())
387 .collect();
388
389 if all_announcements.is_empty() {
137 // No existing announcements for this identifier - author cannot be a maintainer 390 // No existing announcements for this identifier - author cannot be a maintainer
138 return Ok(false); 391 return Ok(false);
139 } 392 }
@@ -141,14 +394,14 @@ impl AnnouncementPolicy {
141 let author_hex = author.to_hex(); 394 let author_hex = author.to_hex();
142 395
143 // Check each announcement to see if author is listed as a maintainer 396 // Check each announcement to see if author is listed as a maintainer
144 for event in &announcements { 397 for event in &all_announcements {
145 // Check if author is the owner of this announcement 398 // Check if author is the owner of this announcement
146 if event.pubkey == *author { 399 if event.pubkey == *author {
147 return Ok(true); 400 return Ok(true);
148 } 401 }
149 402
150 // Check if author is listed in the maintainers tag 403 // Check if author is listed in the maintainers tag
151 if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { 404 if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) {
152 if announcement.maintainers.contains(&author_hex) { 405 if announcement.maintainers.contains(&author_hex) {
153 return Ok(true); 406 return Ok(true);
154 } 407 }
diff --git a/src/nostr/policy/deletion.rs b/src/nostr/policy/deletion.rs
new file mode 100644
index 0000000..6457c90
--- /dev/null
+++ b/src/nostr/policy/deletion.rs
@@ -0,0 +1,498 @@
1/// Deletion Policy - NIP-09 event deletion request handling
2///
3/// Handles kind 5 (EventDeletion) events that request removal of purgatory entries
4/// for repository announcements (kind 30617) and state events (kind 30618).
5///
6/// ## NIP-09 Rules Enforced
7///
8/// - Only the event author can delete their own events (pubkey must match)
9/// - `e` tags reference specific event IDs to delete
10/// - `a` tags reference addressable events by coordinate (`<kind>:<pubkey>:<d-identifier>`)
11/// - When an `a` tag is used, all versions up to `created_at` of the deletion request
12/// are considered deleted
13///
14/// ## Purgatory Interaction
15///
16/// - Kind 30617 (announcement) in purgatory: entry removed, bare repo deleted from disk
17/// - Kind 30618 (state event) in purgatory: matching state event(s) removed by event ID
18/// or by (author, identifier) coordinate
19use nostr_relay_builder::prelude::{Event, WritePolicyResult};
20
21use super::PolicyContext;
22
23/// Policy for handling NIP-09 event deletion requests
24#[derive(Clone)]
25pub struct DeletionPolicy {
26 ctx: PolicyContext,
27}
28
29impl DeletionPolicy {
30 pub fn new(ctx: PolicyContext) -> Self {
31 Self { ctx }
32 }
33
34 /// Process a kind 5 (EventDeletion) event.
35 ///
36 /// Checks whether the deletion request targets any purgatory announcements
37 /// and removes them if so. The deletion event itself is always accepted
38 /// (relays should store deletion requests per NIP-09).
39 ///
40 /// Only the event author can delete their own events — this is enforced by
41 /// checking that the purgatory entry's owner matches `event.pubkey`.
42 pub async fn handle(&self, event: &Event) -> WritePolicyResult {
43 // Process purgatory removals synchronously (no async needed)
44 self.remove_purgatory_targets(event);
45
46 // Always accept the deletion event itself so it is stored and
47 // can prevent re-acceptance of the deleted event in the future.
48 WritePolicyResult::Accept
49 }
50
51 /// Remove any purgatory entries targeted by this deletion event.
52 ///
53 /// Handles both reference styles from NIP-09:
54 /// - `e` tags: event ID references — match against announcement or state event IDs
55 /// - `a` tags: addressable coordinate references — `30617:…` or `30618:…`
56 ///
57 /// Only removes entries where the purgatory entry's author matches the deletion
58 /// event's pubkey (enforces author-only deletion).
59 fn remove_purgatory_targets(&self, event: &Event) {
60 let author = &event.pubkey;
61
62 for tag in event.tags.iter() {
63 let tag_vec = tag.as_slice();
64 if tag_vec.len() < 2 {
65 continue;
66 }
67
68 match tag_vec[0].as_str() {
69 "e" => {
70 // Event ID reference: find purgatory announcement with this event ID
71 let target_id = &tag_vec[1];
72 self.remove_by_event_id(author, target_id, event.created_at.as_secs());
73 }
74 "a" => {
75 // Addressable coordinate reference: `<kind>:<pubkey>:<d-identifier>`
76 let coord = &tag_vec[1];
77 self.remove_by_coordinate(author, coord, event.created_at.as_secs());
78 }
79 _ => {}
80 }
81 }
82 }
83
84 /// Remove a purgatory entry (announcement, state event, or PR event) matched by event ID.
85 ///
86 /// Checks in order: announcements (30617), state events (30618), PR/PR-update events.
87 /// Only removes entries whose author matches `author`.
88 fn remove_by_event_id(
89 &self,
90 author: &nostr_relay_builder::prelude::PublicKey,
91 target_id_hex: &str,
92 _deletion_created_at: u64,
93 ) {
94 // --- Check PR events (kind 1617/1618) first — O(1) direct lookup ---
95 // PR purgatory is keyed by event ID hex, so this is the cheapest check.
96 // Only remove if the entry has an actual event (not a placeholder) and the
97 // event's author matches the deletion request author.
98 if let Some(entry) = self.ctx.purgatory.find_pr(target_id_hex) {
99 if let Some(ref event) = entry.event {
100 if event.pubkey == *author {
101 tracing::info!(
102 event_id = %target_id_hex,
103 author = %author.to_hex(),
104 "Deletion request: removing purgatory PR event by event ID"
105 );
106 self.ctx.purgatory.remove_pr(target_id_hex);
107 return;
108 }
109 }
110 // Entry exists but is a placeholder or wrong author — don't remove
111 return;
112 }
113
114 // --- Check announcements (kind 30617) ---
115 // The DashMap doesn't expose a direct "find by event ID" method, so we use
116 // the announcements_for_sync snapshot to enumerate all (repo_id, _) pairs.
117 let all = self.ctx.purgatory.announcements_for_sync();
118 for (repo_id, _) in all {
119 // repo_id format: "30617:{pubkey_hex}:{identifier}"
120 let parts: Vec<&str> = repo_id.splitn(3, ':').collect();
121 if parts.len() != 3 {
122 continue;
123 }
124 let entry_pubkey_hex = parts[1];
125 let identifier = parts[2];
126
127 if entry_pubkey_hex != author.to_hex() {
128 continue;
129 }
130
131 if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) {
132 if entry.event.id.to_hex() == target_id_hex {
133 tracing::info!(
134 event_id = %target_id_hex,
135 identifier = %identifier,
136 author = %author.to_hex(),
137 "Deletion request: removing purgatory announcement by event ID"
138 );
139 self.evict_purgatory_entry(author, identifier);
140 return; // event IDs are unique
141 }
142 }
143 }
144
145 // --- Check state events (kind 30618) ---
146 // State events are keyed by identifier; scan all identifiers for a match.
147 let state_identifiers = self.ctx.purgatory.get_all_identifiers();
148 for identifier in state_identifiers {
149 let entries = self.ctx.purgatory.find_state(&identifier);
150 for entry in entries {
151 if entry.author == *author && entry.event.id.to_hex() == target_id_hex {
152 tracing::info!(
153 event_id = %target_id_hex,
154 identifier = %identifier,
155 author = %author.to_hex(),
156 "Deletion request: removing purgatory state event by event ID"
157 );
158 self.ctx.purgatory.remove_state_event(&identifier, &entry.event.id);
159 return; // event IDs are unique
160 }
161 }
162 }
163 }
164
165 /// Remove a purgatory entry matched by addressable coordinate.
166 ///
167 /// The coordinate format is `<kind>:<pubkey>:<d-identifier>`.
168 /// Handles kind 30617 (announcements) and kind 30618 (state events).
169 ///
170 /// Per NIP-09, all versions up to `deletion_created_at` are considered deleted.
171 fn remove_by_coordinate(
172 &self,
173 author: &nostr_relay_builder::prelude::PublicKey,
174 coordinate: &str,
175 deletion_created_at: u64,
176 ) {
177 // Parse coordinate: `<kind>:<pubkey>:<d-identifier>`
178 let parts: Vec<&str> = coordinate.splitn(3, ':').collect();
179 if parts.len() != 3 {
180 return;
181 }
182
183 let kind_str = parts[0];
184 let coord_pubkey_hex = parts[1];
185 let identifier = parts[2];
186
187 // The coordinate pubkey must match the deletion event author
188 if coord_pubkey_hex != author.to_hex() {
189 tracing::debug!(
190 coord_pubkey = %coord_pubkey_hex,
191 deletion_author = %author.to_hex(),
192 "Ignoring deletion: coordinate pubkey does not match deletion author"
193 );
194 return;
195 }
196
197 match kind_str {
198 "30617" => {
199 // Announcement purgatory entry
200 if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) {
201 if entry.event.created_at.as_secs() <= deletion_created_at {
202 tracing::info!(
203 identifier = %identifier,
204 author = %author.to_hex(),
205 "Deletion request: removing purgatory announcement by coordinate"
206 );
207 self.evict_purgatory_entry(author, identifier);
208 } else {
209 tracing::debug!(
210 identifier = %identifier,
211 author = %author.to_hex(),
212 "Ignoring deletion: purgatory announcement is newer than deletion request"
213 );
214 }
215 }
216 }
217 "30618" => {
218 // State event purgatory entries for this (author, identifier).
219 // Remove all entries authored by `author` with created_at ≤ deletion_created_at.
220 let entries = self.ctx.purgatory.find_state(identifier);
221 let mut removed = 0usize;
222 for entry in entries {
223 if entry.author == *author
224 && entry.event.created_at.as_secs() <= deletion_created_at
225 {
226 self.ctx.purgatory.remove_state_event(identifier, &entry.event.id);
227 removed += 1;
228 }
229 }
230 if removed > 0 {
231 tracing::info!(
232 identifier = %identifier,
233 author = %author.to_hex(),
234 removed = %removed,
235 "Deletion request: removed purgatory state event(s) by coordinate"
236 );
237 }
238 }
239 _ => {
240 // Other kinds not handled
241 }
242 }
243 }
244
245 /// Remove a purgatory announcement and delete its bare repository from disk.
246 fn evict_purgatory_entry(
247 &self,
248 author: &nostr_relay_builder::prelude::PublicKey,
249 identifier: &str,
250 ) {
251 // Get repo path before removing
252 if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) {
253 if entry.repo_path.exists() {
254 if let Err(e) = std::fs::remove_dir_all(&entry.repo_path) {
255 tracing::warn!(
256 path = %entry.repo_path.display(),
257 error = %e,
258 "Failed to delete bare repository during deletion request processing"
259 );
260 } else {
261 tracing::info!(
262 path = %entry.repo_path.display(),
263 "Deleted bare repository for deletion-requested purgatory announcement"
264 );
265 }
266 }
267 }
268
269 self.ctx.purgatory.remove_announcement(author, identifier);
270
271 // Remove state events for this identifier only if no other owner's
272 // announcement remains in purgatory (state events are keyed by identifier alone)
273 let other_owners_remain = !self
274 .ctx
275 .purgatory
276 .get_announcements_by_identifier(identifier)
277 .is_empty();
278
279 if !other_owners_remain {
280 self.ctx.purgatory.remove_state(identifier);
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use crate::nostr::policy::PolicyContext;
289 use crate::purgatory::Purgatory;
290 use nostr_relay_builder::prelude::*;
291 use std::collections::HashSet;
292 use std::path::PathBuf;
293 use std::sync::Arc;
294
295 fn make_context() -> PolicyContext {
296 let db = Arc::new(MemoryDatabase::with_opts(MemoryDatabaseOptions {
297 events: true,
298 max_events: None,
299 }));
300 let purgatory = Arc::new(Purgatory::new(PathBuf::new()));
301 let config = crate::config::Config::for_testing();
302 PolicyContext::new("test.example.com", db, PathBuf::new(), purgatory, config)
303 }
304
305 fn make_announcement_event(keys: &Keys, identifier: &str) -> Event {
306 EventBuilder::new(Kind::GitRepoAnnouncement, "")
307 .tags(vec![
308 Tag::identifier(identifier),
309 Tag::custom(TagKind::custom("clone"), vec!["https://example.com/repo.git"]),
310 ])
311 .sign_with_keys(keys)
312 .unwrap()
313 }
314
315 fn add_to_purgatory(ctx: &PolicyContext, event: &Event, identifier: &str) {
316 ctx.purgatory.add_announcement(
317 event.clone(),
318 identifier.to_string(),
319 event.pubkey,
320 PathBuf::new(),
321 HashSet::new(),
322 );
323 }
324
325 #[tokio::test]
326 async fn test_deletion_by_event_id_removes_purgatory_entry() {
327 let ctx = make_context();
328 let keys = Keys::generate();
329 let identifier = "my-repo";
330
331 let announcement = make_announcement_event(&keys, identifier);
332 add_to_purgatory(&ctx, &announcement, identifier);
333
334 assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier));
335
336 // Build kind 5 deletion event referencing the announcement by event ID
337 let deletion = EventBuilder::new(Kind::EventDeletion, "")
338 .tags(vec![
339 Tag::event(announcement.id),
340 Tag::custom(TagKind::custom("k"), vec!["30617"]),
341 ])
342 .sign_with_keys(&keys)
343 .unwrap();
344
345 let policy = DeletionPolicy::new(ctx.clone());
346 let result = policy.handle(&deletion).await;
347
348 assert!(matches!(result, WritePolicyResult::Accept));
349 assert!(
350 !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier),
351 "Purgatory entry should have been removed"
352 );
353 }
354
355 #[tokio::test]
356 async fn test_deletion_by_coordinate_removes_purgatory_entry() {
357 let ctx = make_context();
358 let keys = Keys::generate();
359 let identifier = "my-repo";
360
361 let announcement = make_announcement_event(&keys, identifier);
362 add_to_purgatory(&ctx, &announcement, identifier);
363
364 assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier));
365
366 // Build kind 5 deletion event referencing the announcement by coordinate
367 let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier);
368 let deletion = EventBuilder::new(Kind::EventDeletion, "")
369 .tags(vec![
370 Tag::custom(TagKind::custom("a"), vec![coord]),
371 Tag::custom(TagKind::custom("k"), vec!["30617"]),
372 ])
373 .sign_with_keys(&keys)
374 .unwrap();
375
376 let policy = DeletionPolicy::new(ctx.clone());
377 let result = policy.handle(&deletion).await;
378
379 assert!(matches!(result, WritePolicyResult::Accept));
380 assert!(
381 !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier),
382 "Purgatory entry should have been removed"
383 );
384 }
385
386 #[tokio::test]
387 async fn test_deletion_by_wrong_author_does_not_remove() {
388 let ctx = make_context();
389 let owner_keys = Keys::generate();
390 let attacker_keys = Keys::generate();
391 let identifier = "my-repo";
392
393 let announcement = make_announcement_event(&owner_keys, identifier);
394 add_to_purgatory(&ctx, &announcement, identifier);
395
396 // Attacker tries to delete by event ID
397 let deletion = EventBuilder::new(Kind::EventDeletion, "")
398 .tags(vec![
399 Tag::event(announcement.id),
400 Tag::custom(TagKind::custom("k"), vec!["30617"]),
401 ])
402 .sign_with_keys(&attacker_keys)
403 .unwrap();
404
405 let policy = DeletionPolicy::new(ctx.clone());
406 let result = policy.handle(&deletion).await;
407
408 assert!(matches!(result, WritePolicyResult::Accept));
409 assert!(
410 ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier),
411 "Purgatory entry should NOT have been removed by wrong author"
412 );
413 }
414
415 #[tokio::test]
416 async fn test_deletion_by_coordinate_wrong_author_does_not_remove() {
417 let ctx = make_context();
418 let owner_keys = Keys::generate();
419 let attacker_keys = Keys::generate();
420 let identifier = "my-repo";
421
422 let announcement = make_announcement_event(&owner_keys, identifier);
423 add_to_purgatory(&ctx, &announcement, identifier);
424
425 // Attacker tries to delete by coordinate using owner's pubkey in coord
426 // but signs with their own key — coord pubkey != deletion author
427 let coord = format!("30617:{}:{}", owner_keys.public_key().to_hex(), identifier);
428 let deletion = EventBuilder::new(Kind::EventDeletion, "")
429 .tags(vec![
430 Tag::custom(TagKind::custom("a"), vec![coord]),
431 Tag::custom(TagKind::custom("k"), vec!["30617"]),
432 ])
433 .sign_with_keys(&attacker_keys)
434 .unwrap();
435
436 let policy = DeletionPolicy::new(ctx.clone());
437 let result = policy.handle(&deletion).await;
438
439 assert!(matches!(result, WritePolicyResult::Accept));
440 assert!(
441 ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier),
442 "Purgatory entry should NOT have been removed by wrong author"
443 );
444 }
445
446 #[tokio::test]
447 async fn test_deletion_of_nonexistent_entry_is_accepted() {
448 let ctx = make_context();
449 let keys = Keys::generate();
450
451 // No purgatory entry exists — deletion should still be accepted
452 let deletion = EventBuilder::new(Kind::EventDeletion, "")
453 .tags(vec![
454 Tag::custom(TagKind::custom("a"), vec![
455 format!("30617:{}:nonexistent", keys.public_key().to_hex())
456 ]),
457 ])
458 .sign_with_keys(&keys)
459 .unwrap();
460
461 let policy = DeletionPolicy::new(ctx.clone());
462 let result = policy.handle(&deletion).await;
463
464 assert!(matches!(result, WritePolicyResult::Accept));
465 }
466
467 #[tokio::test]
468 async fn test_deletion_by_coordinate_respects_created_at() {
469 let ctx = make_context();
470 let keys = Keys::generate();
471 let identifier = "my-repo";
472
473 // Create announcement with a future timestamp
474 let future_ts = Timestamp::now().as_secs() + 3600; // 1 hour in the future
475 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "")
476 .tags(vec![Tag::identifier(identifier)])
477 .custom_created_at(Timestamp::from(future_ts))
478 .sign_with_keys(&keys)
479 .unwrap();
480 add_to_purgatory(&ctx, &announcement, identifier);
481
482 // Deletion event with current timestamp (older than announcement)
483 let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier);
484 let deletion = EventBuilder::new(Kind::EventDeletion, "")
485 .tags(vec![Tag::custom(TagKind::custom("a"), vec![coord])])
486 .sign_with_keys(&keys)
487 .unwrap();
488
489 let policy = DeletionPolicy::new(ctx.clone());
490 let result = policy.handle(&deletion).await;
491
492 assert!(matches!(result, WritePolicyResult::Accept));
493 assert!(
494 ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier),
495 "Purgatory entry should NOT be removed: entry is newer than deletion request"
496 );
497 }
498}
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs
index 1566b6c..f5b981a 100644
--- a/src/nostr/policy/mod.rs
+++ b/src/nostr/policy/mod.rs
@@ -6,11 +6,13 @@
6/// - `PrEventPolicy` - PR/PR Update validation 6/// - `PrEventPolicy` - PR/PR Update validation
7/// - `RelatedEventPolicy` - Forward/backward reference checking 7/// - `RelatedEventPolicy` - Forward/backward reference checking
8mod announcement; 8mod announcement;
9mod deletion;
9mod pr_event; 10mod pr_event;
10mod related; 11mod related;
11mod state; 12mod state;
12 13
13pub use announcement::{AnnouncementPolicy, AnnouncementResult}; 14pub use announcement::{AnnouncementPolicy, AnnouncementResult};
15pub use deletion::DeletionPolicy;
14pub use pr_event::PrEventPolicy; 16pub use pr_event::PrEventPolicy;
15pub use related::{ReferenceResult, RelatedEventPolicy}; 17pub use related::{ReferenceResult, RelatedEventPolicy};
16pub use state::{StatePolicy, StateResult}; 18pub use state::{StatePolicy, StateResult};
diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs
index 00e09c3..072e445 100644
--- a/src/nostr/policy/pr_event.rs
+++ b/src/nostr/policy/pr_event.rs
@@ -127,6 +127,10 @@ impl PrEventPolicy {
127 .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; 127 .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?;
128 128
129 // Fetch repository data 129 // Fetch repository data
130 // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should
131 // only be accepted for announcements that have been promoted (validated).
132 // If the announcement is still in purgatory, the PR event should also go
133 // to purgatory and wait for the announcement to be promoted.
130 let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; 134 let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?;
131 135
132 // Extract owner pubkey from source repo path 136 // Extract owner pubkey from source repo path
@@ -203,6 +207,10 @@ impl PrEventPolicy {
203 let identifier = parts[2]; 207 let identifier = parts[2];
204 208
205 // 2. Fetch repo data 209 // 2. Fetch repo data
210 // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should
211 // only be accepted for announcements that have been promoted (validated).
212 // If the announcement is still in purgatory, the PR event should also go
213 // to purgatory and wait for the announcement to be promoted.
206 let db_repo_data = fetch_repository_data(&self.ctx.database, identifier).await?; 214 let db_repo_data = fetch_repository_data(&self.ctx.database, identifier).await?;
207 215
208 // 3. Extract list of maintainers from "a 30617:<maintainer>:<identifier>" tags 216 // 3. Extract list of maintainers from "a 30617:<maintainer>:<identifier>" tags
diff --git a/src/nostr/policy/related.rs b/src/nostr/policy/related.rs
index 7ce87db..cfe04a7 100644
--- a/src/nostr/policy/related.rs
+++ b/src/nostr/policy/related.rs
@@ -139,6 +139,11 @@ impl RelatedEventPolicy {
139 .push((addr, pubkey, identifier)); 139 .push((addr, pubkey, identifier));
140 } 140 }
141 141
142 // NOTE: Intentionally only checks the database (promoted announcements), not purgatory.
143 // Related events should only be accepted once the repository announcement has been
144 // validated (promoted via git data). Events referencing purgatory-only repositories
145 // are correctly rejected as orphans and can be re-submitted after promotion.
146
142 // Query each kind group 147 // Query each kind group
143 for (kind, refs) in by_kind { 148 for (kind, refs) in by_kind {
144 let authors: Vec<PublicKey> = refs.iter().map(|(_, pk, _)| *pk).collect(); 149 let authors: Vec<PublicKey> = refs.iter().map(|(_, pk, _)| *pk).collect();
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index 3411077..df743ae 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -1,3 +1,4 @@
1use std::collections::HashSet;
1use std::path::{Path, PathBuf}; 2use std::path::{Path, PathBuf};
2 3
3use anyhow::{Context, Result}; 4use anyhow::{Context, Result};
@@ -10,7 +11,7 @@ use nostr_relay_builder::prelude::Event;
10 11
11use super::PolicyContext; 12use super::PolicyContext;
12use crate::git; 13use crate::git;
13use crate::git::authorization::fetch_repository_data; 14use crate::git::authorization::fetch_repository_data_with_purgatory;
14use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; 15use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState};
15 16
16/// Result of state policy evaluation 17/// Result of state policy evaluation
@@ -76,7 +77,13 @@ impl StatePolicy {
76 } 77 }
77 78
78 // Get all repositories and state events from db with identifier 79 // Get all repositories and state events from db with identifier
79 let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; 80 // Include purgatory announcements for authorization
81 let db_repo_data = fetch_repository_data_with_purgatory(
82 &self.ctx.database,
83 &self.ctx.purgatory,
84 &state.identifier,
85 )
86 .await?;
80 87
81 // CRITICAL: Check if author is authorized via maintainer set 88 // CRITICAL: Check if author is authorized via maintainer set
82 // State events MUST be rejected if author is not in maintainer set of any accepted announcement 89 // State events MUST be rejected if author is not in maintainer set of any accepted announcement
@@ -139,6 +146,34 @@ impl StatePolicy {
139 "State event author authorized via maintainer set" 146 "State event author authorized via maintainer set"
140 ); 147 );
141 148
149 // Extend expiry for any purgatory announcements for this identifier.
150 //
151 // Per design doc decision #4: state event arrival extends the purgatory
152 // announcement's expiry (reset the 30-minute protocol timer). This prevents
153 // premature expiry during slow sync operations — the repo is actively receiving
154 // metadata so it should stay alive.
155 //
156 // We extend for all owners that authorized this state event, since the state
157 // event proves the repo is active regardless of which owner's announcement
158 // authorized it.
159 for owner_hex in &authorized_owners {
160 if let Ok(owner_pk) = nostr_sdk::PublicKey::from_hex(owner_hex) {
161 if self.ctx.purgatory.has_purgatory_announcement(&owner_pk, &state.identifier) {
162 self.ctx.purgatory.extend_announcement_expiry(
163 &owner_pk,
164 &state.identifier,
165 std::time::Duration::from_secs(1800),
166 );
167 tracing::debug!(
168 event_id = %event.id,
169 identifier = %state.identifier,
170 owner = %owner_hex,
171 "Extended purgatory announcement expiry due to state event arrival"
172 );
173 }
174 }
175 }
176
142 // Duplicate check in db 177 // Duplicate check in db
143 if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { 178 if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) {
144 tracing::debug!("processed state event duplicate (in db): {}", event.id); 179 tracing::debug!("processed state event duplicate (in db): {}", event.id);
@@ -186,6 +221,42 @@ impl StatePolicy {
186 } 221 }
187 } 222 }
188 223
224 // After copying OIDs to other owner repos, promote any purgatory announcements
225 // for those repos. This handles the case where two maintainers push to the same
226 // identifier on the same relay with identical commit hashes: the second maintainer's
227 // announcement sits in purgatory, and when their state event arrives the relay copies
228 // commits from the first maintainer's repo — but without this call the announcement
229 // would stay in purgatory indefinitely.
230 let local_relay = self.ctx.get_local_relay();
231 let empty_oids: HashSet<String> = HashSet::new();
232 for announcement in &db_repo_data.announcements {
233 let target_repo_path = self.ctx.git_data_path.join(announcement.repo_path());
234 if target_repo_path != repo_with_git_data {
235 // OIDs were copied to this repo by process_state_with_git_data;
236 // check if there's a purgatory announcement waiting for it.
237 if let Err(e) = crate::git::sync::process_newly_available_git_data(
238 &target_repo_path,
239 &empty_oids,
240 &self.ctx.database,
241 local_relay.as_ref(),
242 &self.ctx.purgatory,
243 &self.ctx.git_data_path,
244 None,
245 None,
246 )
247 .await
248 {
249 tracing::warn!(
250 identifier = %state.identifier,
251 event_id = %event.id,
252 repo_path = %target_repo_path.display(),
253 error = %e,
254 "Failed to process purgatory announcements for target repo after git sync copy"
255 );
256 }
257 }
258 }
259
189 // Event will be saved and broadcast by relay builder 260 // Event will be saved and broadcast by relay builder
190 Ok(WritePolicyResult::Accept) 261 Ok(WritePolicyResult::Accept)
191 } else { 262 } else {
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 2c278f6..bb6ff54 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -17,7 +17,7 @@ pub mod sync;
17mod types; 17mod types;
18 18
19pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs}; 19pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs};
20pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 20pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 21
22use dashmap::DashMap; 22use dashmap::DashMap;
23use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
@@ -34,6 +34,13 @@ pub use sync::SyncQueueEntry;
34/// Default expiry duration for purgatory entries (30 minutes) 34/// Default expiry duration for purgatory entries (30 minutes)
35const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); 35const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800);
36 36
37/// Extended expiry for soft-expired announcements (24 hours).
38///
39/// After the initial 30-minute expiry, the bare repo is deleted but the event is
40/// retained for this additional period. This allows revival if a state event arrives
41/// late (e.g. slow sync), without permanently blocking the repository.
42const SOFT_EXPIRY_EXTENDED: Duration = Duration::from_secs(86400);
43
37/// Default delay before syncing user-submitted events (3 minutes). 44/// Default delay before syncing user-submitted events (3 minutes).
38/// This gives time for the git push to arrive after the nostr event. 45/// This gives time for the git push to arrive after the nostr event.
39const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); 46const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180);
@@ -83,9 +90,35 @@ struct SerializablePrPurgatoryEntry {
83 source: types::EventSource, 90 source: types::EventSource,
84} 91}
85 92
93/// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets.
94///
95/// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp
96/// in `PurgatoryState`, allowing state to be persisted and restored across restarts.
97///
98/// Note: soft-expired entries (bare repo deleted) are NOT persisted — they have
99/// no git repo on disk and would be immediately cleaned up on restore anyway.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101struct SerializableAnnouncementPurgatoryEntry {
102 /// The nostr announcement event (kind 30617)
103 event: Event,
104 /// The repository identifier from the event's 'd' tag
105 identifier: String,
106 /// The owner pubkey (event author)
107 owner: PublicKey,
108 /// Path to the bare git repository (must exist on disk)
109 repo_path: PathBuf,
110 /// Relay URLs from the announcement (for sync registration)
111 relays: HashSet<String>,
112 /// Duration offset from saved_at for created_at
113 created_at_offset_secs: u64,
114 /// Duration offset from saved_at for expires_at
115 expires_at_offset_secs: u64,
116}
117
86/// Serializable purgatory state for disk persistence. 118/// Serializable purgatory state for disk persistence.
87/// 119///
88/// Contains all purgatory data needed to restore state across restarts: 120/// Contains all purgatory data needed to restore state across restarts:
121/// - Announcement events (indexed by (owner, identifier)) — non-soft-expired only
89/// - State events (indexed by identifier) 122/// - State events (indexed by identifier)
90/// - PR events (indexed by event ID) 123/// - PR events (indexed by event ID)
91/// - Expired events (to prevent re-sync loops) 124/// - Expired events (to prevent re-sync loops)
@@ -97,6 +130,10 @@ struct PurgatoryState {
97 version: u32, 130 version: u32,
98 /// When this state was saved to disk 131 /// When this state was saved to disk
99 saved_at: SystemTime, 132 saved_at: SystemTime,
133 /// Announcement events indexed by "owner_hex:identifier"
134 /// Only non-soft-expired entries are persisted (bare repo must exist).
135 #[serde(default)]
136 announcement_purgatory: HashMap<String, SerializableAnnouncementPurgatoryEntry>,
100 /// State events indexed by repository identifier 137 /// State events indexed by repository identifier
101 state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>, 138 state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>,
102 /// PR events indexed by event ID (hex string) 139 /// PR events indexed by event ID (hex string)
@@ -107,7 +144,8 @@ struct PurgatoryState {
107 144
108/// Main purgatory structure holding events awaiting git data. 145/// Main purgatory structure holding events awaiting git data.
109/// 146///
110/// Provides thread-safe concurrent access to two separate stores: 147/// Provides thread-safe concurrent access to three separate stores:
148/// - Announcements indexed by (pubkey, identifier)
111/// - State events indexed by repository identifier 149/// - State events indexed by repository identifier
112/// - PR events indexed by event ID 150/// - PR events indexed by event ID
113/// 151///
@@ -128,6 +166,10 @@ struct PurgatoryState {
128/// that we've already determined have no git data available. 166/// that we've already determined have no git data available.
129#[derive(Clone)] 167#[derive(Clone)]
130pub struct Purgatory { 168pub struct Purgatory {
169 /// Repository announcements (kind 30617) indexed by (owner pubkey, identifier).
170 /// Key: (PublicKey, String) where String is the repository identifier.
171 announcement_purgatory: Arc<DashMap<(PublicKey, String), AnnouncementPurgatoryEntry>>,
172
131 /// State events (kind 30618) indexed by repository identifier. 173 /// State events (kind 30618) indexed by repository identifier.
132 /// Multiple state events can wait for the same identifier (different maintainers). 174 /// Multiple state events can wait for the same identifier (different maintainers).
133 state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>, 175 state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>,
@@ -152,6 +194,7 @@ impl Purgatory {
152 /// Create a new empty purgatory. 194 /// Create a new empty purgatory.
153 pub fn new(git_data_path: impl Into<PathBuf>) -> Self { 195 pub fn new(git_data_path: impl Into<PathBuf>) -> Self {
154 Self { 196 Self {
197 announcement_purgatory: Arc::new(DashMap::new()),
155 state_events: Arc::new(DashMap::new()), 198 state_events: Arc::new(DashMap::new()),
156 pr_events: Arc::new(DashMap::new()), 199 pr_events: Arc::new(DashMap::new()),
157 sync_queue: Arc::new(DashMap::new()), 200 sync_queue: Arc::new(DashMap::new()),
@@ -576,9 +619,245 @@ impl Purgatory {
576 self.pr_events.remove(event_id); 619 self.pr_events.remove(event_id);
577 } 620 }
578 621
622 // =========================================================================
623 // Announcement Purgatory Methods
624 // =========================================================================
625
626 /// Add a repository announcement to purgatory.
627 ///
628 /// The announcement will be held until git data arrives, at which point
629 /// it will be promoted to the database and served to clients.
630 ///
631 /// # Arguments
632 /// * `event` - The announcement event (kind 30617)
633 /// * `identifier` - The repository identifier from the 'd' tag
634 /// * `owner` - The owner pubkey (event author)
635 /// * `repo_path` - Path to the bare git repository
636 /// * `relays` - Relay URLs from the announcement (for sync registration)
637 pub fn add_announcement(
638 &self,
639 event: Event,
640 identifier: String,
641 owner: PublicKey,
642 repo_path: PathBuf,
643 relays: HashSet<String>,
644 ) {
645 let now = Instant::now();
646 let entry = AnnouncementPurgatoryEntry {
647 event,
648 identifier: identifier.clone(),
649 owner,
650 repo_path,
651 relays,
652 created_at: now,
653 expires_at: now + DEFAULT_EXPIRY,
654 soft_expired: false,
655 };
656
657 let key = (owner, identifier);
658 self.announcement_purgatory.insert(key.clone(), entry);
659
660 tracing::debug!(
661 owner = %key.0,
662 identifier = %key.1,
663 "Added announcement to purgatory"
664 );
665 }
666
667 /// Find an announcement in purgatory by owner and identifier.
668 ///
669 /// # Arguments
670 /// * `owner` - The owner pubkey
671 /// * `identifier` - The repository identifier
672 ///
673 /// # Returns
674 /// The announcement entry if found, None otherwise
675 pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option<AnnouncementPurgatoryEntry> {
676 let key = (*owner, identifier.to_string());
677 self.announcement_purgatory.get(&key).map(|entry| entry.clone())
678 }
679
680 /// Get all announcements in purgatory for a given identifier.
681 ///
682 /// This is used for authorization - state events and git pushes need to
683 /// check purgatory announcements for maintainer validation.
684 ///
685 /// # Arguments
686 /// * `identifier` - The repository identifier
687 ///
688 /// # Returns
689 /// Vector of announcement entries for this identifier
690 pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec<AnnouncementPurgatoryEntry> {
691 self.announcement_purgatory
692 .iter()
693 .filter(|entry| entry.key().1 == identifier)
694 .map(|entry| entry.value().clone())
695 .collect()
696 }
697
698 /// Remove an announcement from purgatory.
699 ///
700 /// # Arguments
701 /// * `owner` - The owner pubkey
702 /// * `identifier` - The repository identifier
703 pub fn remove_announcement(&self, owner: &PublicKey, identifier: &str) {
704 let key = (*owner, identifier.to_string());
705 self.announcement_purgatory.remove(&key);
706 tracing::debug!(
707 owner = %owner,
708 identifier = %identifier,
709 "Removed announcement from purgatory"
710 );
711 }
712
713 /// Promote an announcement from purgatory to active status.
714 ///
715 /// This is called when git data arrives. The announcement event is returned
716 /// so it can be saved to the database.
717 ///
718 /// # Arguments
719 /// * `owner` - The owner pubkey
720 /// * `identifier` - The repository identifier
721 ///
722 /// # Returns
723 /// The announcement event if found, None otherwise
724 pub fn promote_announcement(&self, owner: &PublicKey, identifier: &str) -> Option<Event> {
725 let key = (*owner, identifier.to_string());
726 self.announcement_purgatory.remove(&key).map(|(_, entry)| {
727 tracing::info!(
728 owner = %owner,
729 identifier = %identifier,
730 "Promoted announcement from purgatory to database"
731 );
732 entry.event
733 })
734 }
735
736 /// Check if there's an announcement in purgatory for the given owner and identifier.
737 ///
738 /// # Arguments
739 /// * `owner` - The owner pubkey
740 /// * `identifier` - The repository identifier
741 ///
742 /// # Returns
743 /// true if an announcement exists in purgatory, false otherwise
744 pub fn has_purgatory_announcement(&self, owner: &PublicKey, identifier: &str) -> bool {
745 let key = (*owner, identifier.to_string());
746 self.announcement_purgatory.contains_key(&key)
747 }
748
749 /// Extend the expiry for an announcement in purgatory.
750 ///
751 /// This is called when state events arrive for a purgatory announcement,
752 /// indicating the repository is actively receiving metadata.
753 ///
754 /// # Arguments
755 /// * `owner` - The owner pubkey
756 /// * `identifier` - The repository identifier
757 /// * `duration` - Minimum duration to guarantee from now
758 pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) {
759 let key = (*owner, identifier.to_string());
760
761 // Collect revival info before taking a mutable borrow
762 let revival_info: Option<(PathBuf, bool)> = self
763 .announcement_purgatory
764 .get(&key)
765 .map(|entry| (entry.repo_path.clone(), entry.soft_expired));
766
767 if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) {
768 let now = Instant::now();
769 let new_expiry = now + duration;
770 if entry.expires_at < new_expiry {
771 entry.expires_at = new_expiry;
772 }
773 // Always reset soft_expired when expiry is extended — the caller
774 // (state event or git auth) signals the repo is still active.
775 if entry.soft_expired {
776 entry.soft_expired = false;
777 }
778 }
779
780 // If the entry was soft-expired, recreate the bare repo outside the
781 // mutable borrow so we don't hold the DashMap lock during I/O.
782 if let Some((repo_path, was_soft_expired)) = revival_info {
783 if was_soft_expired {
784 if !repo_path.exists() {
785 match std::fs::create_dir_all(&repo_path) {
786 Ok(()) => {
787 // Initialise as a bare git repository
788 let status = std::process::Command::new("git")
789 .args(["init", "--bare"])
790 .arg(&repo_path)
791 .status();
792 match status {
793 Ok(s) if s.success() => {
794 tracing::info!(
795 path = %repo_path.display(),
796 owner = %owner,
797 identifier = %identifier,
798 "Recreated bare repository for revived soft-expired announcement"
799 );
800 }
801 Ok(s) => {
802 tracing::warn!(
803 path = %repo_path.display(),
804 exit_code = ?s.code(),
805 "git init --bare failed when reviving soft-expired announcement"
806 );
807 }
808 Err(e) => {
809 tracing::warn!(
810 path = %repo_path.display(),
811 error = %e,
812 "Failed to run git init --bare when reviving soft-expired announcement"
813 );
814 }
815 }
816 }
817 Err(e) => {
818 tracing::warn!(
819 path = %repo_path.display(),
820 error = %e,
821 "Failed to create directory when reviving soft-expired announcement"
822 );
823 }
824 }
825 }
826 tracing::info!(
827 owner = %owner,
828 identifier = %identifier,
829 "Revived soft-expired announcement (bare repo recreated, expiry extended)"
830 );
831 }
832 }
833 }
834
835 /// Get count of announcements in purgatory.
836 pub fn announcement_count(&self) -> usize {
837 self.announcement_purgatory.len()
838 }
839
840 /// Collect (repo_id, relay_urls) for all announcements currently in purgatory.
841 ///
842 /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable
843 /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory
844 /// announcement sync timer to register StateOnly entries in `repo_sync_index`.
845 pub fn announcements_for_sync(&self) -> Vec<(String, HashSet<String>)> {
846 self.announcement_purgatory
847 .iter()
848 .map(|entry| {
849 let (owner, identifier) = entry.key();
850 let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier);
851 let relays = entry.value().relays.clone();
852 (repo_id, relays)
853 })
854 .collect()
855 }
856
579 /// Get all event IDs currently stored in purgatory AND previously expired events. 857 /// Get all event IDs currently stored in purgatory AND previously expired events.
580 /// 858 ///
581 /// Returns a HashSet of all event IDs for: 859 /// Returns a HashSet of all event IDs for:
860 /// - Announcements currently held in purgatory
582 /// - State events currently held in purgatory 861 /// - State events currently held in purgatory
583 /// - PR events currently held in purgatory 862 /// - PR events currently held in purgatory
584 /// - Events that previously expired from purgatory without finding git data 863 /// - Events that previously expired from purgatory without finding git data
@@ -593,6 +872,11 @@ impl Purgatory {
593 pub fn event_ids(&self) -> HashSet<EventId> { 872 pub fn event_ids(&self) -> HashSet<EventId> {
594 let mut ids = HashSet::new(); 873 let mut ids = HashSet::new();
595 874
875 // Collect announcement event IDs
876 for entry in self.announcement_purgatory.iter() {
877 ids.insert(entry.value().event.id);
878 }
879
596 // Collect state event IDs 880 // Collect state event IDs
597 for entry in self.state_events.iter() { 881 for entry in self.state_events.iter() {
598 for state_entry in entry.value().iter() { 882 for state_entry in entry.value().iter() {
@@ -675,9 +959,86 @@ impl Purgatory {
675 /// to support migration scripts and operational monitoring. 959 /// to support migration scripts and operational monitoring.
676 /// 960 ///
677 /// # Returns 961 /// # Returns
678 /// Tuple of (num_state_removed, num_pr_removed) 962 /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed)
679 pub fn cleanup(&self) -> (usize, usize) { 963 pub fn cleanup(&self) -> (usize, usize, usize) {
680 let now = Instant::now(); 964 let now = Instant::now();
965
966 // Process expired announcements with two-phase soft expiry:
967 //
968 // Phase 1 (initial expiry, !soft_expired): Delete bare repo, set soft_expired=true,
969 // extend expiry by SOFT_EXPIRY_EXTENDED so the event is retained for revival.
970 // Phase 2 (extended expiry, soft_expired): Fully remove from purgatory.
971 //
972 // Collect entries that have passed their expires_at deadline.
973 let expired_announcements: Vec<(PublicKey, String, PathBuf, EventId, bool)> = self
974 .announcement_purgatory
975 .iter()
976 .filter(|entry| entry.value().expires_at <= now)
977 .map(|entry| {
978 let key = entry.key();
979 let v = entry.value();
980 (key.0.clone(), key.1.clone(), v.repo_path.clone(), v.event.id, v.soft_expired)
981 })
982 .collect();
983
984 let mut announcement_removed = 0;
985 for (owner, identifier, repo_path, event_id, already_soft_expired) in expired_announcements {
986 if already_soft_expired {
987 // Phase 2: fully remove
988 self.mark_expired(event_id);
989 self.announcement_purgatory.remove(&(owner.clone(), identifier.clone()));
990 announcement_removed += 1;
991 tracing::info!(
992 owner = %owner,
993 identifier = %identifier,
994 "Announcement fully expired from purgatory (soft expiry period elapsed)"
995 );
996 } else {
997 // Phase 1: soft expiry — delete bare repo, retain event.
998 //
999 // Only transition to soft_expired if the directory is gone (or never
1000 // existed). If removal fails we leave the entry untouched so the next
1001 // cleanup cycle retries the deletion automatically.
1002 let repo_gone = if repo_path.exists() {
1003 match std::fs::remove_dir_all(&repo_path) {
1004 Ok(()) => {
1005 tracing::info!(
1006 path = %repo_path.display(),
1007 owner = %owner,
1008 identifier = %identifier,
1009 "Deleted bare repository during soft expiry (event retained for revival)"
1010 );
1011 true
1012 }
1013 Err(e) => {
1014 tracing::warn!(
1015 path = %repo_path.display(),
1016 error = %e,
1017 "Failed to delete bare repository during soft expiry; will retry next cleanup cycle"
1018 );
1019 false
1020 }
1021 }
1022 } else {
1023 // Already gone (e.g. deleted externally)
1024 true
1025 };
1026
1027 if repo_gone {
1028 // Mark soft_expired and extend expiry
1029 if let Some(mut entry) = self.announcement_purgatory.get_mut(&(owner.clone(), identifier.clone())) {
1030 entry.soft_expired = true;
1031 entry.expires_at = now + SOFT_EXPIRY_EXTENDED;
1032 }
1033 tracing::debug!(
1034 owner = %owner,
1035 identifier = %identifier,
1036 "Announcement soft-expired: bare repo deleted, event retained for 24h"
1037 );
1038 }
1039 }
1040 }
1041
681 let mut state_removed = 0; 1042 let mut state_removed = 0;
682 1043
683 // Remove expired state events and mark them as expired 1044 // Remove expired state events and mark them as expired
@@ -823,17 +1184,17 @@ impl Purgatory {
823 self.pr_events.remove(&event_id_str); 1184 self.pr_events.remove(&event_id_str);
824 } 1185 }
825 1186
826 (state_removed, pr_removed) 1187 (announcement_removed, state_removed, pr_removed)
827 } 1188 }
828 1189
829 /// Remove expired entries from purgatory (legacy method). 1190 /// Remove expired entries from purgatory (legacy method).
830 /// 1191 ///
831 /// # Returns 1192 /// # Returns
832 /// Total number of entries removed (state + PR events) 1193 /// Total number of entries removed (announcement + state + PR events)
833 #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] 1194 #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")]
834 pub fn remove_expired(&self) -> usize { 1195 pub fn remove_expired(&self) -> usize {
835 let (state, pr) = self.cleanup(); 1196 let (announcement, state, pr) = self.cleanup();
836 state + pr 1197 announcement + state + pr
837 } 1198 }
838 1199
839 /// Remove old expired event records. 1200 /// Remove old expired event records.
@@ -867,11 +1228,12 @@ impl Purgatory {
867 /// Get current count of entries in purgatory. 1228 /// Get current count of entries in purgatory.
868 /// 1229 ///
869 /// # Returns 1230 /// # Returns
870 /// Tuple of (state_event_count, pr_event_count) 1231 /// Tuple of (announcement_count, state_event_count, pr_event_count)
871 pub fn count(&self) -> (usize, usize) { 1232 pub fn count(&self) -> (usize, usize, usize) {
1233 let announcement_count = self.announcement_purgatory.len();
872 let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); 1234 let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum();
873 let pr_count = self.pr_events.len(); 1235 let pr_count = self.pr_events.len();
874 (state_count, pr_count) 1236 (announcement_count, state_count, pr_count)
875 } 1237 }
876 1238
877 /// Get count of expired events being tracked. 1239 /// Get count of expired events being tracked.
@@ -885,6 +1247,7 @@ impl Purgatory {
885 /// Clear all entries from purgatory (for testing). 1247 /// Clear all entries from purgatory (for testing).
886 #[cfg(test)] 1248 #[cfg(test)]
887 pub fn clear(&self) { 1249 pub fn clear(&self) {
1250 self.announcement_purgatory.clear();
888 self.state_events.clear(); 1251 self.state_events.clear();
889 self.pr_events.clear(); 1252 self.pr_events.clear();
890 self.sync_queue.clear(); 1253 self.sync_queue.clear();
@@ -949,6 +1312,34 @@ impl Purgatory {
949 let saved_at = SystemTime::now(); 1312 let saved_at = SystemTime::now();
950 let now_instant = Instant::now(); 1313 let now_instant = Instant::now();
951 1314
1315 // Convert announcement_purgatory to serializable format.
1316 // Skip soft-expired entries: their bare repos have been deleted, so they
1317 // cannot be meaningfully restored (the repo path no longer exists on disk).
1318 let mut announcement_purgatory = HashMap::new();
1319 for entry in self.announcement_purgatory.iter() {
1320 let e = entry.value();
1321 if e.soft_expired {
1322 continue;
1323 }
1324 let created_offset =
1325 persistence::instant_to_offset(e.created_at, saved_at, now_instant);
1326 let expires_offset =
1327 persistence::instant_to_offset(e.expires_at, saved_at, now_instant);
1328 let key = format!("{}:{}", e.owner.to_hex(), e.identifier);
1329 announcement_purgatory.insert(
1330 key,
1331 SerializableAnnouncementPurgatoryEntry {
1332 event: e.event.clone(),
1333 identifier: e.identifier.clone(),
1334 owner: e.owner,
1335 repo_path: e.repo_path.clone(),
1336 relays: e.relays.clone(),
1337 created_at_offset_secs: created_offset.as_secs(),
1338 expires_at_offset_secs: expires_offset.as_secs(),
1339 },
1340 );
1341 }
1342
952 // Convert state_events to serializable format 1343 // Convert state_events to serializable format
953 let mut state_events = HashMap::new(); 1344 let mut state_events = HashMap::new();
954 for entry in self.state_events.iter() { 1345 for entry in self.state_events.iter() {
@@ -1013,6 +1404,7 @@ impl Purgatory {
1013 let state = PurgatoryState { 1404 let state = PurgatoryState {
1014 version: 1, 1405 version: 1,
1015 saved_at, 1406 saved_at,
1407 announcement_purgatory,
1016 state_events, 1408 state_events,
1017 pr_events, 1409 pr_events,
1018 expired_events, 1410 expired_events,
@@ -1024,6 +1416,7 @@ impl Purgatory {
1024 1416
1025 tracing::info!( 1417 tracing::info!(
1026 path = %path.display(), 1418 path = %path.display(),
1419 announcements = state.announcement_purgatory.len(),
1027 state_events = state.state_events.len(), 1420 state_events = state.state_events.len(),
1028 pr_events = state.pr_events.len(), 1421 pr_events = state.pr_events.len(),
1029 expired_events = state.expired_events.len(), 1422 expired_events = state.expired_events.len(),
@@ -1071,6 +1464,45 @@ impl Purgatory {
1071 1464
1072 let now_instant = Instant::now(); 1465 let now_instant = Instant::now();
1073 1466
1467 // Restore announcement_purgatory.
1468 // Skip entries whose bare repo no longer exists on disk — this can happen
1469 // if the repo was deleted externally between save and restore.
1470 for (_key, e) in state.announcement_purgatory {
1471 if !e.repo_path.exists() {
1472 tracing::warn!(
1473 owner = %e.owner,
1474 identifier = %e.identifier,
1475 repo_path = %e.repo_path.display(),
1476 "Skipping announcement restore: bare repo no longer exists"
1477 );
1478 continue;
1479 }
1480 let created_at = persistence::offset_to_instant(
1481 Duration::from_secs(e.created_at_offset_secs),
1482 state.saved_at,
1483 now_instant,
1484 );
1485 let expires_at = persistence::offset_to_instant(
1486 Duration::from_secs(e.expires_at_offset_secs),
1487 state.saved_at,
1488 now_instant,
1489 );
1490 let key = (e.owner, e.identifier.clone());
1491 self.announcement_purgatory.insert(
1492 key,
1493 AnnouncementPurgatoryEntry {
1494 event: e.event,
1495 identifier: e.identifier,
1496 owner: e.owner,
1497 repo_path: e.repo_path,
1498 relays: e.relays,
1499 created_at,
1500 expires_at,
1501 soft_expired: false,
1502 },
1503 );
1504 }
1505
1074 // Restore state_events 1506 // Restore state_events
1075 for (identifier, entries) in state.state_events { 1507 for (identifier, entries) in state.state_events {
1076 let restored_entries: Vec<StatePurgatoryEntry> = entries 1508 let restored_entries: Vec<StatePurgatoryEntry> = entries
@@ -1140,6 +1572,7 @@ impl Purgatory {
1140 1572
1141 tracing::info!( 1573 tracing::info!(
1142 path = %path.display(), 1574 path = %path.display(),
1575 announcements = self.announcement_purgatory.len(),
1143 state_events = self.state_events.len(), 1576 state_events = self.state_events.len(),
1144 pr_events = self.pr_events.len(), 1577 pr_events = self.pr_events.len(),
1145 expired_events = self.expired_events.len(), 1578 expired_events = self.expired_events.len(),
@@ -1162,7 +1595,8 @@ mod tests {
1162 #[test] 1595 #[test]
1163 fn test_purgatory_creation() { 1596 fn test_purgatory_creation() {
1164 let purgatory = Purgatory::new(PathBuf::new()); 1597 let purgatory = Purgatory::new(PathBuf::new());
1165 let (state_count, pr_count) = purgatory.count(); 1598 let (announcement_count, state_count, pr_count) = purgatory.count();
1599 assert_eq!(announcement_count, 0);
1166 assert_eq!(state_count, 0); 1600 assert_eq!(state_count, 0);
1167 assert_eq!(pr_count, 0); 1601 assert_eq!(pr_count, 0);
1168 } 1602 }
@@ -1190,7 +1624,8 @@ mod tests {
1190 false, 1624 false,
1191 ); 1625 );
1192 1626
1193 let (state_count, pr_count) = purgatory.count(); 1627 let (announcement_count, state_count, pr_count) = purgatory.count();
1628 assert_eq!(announcement_count, 0);
1194 assert_eq!(state_count, 1); 1629 assert_eq!(state_count, 1);
1195 assert_eq!(pr_count, 1); 1630 assert_eq!(pr_count, 1);
1196 } 1631 }
@@ -1407,7 +1842,7 @@ fn test_cleanup_removes_expired_entries() {
1407 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); 1842 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string());
1408 1843
1409 // Verify entries are there 1844 // Verify entries are there
1410 let (state_count, pr_count) = purgatory.count(); 1845 let (_, state_count, pr_count) = purgatory.count();
1411 assert_eq!(state_count, 1); 1846 assert_eq!(state_count, 1);
1412 assert_eq!(pr_count, 2); 1847 assert_eq!(pr_count, 2);
1413 1848
@@ -1425,14 +1860,14 @@ fn test_cleanup_removes_expired_entries() {
1425 } 1860 }
1426 1861
1427 // Run cleanup 1862 // Run cleanup
1428 let (state_removed, pr_removed) = purgatory.cleanup(); 1863 let (_, state_removed, pr_removed) = purgatory.cleanup();
1429 1864
1430 // Verify counts 1865 // Verify counts
1431 assert_eq!(state_removed, 1); 1866 assert_eq!(state_removed, 1);
1432 assert_eq!(pr_removed, 2); 1867 assert_eq!(pr_removed, 2);
1433 1868
1434 // Verify entries are gone 1869 // Verify entries are gone
1435 let (state_count, pr_count) = purgatory.count(); 1870 let (_, state_count, pr_count) = purgatory.count();
1436 assert_eq!(state_count, 0); 1871 assert_eq!(state_count, 0);
1437 assert_eq!(pr_count, 0); 1872 assert_eq!(pr_count, 0);
1438} 1873}
@@ -1464,14 +1899,14 @@ fn test_cleanup_preserves_non_expired_entries() {
1464 ); 1899 );
1465 1900
1466 // Run cleanup 1901 // Run cleanup
1467 let (state_removed, pr_removed) = purgatory.cleanup(); 1902 let (_, state_removed, pr_removed) = purgatory.cleanup();
1468 1903
1469 // Nothing should be removed 1904 // Nothing should be removed
1470 assert_eq!(state_removed, 0); 1905 assert_eq!(state_removed, 0);
1471 assert_eq!(pr_removed, 0); 1906 assert_eq!(pr_removed, 0);
1472 1907
1473 // Verify entries are still there 1908 // Verify entries are still there
1474 let (state_count, pr_count) = purgatory.count(); 1909 let (_, state_count, pr_count) = purgatory.count();
1475 assert_eq!(state_count, 1); 1910 assert_eq!(state_count, 1);
1476 assert_eq!(pr_count, 1); 1911 assert_eq!(pr_count, 1);
1477} 1912}
@@ -1518,14 +1953,14 @@ fn test_cleanup_mixed_expired_and_fresh() {
1518 } 1953 }
1519 1954
1520 // Run cleanup 1955 // Run cleanup
1521 let (state_removed, pr_removed) = purgatory.cleanup(); 1956 let (_, state_removed, pr_removed) = purgatory.cleanup();
1522 1957
1523 // One of each should be removed 1958 // One of each should be removed
1524 assert_eq!(state_removed, 1); 1959 assert_eq!(state_removed, 1);
1525 assert_eq!(pr_removed, 1); 1960 assert_eq!(pr_removed, 1);
1526 1961
1527 // Verify remaining counts 1962 // Verify remaining counts
1528 let (state_count, pr_count) = purgatory.count(); 1963 let (_, state_count, pr_count) = purgatory.count();
1529 assert_eq!(state_count, 1); // One state event remains 1964 assert_eq!(state_count, 1); // One state event remains
1530 assert_eq!(pr_count, 1); // One PR event remains 1965 assert_eq!(pr_count, 1); // One PR event remains
1531} 1966}
@@ -1595,7 +2030,7 @@ fn test_expired_event_tracking() {
1595 } 2030 }
1596 2031
1597 // Run cleanup 2032 // Run cleanup
1598 let (state_removed, pr_removed) = purgatory.cleanup(); 2033 let (_, state_removed, pr_removed) = purgatory.cleanup();
1599 assert_eq!(state_removed, 1); 2034 assert_eq!(state_removed, 1);
1600 assert_eq!(pr_removed, 1); 2035 assert_eq!(pr_removed, 1);
1601 2036
@@ -1705,7 +2140,7 @@ fn test_expired_events_prevent_readdition() {
1705 } 2140 }
1706 2141
1707 // Event should NOT be re-added 2142 // Event should NOT be re-added
1708 let (state_count, _) = purgatory.count(); 2143 let (_, state_count, _) = purgatory.count();
1709 assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); 2144 assert_eq!(state_count, 0, "Event should not be re-added to purgatory");
1710} 2145}
1711 2146
@@ -1724,7 +2159,7 @@ fn test_pr_placeholder_not_marked_expired() {
1724 } 2159 }
1725 2160
1726 // Run cleanup 2161 // Run cleanup
1727 let (_, pr_removed) = purgatory.cleanup(); 2162 let (_, _, pr_removed) = purgatory.cleanup();
1728 assert_eq!(pr_removed, 1); 2163 assert_eq!(pr_removed, 1);
1729 2164
1730 // Expired count should be 0 (placeholders don't have event IDs to track) 2165 // Expired count should be 0 (placeholders don't have event IDs to track)
@@ -1820,7 +2255,7 @@ async fn test_save_and_restore_state_events() {
1820 assert!(!state_file.exists()); 2255 assert!(!state_file.exists());
1821 2256
1822 // Verify state events were restored 2257 // Verify state events were restored
1823 let (state_count, _) = purgatory2.count(); 2258 let (_, state_count, _) = purgatory2.count();
1824 assert_eq!(state_count, 2); 2259 assert_eq!(state_count, 2);
1825 2260
1826 let restored_entries = purgatory2.find_state("test-repo"); 2261 let restored_entries = purgatory2.find_state("test-repo");
@@ -1877,7 +2312,7 @@ async fn test_save_and_restore_pr_events() {
1877 purgatory2.restore_from_disk(&state_file).unwrap(); 2312 purgatory2.restore_from_disk(&state_file).unwrap();
1878 2313
1879 // Verify PR event was restored 2314 // Verify PR event was restored
1880 let (_, pr_count) = purgatory2.count(); 2315 let (_, _, pr_count) = purgatory2.count();
1881 assert_eq!(pr_count, 1); 2316 assert_eq!(pr_count, 1);
1882 2317
1883 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); 2318 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap();
@@ -1906,7 +2341,7 @@ async fn test_save_and_restore_pr_placeholders() {
1906 purgatory2.restore_from_disk(&state_file).unwrap(); 2341 purgatory2.restore_from_disk(&state_file).unwrap();
1907 2342
1908 // Verify placeholder was restored 2343 // Verify placeholder was restored
1909 let (_, pr_count) = purgatory2.count(); 2344 let (_, _, pr_count) = purgatory2.count();
1910 assert_eq!(pr_count, 1); 2345 assert_eq!(pr_count, 1);
1911 2346
1912 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); 2347 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap();
@@ -1984,7 +2419,7 @@ async fn test_save_and_restore_empty_purgatory() {
1984 purgatory2.restore_from_disk(&state_file).unwrap(); 2419 purgatory2.restore_from_disk(&state_file).unwrap();
1985 2420
1986 // Verify purgatory is still empty 2421 // Verify purgatory is still empty
1987 let (state_count, pr_count) = purgatory2.count(); 2422 let (_, state_count, pr_count) = purgatory2.count();
1988 assert_eq!(state_count, 0); 2423 assert_eq!(state_count, 0);
1989 assert_eq!(pr_count, 0); 2424 assert_eq!(pr_count, 0);
1990 assert_eq!(purgatory2.expired_count(), 0); 2425 assert_eq!(purgatory2.expired_count(), 0);
@@ -2004,7 +2439,7 @@ async fn test_restore_missing_file() {
2004 assert!(result.is_err()); 2439 assert!(result.is_err());
2005 2440
2006 // Purgatory should remain empty 2441 // Purgatory should remain empty
2007 let (state_count, pr_count) = purgatory.count(); 2442 let (_, state_count, pr_count) = purgatory.count();
2008 assert_eq!(state_count, 0); 2443 assert_eq!(state_count, 0);
2009 assert_eq!(pr_count, 0); 2444 assert_eq!(pr_count, 0);
2010} 2445}
@@ -2026,7 +2461,7 @@ async fn test_restore_corrupted_json() {
2026 assert!(result.is_err()); 2461 assert!(result.is_err());
2027 2462
2028 // Purgatory should remain empty 2463 // Purgatory should remain empty
2029 let (state_count, pr_count) = purgatory.count(); 2464 let (_, state_count, pr_count) = purgatory.count();
2030 assert_eq!(state_count, 0); 2465 assert_eq!(state_count, 0);
2031 assert_eq!(pr_count, 0); 2466 assert_eq!(pr_count, 0);
2032} 2467}
@@ -2263,7 +2698,7 @@ async fn test_mixed_pr_events_and_placeholders() {
2263 purgatory2.restore_from_disk(&state_file).unwrap(); 2698 purgatory2.restore_from_disk(&state_file).unwrap();
2264 2699
2265 // Verify both were restored correctly 2700 // Verify both were restored correctly
2266 let (_, pr_count) = purgatory2.count(); 2701 let (_, _, pr_count) = purgatory2.count();
2267 assert_eq!(pr_count, 2); 2702 assert_eq!(pr_count, 2);
2268 2703
2269 // Verify PR event 2704 // Verify PR event
@@ -2310,6 +2745,141 @@ async fn test_file_cleanup_after_successful_restore() {
2310} 2745}
2311 2746
2312#[tokio::test] 2747#[tokio::test]
2748async fn test_save_and_restore_announcement_events() {
2749 use tempfile::tempdir;
2750
2751 let temp_dir = tempdir().unwrap();
2752 let state_file = temp_dir.path().join("purgatory_state.json");
2753
2754 // Create a real bare repo directory so the restore path-existence check passes
2755 let repo_dir = temp_dir.path().join("owner.git");
2756 std::fs::create_dir_all(&repo_dir).unwrap();
2757
2758 let purgatory = Purgatory::new(PathBuf::new());
2759 let keys = Keys::generate();
2760
2761 let ann_event = EventBuilder::text_note("announcement event")
2762 .sign_with_keys(&keys)
2763 .unwrap();
2764 let ann_event_id = ann_event.id;
2765
2766 let mut relays = HashSet::new();
2767 relays.insert("wss://relay.example.com".to_string());
2768
2769 purgatory.add_announcement(
2770 ann_event.clone(),
2771 "my-repo".to_string(),
2772 keys.public_key(),
2773 repo_dir.clone(),
2774 relays.clone(),
2775 );
2776
2777 // Save to disk
2778 purgatory.save_to_disk(&state_file).unwrap();
2779 assert!(state_file.exists());
2780
2781 // Create new purgatory and restore
2782 let purgatory2 = Purgatory::new(PathBuf::new());
2783 purgatory2.restore_from_disk(&state_file).unwrap();
2784
2785 // File should be deleted after restore
2786 assert!(!state_file.exists());
2787
2788 // Verify announcement was restored
2789 let (ann_count, _, _) = purgatory2.count();
2790 assert_eq!(ann_count, 1);
2791
2792 let restored = purgatory2
2793 .find_announcement(&keys.public_key(), "my-repo")
2794 .unwrap();
2795 assert_eq!(restored.event.id, ann_event_id);
2796 assert_eq!(restored.identifier, "my-repo");
2797 assert_eq!(restored.owner, keys.public_key());
2798 assert_eq!(restored.repo_path, repo_dir);
2799 assert_eq!(restored.relays, relays);
2800 assert!(!restored.soft_expired);
2801}
2802
2803#[tokio::test]
2804async fn test_soft_expired_announcements_not_persisted() {
2805 use tempfile::tempdir;
2806
2807 let temp_dir = tempdir().unwrap();
2808 let state_file = temp_dir.path().join("purgatory_state.json");
2809
2810 let repo_dir = temp_dir.path().join("owner.git");
2811 std::fs::create_dir_all(&repo_dir).unwrap();
2812
2813 let purgatory = Purgatory::new(PathBuf::new());
2814 let keys = Keys::generate();
2815
2816 let ann_event = EventBuilder::text_note("announcement event")
2817 .sign_with_keys(&keys)
2818 .unwrap();
2819
2820 purgatory.add_announcement(
2821 ann_event.clone(),
2822 "my-repo".to_string(),
2823 keys.public_key(),
2824 repo_dir.clone(),
2825 HashSet::new(),
2826 );
2827
2828 // Manually mark as soft-expired (bare repo deleted)
2829 let key = (keys.public_key(), "my-repo".to_string());
2830 if let Some(mut entry) = purgatory.announcement_purgatory.get_mut(&key) {
2831 entry.soft_expired = true;
2832 }
2833
2834 // Save to disk — soft-expired entry should be excluded
2835 purgatory.save_to_disk(&state_file).unwrap();
2836
2837 // Create new purgatory and restore
2838 let purgatory2 = Purgatory::new(PathBuf::new());
2839 purgatory2.restore_from_disk(&state_file).unwrap();
2840
2841 // Soft-expired announcement should NOT be restored
2842 let (ann_count, _, _) = purgatory2.count();
2843 assert_eq!(ann_count, 0);
2844}
2845
2846#[tokio::test]
2847async fn test_announcement_with_missing_repo_skipped_on_restore() {
2848 use tempfile::tempdir;
2849
2850 let temp_dir = tempdir().unwrap();
2851 let state_file = temp_dir.path().join("purgatory_state.json");
2852
2853 // Point to a repo path that does NOT exist
2854 let missing_repo = temp_dir.path().join("nonexistent.git");
2855
2856 let purgatory = Purgatory::new(PathBuf::new());
2857 let keys = Keys::generate();
2858
2859 let ann_event = EventBuilder::text_note("announcement event")
2860 .sign_with_keys(&keys)
2861 .unwrap();
2862
2863 purgatory.add_announcement(
2864 ann_event.clone(),
2865 "my-repo".to_string(),
2866 keys.public_key(),
2867 missing_repo.clone(),
2868 HashSet::new(),
2869 );
2870
2871 // Save to disk (repo path is serialized even though it doesn't exist)
2872 purgatory.save_to_disk(&state_file).unwrap();
2873
2874 // Create new purgatory and restore — entry should be skipped
2875 let purgatory2 = Purgatory::new(PathBuf::new());
2876 purgatory2.restore_from_disk(&state_file).unwrap();
2877
2878 let (ann_count, _, _) = purgatory2.count();
2879 assert_eq!(ann_count, 0);
2880}
2881
2882#[tokio::test]
2313async fn test_comprehensive_roundtrip() { 2883async fn test_comprehensive_roundtrip() {
2314 use nostr_sdk::{Kind, Tag, TagKind}; 2884 use nostr_sdk::{Kind, Tag, TagKind};
2315 use tempfile::tempdir; 2885 use tempfile::tempdir;
@@ -2317,10 +2887,27 @@ async fn test_comprehensive_roundtrip() {
2317 let temp_dir = tempdir().unwrap(); 2887 let temp_dir = tempdir().unwrap();
2318 let state_file = temp_dir.path().join("purgatory_state.json"); 2888 let state_file = temp_dir.path().join("purgatory_state.json");
2319 2889
2890 // Create a real bare repo directory for the announcement
2891 let repo_dir = temp_dir.path().join("owner.git");
2892 std::fs::create_dir_all(&repo_dir).unwrap();
2893
2320 let purgatory = Purgatory::new(PathBuf::new()); 2894 let purgatory = Purgatory::new(PathBuf::new());
2321 let keys1 = Keys::generate(); 2895 let keys1 = Keys::generate();
2322 let keys2 = Keys::generate(); 2896 let keys2 = Keys::generate();
2323 2897
2898 // Add announcement
2899 let ann_event = EventBuilder::text_note("announcement")
2900 .sign_with_keys(&keys1)
2901 .unwrap();
2902 let ann_event_id = ann_event.id;
2903 purgatory.add_announcement(
2904 ann_event,
2905 "repo1".to_string(),
2906 keys1.public_key(),
2907 repo_dir.clone(),
2908 HashSet::new(),
2909 );
2910
2324 // Add multiple state events 2911 // Add multiple state events
2325 let state1 = EventBuilder::text_note("state 1") 2912 let state1 = EventBuilder::text_note("state 1")
2326 .sign_with_keys(&keys1) 2913 .sign_with_keys(&keys1)
@@ -2380,7 +2967,8 @@ async fn test_comprehensive_roundtrip() {
2380 purgatory.cleanup(); 2967 purgatory.cleanup();
2381 2968
2382 // Verify initial state 2969 // Verify initial state
2383 let (state_count, pr_count) = purgatory.count(); 2970 let (ann_count, state_count, pr_count) = purgatory.count();
2971 assert_eq!(ann_count, 1); // announcement
2384 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) 2972 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up)
2385 assert_eq!(pr_count, 2); // pr-1, pr-2 2973 assert_eq!(pr_count, 2); // pr-1, pr-2
2386 assert_eq!(purgatory.expired_count(), 1); // expired_event 2974 assert_eq!(purgatory.expired_count(), 1); // expired_event
@@ -2393,11 +2981,18 @@ async fn test_comprehensive_roundtrip() {
2393 purgatory2.restore_from_disk(&state_file).unwrap(); 2981 purgatory2.restore_from_disk(&state_file).unwrap();
2394 2982
2395 // Verify all data was restored correctly 2983 // Verify all data was restored correctly
2396 let (state_count2, pr_count2) = purgatory2.count(); 2984 let (ann_count2, state_count2, pr_count2) = purgatory2.count();
2985 assert_eq!(ann_count2, 1);
2397 assert_eq!(state_count2, 2); 2986 assert_eq!(state_count2, 2);
2398 assert_eq!(pr_count2, 2); 2987 assert_eq!(pr_count2, 2);
2399 assert_eq!(purgatory2.expired_count(), 1); 2988 assert_eq!(purgatory2.expired_count(), 1);
2400 2989
2990 // Verify announcement
2991 let restored_ann = purgatory2
2992 .find_announcement(&keys1.public_key(), "repo1")
2993 .unwrap();
2994 assert_eq!(restored_ann.event.id, ann_event_id);
2995
2401 // Verify state events 2996 // Verify state events
2402 assert_eq!(purgatory2.find_state("repo1").len(), 1); 2997 assert_eq!(purgatory2.find_state("repo1").len(), 1);
2403 assert_eq!(purgatory2.find_state("repo2").len(), 1); 2998 assert_eq!(purgatory2.find_state("repo2").len(), 1);
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 904f8af..8297515 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -75,7 +75,12 @@ pub trait SyncContext: Send + Sync {
75 /// # Returns 75 /// # Returns
76 /// Set of clone URLs from PR events in purgatory for this identifier 76 /// Set of clone URLs from PR events in purgatory for this identifier
77 fn collect_pr_clone_urls(&self, identifier: &str) -> HashSet<String>; 77 fn collect_pr_clone_urls(&self, identifier: &str) -> HashSet<String>;
78 /// Get repository data (announcements, clone URLs, etc.) from the database. 78 /// Get repository data (announcements, clone URLs, etc.) from the database and purgatory.
79 ///
80 /// Checks both the database (promoted announcements) and purgatory (announcements
81 /// awaiting git data). This is necessary to obtain clone URLs when an announcement
82 /// has not yet been promoted - without purgatory data, the sync loop would have no
83 /// URLs to fetch from and the announcement could never be promoted (circular deadlock).
79 /// 84 ///
80 /// # Arguments 85 /// # Arguments
81 /// * `identifier` - The repository identifier (d-tag value) 86 /// * `identifier` - The repository identifier (d-tag value)
@@ -279,7 +284,16 @@ impl SyncContext for RealSyncContext {
279 } 284 }
280 285
281 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> { 286 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> {
282 crate::git::authorization::fetch_repository_data(&self.database, identifier).await 287 // Use the purgatory-aware variant so that clone URLs from announcements still
288 // in purgatory (not yet promoted) are available. Without this, the sync loop
289 // would find no URLs to fetch from and the announcement could never be promoted
290 // (circular deadlock: can't promote without git data, can't get git data without URLs).
291 crate::git::authorization::fetch_repository_data_with_purgatory(
292 &self.database,
293 &self.purgatory,
294 identifier,
295 )
296 .await
283 } 297 }
284 298
285 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { 299 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> {
@@ -487,7 +501,9 @@ impl SyncContext for RealSyncContext {
487 source_repo_path: &Path, 501 source_repo_path: &Path,
488 new_oids: &HashSet<String>, 502 new_oids: &HashSet<String>,
489 ) -> Result<ProcessResult> { 503 ) -> Result<ProcessResult> {
490 // Delegate to the unified function from git::sync 504 // Delegate to the unified function from git::sync.
505 // Pass None for write_policy and rejected_events_index: the purgatory sync path
506 // already handles hot-cache re-processing via SyncManager::process_event_static.
491 let result = crate::git::sync::process_newly_available_git_data( 507 let result = crate::git::sync::process_newly_available_git_data(
492 source_repo_path, 508 source_repo_path,
493 new_oids, 509 new_oids,
@@ -495,6 +511,8 @@ impl SyncContext for RealSyncContext {
495 self.local_relay.as_ref(), 511 self.local_relay.as_ref(),
496 &self.purgatory, 512 &self.purgatory,
497 &self.git_data_path, 513 &self.git_data_path,
514 None,
515 None,
498 ) 516 )
499 .await?; 517 .await?;
500 518
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index e37a3e1..1af5c4e 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -6,6 +6,8 @@
6 6
7use nostr_sdk::prelude::*; 7use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize}; 8use serde::{Deserialize, Serialize};
9use std::collections::HashSet;
10use std::path::PathBuf;
9use std::time::Instant; 11use std::time::Instant;
10 12
11/// Source of an event entering purgatory. 13/// Source of an event entering purgatory.
@@ -143,3 +145,40 @@ pub struct PrPurgatoryEntry {
143 #[serde(default)] 145 #[serde(default)]
144 pub source: EventSource, 146 pub source: EventSource,
145} 147}
148
149/// Entry for a repository announcement (kind 30617) waiting in purgatory.
150///
151/// Announcements are held in purgatory until git data arrives, proving
152/// the repository has actual content. This prevents serving announcements
153/// for empty repositories.
154///
155/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
156/// module to convert to/from serializable wrapper types.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct AnnouncementPurgatoryEntry {
159 /// The nostr announcement event (kind 30617)
160 pub event: Event,
161
162 /// The repository identifier from the event's 'd' tag
163 pub identifier: String,
164
165 /// The owner pubkey (event author)
166 pub owner: PublicKey,
167
168 /// Path to the bare git repository
169 pub repo_path: PathBuf,
170
171 /// Relay URLs from the announcement (for sync registration)
172 pub relays: HashSet<String>,
173
174 /// When this entry was added to purgatory
175 #[serde(skip, default = "instant_now")]
176 pub created_at: Instant,
177
178 /// Expiry deadline (30 min from creation, may be extended)
179 #[serde(skip, default = "instant_now")]
180 pub expires_at: Instant,
181
182 /// Whether the bare repo has been deleted (soft expiry)
183 pub soft_expired: bool,
184}
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs
index 39788bc..9899abc 100644
--- a/src/sync/algorithms.rs
+++ b/src/sync/algorithms.rs
@@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState};
25/// this repo need to sync from", it's "what repos does this relay need to sync". 25/// this repo need to sync from", it's "what repos does this relay need to sync".
26#[derive(Debug, Clone, Default)] 26#[derive(Debug, Clone, Default)]
27pub struct RelaySyncNeeds { 27pub struct RelaySyncNeeds {
28 /// Repos that need to be synced from this relay 28 /// Repos that need full L2+L3 sync from this relay
29 pub repos: HashSet<String>, 29 pub repos: HashSet<String>,
30 /// Repos that only need state event sync (purgatory announcements)
31 pub state_only_repos: HashSet<String>,
30 /// Root events that need to be tracked from this relay 32 /// Root events that need to be tracked from this relay
31 pub root_events: HashSet<EventId>, 33 pub root_events: HashSet<EventId>,
32} 34}
@@ -67,8 +69,15 @@ pub fn derive_relay_targets(
67 for relay_url in &needs.relays { 69 for relay_url in &needs.relays {
68 let entry = relay_targets.entry(relay_url.clone()).or_default(); 70 let entry = relay_targets.entry(relay_url.clone()).or_default();
69 71
70 entry.repos.insert(repo_id.clone()); 72 match needs.sync_level {
71 entry.root_events.extend(needs.root_events.iter().cloned()); 73 super::SyncLevel::Full => {
74 entry.repos.insert(repo_id.clone());
75 entry.root_events.extend(needs.root_events.iter().cloned());
76 }
77 super::SyncLevel::StateOnly => {
78 entry.state_only_repos.insert(repo_id.clone());
79 }
80 }
72 } 81 }
73 } 82 }
74 83
@@ -96,7 +105,7 @@ pub fn compute_actions(
96 pending: &HashMap<String, Vec<PendingBatch>>, 105 pending: &HashMap<String, Vec<PendingBatch>>,
97 confirmed: &HashMap<String, RelayState>, 106 confirmed: &HashMap<String, RelayState>,
98) -> Vec<AddFilters> { 107) -> Vec<AddFilters> {
99 use crate::sync::filters::build_layer2_and_layer3_filters; 108 use crate::sync::filters::build_sync_level_aware_filters;
100 109
101 let mut actions = Vec::new(); 110 let mut actions = Vec::new();
102 111
@@ -140,14 +149,22 @@ pub fn compute_actions(
140 .map(|state| state.root_events.clone()) 149 .map(|state| state.root_events.clone())
141 .unwrap_or_default(); 150 .unwrap_or_default();
142 151
143 // Calculate what's NEW (not in pending, not in confirmed) 152 // Calculate what's NEW for full repos (not in pending, not in confirmed)
144 let new_repos: HashSet<String> = target_needs 153 let new_full_repos: HashSet<String> = target_needs
145 .repos 154 .repos
146 .difference(&pending_repos) 155 .difference(&pending_repos)
147 .filter(|repo| !confirmed_repos.contains(*repo)) 156 .filter(|repo| !confirmed_repos.contains(*repo))
148 .cloned() 157 .cloned()
149 .collect(); 158 .collect();
150 159
160 // Calculate what's NEW for state-only repos
161 let new_state_only_repos: HashSet<String> = target_needs
162 .state_only_repos
163 .difference(&pending_repos)
164 .filter(|repo| !confirmed_repos.contains(*repo))
165 .cloned()
166 .collect();
167
151 let new_events: HashSet<EventId> = target_needs 168 let new_events: HashSet<EventId> = target_needs
152 .root_events 169 .root_events
153 .difference(&pending_events) 170 .difference(&pending_events)
@@ -156,13 +173,23 @@ pub fn compute_actions(
156 .collect(); 173 .collect();
157 174
158 // If there's anything new, create an AddFilters action 175 // If there's anything new, create an AddFilters action
159 if !new_repos.is_empty() || !new_events.is_empty() { 176 if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty()
160 let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); 177 {
178 let filters = build_sync_level_aware_filters(
179 &new_full_repos,
180 &new_state_only_repos,
181 &new_events,
182 None,
183 );
184
185 // Combine all repos into pending items (pending tracking doesn't need sync level)
186 let mut all_new_repos = new_full_repos;
187 all_new_repos.extend(new_state_only_repos);
161 188
162 actions.push(AddFilters { 189 actions.push(AddFilters {
163 relay_url: relay_url.clone(), 190 relay_url: relay_url.clone(),
164 items: PendingItems { 191 items: PendingItems {
165 repos: new_repos, 192 repos: all_new_repos,
166 root_events: new_events, 193 root_events: new_events,
167 }, 194 },
168 filters, 195 filters,
@@ -204,6 +231,7 @@ mod tests {
204 ModRepoSyncNeeds { 231 ModRepoSyncNeeds {
205 relays, 232 relays,
206 root_events, 233 root_events,
234 sync_level: Default::default(),
207 }, 235 },
208 ); 236 );
209 237
@@ -229,6 +257,7 @@ mod tests {
229 ModRepoSyncNeeds { 257 ModRepoSyncNeeds {
230 relays, 258 relays,
231 root_events: HashSet::new(), 259 root_events: HashSet::new(),
260 sync_level: Default::default(),
232 }, 261 },
233 ); 262 );
234 } 263 }
@@ -252,6 +281,7 @@ mod tests {
252 ModRepoSyncNeeds { 281 ModRepoSyncNeeds {
253 relays, 282 relays,
254 root_events: HashSet::new(), 283 root_events: HashSet::new(),
284 sync_level: Default::default(),
255 }, 285 },
256 ); 286 );
257 287
@@ -285,6 +315,7 @@ mod tests {
285 ModRepoSyncNeeds { 315 ModRepoSyncNeeds {
286 relays: relays1, 316 relays: relays1,
287 root_events: root_events1, 317 root_events: root_events1,
318 sync_level: Default::default(),
288 }, 319 },
289 ); 320 );
290 321
@@ -299,6 +330,7 @@ mod tests {
299 ModRepoSyncNeeds { 330 ModRepoSyncNeeds {
300 relays: relays2, 331 relays: relays2,
301 root_events: root_events2, 332 root_events: root_events2,
333 sync_level: Default::default(),
302 }, 334 },
303 ); 335 );
304 336
@@ -332,6 +364,7 @@ mod tests {
332 "wss://relay1.com".to_string(), 364 "wss://relay1.com".to_string(),
333 RelaySyncNeeds { 365 RelaySyncNeeds {
334 repos: vec!["repo1".to_string()].into_iter().collect(), 366 repos: vec!["repo1".to_string()].into_iter().collect(),
367 state_only_repos: HashSet::new(),
335 root_events: HashSet::new(), 368 root_events: HashSet::new(),
336 }, 369 },
337 ); 370 );
@@ -366,6 +399,7 @@ mod tests {
366 "wss://relay1.com".to_string(), 399 "wss://relay1.com".to_string(),
367 RelaySyncNeeds { 400 RelaySyncNeeds {
368 repos: vec!["repo1".to_string()].into_iter().collect(), 401 repos: vec!["repo1".to_string()].into_iter().collect(),
402 state_only_repos: HashSet::new(),
369 root_events: HashSet::new(), 403 root_events: HashSet::new(),
370 }, 404 },
371 ); 405 );
@@ -389,6 +423,7 @@ mod tests {
389 "wss://relay1.com".to_string(), 423 "wss://relay1.com".to_string(),
390 RelaySyncNeeds { 424 RelaySyncNeeds {
391 repos: vec!["repo1".to_string()].into_iter().collect(), 425 repos: vec!["repo1".to_string()].into_iter().collect(),
426 state_only_repos: HashSet::new(),
392 root_events: HashSet::new(), 427 root_events: HashSet::new(),
393 }, 428 },
394 ); 429 );
@@ -428,6 +463,7 @@ mod tests {
428 "wss://relay1.com".to_string(), 463 "wss://relay1.com".to_string(),
429 RelaySyncNeeds { 464 RelaySyncNeeds {
430 repos: vec!["repo1".to_string()].into_iter().collect(), 465 repos: vec!["repo1".to_string()].into_iter().collect(),
466 state_only_repos: HashSet::new(),
431 root_events: HashSet::new(), 467 root_events: HashSet::new(),
432 }, 468 },
433 ); 469 );
@@ -465,6 +501,7 @@ mod tests {
465 "wss://relay1.com".to_string(), 501 "wss://relay1.com".to_string(),
466 RelaySyncNeeds { 502 RelaySyncNeeds {
467 repos: vec!["repo1".to_string()].into_iter().collect(), 503 repos: vec!["repo1".to_string()].into_iter().collect(),
504 state_only_repos: HashSet::new(),
468 root_events: HashSet::new(), 505 root_events: HashSet::new(),
469 }, 506 },
470 ); 507 );
@@ -510,6 +547,7 @@ mod tests {
510 ] 547 ]
511 .into_iter() 548 .into_iter()
512 .collect(), 549 .collect(),
550 state_only_repos: HashSet::new(),
513 root_events: HashSet::new(), 551 root_events: HashSet::new(),
514 }, 552 },
515 ); 553 );
@@ -572,6 +610,7 @@ mod tests {
572 "wss://relay1.com".to_string(), 610 "wss://relay1.com".to_string(),
573 RelaySyncNeeds { 611 RelaySyncNeeds {
574 repos: HashSet::new(), 612 repos: HashSet::new(),
613 state_only_repos: HashSet::new(),
575 root_events: vec![event_id].into_iter().collect(), 614 root_events: vec![event_id].into_iter().collect(),
576 }, 615 },
577 ); 616 );
@@ -599,6 +638,7 @@ mod tests {
599 "wss://new-relay.com".to_string(), 638 "wss://new-relay.com".to_string(),
600 RelaySyncNeeds { 639 RelaySyncNeeds {
601 repos: vec!["repo1".to_string()].into_iter().collect(), 640 repos: vec!["repo1".to_string()].into_iter().collect(),
641 state_only_repos: HashSet::new(),
602 root_events: HashSet::new(), 642 root_events: HashSet::new(),
603 }, 643 },
604 ); 644 );
diff --git a/src/sync/filters.rs b/src/sync/filters.rs
index 3592489..1215e81 100644
--- a/src/sync/filters.rs
+++ b/src/sync/filters.rs
@@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters(
245 filters 245 filters
246} 246}
247 247
248/// Builds filters respecting SyncLevel for each repo
249///
250/// StateOnly repos only get state event filters (kind 30618).
251/// Full repos get all L2/L3 filters (state + repo-tagging + root event).
252///
253/// # Arguments
254/// * `full_repos` - Repos needing full L2+L3 sync
255/// * `state_only_repos` - Repos needing only state event sync (purgatory)
256/// * `root_events` - Root event IDs (only used for Full repos)
257/// * `since` - Optional timestamp for incremental sync
258pub fn build_sync_level_aware_filters(
259 full_repos: &HashSet<String>,
260 state_only_repos: &HashSet<String>,
261 root_events: &HashSet<EventId>,
262 since: Option<Timestamp>,
263) -> Vec<Filter> {
264 let mut filters = Vec::new();
265
266 // All repos (both Full and StateOnly) need state event filters
267 let all_repos: HashSet<String> = full_repos.union(state_only_repos).cloned().collect();
268 filters.extend(state_event_filters_for_our_repos(&all_repos, since));
269
270 // Only Full repos get repo-tagging and root event filters
271 if !full_repos.is_empty() {
272 filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since));
273 }
274 filters.extend(tagged_one_of_our_root_event_filters(root_events, since));
275
276 filters
277}
278
248#[cfg(test)] 279#[cfg(test)]
249mod tests { 280mod tests {
250 use super::*; 281 use super::*;
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index d6634ff..cd62380 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex;
85// Supporting Data Structures 85// Supporting Data Structures
86// ============================================================================= 86// =============================================================================
87 87
88/// Level of sync needed for a repository
89///
90/// Purgatory announcements only need state events synced (to validate git data).
91/// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.).
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
93pub enum SyncLevel {
94 /// Full L2 + L3 sync (promoted repos with git data)
95 #[default]
96 Full,
97 /// Only state events (kind 30618) - for purgatory announcements
98 StateOnly,
99}
100
88/// What repos and root events need to be synced 101/// What repos and root events need to be synced
89#[derive(Debug, Clone, Default)] 102#[derive(Debug, Clone, Default)]
90pub struct RepoSyncNeeds { 103pub struct RepoSyncNeeds {
@@ -92,6 +105,8 @@ pub struct RepoSyncNeeds {
92 pub relays: HashSet<String>, 105 pub relays: HashSet<String>,
93 /// Root event IDs - 1617/1618/1621 - that reference this repo 106 /// Root event IDs - 1617/1618/1621 - that reference this repo
94 pub root_events: HashSet<EventId>, 107 pub root_events: HashSet<EventId>,
108 /// Sync level - StateOnly for purgatory, Full for promoted repos
109 pub sync_level: SyncLevel,
95} 110}
96 111
97/// Connection status for a relay 112/// Connection status for a relay
@@ -382,6 +397,40 @@ async fn run_daily_timer(
382 } 397 }
383} 398}
384 399
400/// Background task that periodically syncs purgatory announcements into repo_sync_index.
401///
402/// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`).
403/// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in
404/// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the
405/// relay URLs listed in the announcement and subscribes to state events (kind 30618).
406///
407/// This is the sole registration path for purgatory announcements:
408/// - Sync-path announcements: registered here within one interval of arriving.
409/// - User-submitted purgatory announcements: the SelfSubscriber never sees them
410/// (they're rejected from DB), so this timer is the only registration path.
411async fn run_purgatory_announcement_sync(
412 sync_manager: Arc<Mutex<SyncManager>>,
413 mut shutdown_rx: broadcast::Receiver<()>,
414) {
415 let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
416 Duration::from_millis(200)
417 } else {
418 Duration::from_secs(5)
419 };
420 loop {
421 tokio::select! {
422 _ = tokio::time::sleep(interval) => {
423 let mut manager = sync_manager.lock().await;
424 manager.sync_purgatory_announcements_to_index().await;
425 }
426 _ = shutdown_rx.recv() => {
427 tracing::debug!("Purgatory announcement sync timer received shutdown signal");
428 break;
429 }
430 }
431 }
432}
433
385// Combined Health and Metrics Checker 434// Combined Health and Metrics Checker
386 435
387/// Background task for cleaning up expired entries from the rejected events index 436/// Background task for cleaning up expired entries from the rejected events index
@@ -936,9 +985,29 @@ impl SyncManager {
936 985
937 // Create REQ+EOSE subscriptions using original semantic filters 986 // Create REQ+EOSE subscriptions using original semantic filters
938 // This queries by kind/author/tags instead of by ID, which may 987 // This queries by kind/author/tags instead of by ID, which may
939 // succeed even when ID-based queries fail 988 // succeed even when ID-based queries fail.
940 let fallback_filters = filters::build_layer2_and_layer3_filters( 989 // Split batch_repos by SyncLevel to avoid sending Layer 2 filters
941 &batch_repos, 990 // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be
991 // rejected as orphan and then silently dropped by nostr-sdk deduplication.
992 let (full_repos, state_only_repos) = {
993 let repo_index = self.repo_sync_index.read().await;
994 let mut full = HashSet::new();
995 let mut state_only = HashSet::new();
996 for repo_ref in &batch_repos {
997 match repo_index.get(repo_ref).map(|n| n.sync_level) {
998 Some(SyncLevel::StateOnly) => {
999 state_only.insert(repo_ref.clone());
1000 }
1001 _ => {
1002 full.insert(repo_ref.clone());
1003 }
1004 }
1005 }
1006 (full, state_only)
1007 };
1008 let fallback_filters = filters::build_sync_level_aware_filters(
1009 &full_repos,
1010 &state_only_repos,
942 &batch_root_events, 1011 &batch_root_events,
943 None, 1012 None,
944 ); 1013 );
@@ -1272,7 +1341,7 @@ impl SyncManager {
1272 /// to be batched and create Layer 2/3 filters before we mark sync complete. 1341 /// to be batched and create Layer 2/3 filters before we mark sync complete.
1273 /// 1342 ///
1274 /// The 6-second delay is based on: 1343 /// The 6-second delay is based on:
1275 /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) 1344 /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`)
1276 /// - Buffer for processing: 1 second 1345 /// - Buffer for processing: 1 second
1277 /// 1346 ///
1278 /// Called after each batch is confirmed to detect completion. 1347 /// Called after each batch is confirmed to detect completion.
@@ -1486,7 +1555,17 @@ impl SyncManager {
1486 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; 1555 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1487 }); 1556 });
1488 1557
1489 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1558 // 11. Spawn purgatory announcement sync timer (every 5s)
1559 // Ensures purgatory announcements (including user-submitted ones that never
1560 // touch the DB) are registered in repo_sync_index as StateOnly so that
1561 // state event subscriptions are established on their listed relay URLs.
1562 let purgatory_sync_manager = Arc::clone(&sync_manager);
1563 let purgatory_sync_shutdown = shutdown_tx.subscribe();
1564 tokio::spawn(async move {
1565 run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await;
1566 });
1567
1568 // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1490 loop { 1569 loop {
1491 // Wait for an event without holding the lock 1570 // Wait for an event without holding the lock
1492 tokio::select! { 1571 tokio::select! {
@@ -1719,6 +1798,10 @@ impl SyncManager {
1719 1798
1720 // For sync-triggered events that go to purgatory, trigger immediate sync 1799 // For sync-triggered events that go to purgatory, trigger immediate sync
1721 // (instead of the default 3-minute delay for user-submitted events) 1800 // (instead of the default 3-minute delay for user-submitted events)
1801 //
1802 // Note: announcement events (kind 30617) are registered in repo_sync_index
1803 // by the purgatory announcement sync timer (run_purgatory_announcement_sync)
1804 // rather than inline here.
1722 if result == ProcessResult::Purgatory { 1805 if result == ProcessResult::Purgatory {
1723 // State events (kind 30618) - extract identifier and trigger immediate sync 1806 // State events (kind 30618) - extract identifier and trigger immediate sync
1724 if event.kind.as_u16() == 30618 { 1807 if event.kind.as_u16() == 30618 {
@@ -2303,6 +2386,80 @@ impl SyncManager {
2303 } 2386 }
2304 } 2387 }
2305 2388
2389 /// Sync purgatory announcements into repo_sync_index as StateOnly entries.
2390 ///
2391 /// Called periodically by the purgatory announcement sync timer (every 5s).
2392 /// For each announcement currently in purgatory, ensures a `StateOnly` entry
2393 /// exists in `repo_sync_index`. New entries are then picked up by
2394 /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes
2395 /// to state events for that repo.
2396 ///
2397 /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full).
2398 async fn sync_purgatory_announcements_to_index(&mut self) {
2399 use crate::sync::algorithms::{compute_actions, derive_relay_targets};
2400
2401 // Collect all purgatory announcements (snapshot - no async holds)
2402 let announcements = self.purgatory.announcements_for_sync();
2403
2404 if announcements.is_empty() {
2405 return;
2406 }
2407
2408 // Register any new entries in repo_sync_index as StateOnly
2409 let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
2410 {
2411 let mut index = self.repo_sync_index.write().await;
2412 for (repo_id, relays) in &announcements {
2413 let entry = index.entry(repo_id.clone()).or_insert_with(|| {
2414 tracing::debug!(
2415 repo_id = %repo_id,
2416 "Registering purgatory announcement in repo_sync_index as StateOnly"
2417 );
2418 RepoSyncNeeds {
2419 relays: std::collections::HashSet::new(),
2420 root_events: std::collections::HashSet::new(),
2421 sync_level: SyncLevel::StateOnly,
2422 }
2423 });
2424 // Don't downgrade an already-Full entry
2425 // Add any new relay URLs
2426 for relay in relays {
2427 if entry.relays.insert(relay.clone()) {
2428 new_relay_urls.insert(relay.clone());
2429 }
2430 }
2431 }
2432 }
2433
2434 if new_relay_urls.is_empty() {
2435 return;
2436 }
2437
2438 // For any relay URLs that are new, compute and send AddFilters actions
2439 let all_targets = {
2440 let repo_index = self.repo_sync_index.read().await;
2441 derive_relay_targets(&repo_index)
2442 };
2443
2444 let actions = {
2445 let pending_index = self.pending_sync_index.read().await;
2446 let relay_index = self.relay_sync_index.read().await;
2447 compute_actions(&all_targets, &pending_index, &relay_index)
2448 };
2449
2450 for action in actions {
2451 // Only act on relays that have new URLs (avoids redundant work)
2452 if new_relay_urls.contains(&action.relay_url) {
2453 tracing::info!(
2454 relay = %action.relay_url,
2455 repos = action.items.repos.len(),
2456 "Purgatory sync timer: connecting to new relay from purgatory announcement"
2457 );
2458 self.handle_new_sync_filters(action).await;
2459 }
2460 }
2461 }
2462
2306 /// Handle a relay disconnection 2463 /// Handle a relay disconnection
2307 /// 2464 ///
2308 /// This method is called when the event loop terminates and sends a disconnect notification. 2465 /// This method is called when the event loop terminates and sends a disconnect notification.
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 86e4583..4d69c9a 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -18,7 +18,7 @@ use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase; 19use crate::nostr::builder::SharedDatabase;
20 20
21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel};
22 22
23// ============================================================================= 23// =============================================================================
24// LoopControl - Result of notification processing 24// LoopControl - Result of notification processing
@@ -60,6 +60,7 @@ impl PendingUpdates {
60 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { 60 let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds {
61 relays: HashSet::new(), 61 relays: HashSet::new(),
62 root_events: HashSet::new(), 62 root_events: HashSet::new(),
63 sync_level: SyncLevel::Full,
63 }); 64 });
64 entry.relays.extend(relays); 65 entry.relays.extend(relays);
65 entry.root_events.extend(root_events); 66 entry.root_events.extend(root_events);
@@ -132,14 +133,14 @@ impl SelfSubscriber {
132 133
133 /// Get batch window from environment or use default 134 /// Get batch window from environment or use default
134 /// 135 ///
135 /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. 136 /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution.
136 /// Default: 5000ms (5 seconds) 137 /// Default: 5000ms (5 seconds)
137 fn get_batch_window() -> Duration { 138 fn get_batch_window() -> Duration {
138 std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") 139 if std::env::var("NGIT_TEST").as_deref() == Ok("1") {
139 .ok() 140 Duration::from_millis(200)
140 .and_then(|s| s.parse::<u64>().ok()) 141 } else {
141 .map(Duration::from_millis) 142 Duration::from_millis(5000)
142 .unwrap_or(Duration::from_millis(5000)) 143 }
143 } 144 }
144 145
145 /// Load existing events from database on startup 146 /// Load existing events from database on startup
@@ -197,6 +198,7 @@ impl SelfSubscriber {
197 .or_insert_with(|| RepoSyncNeeds { 198 .or_insert_with(|| RepoSyncNeeds {
198 relays: HashSet::new(), 199 relays: HashSet::new(),
199 root_events: HashSet::new(), 200 root_events: HashSet::new(),
201 sync_level: SyncLevel::StateOnly,
200 }); 202 });
201 entry.relays.extend(needs.relays.clone()); 203 entry.relays.extend(needs.relays.clone());
202 } 204 }
@@ -570,7 +572,12 @@ impl SelfSubscriber {
570 .or_insert_with(|| RepoSyncNeeds { 572 .or_insert_with(|| RepoSyncNeeds {
571 relays: HashSet::new(), 573 relays: HashSet::new(),
572 root_events: HashSet::new(), 574 root_events: HashSet::new(),
575 sync_level: SyncLevel::Full,
573 }); 576 });
577 // Upgrade sync_level to Full - this handles the case where the entry
578 // already exists as StateOnly (purgatory announcement) and is now being
579 // promoted (git data arrived and the event was broadcast via notify_event).
580 entry.sync_level = SyncLevel::Full;
574 entry.relays.extend(needs.relays); 581 entry.relays.extend(needs.relays);
575 entry.root_events.extend(needs.root_events); 582 entry.root_events.extend(needs.root_events);
576 583
@@ -594,21 +601,26 @@ impl SelfSubscriber {
594 continue; 601 continue;
595 } 602 }
596 603
597 // Build filters for these repos 604 // Build filters for these repos (sync-level-aware)
598 let filters = crate::sync::filters::build_layer2_and_layer3_filters( 605 let filters = crate::sync::filters::build_sync_level_aware_filters(
599 &needs.repos, 606 &needs.repos,
607 &needs.state_only_repos,
600 &needs.root_events, 608 &needs.root_events,
601 None, 609 None,
602 ); 610 );
603 611
604 // Log before moving values 612 // Log before moving values
605 let repo_count = needs.repos.len(); 613 let repo_count = needs.repos.len() + needs.state_only_repos.len();
606 let event_count = needs.root_events.len(); 614 let event_count = needs.root_events.len();
607 615
616 // Combine all repos into pending items
617 let mut all_repos = needs.repos;
618 all_repos.extend(needs.state_only_repos);
619
608 let action = AddFilters { 620 let action = AddFilters {
609 relay_url: relay_url.clone(), 621 relay_url: relay_url.clone(),
610 items: crate::sync::PendingItems { 622 items: crate::sync::PendingItems {
611 repos: needs.repos, 623 repos: all_repos,
612 root_events: needs.root_events, 624 root_events: needs.root_events,
613 }, 625 },
614 filters, 626 filters,