upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/git/authorization.rs38
-rw-r--r--src/git/sync.rs110
-rw-r--r--src/main.rs8
-rw-r--r--src/nostr/builder.rs23
-rw-r--r--src/nostr/policy/announcement.rs117
-rw-r--r--src/nostr/policy/state.rs10
-rw-r--r--src/purgatory/mod.rs260
-rw-r--r--src/purgatory/sync/context.rs7
-rw-r--r--src/purgatory/types.rs39
-rw-r--r--src/sync/mod.rs68
-rw-r--r--tests/archive_read_only.rs59
-rw-r--r--tests/purgatory.rs4
-rw-r--r--tests/purgatory_persistence.rs26
13 files changed, 691 insertions, 78 deletions
diff --git a/src/git/authorization.rs b/src/git/authorization.rs
index e174b51..9d53c4f 100644
--- a/src/git/authorization.rs
+++ b/src/git/authorization.rs
@@ -287,6 +287,39 @@ pub async fn fetch_repository_data(
287 }) 287 })
288} 288}
289 289
290/// Fetch repository data including announcements from purgatory
291///
292/// This combines database announcements with purgatory announcements,
293/// which is needed for authorization when the announcement hasn't been
294/// promoted yet (no git data has arrived).
295pub async fn fetch_repository_data_with_purgatory(
296 database: &SharedDatabase,
297 purgatory: &crate::purgatory::Purgatory,
298 identifier: &str,
299) -> Result<RepositoryData> {
300 // First, fetch from database
301 let mut repo_data = fetch_repository_data(database, identifier).await?;
302
303 // Then, add announcements from purgatory
304 let purgatory_announcements = purgatory.get_announcements_by_identifier(identifier);
305 let purgatory_count = purgatory_announcements.len();
306
307 for entry in purgatory_announcements {
308 if let Ok(announcement) = RepositoryAnnouncement::from_event(entry.event) {
309 repo_data.announcements.push(announcement);
310 }
311 }
312
313 debug!(
314 "Fetched repository data with purgatory: {} announcements ({} from purgatory), {} states",
315 repo_data.announcements.len(),
316 purgatory_count,
317 repo_data.states.len()
318 );
319
320 Ok(repo_data)
321}
322
290pub fn pubkey_authorised_for_repo_owners( 323pub fn pubkey_authorised_for_repo_owners(
291 pubkey: &PublicKey, 324 pubkey: &PublicKey,
292 db_repo_data: &RepositoryData, 325 db_repo_data: &RepositoryData,
@@ -539,8 +572,9 @@ pub async fn get_state_authorization_for_specific_owner_repo(
539 use crate::git::list_refs; 572 use crate::git::list_refs;
540 use crate::purgatory::RefUpdate; 573 use crate::purgatory::RefUpdate;
541 574
542 // Fetch announcements only - we don't need database states 575 // Fetch announcements from database AND purgatory - needed for authorization
543 let repo_data = fetch_repository_data(database, identifier).await?; 576 // when the announcement hasn't been promoted yet (no git data has arrived)
577 let repo_data = fetch_repository_data_with_purgatory(database, purgatory, identifier).await?;
544 578
545 if repo_data.announcements.is_empty() { 579 if repo_data.announcements.is_empty() {
546 return Ok(AuthorizationResult::denied( 580 return Ok(AuthorizationResult::denied(
diff --git a/src/git/sync.rs b/src/git/sync.rs
index e8e9655..13f30b6 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -51,6 +51,8 @@ use crate::purgatory::{can_apply_state, Purgatory};
51/// or from purgatory sync fetching OIDs from remote servers). 51/// or from purgatory sync fetching OIDs from remote servers).
52#[derive(Debug, Default, Clone)] 52#[derive(Debug, Default, Clone)]
53pub struct ProcessResult { 53pub struct ProcessResult {
54 /// Number of announcements released from purgatory
55 pub announcements_released: usize,
54 /// Number of state events released from purgatory 56 /// Number of state events released from purgatory
55 pub states_released: usize, 57 pub states_released: usize,
56 /// Number of PR events released from purgatory 58 /// Number of PR events released from purgatory
@@ -70,11 +72,12 @@ pub struct ProcessResult {
70impl ProcessResult { 72impl ProcessResult {
71 /// Check if any events were released 73 /// Check if any events were released
72 pub fn released_any(&self) -> bool { 74 pub fn released_any(&self) -> bool {
73 self.states_released > 0 || self.prs_released > 0 75 self.announcements_released > 0 || self.states_released > 0 || self.prs_released > 0
74 } 76 }
75 77
76 /// Merge another ProcessResult into this one 78 /// Merge another ProcessResult into this one
77 pub fn merge(&mut self, other: ProcessResult) { 79 pub fn merge(&mut self, other: ProcessResult) {
80 self.announcements_released += other.announcements_released;
78 self.states_released += other.states_released; 81 self.states_released += other.states_released;
79 self.prs_released += other.prs_released; 82 self.prs_released += other.prs_released;
80 self.repos_synced += other.repos_synced; 83 self.repos_synced += other.repos_synced;
@@ -836,6 +839,18 @@ pub async fn process_newly_available_git_data(
836 "Processing newly available git data" 839 "Processing newly available git data"
837 ); 840 );
838 841
842 // Process announcements from purgatory
843 let announcement_result = process_purgatory_announcements(
844 &identifier,
845 source_repo_path,
846 database,
847 local_relay,
848 purgatory,
849 git_data_path,
850 )
851 .await;
852 result.merge(announcement_result);
853
839 // Process state events from purgatory 854 // Process state events from purgatory
840 let state_result = process_purgatory_state_events( 855 let state_result = process_purgatory_state_events(
841 &identifier, 856 &identifier,
@@ -863,6 +878,7 @@ pub async fn process_newly_available_git_data(
863 if result.released_any() { 878 if result.released_any() {
864 info!( 879 info!(
865 identifier = %identifier, 880 identifier = %identifier,
881 announcements_released = result.announcements_released,
866 states_released = result.states_released, 882 states_released = result.states_released,
867 prs_released = result.prs_released, 883 prs_released = result.prs_released,
868 repos_synced = result.repos_synced, 884 repos_synced = result.repos_synced,
@@ -1250,6 +1266,90 @@ async fn process_purgatory_pr_events(
1250 result 1266 result
1251} 1267}
1252 1268
1269/// Process announcements from purgatory that can now be promoted.
1270///
1271/// When git data arrives for a repository, any announcements in purgatory
1272/// for that repository should be promoted to the database and served to clients.
1273async fn process_purgatory_announcements(
1274 identifier: &str,
1275 source_repo_path: &Path,
1276 database: &SharedDatabase,
1277 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1278 purgatory: &Purgatory,
1279 git_data_path: &Path,
1280) -> ProcessResult {
1281 let mut result = ProcessResult::default();
1282
1283 // Extract owner pubkey from the source repo path
1284 let owner_pubkey = match extract_owner_from_repo_path(source_repo_path, git_data_path) {
1285 Some(npub) => npub,
1286 None => {
1287 debug!(
1288 identifier = %identifier,
1289 "Could not extract owner from repo path"
1290 );
1291 return result;
1292 }
1293 };
1294
1295 // Parse the npub back to PublicKey
1296 let owner = match nostr_sdk::PublicKey::parse(&owner_pubkey) {
1297 Ok(pk) => pk,
1298 Err(e) => {
1299 warn!(
1300 identifier = %identifier,
1301 owner_pubkey = %owner_pubkey,
1302 error = %e,
1303 "Failed to parse owner pubkey"
1304 );
1305 result.errors.push(format!("Failed to parse owner pubkey: {}", e));
1306 return result;
1307 }
1308 };
1309
1310 // Check if there's an announcement in purgatory for this owner and identifier
1311 let announcement_event = purgatory.promote_announcement(&owner, identifier);
1312
1313 if let Some(event) = announcement_event {
1314 // Save to database
1315 match database.save_event(&event).await {
1316 Ok(_) => {
1317 info!(
1318 identifier = %identifier,
1319 event_id = %event.id,
1320 "Promoted announcement from purgatory to database"
1321 );
1322
1323 // Notify WebSocket subscribers
1324 if let Some(relay) = local_relay {
1325 if relay.notify_event(event.clone()) {
1326 debug!(
1327 identifier = %identifier,
1328 event_id = %event.id,
1329 "Broadcast announcement event to WebSocket listeners"
1330 );
1331 }
1332 }
1333
1334 result.announcements_released += 1;
1335 }
1336 Err(e) => {
1337 warn!(
1338 identifier = %identifier,
1339 event_id = %event.id,
1340 error = %e,
1341 "Failed to save announcement to database"
1342 );
1343 result
1344 .errors
1345 .push(format!("Failed to save announcement: {}", e));
1346 }
1347 }
1348 }
1349
1350 result
1351}
1352
1253/// Extract owner pubkey from a repository path. 1353/// Extract owner pubkey from a repository path.
1254/// 1354///
1255/// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. 1355/// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub.
@@ -1271,6 +1371,7 @@ mod tests {
1271 #[test] 1371 #[test]
1272 fn test_process_result_default() { 1372 fn test_process_result_default() {
1273 let result = ProcessResult::default(); 1373 let result = ProcessResult::default();
1374 assert_eq!(result.announcements_released, 0);
1274 assert_eq!(result.states_released, 0); 1375 assert_eq!(result.states_released, 0);
1275 assert_eq!(result.prs_released, 0); 1376 assert_eq!(result.prs_released, 0);
1276 assert_eq!(result.repos_synced, 0); 1377 assert_eq!(result.repos_synced, 0);
@@ -1282,6 +1383,10 @@ mod tests {
1282 let mut result = ProcessResult::default(); 1383 let mut result = ProcessResult::default();
1283 assert!(!result.released_any()); 1384 assert!(!result.released_any());
1284 1385
1386 result.announcements_released = 1;
1387 assert!(result.released_any());
1388
1389 result.announcements_released = 0;
1285 result.states_released = 1; 1390 result.states_released = 1;
1286 assert!(result.released_any()); 1391 assert!(result.released_any());
1287 1392
@@ -1293,6 +1398,7 @@ mod tests {
1293 #[test] 1398 #[test]
1294 fn test_process_result_merge() { 1399 fn test_process_result_merge() {
1295 let mut result1 = ProcessResult { 1400 let mut result1 = ProcessResult {
1401 announcements_released: 0,
1296 states_released: 1, 1402 states_released: 1,
1297 prs_released: 2, 1403 prs_released: 2,
1298 repos_synced: 3, 1404 repos_synced: 3,
@@ -1303,6 +1409,7 @@ mod tests {
1303 }; 1409 };
1304 1410
1305 let result2 = ProcessResult { 1411 let result2 = ProcessResult {
1412 announcements_released: 5,
1306 states_released: 10, 1413 states_released: 10,
1307 prs_released: 20, 1414 prs_released: 20,
1308 repos_synced: 30, 1415 repos_synced: 30,
@@ -1314,6 +1421,7 @@ mod tests {
1314 1421
1315 result1.merge(result2); 1422 result1.merge(result2);
1316 1423
1424 assert_eq!(result1.announcements_released, 5);
1317 assert_eq!(result1.states_released, 11); 1425 assert_eq!(result1.states_released, 11);
1318 assert_eq!(result1.prs_released, 22); 1426 assert_eq!(result1.prs_released, 22);
1319 assert_eq!(result1.repos_synced, 33); 1427 assert_eq!(result1.repos_synced, 33);
diff --git a/src/main.rs b/src/main.rs
index 5e5b83a..ab6ede7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -142,11 +142,11 @@ async fn main() -> Result<()> {
142 let mut interval = tokio::time::interval(Duration::from_secs(60)); 142 let mut interval = tokio::time::interval(Duration::from_secs(60));
143 loop { 143 loop {
144 interval.tick().await; 144 interval.tick().await;
145 let (state_removed, pr_removed) = cleanup_purgatory.cleanup(); 145 let (announcement_removed, state_removed, pr_removed) = cleanup_purgatory.cleanup();
146 if state_removed > 0 || pr_removed > 0 { 146 if announcement_removed > 0 || state_removed > 0 || pr_removed > 0 {
147 info!( 147 info!(
148 "Purgatory cleanup: removed {} state events, {} PR events", 148 "Purgatory cleanup: removed {} announcements, {} state events, {} PR events",
149 state_removed, pr_removed 149 announcement_removed, state_removed, pr_removed
150 ); 150 );
151 } 151 }
152 } 152 }
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 34014db..aff12a6 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -138,6 +138,29 @@ impl Nip34WritePolicy {
138 } 138 }
139 } 139 }
140 } 140 }
141 AnnouncementResult::AcceptPurgatory => {
142 // New announcement - add to purgatory
143 match self.announcement_policy.add_to_purgatory(event) {
144 Ok(()) => {
145 tracing::info!(
146 "Accepted announcement to purgatory: {} (waiting for git data)",
147 event_id_str
148 );
149 WritePolicyResult::Reject {
150 status: true, // Client sees OK
151 message: "purgatory: won't be served until git data arrives".into(),
152 }
153 }
154 Err(e) => {
155 tracing::warn!(
156 "Failed to add announcement to purgatory {}: {}",
157 event_id_str,
158 e
159 );
160 WritePolicyResult::reject(e)
161 }
162 }
163 }
141 AnnouncementResult::AcceptMaintainer => { 164 AnnouncementResult::AcceptMaintainer => {
142 // Parse announcement to get details for logging 165 // Parse announcement to get details for logging
143 match RepositoryAnnouncement::from_event(event.clone()) { 166 match RepositoryAnnouncement::from_event(event.clone()) {
diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs
index 15a6e58..1118497 100644
--- a/src/nostr/policy/announcement.rs
+++ b/src/nostr/policy/announcement.rs
@@ -3,6 +3,7 @@
3/// Handles validation of NIP-34 repository announcements (kind 30617) 3/// Handles validation of NIP-34 repository announcements (kind 30617)
4/// according to GRASP-01 specification. 4/// according to GRASP-01 specification.
5use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; 5use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag};
6use std::collections::HashSet;
6 7
7use super::PolicyContext; 8use super::PolicyContext;
8use crate::config::Config; 9use crate::config::Config;
@@ -11,12 +12,14 @@ use crate::nostr::events::{validate_announcement, RepositoryAnnouncement};
11/// Result of announcement policy evaluation 12/// Result of announcement policy evaluation
12#[derive(Debug, Clone, PartialEq)] 13#[derive(Debug, Clone, PartialEq)]
13pub enum AnnouncementResult { 14pub enum AnnouncementResult {
14 /// Accept: Event lists our service (GRASP-01 compliant) 15 /// Accept: Event lists our service (GRASP-01 compliant) - replacement announcement
15 Accept, 16 Accept,
16 /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer) 17 /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer)
17 AcceptMaintainer, 18 AcceptMaintainer,
18 /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only) 19 /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only)
19 AcceptArchive, 20 AcceptArchive,
21 /// Accept to purgatory: New announcement, waiting for git data
22 AcceptPurgatory,
20 /// Reject: Event fails validation with reason 23 /// Reject: Event fails validation with reason
21 Reject(String), 24 Reject(String),
22} 25}
@@ -35,10 +38,12 @@ impl AnnouncementPolicy {
35 38
36 /// Validate a repository announcement event 39 /// Validate a repository announcement event
37 /// 40 ///
38 /// Returns `Accept` if the announcement lists the service properly, 41 /// Returns:
39 /// `AcceptMaintainer` if accepted via maintainer exception, 42 /// - `Accept` if this is a replacement announcement (active announcement exists)
40 /// `AcceptArchive` if accepted via GRASP-05 archive config, 43 /// - `AcceptPurgatory` if this is a new announcement (no active announcement exists)
41 /// or `Reject` with reason. 44 /// - `AcceptMaintainer` if accepted via maintainer exception
45 /// - `AcceptArchive` if accepted via GRASP-05 archive config
46 /// - `Reject` with reason if validation fails
42 pub async fn validate(&self, event: &Event) -> AnnouncementResult { 47 pub async fn validate(&self, event: &Event) -> AnnouncementResult {
43 // First, try validation (GRASP-01 + GRASP-05) 48 // First, try validation (GRASP-01 + GRASP-05)
44 let validation_result = validate_announcement(event, &self.config); 49 let validation_result = validate_announcement(event, &self.config);
@@ -67,11 +72,111 @@ impl AnnouncementPolicy {
67 Err(_) => AnnouncementResult::Reject(reason), 72 Err(_) => AnnouncementResult::Reject(reason),
68 } 73 }
69 } 74 }
70 // Accept, AcceptArchive, or AcceptMaintainer - return as-is 75 AnnouncementResult::Accept | AnnouncementResult::AcceptArchive => {
76 // Parse announcement to check for existing active announcement
77 match RepositoryAnnouncement::from_event(event.clone()) {
78 Ok(announcement) => {
79 // Check if there's already an active announcement for this (pubkey, identifier)
80 match self
81 .has_active_announcement(&event.pubkey, &announcement.identifier)
82 .await
83 {
84 Ok(true) => {
85 // Replacement announcement - accept immediately
86 tracing::debug!(
87 identifier = %announcement.identifier,
88 "Replacement announcement - accepting immediately"
89 );
90 validation_result
91 }
92 Ok(false) => {
93 // New announcement - route to purgatory
94 tracing::debug!(
95 identifier = %announcement.identifier,
96 "New announcement - routing to purgatory"
97 );
98 AnnouncementResult::AcceptPurgatory
99 }
100 Err(e) => {
101 tracing::warn!(
102 error = %e,
103 "Failed to check for existing announcement - rejecting"
104 );
105 AnnouncementResult::Reject(format!(
106 "Database error checking existing announcement: {}",
107 e
108 ))
109 }
110 }
111 }
112 Err(e) => AnnouncementResult::Reject(format!(
113 "Failed to parse announcement: {}",
114 e
115 )),
116 }
117 }
118 // AcceptPurgatory shouldn't come from validate_announcement, but handle it
71 result => result, 119 result => result,
72 } 120 }
73 } 121 }
74 122
123 /// Check if there's an active announcement in the database for this (pubkey, identifier)
124 async fn has_active_announcement(
125 &self,
126 pubkey: &PublicKey,
127 identifier: &str,
128 ) -> Result<bool, String> {
129 let filter = Filter::new()
130 .kind(Kind::GitRepoAnnouncement)
131 .author(*pubkey)
132 .custom_tag(
133 SingleLetterTag::lowercase(Alphabet::D),
134 identifier.to_string(),
135 );
136
137 let events: Vec<Event> = match self.ctx.database.query(filter).await {
138 Ok(events) => events.into_iter().collect(),
139 Err(e) => return Err(format!("Database query failed: {}", e)),
140 };
141
142 Ok(!events.is_empty())
143 }
144
145 /// Add an announcement to purgatory
146 ///
147 /// Creates the bare repository and stores the announcement in purgatory
148 /// until git data arrives.
149 pub fn add_to_purgatory(&self, event: &Event) -> Result<(), String> {
150 let announcement = RepositoryAnnouncement::from_event(event.clone())
151 .map_err(|e| format!("Failed to parse announcement: {}", e))?;
152
153 // Create bare repository
154 self.ensure_bare_repository(&announcement)?;
155
156 // Build repo path
157 let repo_path = self.ctx.git_data_path.join(announcement.repo_path());
158
159 // Extract relays from announcement
160 let relays: HashSet<String> = announcement.relays.iter().cloned().collect();
161
162 // Add to purgatory
163 self.ctx.purgatory.add_announcement(
164 event.clone(),
165 announcement.identifier.clone(),
166 event.pubkey,
167 repo_path,
168 relays,
169 );
170
171 tracing::info!(
172 identifier = %announcement.identifier,
173 event_id = %event.id,
174 "Added announcement to purgatory"
175 );
176
177 Ok(())
178 }
179
75 /// Create a bare git repository if it doesn't exist 180 /// Create a bare git repository if it doesn't exist
76 /// Path format: <git_data_path>/<npub>/<identifier>.git 181 /// Path format: <git_data_path>/<npub>/<identifier>.git
77 pub fn ensure_bare_repository( 182 pub fn ensure_bare_repository(
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index f94f004..4bfb513 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -10,7 +10,7 @@ use nostr_relay_builder::prelude::Event;
10 10
11use super::PolicyContext; 11use super::PolicyContext;
12use crate::git; 12use crate::git;
13use crate::git::authorization::fetch_repository_data; 13use crate::git::authorization::fetch_repository_data_with_purgatory;
14use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; 14use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState};
15 15
16/// Result of state policy evaluation 16/// Result of state policy evaluation
@@ -76,7 +76,13 @@ impl StatePolicy {
76 } 76 }
77 77
78 // Get all repositories and state events from db with identifier 78 // Get all repositories and state events from db with identifier
79 let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; 79 // Include purgatory announcements for authorization
80 let db_repo_data = fetch_repository_data_with_purgatory(
81 &self.ctx.database,
82 &self.ctx.purgatory,
83 &state.identifier,
84 )
85 .await?;
80 86
81 // CRITICAL: Check if author is authorized via maintainer set 87 // CRITICAL: Check if author is authorized via maintainer set
82 // State events MUST be rejected if author is not in maintainer set of any accepted announcement 88 // State events MUST be rejected if author is not in maintainer set of any accepted announcement
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 47798a6..3b5514b 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -17,7 +17,7 @@ pub mod sync;
17mod types; 17mod types;
18 18
19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs};
20pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 20pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 21
22use dashmap::DashMap; 22use dashmap::DashMap;
23use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
@@ -100,7 +100,8 @@ struct PurgatoryState {
100 100
101/// Main purgatory structure holding events awaiting git data. 101/// Main purgatory structure holding events awaiting git data.
102/// 102///
103/// Provides thread-safe concurrent access to two separate stores: 103/// Provides thread-safe concurrent access to three separate stores:
104/// - Announcements indexed by (pubkey, identifier)
104/// - State events indexed by repository identifier 105/// - State events indexed by repository identifier
105/// - PR events indexed by event ID 106/// - PR events indexed by event ID
106/// 107///
@@ -121,6 +122,10 @@ struct PurgatoryState {
121/// that we've already determined have no git data available. 122/// that we've already determined have no git data available.
122#[derive(Clone)] 123#[derive(Clone)]
123pub struct Purgatory { 124pub struct Purgatory {
125 /// Repository announcements (kind 30617) indexed by (owner pubkey, identifier).
126 /// Key: (PublicKey, String) where String is the repository identifier.
127 announcement_purgatory: Arc<DashMap<(PublicKey, String), AnnouncementPurgatoryEntry>>,
128
124 /// State events (kind 30618) indexed by repository identifier. 129 /// State events (kind 30618) indexed by repository identifier.
125 /// Multiple state events can wait for the same identifier (different maintainers). 130 /// Multiple state events can wait for the same identifier (different maintainers).
126 state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>, 131 state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>,
@@ -145,6 +150,7 @@ impl Purgatory {
145 /// Create a new empty purgatory. 150 /// Create a new empty purgatory.
146 pub fn new(git_data_path: impl Into<PathBuf>) -> Self { 151 pub fn new(git_data_path: impl Into<PathBuf>) -> Self {
147 Self { 152 Self {
153 announcement_purgatory: Arc::new(DashMap::new()),
148 state_events: Arc::new(DashMap::new()), 154 state_events: Arc::new(DashMap::new()),
149 pr_events: Arc::new(DashMap::new()), 155 pr_events: Arc::new(DashMap::new()),
150 sync_queue: Arc::new(DashMap::new()), 156 sync_queue: Arc::new(DashMap::new()),
@@ -513,9 +519,171 @@ impl Purgatory {
513 self.pr_events.remove(event_id); 519 self.pr_events.remove(event_id);
514 } 520 }
515 521
522 // =========================================================================
523 // Announcement Purgatory Methods
524 // =========================================================================
525
526 /// Add a repository announcement to purgatory.
527 ///
528 /// The announcement will be held until git data arrives, at which point
529 /// it will be promoted to the database and served to clients.
530 ///
531 /// # Arguments
532 /// * `event` - The announcement event (kind 30617)
533 /// * `identifier` - The repository identifier from the 'd' tag
534 /// * `owner` - The owner pubkey (event author)
535 /// * `repo_path` - Path to the bare git repository
536 /// * `relays` - Relay URLs from the announcement (for sync registration)
537 pub fn add_announcement(
538 &self,
539 event: Event,
540 identifier: String,
541 owner: PublicKey,
542 repo_path: PathBuf,
543 relays: HashSet<String>,
544 ) {
545 let now = Instant::now();
546 let entry = AnnouncementPurgatoryEntry {
547 event,
548 identifier: identifier.clone(),
549 owner,
550 repo_path,
551 relays,
552 created_at: now,
553 expires_at: now + DEFAULT_EXPIRY,
554 soft_expired: false,
555 };
556
557 let key = (owner, identifier);
558 self.announcement_purgatory.insert(key.clone(), entry);
559
560 tracing::debug!(
561 owner = %key.0,
562 identifier = %key.1,
563 "Added announcement to purgatory"
564 );
565 }
566
567 /// Find an announcement in purgatory by owner and identifier.
568 ///
569 /// # Arguments
570 /// * `owner` - The owner pubkey
571 /// * `identifier` - The repository identifier
572 ///
573 /// # Returns
574 /// The announcement entry if found, None otherwise
575 pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option<AnnouncementPurgatoryEntry> {
576 let key = (*owner, identifier.to_string());
577 self.announcement_purgatory.get(&key).map(|entry| entry.clone())
578 }
579
580 /// Get all announcements in purgatory for a given identifier.
581 ///
582 /// This is used for authorization - state events and git pushes need to
583 /// check purgatory announcements for maintainer validation.
584 ///
585 /// # Arguments
586 /// * `identifier` - The repository identifier
587 ///
588 /// # Returns
589 /// Vector of announcement entries for this identifier
590 pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec<AnnouncementPurgatoryEntry> {
591 self.announcement_purgatory
592 .iter()
593 .filter(|entry| entry.key().1 == identifier)
594 .map(|entry| entry.value().clone())
595 .collect()
596 }
597
598 /// Remove an announcement from purgatory.
599 ///
600 /// # Arguments
601 /// * `owner` - The owner pubkey
602 /// * `identifier` - The repository identifier
603 pub fn remove_announcement(&self, owner: &PublicKey, identifier: &str) {
604 let key = (*owner, identifier.to_string());
605 self.announcement_purgatory.remove(&key);
606 tracing::debug!(
607 owner = %owner,
608 identifier = %identifier,
609 "Removed announcement from purgatory"
610 );
611 }
612
613 /// Promote an announcement from purgatory to active status.
614 ///
615 /// This is called when git data arrives. The announcement event is returned
616 /// so it can be saved to the database.
617 ///
618 /// # Arguments
619 /// * `owner` - The owner pubkey
620 /// * `identifier` - The repository identifier
621 ///
622 /// # Returns
623 /// The announcement event if found, None otherwise
624 pub fn promote_announcement(&self, owner: &PublicKey, identifier: &str) -> Option<Event> {
625 let key = (*owner, identifier.to_string());
626 self.announcement_purgatory.remove(&key).map(|(_, entry)| {
627 tracing::info!(
628 owner = %owner,
629 identifier = %identifier,
630 "Promoted announcement from purgatory to database"
631 );
632 entry.event
633 })
634 }
635
636 /// Check if there's an announcement in purgatory for the given owner and identifier.
637 ///
638 /// # Arguments
639 /// * `owner` - The owner pubkey
640 /// * `identifier` - The repository identifier
641 ///
642 /// # Returns
643 /// true if an announcement exists in purgatory, false otherwise
644 pub fn has_purgatory_announcement(&self, owner: &PublicKey, identifier: &str) -> bool {
645 let key = (*owner, identifier.to_string());
646 self.announcement_purgatory.contains_key(&key)
647 }
648
649 /// Extend the expiry for an announcement in purgatory.
650 ///
651 /// This is called when state events arrive for a purgatory announcement,
652 /// indicating the repository is actively receiving metadata.
653 ///
654 /// # Arguments
655 /// * `owner` - The owner pubkey
656 /// * `identifier` - The repository identifier
657 /// * `duration` - Minimum duration to guarantee from now
658 pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) {
659 let key = (*owner, identifier.to_string());
660 if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) {
661 let now = Instant::now();
662 let new_expiry = now + duration;
663 if entry.expires_at < new_expiry {
664 entry.expires_at = new_expiry;
665 // If soft-expired, revive it
666 if entry.soft_expired {
667 entry.soft_expired = false;
668 tracing::debug!(
669 owner = %owner,
670 identifier = %identifier,
671 "Revived soft-expired announcement"
672 );
673 }
674 }
675 }
676 }
677
678 /// Get count of announcements in purgatory.
679 pub fn announcement_count(&self) -> usize {
680 self.announcement_purgatory.len()
681 }
682
516 /// Get all event IDs currently stored in purgatory AND previously expired events. 683 /// Get all event IDs currently stored in purgatory AND previously expired events.
517 /// 684 ///
518 /// Returns a HashSet of all event IDs for: 685 /// Returns a HashSet of all event IDs for:
686 /// - Announcements currently held in purgatory
519 /// - State events currently held in purgatory 687 /// - State events currently held in purgatory
520 /// - PR events currently held in purgatory 688 /// - PR events currently held in purgatory
521 /// - Events that previously expired from purgatory without finding git data 689 /// - Events that previously expired from purgatory without finding git data
@@ -530,6 +698,11 @@ impl Purgatory {
530 pub fn event_ids(&self) -> HashSet<EventId> { 698 pub fn event_ids(&self) -> HashSet<EventId> {
531 let mut ids = HashSet::new(); 699 let mut ids = HashSet::new();
532 700
701 // Collect announcement event IDs
702 for entry in self.announcement_purgatory.iter() {
703 ids.insert(entry.value().event.id);
704 }
705
533 // Collect state event IDs 706 // Collect state event IDs
534 for entry in self.state_events.iter() { 707 for entry in self.state_events.iter() {
535 for state_entry in entry.value().iter() { 708 for state_entry in entry.value().iter() {
@@ -609,9 +782,28 @@ impl Purgatory {
609 /// will be filtered out during future negentropy/REQ sync operations. 782 /// will be filtered out during future negentropy/REQ sync operations.
610 /// 783 ///
611 /// # Returns 784 /// # Returns
612 /// Tuple of (num_state_removed, num_pr_removed) 785 /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed)
613 pub fn cleanup(&self) -> (usize, usize) { 786 pub fn cleanup(&self) -> (usize, usize, usize) {
614 let now = Instant::now(); 787 let now = Instant::now();
788
789 // Remove expired announcements and mark them as expired
790 let expired_announcements: Vec<(PublicKey, String, EventId)> = self
791 .announcement_purgatory
792 .iter()
793 .filter(|entry| entry.value().expires_at <= now)
794 .map(|entry| {
795 let key = entry.key();
796 let event_id = entry.value().event.id;
797 (key.0.clone(), key.1.clone(), event_id)
798 })
799 .collect();
800
801 let announcement_removed = expired_announcements.len();
802 for (owner, identifier, event_id) in expired_announcements {
803 self.mark_expired(event_id);
804 self.announcement_purgatory.remove(&(owner, identifier));
805 }
806
615 let mut state_removed = 0; 807 let mut state_removed = 0;
616 808
617 // Remove expired state events and mark them as expired 809 // Remove expired state events and mark them as expired
@@ -655,17 +847,17 @@ impl Purgatory {
655 self.pr_events.remove(&event_id_str); 847 self.pr_events.remove(&event_id_str);
656 } 848 }
657 849
658 (state_removed, pr_removed) 850 (announcement_removed, state_removed, pr_removed)
659 } 851 }
660 852
661 /// Remove expired entries from purgatory (legacy method). 853 /// Remove expired entries from purgatory (legacy method).
662 /// 854 ///
663 /// # Returns 855 /// # Returns
664 /// Total number of entries removed (state + PR events) 856 /// Total number of entries removed (announcement + state + PR events)
665 #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] 857 #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")]
666 pub fn remove_expired(&self) -> usize { 858 pub fn remove_expired(&self) -> usize {
667 let (state, pr) = self.cleanup(); 859 let (announcement, state, pr) = self.cleanup();
668 state + pr 860 announcement + state + pr
669 } 861 }
670 862
671 /// Remove old expired event records. 863 /// Remove old expired event records.
@@ -699,11 +891,12 @@ impl Purgatory {
699 /// Get current count of entries in purgatory. 891 /// Get current count of entries in purgatory.
700 /// 892 ///
701 /// # Returns 893 /// # Returns
702 /// Tuple of (state_event_count, pr_event_count) 894 /// Tuple of (announcement_count, state_event_count, pr_event_count)
703 pub fn count(&self) -> (usize, usize) { 895 pub fn count(&self) -> (usize, usize, usize) {
896 let announcement_count = self.announcement_purgatory.len();
704 let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); 897 let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum();
705 let pr_count = self.pr_events.len(); 898 let pr_count = self.pr_events.len();
706 (state_count, pr_count) 899 (announcement_count, state_count, pr_count)
707 } 900 }
708 901
709 /// Get count of expired events being tracked. 902 /// Get count of expired events being tracked.
@@ -717,6 +910,7 @@ impl Purgatory {
717 /// Clear all entries from purgatory (for testing). 910 /// Clear all entries from purgatory (for testing).
718 #[cfg(test)] 911 #[cfg(test)]
719 pub fn clear(&self) { 912 pub fn clear(&self) {
913 self.announcement_purgatory.clear();
720 self.state_events.clear(); 914 self.state_events.clear();
721 self.pr_events.clear(); 915 self.pr_events.clear();
722 self.sync_queue.clear(); 916 self.sync_queue.clear();
@@ -990,7 +1184,8 @@ mod tests {
990 #[test] 1184 #[test]
991 fn test_purgatory_creation() { 1185 fn test_purgatory_creation() {
992 let purgatory = Purgatory::new(PathBuf::new()); 1186 let purgatory = Purgatory::new(PathBuf::new());
993 let (state_count, pr_count) = purgatory.count(); 1187 let (announcement_count, state_count, pr_count) = purgatory.count();
1188 assert_eq!(announcement_count, 0);
994 assert_eq!(state_count, 0); 1189 assert_eq!(state_count, 0);
995 assert_eq!(pr_count, 0); 1190 assert_eq!(pr_count, 0);
996 } 1191 }
@@ -1008,7 +1203,8 @@ mod tests {
1008 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); 1203 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key());
1009 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); 1204 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string());
1010 1205
1011 let (state_count, pr_count) = purgatory.count(); 1206 let (announcement_count, state_count, pr_count) = purgatory.count();
1207 assert_eq!(announcement_count, 0);
1012 assert_eq!(state_count, 1); 1208 assert_eq!(state_count, 1);
1013 assert_eq!(pr_count, 1); 1209 assert_eq!(pr_count, 1);
1014 } 1210 }
@@ -1213,7 +1409,7 @@ fn test_cleanup_removes_expired_entries() {
1213 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); 1409 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string());
1214 1410
1215 // Verify entries are there 1411 // Verify entries are there
1216 let (state_count, pr_count) = purgatory.count(); 1412 let (_, state_count, pr_count) = purgatory.count();
1217 assert_eq!(state_count, 1); 1413 assert_eq!(state_count, 1);
1218 assert_eq!(pr_count, 2); 1414 assert_eq!(pr_count, 2);
1219 1415
@@ -1231,14 +1427,14 @@ fn test_cleanup_removes_expired_entries() {
1231 } 1427 }
1232 1428
1233 // Run cleanup 1429 // Run cleanup
1234 let (state_removed, pr_removed) = purgatory.cleanup(); 1430 let (_, state_removed, pr_removed) = purgatory.cleanup();
1235 1431
1236 // Verify counts 1432 // Verify counts
1237 assert_eq!(state_removed, 1); 1433 assert_eq!(state_removed, 1);
1238 assert_eq!(pr_removed, 2); 1434 assert_eq!(pr_removed, 2);
1239 1435
1240 // Verify entries are gone 1436 // Verify entries are gone
1241 let (state_count, pr_count) = purgatory.count(); 1437 let (_, state_count, pr_count) = purgatory.count();
1242 assert_eq!(state_count, 0); 1438 assert_eq!(state_count, 0);
1243 assert_eq!(pr_count, 0); 1439 assert_eq!(pr_count, 0);
1244} 1440}
@@ -1260,14 +1456,14 @@ fn test_cleanup_preserves_non_expired_entries() {
1260 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); 1456 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string());
1261 1457
1262 // Run cleanup 1458 // Run cleanup
1263 let (state_removed, pr_removed) = purgatory.cleanup(); 1459 let (_, state_removed, pr_removed) = purgatory.cleanup();
1264 1460
1265 // Nothing should be removed 1461 // Nothing should be removed
1266 assert_eq!(state_removed, 0); 1462 assert_eq!(state_removed, 0);
1267 assert_eq!(pr_removed, 0); 1463 assert_eq!(pr_removed, 0);
1268 1464
1269 // Verify entries are still there 1465 // Verify entries are still there
1270 let (state_count, pr_count) = purgatory.count(); 1466 let (_, state_count, pr_count) = purgatory.count();
1271 assert_eq!(state_count, 1); 1467 assert_eq!(state_count, 1);
1272 assert_eq!(pr_count, 1); 1468 assert_eq!(pr_count, 1);
1273} 1469}
@@ -1314,14 +1510,14 @@ fn test_cleanup_mixed_expired_and_fresh() {
1314 } 1510 }
1315 1511
1316 // Run cleanup 1512 // Run cleanup
1317 let (state_removed, pr_removed) = purgatory.cleanup(); 1513 let (_, state_removed, pr_removed) = purgatory.cleanup();
1318 1514
1319 // One of each should be removed 1515 // One of each should be removed
1320 assert_eq!(state_removed, 1); 1516 assert_eq!(state_removed, 1);
1321 assert_eq!(pr_removed, 1); 1517 assert_eq!(pr_removed, 1);
1322 1518
1323 // Verify remaining counts 1519 // Verify remaining counts
1324 let (state_count, pr_count) = purgatory.count(); 1520 let (_, state_count, pr_count) = purgatory.count();
1325 assert_eq!(state_count, 1); // One state event remains 1521 assert_eq!(state_count, 1); // One state event remains
1326 assert_eq!(pr_count, 1); // One PR event remains 1522 assert_eq!(pr_count, 1); // One PR event remains
1327} 1523}
@@ -1391,7 +1587,7 @@ fn test_expired_event_tracking() {
1391 } 1587 }
1392 1588
1393 // Run cleanup 1589 // Run cleanup
1394 let (state_removed, pr_removed) = purgatory.cleanup(); 1590 let (_, state_removed, pr_removed) = purgatory.cleanup();
1395 assert_eq!(state_removed, 1); 1591 assert_eq!(state_removed, 1);
1396 assert_eq!(pr_removed, 1); 1592 assert_eq!(pr_removed, 1);
1397 1593
@@ -1501,7 +1697,7 @@ fn test_expired_events_prevent_readdition() {
1501 } 1697 }
1502 1698
1503 // Event should NOT be re-added 1699 // Event should NOT be re-added
1504 let (state_count, _) = purgatory.count(); 1700 let (_, state_count, _) = purgatory.count();
1505 assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); 1701 assert_eq!(state_count, 0, "Event should not be re-added to purgatory");
1506} 1702}
1507 1703
@@ -1520,7 +1716,7 @@ fn test_pr_placeholder_not_marked_expired() {
1520 } 1716 }
1521 1717
1522 // Run cleanup 1718 // Run cleanup
1523 let (_, pr_removed) = purgatory.cleanup(); 1719 let (_, _, pr_removed) = purgatory.cleanup();
1524 assert_eq!(pr_removed, 1); 1720 assert_eq!(pr_removed, 1);
1525 1721
1526 // Expired count should be 0 (placeholders don't have event IDs to track) 1722 // Expired count should be 0 (placeholders don't have event IDs to track)
@@ -1606,7 +1802,7 @@ async fn test_save_and_restore_state_events() {
1606 assert!(!state_file.exists()); 1802 assert!(!state_file.exists());
1607 1803
1608 // Verify state events were restored 1804 // Verify state events were restored
1609 let (state_count, _) = purgatory2.count(); 1805 let (_, state_count, _) = purgatory2.count();
1610 assert_eq!(state_count, 2); 1806 assert_eq!(state_count, 2);
1611 1807
1612 let restored_entries = purgatory2.find_state("test-repo"); 1808 let restored_entries = purgatory2.find_state("test-repo");
@@ -1662,7 +1858,7 @@ async fn test_save_and_restore_pr_events() {
1662 purgatory2.restore_from_disk(&state_file).unwrap(); 1858 purgatory2.restore_from_disk(&state_file).unwrap();
1663 1859
1664 // Verify PR event was restored 1860 // Verify PR event was restored
1665 let (_, pr_count) = purgatory2.count(); 1861 let (_, _, pr_count) = purgatory2.count();
1666 assert_eq!(pr_count, 1); 1862 assert_eq!(pr_count, 1);
1667 1863
1668 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); 1864 let restored_entry = purgatory2.find_pr("pr-event-id").unwrap();
@@ -1691,7 +1887,7 @@ async fn test_save_and_restore_pr_placeholders() {
1691 purgatory2.restore_from_disk(&state_file).unwrap(); 1887 purgatory2.restore_from_disk(&state_file).unwrap();
1692 1888
1693 // Verify placeholder was restored 1889 // Verify placeholder was restored
1694 let (_, pr_count) = purgatory2.count(); 1890 let (_, _, pr_count) = purgatory2.count();
1695 assert_eq!(pr_count, 1); 1891 assert_eq!(pr_count, 1);
1696 1892
1697 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); 1893 let restored_entry = purgatory2.find_pr("placeholder-id").unwrap();
@@ -1769,7 +1965,7 @@ async fn test_save_and_restore_empty_purgatory() {
1769 purgatory2.restore_from_disk(&state_file).unwrap(); 1965 purgatory2.restore_from_disk(&state_file).unwrap();
1770 1966
1771 // Verify purgatory is still empty 1967 // Verify purgatory is still empty
1772 let (state_count, pr_count) = purgatory2.count(); 1968 let (_, state_count, pr_count) = purgatory2.count();
1773 assert_eq!(state_count, 0); 1969 assert_eq!(state_count, 0);
1774 assert_eq!(pr_count, 0); 1970 assert_eq!(pr_count, 0);
1775 assert_eq!(purgatory2.expired_count(), 0); 1971 assert_eq!(purgatory2.expired_count(), 0);
@@ -1789,7 +1985,7 @@ async fn test_restore_missing_file() {
1789 assert!(result.is_err()); 1985 assert!(result.is_err());
1790 1986
1791 // Purgatory should remain empty 1987 // Purgatory should remain empty
1792 let (state_count, pr_count) = purgatory.count(); 1988 let (_, state_count, pr_count) = purgatory.count();
1793 assert_eq!(state_count, 0); 1989 assert_eq!(state_count, 0);
1794 assert_eq!(pr_count, 0); 1990 assert_eq!(pr_count, 0);
1795} 1991}
@@ -1811,7 +2007,7 @@ async fn test_restore_corrupted_json() {
1811 assert!(result.is_err()); 2007 assert!(result.is_err());
1812 2008
1813 // Purgatory should remain empty 2009 // Purgatory should remain empty
1814 let (state_count, pr_count) = purgatory.count(); 2010 let (_, state_count, pr_count) = purgatory.count();
1815 assert_eq!(state_count, 0); 2011 assert_eq!(state_count, 0);
1816 assert_eq!(pr_count, 0); 2012 assert_eq!(pr_count, 0);
1817} 2013}
@@ -2044,7 +2240,7 @@ async fn test_mixed_pr_events_and_placeholders() {
2044 purgatory2.restore_from_disk(&state_file).unwrap(); 2240 purgatory2.restore_from_disk(&state_file).unwrap();
2045 2241
2046 // Verify both were restored correctly 2242 // Verify both were restored correctly
2047 let (_, pr_count) = purgatory2.count(); 2243 let (_, _, pr_count) = purgatory2.count();
2048 assert_eq!(pr_count, 2); 2244 assert_eq!(pr_count, 2);
2049 2245
2050 // Verify PR event 2246 // Verify PR event
@@ -2141,7 +2337,7 @@ async fn test_comprehensive_roundtrip() {
2141 purgatory.cleanup(); 2337 purgatory.cleanup();
2142 2338
2143 // Verify initial state 2339 // Verify initial state
2144 let (state_count, pr_count) = purgatory.count(); 2340 let (_, state_count, pr_count) = purgatory.count();
2145 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) 2341 assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up)
2146 assert_eq!(pr_count, 2); // pr-1, pr-2 2342 assert_eq!(pr_count, 2); // pr-1, pr-2
2147 assert_eq!(purgatory.expired_count(), 1); // expired_event 2343 assert_eq!(purgatory.expired_count(), 1); // expired_event
@@ -2154,7 +2350,7 @@ async fn test_comprehensive_roundtrip() {
2154 purgatory2.restore_from_disk(&state_file).unwrap(); 2350 purgatory2.restore_from_disk(&state_file).unwrap();
2155 2351
2156 // Verify all data was restored correctly 2352 // Verify all data was restored correctly
2157 let (state_count2, pr_count2) = purgatory2.count(); 2353 let (_, state_count2, pr_count2) = purgatory2.count();
2158 assert_eq!(state_count2, 2); 2354 assert_eq!(state_count2, 2);
2159 assert_eq!(pr_count2, 2); 2355 assert_eq!(pr_count2, 2);
2160 assert_eq!(purgatory2.expired_count(), 1); 2356 assert_eq!(purgatory2.expired_count(), 1);
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 33c2d12..778cdb8 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -279,7 +279,12 @@ impl SyncContext for RealSyncContext {
279 } 279 }
280 280
281 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> { 281 async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> {
282 crate::git::authorization::fetch_repository_data(&self.database, identifier).await 282 crate::git::authorization::fetch_repository_data_with_purgatory(
283 &self.database,
284 &self.purgatory,
285 identifier,
286 )
287 .await
283 } 288 }
284 289
285 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { 290 fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> {
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 919504b..d891bc9 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -6,6 +6,8 @@
6 6
7use nostr_sdk::prelude::*; 7use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize}; 8use serde::{Deserialize, Serialize};
9use std::collections::HashSet;
10use std::path::PathBuf;
9use std::time::Instant; 11use std::time::Instant;
10 12
11/// Default value for Instant fields during deserialization 13/// Default value for Instant fields during deserialization
@@ -113,3 +115,40 @@ pub struct PrPurgatoryEntry {
113 #[serde(skip, default = "instant_now")] 115 #[serde(skip, default = "instant_now")]
114 pub expires_at: Instant, 116 pub expires_at: Instant,
115} 117}
118
119/// Entry for a repository announcement (kind 30617) waiting in purgatory.
120///
121/// Announcements are held in purgatory until git data arrives, proving
122/// the repository has actual content. This prevents serving announcements
123/// for empty repositories.
124///
125/// Note: `Instant` fields cannot be serialized directly. Use the `persistence`
126/// module to convert to/from serializable wrapper types.
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct AnnouncementPurgatoryEntry {
129 /// The nostr announcement event (kind 30617)
130 pub event: Event,
131
132 /// The repository identifier from the event's 'd' tag
133 pub identifier: String,
134
135 /// The owner pubkey (event author)
136 pub owner: PublicKey,
137
138 /// Path to the bare git repository
139 pub repo_path: PathBuf,
140
141 /// Relay URLs from the announcement (for sync registration)
142 pub relays: HashSet<String>,
143
144 /// When this entry was added to purgatory
145 #[serde(skip, default = "instant_now")]
146 pub created_at: Instant,
147
148 /// Expiry deadline (30 min from creation, may be extended)
149 #[serde(skip, default = "instant_now")]
150 pub expires_at: Instant,
151
152 /// Whether the bare repo has been deleted (soft expiry)
153 pub soft_expired: bool,
154}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 1ee1872..872df66 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1719,8 +1719,50 @@ impl SyncManager {
1719 // For sync-triggered events that go to purgatory, trigger immediate sync 1719 // For sync-triggered events that go to purgatory, trigger immediate sync
1720 // (instead of the default 3-minute delay for user-submitted events) 1720 // (instead of the default 3-minute delay for user-submitted events)
1721 if result == ProcessResult::Purgatory { 1721 if result == ProcessResult::Purgatory {
1722 // Announcements (kind 30617) - re-process rejected state events
1723 // When an announcement goes to purgatory, state events that were
1724 // previously rejected ("no announcement exists") can now be authorized
1725 // via fetch_repository_data_with_purgatory.
1726 if event.kind == Kind::GitRepoAnnouncement {
1727 use crate::nostr::events::RepositoryAnnouncement;
1728
1729 if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) {
1730 // Re-process rejected state events for this announcement
1731 let (removed, hot_events) = rejected_events_index.invalidate_and_get(
1732 &event.pubkey,
1733 &announcement.identifier,
1734 Some(rejected_index::EventType::State),
1735 );
1736
1737 if removed > 0 {
1738 tracing::info!(
1739 pubkey = %event.pubkey,
1740 identifier = %announcement.identifier,
1741 removed_from_cold_index = removed,
1742 hot_cache_events = hot_events.len(),
1743 "Invalidated rejected state events (announcement now in purgatory)"
1744 );
1745 }
1746
1747 // Re-process state events from hot cache immediately
1748 if !hot_events.is_empty() {
1749 let _stats = Self::reprocess_events_from_hot_cache(
1750 hot_events,
1751 "state event (announcement in purgatory)",
1752 &event.pubkey,
1753 &announcement.identifier,
1754 &relay_url_clone,
1755 &database,
1756 &write_policy,
1757 &local_relay,
1758 &rejected_events_index,
1759 )
1760 .await;
1761 }
1762 }
1763 }
1722 // State events (kind 30618) - extract identifier and trigger immediate sync 1764 // State events (kind 30618) - extract identifier and trigger immediate sync
1723 if event.kind.as_u16() == 30618 { 1765 else if event.kind.as_u16() == 30618 {
1724 if let Some(identifier) = event.tags.iter().find_map(|tag| { 1766 if let Some(identifier) = event.tags.iter().find_map(|tag| {
1725 let tag_vec = tag.clone().to_vec(); 1767 let tag_vec = tag.clone().to_vec();
1726 if tag_vec.len() >= 2 && tag_vec[0] == "d" { 1768 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
@@ -1754,7 +1796,9 @@ impl SyncManager {
1754 1796
1755 // Track pagination state for this subscription (REQ+EOSE) 1797 // Track pagination state for this subscription (REQ+EOSE)
1756 // and received event IDs for negentropy batches 1798 // and received event IDs for negentropy batches
1757 if result == ProcessResult::Saved || result == ProcessResult::Duplicate { 1799 // Include Purgatory results so announcements in purgatory still trigger
1800 // per-repo sync (state events, PR events) from the source relay.
1801 if result == ProcessResult::Saved || result == ProcessResult::Duplicate || result == ProcessResult::Purgatory {
1758 let mut pending = pending_sync_index.write().await; 1802 let mut pending = pending_sync_index.write().await;
1759 if let Some(batches) = pending.get_mut(&relay_url_clone) { 1803 if let Some(batches) = pending.get_mut(&relay_url_clone) {
1760 for batch in batches.iter_mut() { 1804 for batch in batches.iter_mut() {
@@ -2506,6 +2550,26 @@ impl SyncManager {
2506 "{} added to purgatory (waiting for git data)", 2550 "{} added to purgatory (waiting for git data)",
2507 context 2551 context
2508 ); 2552 );
2553 // Trigger immediate sync for re-processed events that go to purgatory
2554 // (same as sync-triggered events in the main event loop)
2555 if event.kind.as_u16() == 30618 {
2556 // State event - extract identifier from 'd' tag
2557 if let Some(id) = event.tags.iter().find_map(|tag| {
2558 let tag_vec = tag.clone().to_vec();
2559 if tag_vec.len() >= 2 && tag_vec[0] == "d" {
2560 Some(tag_vec[1].clone())
2561 } else {
2562 None
2563 }
2564 }) {
2565 write_policy.purgatory().enqueue_sync_immediate(&id);
2566 }
2567 } else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 {
2568 // PR event - extract identifier from 'a' tag
2569 if let Some(id) = crate::git::sync::extract_identifier_from_pr_event(&event) {
2570 write_policy.purgatory().enqueue_sync_immediate(&id);
2571 }
2572 }
2509 } 2573 }
2510 ProcessResult::Rejected => { 2574 ProcessResult::Rejected => {
2511 stats.rejected += 1; 2575 stats.rejected += 1;
diff --git a/tests/archive_read_only.rs b/tests/archive_read_only.rs
index be6959b..e39b4b2 100644
--- a/tests/archive_read_only.rs
+++ b/tests/archive_read_only.rs
@@ -165,6 +165,7 @@ async fn test_archive_read_only_creates_bare_repo() {
165 // c) Put state event in purgatory (git data missing on archive relay) 165 // c) Put state event in purgatory (git data missing on archive relay)
166 // d) Fetch git data from source relay's clone URL 166 // d) Fetch git data from source relay's clone URL
167 // e) Release the state event from purgatory 167 // e) Release the state event from purgatory
168
168 let found = wait_for_event_served( 169 let found = wait_for_event_served(
169 archive_relay.url(), 170 archive_relay.url(),
170 &state_event_id, 171 &state_event_id,
@@ -267,11 +268,13 @@ async fn test_archive_read_only_creates_bare_repo() {
267/// This verifies the security model: archive mode only syncs git data 268/// This verifies the security model: archive mode only syncs git data
268/// when there are state events to validate against. 269/// when there are state events to validate against.
269/// 270///
270/// Scenario: 271/// With announcement purgatory, the flow is:
271/// 1. Start source relay with announcement only (no state events) 272/// 1. Send announcement to source relay (goes to purgatory)
272/// 2. Start archive relay syncing from source 273/// 2. Send state event to source relay (goes to purgatory)
273/// 3. Archive relay syncs announcement (creates bare repo) 274/// 3. Push git data to source relay (promotes announcement and state event)
274/// 4. Verify git data is NOT synced (no state events to trigger purgatory sync) 275/// 4. Start archive relay with sync from source
276/// 5. Archive relay syncs the promoted announcement
277/// 6. Verify git data is NOT synced (archive has no state event to authorize git fetch)
275#[tokio::test] 278#[tokio::test]
276async fn test_archive_without_state_events_does_not_sync_git() { 279async fn test_archive_without_state_events_does_not_sync_git() {
277 // 1. Start source relay 280 // 1. Start source relay
@@ -290,7 +293,7 @@ async fn test_archive_without_state_events_does_not_sync_git() {
290 293
291 let npub = keys.public_key().to_bech32().expect("Failed to get npub"); 294 let npub = keys.public_key().to_bech32().expect("Failed to get npub");
292 295
293 // 3. Create and send announcement listing BOTH relays (but NO state event) 296 // 3. Create and send announcement listing BOTH relays
294 let announcement = create_repo_announcement( 297 let announcement = create_repo_announcement(
295 &keys, 298 &keys,
296 &[&source_relay.domain(), &archive_domain], 299 &[&source_relay.domain(), &archive_domain],
@@ -306,7 +309,7 @@ async fn test_archive_without_state_events_does_not_sync_git() {
306 309
307 tokio::time::sleep(Duration::from_millis(500)).await; 310 tokio::time::sleep(Duration::from_millis(500)).await;
308 311
309 // Send announcement to source relay 312 // Send announcement to source relay (goes to purgatory)
310 source_client 313 source_client
311 .send_event(&announcement) 314 .send_event(&announcement)
312 .await 315 .await
@@ -314,11 +317,39 @@ async fn test_archive_without_state_events_does_not_sync_git() {
314 317
315 tokio::time::sleep(Duration::from_millis(200)).await; 318 tokio::time::sleep(Duration::from_millis(200)).await;
316 319
317 // 4. Push git data to source relay (but no state event to authorize it) 320 // 4. Create and send state event to source relay (goes to purgatory)
318 // This push will fail because there's no state event in purgatory 321 let clone_url = format!(
319 // That's expected - we're testing that archive mode doesn't blindly fetch git data 322 "http://{}/{}/{}.git",
323 source_relay.domain(),
324 npub,
325 identifier
326 );
327 let relay_url = source_relay.url().to_string();
328
329 let state_event = create_state_event(
330 &keys,
331 identifier,
332 &[("main", &commit_hash)],
333 &[],
334 &[&clone_url],
335 &[&relay_url],
336 )
337 .expect("Failed to create state event");
338
339 source_client
340 .send_event(&state_event)
341 .await
342 .expect("Failed to send state event to source");
343
344 tokio::time::sleep(Duration::from_millis(200)).await;
345
346 // 5. Push git data to source relay (promotes announcement and state event)
347 push_to_relay(temp_dir.path(), &source_relay.domain(), &npub, identifier)
348 .expect("Push to source should succeed");
349
350 tokio::time::sleep(Duration::from_millis(500)).await;
320 351
321 // 5. Start archive relay 352 // 6. Start archive relay (without state event - we don't send state event to archive)
322 let archive_relay = TestRelay::start_with_archive_and_sync( 353 let archive_relay = TestRelay::start_with_archive_and_sync(
323 archive_port, 354 archive_port,
324 Some(source_relay.url().to_string()), 355 Some(source_relay.url().to_string()),
@@ -333,10 +364,10 @@ async fn test_archive_without_state_events_does_not_sync_git() {
333 .await 364 .await
334 .expect("Sync connection should establish"); 365 .expect("Sync connection should establish");
335 366
336 // Give time for any potential git sync to happen 367 // Give time for sync to fetch announcement
337 tokio::time::sleep(Duration::from_secs(3)).await; 368 tokio::time::sleep(Duration::from_secs(3)).await;
338 369
339 // 6. Verify bare repository was created (announcement was accepted) 370 // 7. Verify bare repository was created (announcement was synced and accepted to purgatory)
340 let repo_path = archive_relay 371 let repo_path = archive_relay
341 .git_data_path() 372 .git_data_path()
342 .join(format!("{}/{}.git", npub, identifier)); 373 .join(format!("{}/{}.git", npub, identifier));
@@ -346,7 +377,7 @@ async fn test_archive_without_state_events_does_not_sync_git() {
346 "Bare repository should be created for archive announcement" 377 "Bare repository should be created for archive announcement"
347 ); 378 );
348 379
349 // 7. Verify git data was NOT synced (no state events to trigger purgatory sync) 380 // 8. Verify git data was NOT synced (no state events on archive to trigger git fetch)
350 // Check that the commit does NOT exist in the archive relay's repo 381 // Check that the commit does NOT exist in the archive relay's repo
351 let output = tokio::process::Command::new("git") 382 let output = tokio::process::Command::new("git")
352 .args(["cat-file", "-t", &commit_hash]) 383 .args(["cat-file", "-t", &commit_hash])
diff --git a/tests/purgatory.rs b/tests/purgatory.rs
index e99540b..efc28c9 100644
--- a/tests/purgatory.rs
+++ b/tests/purgatory.rs
@@ -58,10 +58,10 @@ macro_rules! isolated_purgatory_test {
58} 58}
59 59
60// ============================================================ 60// ============================================================
61// Announcement Purgatory Tests (commented out - feature not yet implemented) 61// Announcement Purgatory Tests
62// ============================================================ 62// ============================================================
63 63
64// isolated_purgatory_test!(test_announcement_not_served_before_git_data); 64isolated_purgatory_test!(test_announcement_not_served_before_git_data);
65isolated_purgatory_test!(test_announcement_served_after_git_push); 65isolated_purgatory_test!(test_announcement_served_after_git_push);
66isolated_purgatory_test!(test_bare_repo_exists_for_purgatory_announcement); 66isolated_purgatory_test!(test_bare_repo_exists_for_purgatory_announcement);
67isolated_purgatory_test!(test_state_event_accepted_for_purgatory_announcement); 67isolated_purgatory_test!(test_state_event_accepted_for_purgatory_announcement);
diff --git a/tests/purgatory_persistence.rs b/tests/purgatory_persistence.rs
index fe37c33..5abbf15 100644
--- a/tests/purgatory_persistence.rs
+++ b/tests/purgatory_persistence.rs
@@ -120,7 +120,8 @@ async fn test_full_purgatory_save_restore_cycle() {
120 // so we'll focus on testing state and PR events persistence 120 // so we'll focus on testing state and PR events persistence
121 121
122 // Verify initial counts 122 // Verify initial counts
123 let (state_count, pr_count) = purgatory.count(); 123 let (announcement_count, state_count, pr_count) = purgatory.count();
124 assert_eq!(announcement_count, 0, "Should have 0 announcements");
124 assert_eq!(state_count, 2, "Should have 2 state events"); 125 assert_eq!(state_count, 2, "Should have 2 state events");
125 assert_eq!( 126 assert_eq!(
126 pr_count, 3, 127 pr_count, 3,
@@ -142,7 +143,8 @@ async fn test_full_purgatory_save_restore_cycle() {
142 ); 143 );
143 144
144 // Verify all data was restored 145 // Verify all data was restored
145 let (state_count2, pr_count2) = purgatory2.count(); 146 let (announcement_count2, state_count2, pr_count2) = purgatory2.count();
147 assert_eq!(announcement_count2, 0, "Should have 0 announcements after restore");
146 assert_eq!(state_count2, 2, "Should have 2 state events after restore"); 148 assert_eq!(state_count2, 2, "Should have 2 state events after restore");
147 assert_eq!( 149 assert_eq!(
148 pr_count2, 3, 150 pr_count2, 3,
@@ -275,7 +277,7 @@ async fn test_purgatory_downtime_adjustment() {
275 purgatory2.restore_from_disk(&state_path).unwrap(); 277 purgatory2.restore_from_disk(&state_path).unwrap();
276 278
277 // Verify event is still there (downtime was accounted for) 279 // Verify event is still there (downtime was accounted for)
278 let (state_count, _) = purgatory2.count(); 280 let (_, state_count, _) = purgatory2.count();
279 assert_eq!(state_count, 1); 281 assert_eq!(state_count, 1);
280 282
281 let repo1_states = purgatory2.find_state("repo1"); 283 let repo1_states = purgatory2.find_state("repo1");
@@ -401,7 +403,7 @@ async fn test_purgatory_restore_missing_file() {
401 assert!(result.is_err(), "Should error on missing file"); 403 assert!(result.is_err(), "Should error on missing file");
402 404
403 // Purgatory should still be usable (empty state) 405 // Purgatory should still be usable (empty state)
404 let (state_count, pr_count) = purgatory.count(); 406 let (_, state_count, pr_count) = purgatory.count();
405 assert_eq!(state_count, 0); 407 assert_eq!(state_count, 0);
406 assert_eq!(pr_count, 0); 408 assert_eq!(pr_count, 0);
407 409
@@ -410,7 +412,7 @@ async fn test_purgatory_restore_missing_file() {
410 let event = create_test_event(&keys, "test").await; 412 let event = create_test_event(&keys, "test").await;
411 purgatory.add_state(event, "repo1".to_string(), keys.public_key()); 413 purgatory.add_state(event, "repo1".to_string(), keys.public_key());
412 414
413 let (state_count, _) = purgatory.count(); 415 let (_, state_count, _) = purgatory.count();
414 assert_eq!(state_count, 1); 416 assert_eq!(state_count, 1);
415} 417}
416 418
@@ -461,7 +463,7 @@ async fn test_purgatory_restore_corrupted_file() {
461 assert!(result.is_err(), "Should error on corrupted file"); 463 assert!(result.is_err(), "Should error on corrupted file");
462 464
463 // Purgatory should still be usable 465 // Purgatory should still be usable
464 let (state_count, pr_count) = purgatory.count(); 466 let (_, state_count, pr_count) = purgatory.count();
465 assert_eq!(state_count, 0); 467 assert_eq!(state_count, 0);
466 assert_eq!(pr_count, 0); 468 assert_eq!(pr_count, 0);
467} 469}
@@ -504,7 +506,7 @@ async fn test_empty_purgatory_save_restore() {
504 purgatory2.restore_from_disk(&state_path).unwrap(); 506 purgatory2.restore_from_disk(&state_path).unwrap();
505 507
506 // Verify empty state 508 // Verify empty state
507 let (state_count, pr_count) = purgatory2.count(); 509 let (_, state_count, pr_count) = purgatory2.count();
508 assert_eq!(state_count, 0); 510 assert_eq!(state_count, 0);
509 assert_eq!(pr_count, 0); 511 assert_eq!(pr_count, 0);
510 assert_eq!(purgatory2.expired_count(), 0); 512 assert_eq!(purgatory2.expired_count(), 0);
@@ -591,7 +593,7 @@ async fn test_purgatory_continues_working_after_restore() {
591 purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key()); 593 purgatory2.add_state(event2.clone(), "repo2".to_string(), keys.public_key());
592 594
593 // Verify both old and new events work 595 // Verify both old and new events work
594 let (state_count, _) = purgatory2.count(); 596 let (_, state_count, _) = purgatory2.count();
595 assert_eq!(state_count, 2); 597 assert_eq!(state_count, 2);
596 598
597 let repo1_states = purgatory2.find_state("repo1"); 599 let repo1_states = purgatory2.find_state("repo1");
@@ -603,7 +605,7 @@ async fn test_purgatory_continues_working_after_restore() {
603 assert_eq!(repo2_states[0].event.id, event2.id); 605 assert_eq!(repo2_states[0].event.id, event2.id);
604 606
605 // Verify cleanup still works 607 // Verify cleanup still works
606 let (state_removed, pr_removed) = purgatory2.cleanup(); 608 let (_, state_removed, pr_removed) = purgatory2.cleanup();
607 // Nothing should be expired yet 609 // Nothing should be expired yet
608 assert_eq!(state_removed, 0); 610 assert_eq!(state_removed, 0);
609 assert_eq!(pr_removed, 0); 611 assert_eq!(pr_removed, 0);
@@ -684,15 +686,15 @@ async fn test_purgatory_entries_expired_during_downtime() {
684 purgatory2.restore_from_disk(&state_path).unwrap(); 686 purgatory2.restore_from_disk(&state_path).unwrap();
685 687
686 // Event should be restored 688 // Event should be restored
687 let (state_count, _) = purgatory2.count(); 689 let (_, state_count, _) = purgatory2.count();
688 assert_eq!(state_count, 1); 690 assert_eq!(state_count, 1);
689 691
690 // Cleanup should work (even if nothing is expired yet) 692 // Cleanup should work (even if nothing is expired yet)
691 let (state_removed, _) = purgatory2.cleanup(); 693 let (_, state_removed, _) = purgatory2.cleanup();
692 // Nothing expired yet since we didn't wait 30 minutes 694 // Nothing expired yet since we didn't wait 30 minutes
693 assert_eq!(state_removed, 0); 695 assert_eq!(state_removed, 0);
694 696
695 let (state_count, _) = purgatory2.count(); 697 let (_, state_count, _) = purgatory2.count();
696 assert_eq!(state_count, 1); 698 assert_eq!(state_count, 1);
697} 699}
698 700