diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/git/authorization.rs | 59 | ||||
| -rw-r--r-- | src/git/handlers.rs | 7 | ||||
| -rw-r--r-- | src/git/sync.rs | 234 | ||||
| -rw-r--r-- | src/http/mod.rs | 22 | ||||
| -rw-r--r-- | src/main.rs | 21 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 33 | ||||
| -rw-r--r-- | src/nostr/policy/announcement.rs | 273 | ||||
| -rw-r--r-- | src/nostr/policy/deletion.rs | 498 | ||||
| -rw-r--r-- | src/nostr/policy/mod.rs | 2 | ||||
| -rw-r--r-- | src/nostr/policy/pr_event.rs | 8 | ||||
| -rw-r--r-- | src/nostr/policy/related.rs | 5 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 75 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 659 | ||||
| -rw-r--r-- | src/purgatory/sync/context.rs | 24 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 39 | ||||
| -rw-r--r-- | src/sync/algorithms.rs | 58 | ||||
| -rw-r--r-- | src/sync/filters.rs | 31 | ||||
| -rw-r--r-- | src/sync/mod.rs | 167 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 34 |
19 files changed, 2161 insertions, 88 deletions
diff --git a/src/git/authorization.rs b/src/git/authorization.rs index 27107db..df780bb 100644 --- a/src/git/authorization.rs +++ b/src/git/authorization.rs | |||
| @@ -287,6 +287,39 @@ pub async fn fetch_repository_data( | |||
| 287 | }) | 287 | }) |
| 288 | } | 288 | } |
| 289 | 289 | ||
| 290 | /// Fetch repository data including announcements from purgatory | ||
| 291 | /// | ||
| 292 | /// This combines database announcements with purgatory announcements, | ||
| 293 | /// which is needed for authorization when the announcement hasn't been | ||
| 294 | /// promoted yet (no git data has arrived). | ||
| 295 | pub async fn fetch_repository_data_with_purgatory( | ||
| 296 | database: &SharedDatabase, | ||
| 297 | purgatory: &crate::purgatory::Purgatory, | ||
| 298 | identifier: &str, | ||
| 299 | ) -> Result<RepositoryData> { | ||
| 300 | // First, fetch from database | ||
| 301 | let mut repo_data = fetch_repository_data(database, identifier).await?; | ||
| 302 | |||
| 303 | // Then, add announcements from purgatory | ||
| 304 | let purgatory_announcements = purgatory.get_announcements_by_identifier(identifier); | ||
| 305 | let purgatory_count = purgatory_announcements.len(); | ||
| 306 | |||
| 307 | for entry in purgatory_announcements { | ||
| 308 | if let Ok(announcement) = RepositoryAnnouncement::from_event(entry.event) { | ||
| 309 | repo_data.announcements.push(announcement); | ||
| 310 | } | ||
| 311 | } | ||
| 312 | |||
| 313 | debug!( | ||
| 314 | "Fetched repository data with purgatory: {} announcements ({} from purgatory), {} states", | ||
| 315 | repo_data.announcements.len(), | ||
| 316 | purgatory_count, | ||
| 317 | repo_data.states.len() | ||
| 318 | ); | ||
| 319 | |||
| 320 | Ok(repo_data) | ||
| 321 | } | ||
| 322 | |||
| 290 | pub fn pubkey_authorised_for_repo_owners( | 323 | pub fn pubkey_authorised_for_repo_owners( |
| 291 | pubkey: &PublicKey, | 324 | pubkey: &PublicKey, |
| 292 | db_repo_data: &RepositoryData, | 325 | db_repo_data: &RepositoryData, |
| @@ -539,8 +572,9 @@ pub async fn get_state_authorization_for_specific_owner_repo( | |||
| 539 | use crate::git::list_refs; | 572 | use crate::git::list_refs; |
| 540 | use crate::purgatory::RefUpdate; | 573 | use crate::purgatory::RefUpdate; |
| 541 | 574 | ||
| 542 | // Fetch announcements only - we don't need database states | 575 | // Fetch announcements from database AND purgatory - needed for authorization |
| 543 | let repo_data = fetch_repository_data(database, identifier).await?; | 576 | // when the announcement hasn't been promoted yet (no git data has arrived) |
| 577 | let repo_data = fetch_repository_data_with_purgatory(database, purgatory, identifier).await?; | ||
| 544 | 578 | ||
| 545 | if repo_data.announcements.is_empty() { | 579 | if repo_data.announcements.is_empty() { |
| 546 | return Ok(AuthorizationResult::denied( | 580 | return Ok(AuthorizationResult::denied( |
| @@ -649,6 +683,27 @@ pub async fn get_state_authorization_for_specific_owner_repo( | |||
| 649 | .unwrap_or_else(|_| latest_authorized.pubkey.to_hex()) | 683 | .unwrap_or_else(|_| latest_authorized.pubkey.to_hex()) |
| 650 | ); | 684 | ); |
| 651 | 685 | ||
| 686 | // Extend purgatory announcement expiry for the owner. | ||
| 687 | // | ||
| 688 | // Per design doc decision #4: git auth extending a state event's expiry | ||
| 689 | // also extends the announcement's expiry. The repo is actively receiving | ||
| 690 | // git data, so the announcement should not expire prematurely. | ||
| 691 | // This also revives soft-expired announcements (recreates bare repo). | ||
| 692 | if let Ok(owner_pk) = PublicKey::parse(owner_pubkey) { | ||
| 693 | if purgatory.has_purgatory_announcement(&owner_pk, identifier) { | ||
| 694 | purgatory.extend_announcement_expiry( | ||
| 695 | &owner_pk, | ||
| 696 | identifier, | ||
| 697 | std::time::Duration::from_secs(1800), | ||
| 698 | ); | ||
| 699 | debug!( | ||
| 700 | identifier = %identifier, | ||
| 701 | owner = %owner_pubkey, | ||
| 702 | "Extended purgatory announcement expiry due to git push authorization" | ||
| 703 | ); | ||
| 704 | } | ||
| 705 | } | ||
| 706 | |||
| 652 | return Ok(AuthorizationResult { | 707 | return Ok(AuthorizationResult { |
| 653 | authorized: true, | 708 | authorized: true, |
| 654 | reason: "Authorized by state event in purgatory".to_string(), | 709 | reason: "Authorized by state event in purgatory".to_string(), |
diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 28cb47f..f43cbb6 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs | |||
| @@ -17,8 +17,9 @@ use super::subprocess::GitSubprocess; | |||
| 17 | 17 | ||
| 18 | use crate::git::authorization::{authorize_push, parse_pushed_refs}; | 18 | use crate::git::authorization::{authorize_push, parse_pushed_refs}; |
| 19 | use crate::git::sync::process_newly_available_git_data; | 19 | use crate::git::sync::process_newly_available_git_data; |
| 20 | use crate::nostr::builder::SharedDatabase; | 20 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 21 | use crate::purgatory::Purgatory; | 21 | use crate::purgatory::Purgatory; |
| 22 | use crate::sync::rejected_index::RejectedEventsIndex; | ||
| 22 | 23 | ||
| 23 | /// Handle GET /info/refs?service=git-{upload,receive}-pack | 24 | /// Handle GET /info/refs?service=git-{upload,receive}-pack |
| 24 | /// | 25 | /// |
| @@ -258,6 +259,8 @@ pub async fn handle_receive_pack( | |||
| 258 | purgatory: Arc<Purgatory>, | 259 | purgatory: Arc<Purgatory>, |
| 259 | git_data_path: &str, | 260 | git_data_path: &str, |
| 260 | git_protocol: Option<&str>, | 261 | git_protocol: Option<&str>, |
| 262 | write_policy: Arc<Nip34WritePolicy>, | ||
| 263 | rejected_events_index: Arc<RejectedEventsIndex>, | ||
| 261 | ) -> Result<Response<Full<Bytes>>, GitError> { | 264 | ) -> Result<Response<Full<Bytes>>, GitError> { |
| 262 | debug!("Handling receive-pack for {:?}", repo_path); | 265 | debug!("Handling receive-pack for {:?}", repo_path); |
| 263 | 266 | ||
| @@ -397,6 +400,8 @@ pub async fn handle_receive_pack( | |||
| 397 | Some(&relay), | 400 | Some(&relay), |
| 398 | &purgatory, | 401 | &purgatory, |
| 399 | git_data_path_buf, | 402 | git_data_path_buf, |
| 403 | Some(&write_policy), | ||
| 404 | Some(&rejected_events_index), | ||
| 400 | ) | 405 | ) |
| 401 | .await | 406 | .await |
| 402 | { | 407 | { |
diff --git a/src/git/sync.rs b/src/git/sync.rs index b1a9b49..c24d16b 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs | |||
| @@ -32,17 +32,20 @@ | |||
| 32 | use std::collections::{HashMap, HashSet}; | 32 | use std::collections::{HashMap, HashSet}; |
| 33 | use std::path::Path; | 33 | use std::path::Path; |
| 34 | use std::process::Command; | 34 | use std::process::Command; |
| 35 | use std::sync::Arc; | ||
| 35 | use tracing::{debug, info, warn}; | 36 | use tracing::{debug, info, warn}; |
| 36 | 37 | ||
| 37 | use nostr_sdk::Event; | 38 | use nostr_sdk::Event; |
| 38 | 39 | ||
| 39 | use crate::git::authorization::{ | 40 | use crate::git::authorization::{ |
| 40 | collect_authorized_maintainers, fetch_repository_data, RepositoryData, | 41 | collect_authorized_maintainers, fetch_repository_data, fetch_repository_data_with_purgatory, |
| 42 | RepositoryData, | ||
| 41 | }; | 43 | }; |
| 42 | use crate::git::{self, oid_exists}; | 44 | use crate::git::{self, oid_exists}; |
| 43 | use crate::nostr::builder::SharedDatabase; | 45 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 44 | use crate::nostr::events::RepositoryState; | 46 | use crate::nostr::events::RepositoryState; |
| 45 | use crate::purgatory::{can_apply_state, Purgatory}; | 47 | use crate::purgatory::{can_apply_state, Purgatory}; |
| 48 | use crate::sync::rejected_index::RejectedEventsIndex; | ||
| 46 | 49 | ||
| 47 | /// Result of processing newly available git data. | 50 | /// Result of processing newly available git data. |
| 48 | /// | 51 | /// |
| @@ -51,6 +54,8 @@ use crate::purgatory::{can_apply_state, Purgatory}; | |||
| 51 | /// or from purgatory sync fetching OIDs from remote servers). | 54 | /// or from purgatory sync fetching OIDs from remote servers). |
| 52 | #[derive(Debug, Default, Clone)] | 55 | #[derive(Debug, Default, Clone)] |
| 53 | pub struct ProcessResult { | 56 | pub struct ProcessResult { |
| 57 | /// Number of announcements released from purgatory | ||
| 58 | pub announcements_released: usize, | ||
| 54 | /// Number of state events released from purgatory | 59 | /// Number of state events released from purgatory |
| 55 | pub states_released: usize, | 60 | pub states_released: usize, |
| 56 | /// Number of PR events released from purgatory | 61 | /// Number of PR events released from purgatory |
| @@ -70,11 +75,12 @@ pub struct ProcessResult { | |||
| 70 | impl ProcessResult { | 75 | impl ProcessResult { |
| 71 | /// Check if any events were released | 76 | /// Check if any events were released |
| 72 | pub fn released_any(&self) -> bool { | 77 | pub fn released_any(&self) -> bool { |
| 73 | self.states_released > 0 || self.prs_released > 0 | 78 | self.announcements_released > 0 || self.states_released > 0 || self.prs_released > 0 |
| 74 | } | 79 | } |
| 75 | 80 | ||
| 76 | /// Merge another ProcessResult into this one | 81 | /// Merge another ProcessResult into this one |
| 77 | pub fn merge(&mut self, other: ProcessResult) { | 82 | pub fn merge(&mut self, other: ProcessResult) { |
| 83 | self.announcements_released += other.announcements_released; | ||
| 78 | self.states_released += other.states_released; | 84 | self.states_released += other.states_released; |
| 79 | self.prs_released += other.prs_released; | 85 | self.prs_released += other.prs_released; |
| 80 | self.repos_synced += other.repos_synced; | 86 | self.repos_synced += other.repos_synced; |
| @@ -815,6 +821,8 @@ pub async fn process_newly_available_git_data( | |||
| 815 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | 821 | local_relay: Option<&nostr_relay_builder::LocalRelay>, |
| 816 | purgatory: &Purgatory, | 822 | purgatory: &Purgatory, |
| 817 | git_data_path: &Path, | 823 | git_data_path: &Path, |
| 824 | write_policy: Option<&Nip34WritePolicy>, | ||
| 825 | rejected_events_index: Option<&Arc<RejectedEventsIndex>>, | ||
| 818 | ) -> anyhow::Result<ProcessResult> { | 826 | ) -> anyhow::Result<ProcessResult> { |
| 819 | let mut result = ProcessResult::default(); | 827 | let mut result = ProcessResult::default(); |
| 820 | 828 | ||
| @@ -836,6 +844,20 @@ pub async fn process_newly_available_git_data( | |||
| 836 | "Processing newly available git data" | 844 | "Processing newly available git data" |
| 837 | ); | 845 | ); |
| 838 | 846 | ||
| 847 | // Process announcements from purgatory | ||
| 848 | let announcement_result = process_purgatory_announcements( | ||
| 849 | &identifier, | ||
| 850 | source_repo_path, | ||
| 851 | database, | ||
| 852 | local_relay, | ||
| 853 | purgatory, | ||
| 854 | git_data_path, | ||
| 855 | write_policy, | ||
| 856 | rejected_events_index, | ||
| 857 | ) | ||
| 858 | .await; | ||
| 859 | result.merge(announcement_result); | ||
| 860 | |||
| 839 | // Process state events from purgatory | 861 | // Process state events from purgatory |
| 840 | let state_result = process_purgatory_state_events( | 862 | let state_result = process_purgatory_state_events( |
| 841 | &identifier, | 863 | &identifier, |
| @@ -863,6 +885,7 @@ pub async fn process_newly_available_git_data( | |||
| 863 | if result.released_any() { | 885 | if result.released_any() { |
| 864 | info!( | 886 | info!( |
| 865 | identifier = %identifier, | 887 | identifier = %identifier, |
| 888 | announcements_released = result.announcements_released, | ||
| 866 | states_released = result.states_released, | 889 | states_released = result.states_released, |
| 867 | prs_released = result.prs_released, | 890 | prs_released = result.prs_released, |
| 868 | repos_synced = result.repos_synced, | 891 | repos_synced = result.repos_synced, |
| @@ -907,7 +930,10 @@ async fn process_purgatory_state_events( | |||
| 907 | ); | 930 | ); |
| 908 | 931 | ||
| 909 | // Fetch repository data once for all state events | 932 | // Fetch repository data once for all state events |
| 910 | let mut db_repo_data = match fetch_repository_data(database, identifier).await { | 933 | // IMPORTANT: Use fetch_repository_data_with_purgatory to include announcements |
| 934 | // that may still be in purgatory (not yet promoted). This ensures authorization | ||
| 935 | // works correctly even if the announcement promotion happens in the same batch. | ||
| 936 | let mut db_repo_data = match fetch_repository_data_with_purgatory(database, purgatory, identifier).await { | ||
| 911 | Ok(data) => data, | 937 | Ok(data) => data, |
| 912 | Err(e) => { | 938 | Err(e) => { |
| 913 | warn!( | 939 | warn!( |
| @@ -1151,6 +1177,9 @@ async fn process_purgatory_pr_events( | |||
| 1151 | ); | 1177 | ); |
| 1152 | 1178 | ||
| 1153 | // Fetch repository data for syncing | 1179 | // Fetch repository data for syncing |
| 1180 | // NOTE: Only fetch from database, NOT purgatory. PR events should only be | ||
| 1181 | // released from purgatory when the announcement has been promoted (validated). | ||
| 1182 | // This ensures we don't accept PR events for announcements that fail validation. | ||
| 1154 | let db_repo_data = match fetch_repository_data(database, identifier).await { | 1183 | let db_repo_data = match fetch_repository_data(database, identifier).await { |
| 1155 | Ok(data) => data, | 1184 | Ok(data) => data, |
| 1156 | Err(e) => { | 1185 | Err(e) => { |
| @@ -1250,6 +1279,195 @@ async fn process_purgatory_pr_events( | |||
| 1250 | result | 1279 | result |
| 1251 | } | 1280 | } |
| 1252 | 1281 | ||
| 1282 | /// Process announcements from purgatory that can now be promoted. | ||
| 1283 | /// | ||
| 1284 | /// When git data arrives for a repository, any announcements in purgatory | ||
| 1285 | /// for that repository should be promoted to the database and served to clients. | ||
| 1286 | /// | ||
| 1287 | /// When `write_policy` and `rejected_events_index` are provided (git push path), | ||
| 1288 | /// any maintainer announcements sitting in the hot cache are re-processed immediately | ||
| 1289 | /// after the owner announcement is promoted, so they don't wait for the next sync cycle. | ||
| 1290 | async fn process_purgatory_announcements( | ||
| 1291 | identifier: &str, | ||
| 1292 | source_repo_path: &Path, | ||
| 1293 | database: &SharedDatabase, | ||
| 1294 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 1295 | purgatory: &Purgatory, | ||
| 1296 | git_data_path: &Path, | ||
| 1297 | write_policy: Option<&Nip34WritePolicy>, | ||
| 1298 | rejected_events_index: Option<&Arc<RejectedEventsIndex>>, | ||
| 1299 | ) -> ProcessResult { | ||
| 1300 | let mut result = ProcessResult::default(); | ||
| 1301 | |||
| 1302 | // Extract owner pubkey from the source repo path | ||
| 1303 | let owner_pubkey = match extract_owner_from_repo_path(source_repo_path, git_data_path) { | ||
| 1304 | Some(npub) => npub, | ||
| 1305 | None => { | ||
| 1306 | debug!( | ||
| 1307 | identifier = %identifier, | ||
| 1308 | "Could not extract owner from repo path" | ||
| 1309 | ); | ||
| 1310 | return result; | ||
| 1311 | } | ||
| 1312 | }; | ||
| 1313 | |||
| 1314 | // Parse the npub back to PublicKey | ||
| 1315 | let owner = match nostr_sdk::PublicKey::parse(&owner_pubkey) { | ||
| 1316 | Ok(pk) => pk, | ||
| 1317 | Err(e) => { | ||
| 1318 | warn!( | ||
| 1319 | identifier = %identifier, | ||
| 1320 | owner_pubkey = %owner_pubkey, | ||
| 1321 | error = %e, | ||
| 1322 | "Failed to parse owner pubkey" | ||
| 1323 | ); | ||
| 1324 | result.errors.push(format!("Failed to parse owner pubkey: {}", e)); | ||
| 1325 | return result; | ||
| 1326 | } | ||
| 1327 | }; | ||
| 1328 | |||
| 1329 | // Check if there's an announcement in purgatory for this owner and identifier | ||
| 1330 | let announcement_event = purgatory.promote_announcement(&owner, identifier); | ||
| 1331 | |||
| 1332 | if let Some(event) = announcement_event { | ||
| 1333 | // Save to database | ||
| 1334 | match database.save_event(&event).await { | ||
| 1335 | Ok(_) => { | ||
| 1336 | info!( | ||
| 1337 | identifier = %identifier, | ||
| 1338 | event_id = %event.id, | ||
| 1339 | "Promoted announcement from purgatory to database" | ||
| 1340 | ); | ||
| 1341 | |||
| 1342 | // Notify WebSocket subscribers | ||
| 1343 | if let Some(relay) = local_relay { | ||
| 1344 | if relay.notify_event(event.clone()) { | ||
| 1345 | debug!( | ||
| 1346 | identifier = %identifier, | ||
| 1347 | event_id = %event.id, | ||
| 1348 | "Broadcast announcement event to WebSocket listeners" | ||
| 1349 | ); | ||
| 1350 | } | ||
| 1351 | } | ||
| 1352 | |||
| 1353 | result.announcements_released += 1; | ||
| 1354 | |||
| 1355 | // Re-process any maintainer announcements sitting in the hot cache. | ||
| 1356 | // | ||
| 1357 | // When an owner announcement is promoted from purgatory via a git push, | ||
| 1358 | // maintainer announcements that arrived earlier (via relay sync) may have | ||
| 1359 | // been rejected and stored in the hot cache because the owner announcement | ||
| 1360 | // didn't exist in the DB yet. Now that the owner announcement is saved, | ||
| 1361 | // we must invalidate and re-process those cached events immediately. | ||
| 1362 | // | ||
| 1363 | // This only applies on the git push path (write_policy + rejected_events_index | ||
| 1364 | // are Some). The purgatory sync path already handles this via | ||
| 1365 | // SyncManager::process_event_static. | ||
| 1366 | if let (Some(wp), Some(rei), Some(relay)) = | ||
| 1367 | (write_policy, rejected_events_index, local_relay) | ||
| 1368 | { | ||
| 1369 | use crate::nostr::events::RepositoryAnnouncement; | ||
| 1370 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; | ||
| 1371 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
| 1372 | |||
| 1373 | if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { | ||
| 1374 | if !announcement.maintainers.is_empty() { | ||
| 1375 | debug!( | ||
| 1376 | identifier = %identifier, | ||
| 1377 | event_id = %event.id, | ||
| 1378 | maintainer_count = announcement.maintainers.len(), | ||
| 1379 | "Owner announcement promoted via git push, checking hot cache for rejected maintainer announcements" | ||
| 1380 | ); | ||
| 1381 | |||
| 1382 | for maintainer_hex in &announcement.maintainers { | ||
| 1383 | match nostr_sdk::PublicKey::from_hex(maintainer_hex) { | ||
| 1384 | Ok(maintainer_pubkey) => { | ||
| 1385 | let (removed, hot_events) = rei.invalidate_and_get( | ||
| 1386 | &maintainer_pubkey, | ||
| 1387 | &announcement.identifier, | ||
| 1388 | Some(crate::sync::rejected_index::EventType::Announcement), | ||
| 1389 | ); | ||
| 1390 | |||
| 1391 | if removed > 0 { | ||
| 1392 | info!( | ||
| 1393 | maintainer = %maintainer_hex, | ||
| 1394 | identifier = %announcement.identifier, | ||
| 1395 | removed_from_cold_index = removed, | ||
| 1396 | hot_cache_events = hot_events.len(), | ||
| 1397 | "Invalidated rejected maintainer announcements after git push promotion" | ||
| 1398 | ); | ||
| 1399 | } | ||
| 1400 | |||
| 1401 | // Re-process events from hot cache | ||
| 1402 | let dummy_addr = SocketAddr::new( | ||
| 1403 | IpAddr::V4(Ipv4Addr::LOCALHOST), | ||
| 1404 | 0, | ||
| 1405 | ); | ||
| 1406 | for hot_event in hot_events { | ||
| 1407 | info!( | ||
| 1408 | event_id = %hot_event.id, | ||
| 1409 | maintainer = %maintainer_hex, | ||
| 1410 | identifier = %announcement.identifier, | ||
| 1411 | "Re-processing maintainer announcement from hot cache after git push promotion" | ||
| 1412 | ); | ||
| 1413 | match wp.admit_event(&hot_event, &dummy_addr).await { | ||
| 1414 | WritePolicyResult::Accept => { | ||
| 1415 | match database.save_event(&hot_event).await { | ||
| 1416 | Ok(_) => { | ||
| 1417 | relay.notify_event(hot_event.clone()); | ||
| 1418 | info!( | ||
| 1419 | event_id = %hot_event.id, | ||
| 1420 | "Maintainer announcement accepted and saved on re-processing" | ||
| 1421 | ); | ||
| 1422 | } | ||
| 1423 | Err(e) => { | ||
| 1424 | warn!( | ||
| 1425 | event_id = %hot_event.id, | ||
| 1426 | error = %e, | ||
| 1427 | "Failed to save re-processed maintainer announcement" | ||
| 1428 | ); | ||
| 1429 | } | ||
| 1430 | } | ||
| 1431 | } | ||
| 1432 | _ => { | ||
| 1433 | warn!( | ||
| 1434 | event_id = %hot_event.id, | ||
| 1435 | "Maintainer announcement still rejected on re-processing" | ||
| 1436 | ); | ||
| 1437 | } | ||
| 1438 | } | ||
| 1439 | } | ||
| 1440 | } | ||
| 1441 | Err(e) => { | ||
| 1442 | warn!( | ||
| 1443 | maintainer_hex = %maintainer_hex, | ||
| 1444 | error = %e, | ||
| 1445 | "Invalid maintainer public key in promoted announcement" | ||
| 1446 | ); | ||
| 1447 | } | ||
| 1448 | } | ||
| 1449 | } | ||
| 1450 | } | ||
| 1451 | } | ||
| 1452 | } | ||
| 1453 | } | ||
| 1454 | Err(e) => { | ||
| 1455 | warn!( | ||
| 1456 | identifier = %identifier, | ||
| 1457 | event_id = %event.id, | ||
| 1458 | error = %e, | ||
| 1459 | "Failed to save announcement to database" | ||
| 1460 | ); | ||
| 1461 | result | ||
| 1462 | .errors | ||
| 1463 | .push(format!("Failed to save announcement: {}", e)); | ||
| 1464 | } | ||
| 1465 | } | ||
| 1466 | } | ||
| 1467 | |||
| 1468 | result | ||
| 1469 | } | ||
| 1470 | |||
| 1253 | /// Extract owner pubkey from a repository path. | 1471 | /// Extract owner pubkey from a repository path. |
| 1254 | /// | 1472 | /// |
| 1255 | /// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. | 1473 | /// Given a path like `{git_data_path}/{npub}/{identifier}.git`, extracts the npub. |
| @@ -1271,6 +1489,7 @@ mod tests { | |||
| 1271 | #[test] | 1489 | #[test] |
| 1272 | fn test_process_result_default() { | 1490 | fn test_process_result_default() { |
| 1273 | let result = ProcessResult::default(); | 1491 | let result = ProcessResult::default(); |
| 1492 | assert_eq!(result.announcements_released, 0); | ||
| 1274 | assert_eq!(result.states_released, 0); | 1493 | assert_eq!(result.states_released, 0); |
| 1275 | assert_eq!(result.prs_released, 0); | 1494 | assert_eq!(result.prs_released, 0); |
| 1276 | assert_eq!(result.repos_synced, 0); | 1495 | assert_eq!(result.repos_synced, 0); |
| @@ -1282,6 +1501,10 @@ mod tests { | |||
| 1282 | let mut result = ProcessResult::default(); | 1501 | let mut result = ProcessResult::default(); |
| 1283 | assert!(!result.released_any()); | 1502 | assert!(!result.released_any()); |
| 1284 | 1503 | ||
| 1504 | result.announcements_released = 1; | ||
| 1505 | assert!(result.released_any()); | ||
| 1506 | |||
| 1507 | result.announcements_released = 0; | ||
| 1285 | result.states_released = 1; | 1508 | result.states_released = 1; |
| 1286 | assert!(result.released_any()); | 1509 | assert!(result.released_any()); |
| 1287 | 1510 | ||
| @@ -1293,6 +1516,7 @@ mod tests { | |||
| 1293 | #[test] | 1516 | #[test] |
| 1294 | fn test_process_result_merge() { | 1517 | fn test_process_result_merge() { |
| 1295 | let mut result1 = ProcessResult { | 1518 | let mut result1 = ProcessResult { |
| 1519 | announcements_released: 0, | ||
| 1296 | states_released: 1, | 1520 | states_released: 1, |
| 1297 | prs_released: 2, | 1521 | prs_released: 2, |
| 1298 | repos_synced: 3, | 1522 | repos_synced: 3, |
| @@ -1303,6 +1527,7 @@ mod tests { | |||
| 1303 | }; | 1527 | }; |
| 1304 | 1528 | ||
| 1305 | let result2 = ProcessResult { | 1529 | let result2 = ProcessResult { |
| 1530 | announcements_released: 5, | ||
| 1306 | states_released: 10, | 1531 | states_released: 10, |
| 1307 | prs_released: 20, | 1532 | prs_released: 20, |
| 1308 | repos_synced: 30, | 1533 | repos_synced: 30, |
| @@ -1314,6 +1539,7 @@ mod tests { | |||
| 1314 | 1539 | ||
| 1315 | result1.merge(result2); | 1540 | result1.merge(result2); |
| 1316 | 1541 | ||
| 1542 | assert_eq!(result1.announcements_released, 5); | ||
| 1317 | assert_eq!(result1.states_released, 11); | 1543 | assert_eq!(result1.states_released, 11); |
| 1318 | assert_eq!(result1.prs_released, 22); | 1544 | assert_eq!(result1.prs_released, 22); |
| 1319 | assert_eq!(result1.repos_synced, 33); | 1545 | assert_eq!(result1.repos_synced, 33); |
diff --git a/src/http/mod.rs b/src/http/mod.rs index edc28a3..76ffef3 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs | |||
| @@ -26,8 +26,9 @@ use tokio::net::TcpListener; | |||
| 26 | use crate::config::Config; | 26 | use crate::config::Config; |
| 27 | use crate::git; | 27 | use crate::git; |
| 28 | use crate::metrics::Metrics; | 28 | use crate::metrics::Metrics; |
| 29 | use crate::nostr::builder::SharedDatabase; | 29 | use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; |
| 30 | use crate::purgatory::Purgatory; | 30 | use crate::purgatory::Purgatory; |
| 31 | use crate::sync::rejected_index::RejectedEventsIndex; | ||
| 31 | 32 | ||
| 32 | /// CORS headers required by GRASP-01 specification (lines 40-47) | 33 | /// CORS headers required by GRASP-01 specification (lines 40-47) |
| 33 | const CORS_ALLOW_ORIGIN: &str = "*"; | 34 | const CORS_ALLOW_ORIGIN: &str = "*"; |
| @@ -97,6 +98,10 @@ struct HttpService { | |||
| 97 | metrics: Option<Arc<Metrics>>, | 98 | metrics: Option<Arc<Metrics>>, |
| 98 | /// Purgatory for event/git coordination | 99 | /// Purgatory for event/git coordination |
| 99 | purgatory: Arc<Purgatory>, | 100 | purgatory: Arc<Purgatory>, |
| 101 | /// Write policy for re-processing hot-cache events after git push promotion | ||
| 102 | write_policy: Arc<Nip34WritePolicy>, | ||
| 103 | /// Rejected events index for hot-cache re-processing after git push promotion | ||
| 104 | rejected_events_index: Arc<RejectedEventsIndex>, | ||
| 100 | } | 105 | } |
| 101 | 106 | ||
| 102 | impl HttpService { | 107 | impl HttpService { |
| @@ -107,6 +112,8 @@ impl HttpService { | |||
| 107 | database: SharedDatabase, | 112 | database: SharedDatabase, |
| 108 | metrics: Option<Arc<Metrics>>, | 113 | metrics: Option<Arc<Metrics>>, |
| 109 | purgatory: Arc<Purgatory>, | 114 | purgatory: Arc<Purgatory>, |
| 115 | write_policy: Arc<Nip34WritePolicy>, | ||
| 116 | rejected_events_index: Arc<RejectedEventsIndex>, | ||
| 110 | ) -> Self { | 117 | ) -> Self { |
| 111 | Self { | 118 | Self { |
| 112 | relay, | 119 | relay, |
| @@ -115,6 +122,8 @@ impl HttpService { | |||
| 115 | database, | 122 | database, |
| 116 | metrics, | 123 | metrics, |
| 117 | purgatory, | 124 | purgatory, |
| 125 | write_policy, | ||
| 126 | rejected_events_index, | ||
| 118 | } | 127 | } |
| 119 | } | 128 | } |
| 120 | } | 129 | } |
| @@ -132,6 +141,8 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 132 | let git_data_path = self.config.effective_git_data_path(); | 141 | let git_data_path = self.config.effective_git_data_path(); |
| 133 | let database = self.database.clone(); | 142 | let database = self.database.clone(); |
| 134 | let purgatory = self.purgatory.clone(); | 143 | let purgatory = self.purgatory.clone(); |
| 144 | let write_policy = self.write_policy.clone(); | ||
| 145 | let rejected_events_index = self.rejected_events_index.clone(); | ||
| 135 | 146 | ||
| 136 | // Handle OPTIONS preflight requests (CORS) | 147 | // Handle OPTIONS preflight requests (CORS) |
| 137 | // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content | 148 | // GRASP-01 spec line 47: Respond to OPTIONS with 204 No Content |
| @@ -293,6 +304,8 @@ impl Service<Request<Incoming>> for HttpService { | |||
| 293 | purgatory.clone(), | 304 | purgatory.clone(), |
| 294 | &git_data_path, | 305 | &git_data_path, |
| 295 | git_protocol.as_deref(), | 306 | git_protocol.as_deref(), |
| 307 | write_policy.clone(), | ||
| 308 | rejected_events_index.clone(), | ||
| 296 | ) | 309 | ) |
| 297 | .await; | 310 | .await; |
| 298 | 311 | ||
| @@ -557,12 +570,17 @@ fn derive_accept_key(request_key: &[u8]) -> String { | |||
| 557 | /// * `relay` - The LocalRelay for WebSocket connections | 570 | /// * `relay` - The LocalRelay for WebSocket connections |
| 558 | /// * `database` - The database for direct queries (e.g., push authorization) | 571 | /// * `database` - The database for direct queries (e.g., push authorization) |
| 559 | /// * `metrics` - Optional metrics for Prometheus endpoint | 572 | /// * `metrics` - Optional metrics for Prometheus endpoint |
| 573 | /// * `purgatory` - Purgatory for event/git coordination | ||
| 574 | /// * `write_policy` - Write policy for re-processing hot-cache events after git push promotion | ||
| 575 | /// * `rejected_events_index` - Rejected events index for hot-cache re-processing | ||
| 560 | pub async fn run_server( | 576 | pub async fn run_server( |
| 561 | config: Config, | 577 | config: Config, |
| 562 | relay: LocalRelay, | 578 | relay: LocalRelay, |
| 563 | database: SharedDatabase, | 579 | database: SharedDatabase, |
| 564 | metrics: Option<Arc<Metrics>>, | 580 | metrics: Option<Arc<Metrics>>, |
| 565 | purgatory: Arc<Purgatory>, | 581 | purgatory: Arc<Purgatory>, |
| 582 | write_policy: Arc<Nip34WritePolicy>, | ||
| 583 | rejected_events_index: Arc<RejectedEventsIndex>, | ||
| 566 | ) -> anyhow::Result<()> { | 584 | ) -> anyhow::Result<()> { |
| 567 | let bind_addr: SocketAddr = config.bind_address.parse()?; | 585 | let bind_addr: SocketAddr = config.bind_address.parse()?; |
| 568 | 586 | ||
| @@ -582,6 +600,8 @@ pub async fn run_server( | |||
| 582 | database.clone(), | 600 | database.clone(), |
| 583 | metrics.clone(), | 601 | metrics.clone(), |
| 584 | purgatory.clone(), | 602 | purgatory.clone(), |
| 603 | write_policy.clone(), | ||
| 604 | rejected_events_index.clone(), | ||
| 585 | ); | 605 | ); |
| 586 | 606 | ||
| 587 | tokio::spawn(async move { | 607 | tokio::spawn(async move { |
diff --git a/src/main.rs b/src/main.rs index dd2c903..bf3aefb 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -130,7 +130,9 @@ async fn main() -> Result<()> { | |||
| 130 | } | 130 | } |
| 131 | 131 | ||
| 132 | // Get a reference to the rejected events index for shutdown persistence | 132 | // Get a reference to the rejected events index for shutdown persistence |
| 133 | // and for the HTTP server's git push path (hot-cache re-processing) | ||
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | 134 | let shutdown_rejected_index = sync_manager.rejected_events_index(); |
| 135 | let http_rejected_index = shutdown_rejected_index.clone(); | ||
| 134 | 136 | ||
| 135 | tokio::spawn(async move { | 137 | tokio::spawn(async move { |
| 136 | sync_manager.run().await; | 138 | sync_manager.run().await; |
| @@ -142,11 +144,11 @@ async fn main() -> Result<()> { | |||
| 142 | let mut interval = tokio::time::interval(Duration::from_secs(60)); | 144 | let mut interval = tokio::time::interval(Duration::from_secs(60)); |
| 143 | loop { | 145 | loop { |
| 144 | interval.tick().await; | 146 | interval.tick().await; |
| 145 | let (state_removed, pr_removed) = cleanup_purgatory.cleanup(); | 147 | let (announcement_removed, state_removed, pr_removed) = cleanup_purgatory.cleanup(); |
| 146 | if state_removed > 0 || pr_removed > 0 { | 148 | if announcement_removed > 0 || state_removed > 0 || pr_removed > 0 { |
| 147 | info!( | 149 | info!( |
| 148 | "Purgatory cleanup: removed {} state events, {} PR events", | 150 | "Purgatory cleanup: removed {} announcements, {} state events, {} PR events", |
| 149 | state_removed, pr_removed | 151 | announcement_removed, state_removed, pr_removed |
| 150 | ); | 152 | ); |
| 151 | } | 153 | } |
| 152 | } | 154 | } |
| @@ -206,12 +208,15 @@ async fn main() -> Result<()> { | |||
| 206 | // Start HTTP server with integrated relay and database | 208 | // Start HTTP server with integrated relay and database |
| 207 | info!("Starting HTTP server on {}", config.bind_address); | 209 | info!("Starting HTTP server on {}", config.bind_address); |
| 208 | 210 | ||
| 211 | // Wrap write_policy in Arc for sharing between HTTP server connections | ||
| 212 | let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); | ||
| 213 | |||
| 209 | // Run server until shutdown signal, then cleanup | 214 | // Run server until shutdown signal, then cleanup |
| 210 | #[cfg(unix)] | 215 | #[cfg(unix)] |
| 211 | { | 216 | { |
| 212 | use tokio::signal::unix::{signal, SignalKind}; | 217 | use tokio::signal::unix::{signal, SignalKind}; |
| 213 | let mut sigterm = signal(SignalKind::terminate())?; | 218 | let mut sigterm = signal(SignalKind::terminate())?; |
| 214 | 219 | ||
| 215 | tokio::select! { | 220 | tokio::select! { |
| 216 | result = http::run_server( | 221 | result = http::run_server( |
| 217 | config, | 222 | config, |
| @@ -219,6 +224,8 @@ async fn main() -> Result<()> { | |||
| 219 | relay_with_db.database, | 224 | relay_with_db.database, |
| 220 | metrics, | 225 | metrics, |
| 221 | purgatory, | 226 | purgatory, |
| 227 | http_write_policy, | ||
| 228 | http_rejected_index, | ||
| 222 | ) => { | 229 | ) => { |
| 223 | result? | 230 | result? |
| 224 | } | 231 | } |
| @@ -230,7 +237,7 @@ async fn main() -> Result<()> { | |||
| 230 | } | 237 | } |
| 231 | } | 238 | } |
| 232 | } | 239 | } |
| 233 | 240 | ||
| 234 | #[cfg(not(unix))] | 241 | #[cfg(not(unix))] |
| 235 | { | 242 | { |
| 236 | tokio::select! { | 243 | tokio::select! { |
| @@ -240,6 +247,8 @@ async fn main() -> Result<()> { | |||
| 240 | relay_with_db.database, | 247 | relay_with_db.database, |
| 241 | metrics, | 248 | metrics, |
| 242 | purgatory, | 249 | purgatory, |
| 250 | http_write_policy, | ||
| 251 | http_rejected_index, | ||
| 243 | ) => { | 252 | ) => { |
| 244 | result? | 253 | result? |
| 245 | } | 254 | } |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 713c129..7a05348 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -14,10 +14,11 @@ use nostr_relay_builder::prelude::*; | |||
| 14 | use crate::config::{Config, DatabaseBackend}; | 14 | use crate::config::{Config, DatabaseBackend}; |
| 15 | use crate::nostr::events::RepositoryAnnouncement; | 15 | use crate::nostr::events::RepositoryAnnouncement; |
| 16 | use crate::nostr::policy::{ | 16 | use crate::nostr::policy::{ |
| 17 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, | 17 | AnnouncementPolicy, AnnouncementResult, DeletionPolicy, PolicyContext, PrEventPolicy, |
| 18 | RelatedEventPolicy, StatePolicy, StateResult, | 18 | ReferenceResult, RelatedEventPolicy, StatePolicy, StateResult, |
| 19 | }; | 19 | }; |
| 20 | 20 | ||
| 21 | |||
| 21 | /// Type alias for the shared database used by the relay | 22 | /// Type alias for the shared database used by the relay |
| 22 | pub type SharedDatabase = Arc<dyn NostrDatabase>; | 23 | pub type SharedDatabase = Arc<dyn NostrDatabase>; |
| 23 | 24 | ||
| @@ -28,6 +29,7 @@ pub type SharedDatabase = Arc<dyn NostrDatabase>; | |||
| 28 | /// - `StatePolicy` - State event validation + ref alignment | 29 | /// - `StatePolicy` - State event validation + ref alignment |
| 29 | /// - `PrEventPolicy` - PR/PR Update validation | 30 | /// - `PrEventPolicy` - PR/PR Update validation |
| 30 | /// - `RelatedEventPolicy` - Forward/backward reference checking | 31 | /// - `RelatedEventPolicy` - Forward/backward reference checking |
| 32 | /// - `DeletionPolicy` - NIP-09 event deletion request handling | ||
| 31 | /// | 33 | /// |
| 32 | /// Uses stateful database queries to check event relationships. | 34 | /// Uses stateful database queries to check event relationships. |
| 33 | #[derive(Clone)] | 35 | #[derive(Clone)] |
| @@ -37,6 +39,7 @@ pub struct Nip34WritePolicy { | |||
| 37 | state_policy: StatePolicy, | 39 | state_policy: StatePolicy, |
| 38 | pr_event_policy: PrEventPolicy, | 40 | pr_event_policy: PrEventPolicy, |
| 39 | related_event_policy: RelatedEventPolicy, | 41 | related_event_policy: RelatedEventPolicy, |
| 42 | deletion_policy: DeletionPolicy, | ||
| 40 | } | 43 | } |
| 41 | 44 | ||
| 42 | impl std::fmt::Debug for Nip34WritePolicy { | 45 | impl std::fmt::Debug for Nip34WritePolicy { |
| @@ -68,6 +71,7 @@ impl Nip34WritePolicy { | |||
| 68 | state_policy: StatePolicy::new(ctx.clone()), | 71 | state_policy: StatePolicy::new(ctx.clone()), |
| 69 | pr_event_policy: PrEventPolicy::new(ctx.clone()), | 72 | pr_event_policy: PrEventPolicy::new(ctx.clone()), |
| 70 | related_event_policy: RelatedEventPolicy::new(ctx.clone()), | 73 | related_event_policy: RelatedEventPolicy::new(ctx.clone()), |
| 74 | deletion_policy: DeletionPolicy::new(ctx.clone()), | ||
| 71 | ctx, | 75 | ctx, |
| 72 | } | 76 | } |
| 73 | } | 77 | } |
| @@ -205,6 +209,30 @@ impl Nip34WritePolicy { | |||
| 205 | } | 209 | } |
| 206 | } | 210 | } |
| 207 | } | 211 | } |
| 212 | AnnouncementResult::AcceptPurgatory => { | ||
| 213 | // New announcement - add to purgatory | ||
| 214 | match self.announcement_policy.add_to_purgatory(event) { | ||
| 215 | Ok(()) => { | ||
| 216 | tracing::info!( | ||
| 217 | "Accepted announcement to purgatory: {} (waiting for git data)", | ||
| 218 | event_id_str | ||
| 219 | ); | ||
| 220 | |||
| 221 | WritePolicyResult::Reject { | ||
| 222 | status: true, // Client sees OK | ||
| 223 | message: "purgatory: won't be served until git data arrives".into(), | ||
| 224 | } | ||
| 225 | } | ||
| 226 | Err(e) => { | ||
| 227 | tracing::warn!( | ||
| 228 | "Failed to add announcement to purgatory {}: {}", | ||
| 229 | event_id_str, | ||
| 230 | e | ||
| 231 | ); | ||
| 232 | WritePolicyResult::reject(e) | ||
| 233 | } | ||
| 234 | } | ||
| 235 | } | ||
| 208 | AnnouncementResult::AcceptMaintainer => { | 236 | AnnouncementResult::AcceptMaintainer => { |
| 209 | // Parse announcement to get details for logging | 237 | // Parse announcement to get details for logging |
| 210 | match RepositoryAnnouncement::from_event(event.clone()) { | 238 | match RepositoryAnnouncement::from_event(event.clone()) { |
| @@ -621,6 +649,7 @@ impl WritePolicy for Nip34WritePolicy { | |||
| 621 | ); | 649 | ); |
| 622 | WritePolicyResult::Accept | 650 | WritePolicyResult::Accept |
| 623 | } | 651 | } |
| 652 | Kind::EventDeletion => self.deletion_policy.handle(event).await, | ||
| 624 | _ => self.handle_related_event(event, "Event").await, | 653 | _ => self.handle_related_event(event, "Event").await, |
| 625 | } | 654 | } |
| 626 | }) | 655 | }) |
diff --git a/src/nostr/policy/announcement.rs b/src/nostr/policy/announcement.rs index 15a6e58..b366f0b 100644 --- a/src/nostr/policy/announcement.rs +++ b/src/nostr/policy/announcement.rs | |||
| @@ -3,6 +3,8 @@ | |||
| 3 | /// Handles validation of NIP-34 repository announcements (kind 30617) | 3 | /// Handles validation of NIP-34 repository announcements (kind 30617) |
| 4 | /// according to GRASP-01 specification. | 4 | /// according to GRASP-01 specification. |
| 5 | use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; | 5 | use nostr_relay_builder::prelude::{Alphabet, Event, Filter, Kind, PublicKey, SingleLetterTag}; |
| 6 | use std::collections::HashSet; | ||
| 7 | use std::time::Duration; | ||
| 6 | 8 | ||
| 7 | use super::PolicyContext; | 9 | use super::PolicyContext; |
| 8 | use crate::config::Config; | 10 | use crate::config::Config; |
| @@ -11,12 +13,14 @@ use crate::nostr::events::{validate_announcement, RepositoryAnnouncement}; | |||
| 11 | /// Result of announcement policy evaluation | 13 | /// Result of announcement policy evaluation |
| 12 | #[derive(Debug, Clone, PartialEq)] | 14 | #[derive(Debug, Clone, PartialEq)] |
| 13 | pub enum AnnouncementResult { | 15 | pub enum AnnouncementResult { |
| 14 | /// Accept: Event lists our service (GRASP-01 compliant) | 16 | /// Accept: Event lists our service (GRASP-01 compliant) - replacement announcement |
| 15 | Accept, | 17 | Accept, |
| 16 | /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer) | 18 | /// Accept as maintainer: Event accepted via maintainer exception (multi-maintainer) |
| 17 | AcceptMaintainer, | 19 | AcceptMaintainer, |
| 18 | /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only) | 20 | /// Accept as archive: Event accepted via GRASP-05 archive whitelist (read-only) |
| 19 | AcceptArchive, | 21 | AcceptArchive, |
| 22 | /// Accept to purgatory: New announcement, waiting for git data | ||
| 23 | AcceptPurgatory, | ||
| 20 | /// Reject: Event fails validation with reason | 24 | /// Reject: Event fails validation with reason |
| 21 | Reject(String), | 25 | Reject(String), |
| 22 | } | 26 | } |
| @@ -35,10 +39,13 @@ impl AnnouncementPolicy { | |||
| 35 | 39 | ||
| 36 | /// Validate a repository announcement event | 40 | /// Validate a repository announcement event |
| 37 | /// | 41 | /// |
| 38 | /// Returns `Accept` if the announcement lists the service properly, | 42 | /// Returns: |
| 39 | /// `AcceptMaintainer` if accepted via maintainer exception, | 43 | /// - `Accept` if this is a replacement announcement (active announcement exists in DB or |
| 40 | /// `AcceptArchive` if accepted via GRASP-05 archive config, | 44 | /// purgatory) |
| 41 | /// or `Reject` with reason. | 45 | /// - `AcceptPurgatory` if this is a new announcement (no active announcement exists) |
| 46 | /// - `AcceptMaintainer` if accepted via maintainer exception | ||
| 47 | /// - `AcceptArchive` if accepted via GRASP-05 archive config | ||
| 48 | /// - `Reject` with reason if validation fails | ||
| 42 | pub async fn validate(&self, event: &Event) -> AnnouncementResult { | 49 | pub async fn validate(&self, event: &Event) -> AnnouncementResult { |
| 43 | // First, try validation (GRASP-01 + GRASP-05) | 50 | // First, try validation (GRASP-01 + GRASP-05) |
| 44 | let validation_result = validate_announcement(event, &self.config); | 51 | let validation_result = validate_announcement(event, &self.config); |
| @@ -49,6 +56,23 @@ impl AnnouncementPolicy { | |||
| 49 | // GRASP-01 Exception: Accept announcements from recursive maintainers | 56 | // GRASP-01 Exception: Accept announcements from recursive maintainers |
| 50 | match RepositoryAnnouncement::from_event(event.clone()) { | 57 | match RepositoryAnnouncement::from_event(event.clone()) { |
| 51 | Ok(announcement) => { | 58 | Ok(announcement) => { |
| 59 | // If this pubkey+identifier has a purgatory entry AND the incoming | ||
| 60 | // event is strictly newer, the owner is sending a replacement that | ||
| 61 | // removes our service. Clear the purgatory entry and its bare repo. | ||
| 62 | // | ||
| 63 | // If the incoming event is older than the purgatory entry (e.g. a | ||
| 64 | // relay replay of a superseded announcement), ignore it — the newer | ||
| 65 | // purgatory entry takes precedence and must not be evicted. | ||
| 66 | let should_evict = self | ||
| 67 | .ctx | ||
| 68 | .purgatory | ||
| 69 | .find_announcement(&event.pubkey, &announcement.identifier) | ||
| 70 | .is_some_and(|entry| event.created_at > entry.event.created_at); | ||
| 71 | |||
| 72 | if should_evict { | ||
| 73 | self.remove_purgatory_announcement(&event.pubkey, &announcement.identifier); | ||
| 74 | } | ||
| 75 | |||
| 52 | match self | 76 | match self |
| 53 | .is_maintainer_in_any_announcement( | 77 | .is_maintainer_in_any_announcement( |
| 54 | &announcement.identifier, | 78 | &announcement.identifier, |
| @@ -67,11 +91,221 @@ impl AnnouncementPolicy { | |||
| 67 | Err(_) => AnnouncementResult::Reject(reason), | 91 | Err(_) => AnnouncementResult::Reject(reason), |
| 68 | } | 92 | } |
| 69 | } | 93 | } |
| 70 | // Accept, AcceptArchive, or AcceptMaintainer - return as-is | 94 | AnnouncementResult::Accept | AnnouncementResult::AcceptArchive => { |
| 95 | // Parse announcement to check for existing active announcement | ||
| 96 | match RepositoryAnnouncement::from_event(event.clone()) { | ||
| 97 | Ok(announcement) => { | ||
| 98 | let in_db = match self | ||
| 99 | .has_db_announcement(&event.pubkey, &announcement.identifier) | ||
| 100 | .await | ||
| 101 | { | ||
| 102 | Ok(v) => v, | ||
| 103 | Err(e) => { | ||
| 104 | tracing::warn!( | ||
| 105 | error = %e, | ||
| 106 | "Failed to check for existing DB announcement - rejecting" | ||
| 107 | ); | ||
| 108 | return AnnouncementResult::Reject(format!( | ||
| 109 | "Database error checking existing announcement: {}", | ||
| 110 | e | ||
| 111 | )); | ||
| 112 | } | ||
| 113 | }; | ||
| 114 | |||
| 115 | if in_db { | ||
| 116 | // Replacement announcement with DB entry - accept immediately | ||
| 117 | tracing::debug!( | ||
| 118 | identifier = %announcement.identifier, | ||
| 119 | "Replacement announcement (DB) - accepting immediately" | ||
| 120 | ); | ||
| 121 | return validation_result; | ||
| 122 | } | ||
| 123 | |||
| 124 | let in_purgatory = self | ||
| 125 | .ctx | ||
| 126 | .purgatory | ||
| 127 | .has_purgatory_announcement(&event.pubkey, &announcement.identifier); | ||
| 128 | |||
| 129 | if in_purgatory { | ||
| 130 | // Replacement announcement with purgatory entry - replace it and | ||
| 131 | // extend expiry so the new announcement gets a fresh 30-minute window. | ||
| 132 | tracing::debug!( | ||
| 133 | identifier = %announcement.identifier, | ||
| 134 | "Replacement announcement (purgatory) - replacing purgatory entry" | ||
| 135 | ); | ||
| 136 | self.replace_purgatory_announcement(event, &announcement); | ||
| 137 | // Return Accept (not AcceptPurgatory) - this is a replacement, not new | ||
| 138 | return validation_result; | ||
| 139 | } | ||
| 140 | |||
| 141 | // No existing announcement - route to purgatory | ||
| 142 | tracing::debug!( | ||
| 143 | identifier = %announcement.identifier, | ||
| 144 | "New announcement - routing to purgatory" | ||
| 145 | ); | ||
| 146 | AnnouncementResult::AcceptPurgatory | ||
| 147 | } | ||
| 148 | Err(e) => AnnouncementResult::Reject(format!( | ||
| 149 | "Failed to parse announcement: {}", | ||
| 150 | e | ||
| 151 | )), | ||
| 152 | } | ||
| 153 | } | ||
| 154 | // AcceptPurgatory shouldn't come from validate_announcement, but handle it | ||
| 71 | result => result, | 155 | result => result, |
| 72 | } | 156 | } |
| 73 | } | 157 | } |
| 74 | 158 | ||
| 159 | /// Replace a purgatory announcement entry with a newer event. | ||
| 160 | /// | ||
| 161 | /// Called when a replacement announcement arrives for a (pubkey, identifier) pair | ||
| 162 | /// that is currently in purgatory. Updates the purgatory entry and extends the | ||
| 163 | /// expiry so the new announcement has a fresh waiting window. | ||
| 164 | fn replace_purgatory_announcement( | ||
| 165 | &self, | ||
| 166 | event: &Event, | ||
| 167 | announcement: &RepositoryAnnouncement, | ||
| 168 | ) { | ||
| 169 | let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); | ||
| 170 | let relays: HashSet<String> = announcement.relays.iter().cloned().collect(); | ||
| 171 | |||
| 172 | // add_announcement uses the (owner, identifier) key so it overwrites the old entry | ||
| 173 | self.ctx.purgatory.add_announcement( | ||
| 174 | event.clone(), | ||
| 175 | announcement.identifier.clone(), | ||
| 176 | event.pubkey, | ||
| 177 | repo_path, | ||
| 178 | relays, | ||
| 179 | ); | ||
| 180 | |||
| 181 | // Extend the announcement's expiry (reset to full 30 min window) | ||
| 182 | self.ctx.purgatory.extend_announcement_expiry( | ||
| 183 | &event.pubkey, | ||
| 184 | &announcement.identifier, | ||
| 185 | Duration::from_secs(1800), | ||
| 186 | ); | ||
| 187 | |||
| 188 | // Also extend any state events waiting for this identifier | ||
| 189 | let state_entries = self.ctx.purgatory.find_state(&announcement.identifier); | ||
| 190 | if !state_entries.is_empty() { | ||
| 191 | let state_ids: Vec<_> = state_entries.iter().map(|e| e.event.id).collect(); | ||
| 192 | self.ctx.purgatory.extend_expiry( | ||
| 193 | &announcement.identifier, | ||
| 194 | &state_ids, | ||
| 195 | Duration::from_secs(1800), | ||
| 196 | ); | ||
| 197 | } | ||
| 198 | } | ||
| 199 | |||
| 200 | /// Remove a purgatory announcement and clean up associated resources. | ||
| 201 | /// | ||
| 202 | /// Called when a replacement announcement is rejected (owner removed our service). | ||
| 203 | /// Deletes the bare repository from disk and removes any state events waiting for | ||
| 204 | /// this identifier. | ||
| 205 | fn remove_purgatory_announcement(&self, pubkey: &PublicKey, identifier: &str) { | ||
| 206 | // Get the repo path before removing from purgatory | ||
| 207 | if let Some(entry) = self.ctx.purgatory.find_announcement(pubkey, identifier) { | ||
| 208 | // Delete the bare repository from disk | ||
| 209 | if entry.repo_path.exists() { | ||
| 210 | if let Err(e) = std::fs::remove_dir_all(&entry.repo_path) { | ||
| 211 | tracing::warn!( | ||
| 212 | path = %entry.repo_path.display(), | ||
| 213 | error = %e, | ||
| 214 | "Failed to delete bare repository during purgatory cleanup" | ||
| 215 | ); | ||
| 216 | } else { | ||
| 217 | tracing::info!( | ||
| 218 | path = %entry.repo_path.display(), | ||
| 219 | "Deleted bare repository for rejected purgatory announcement" | ||
| 220 | ); | ||
| 221 | } | ||
| 222 | } | ||
| 223 | } | ||
| 224 | |||
| 225 | // Remove the announcement from purgatory | ||
| 226 | self.ctx.purgatory.remove_announcement(pubkey, identifier); | ||
| 227 | |||
| 228 | // Only remove state events if no other owner still has an announcement in purgatory | ||
| 229 | // for this identifier. State events are keyed by identifier alone, so blindly removing | ||
| 230 | // them would also discard state events legitimately belonging to a different owner's | ||
| 231 | // repository that happens to share the same identifier string. | ||
| 232 | let other_owners_remain = !self | ||
| 233 | .ctx | ||
| 234 | .purgatory | ||
| 235 | .get_announcements_by_identifier(identifier) | ||
| 236 | .is_empty(); | ||
| 237 | |||
| 238 | if !other_owners_remain { | ||
| 239 | self.ctx.purgatory.remove_state(identifier); | ||
| 240 | } | ||
| 241 | |||
| 242 | tracing::info!( | ||
| 243 | identifier = %identifier, | ||
| 244 | other_owners_remain = %other_owners_remain, | ||
| 245 | "Cleared purgatory entry: owner removed our service from announcement" | ||
| 246 | ); | ||
| 247 | } | ||
| 248 | |||
| 249 | /// Check if there's an announcement in the database for this (pubkey, identifier). | ||
| 250 | /// | ||
| 251 | /// Only checks the database (promoted announcements). For purgatory checks use | ||
| 252 | /// `purgatory.has_purgatory_announcement()` directly. | ||
| 253 | async fn has_db_announcement( | ||
| 254 | &self, | ||
| 255 | pubkey: &PublicKey, | ||
| 256 | identifier: &str, | ||
| 257 | ) -> Result<bool, String> { | ||
| 258 | let filter = Filter::new() | ||
| 259 | .kind(Kind::GitRepoAnnouncement) | ||
| 260 | .author(*pubkey) | ||
| 261 | .custom_tag( | ||
| 262 | SingleLetterTag::lowercase(Alphabet::D), | ||
| 263 | identifier.to_string(), | ||
| 264 | ); | ||
| 265 | |||
| 266 | let events: Vec<Event> = match self.ctx.database.query(filter).await { | ||
| 267 | Ok(events) => events.into_iter().collect(), | ||
| 268 | Err(e) => return Err(format!("Database query failed: {}", e)), | ||
| 269 | }; | ||
| 270 | |||
| 271 | Ok(!events.is_empty()) | ||
| 272 | } | ||
| 273 | |||
| 274 | /// Add an announcement to purgatory | ||
| 275 | /// | ||
| 276 | /// Creates the bare repository and stores the announcement in purgatory | ||
| 277 | /// until git data arrives. | ||
| 278 | pub fn add_to_purgatory(&self, event: &Event) -> Result<(), String> { | ||
| 279 | let announcement = RepositoryAnnouncement::from_event(event.clone()) | ||
| 280 | .map_err(|e| format!("Failed to parse announcement: {}", e))?; | ||
| 281 | |||
| 282 | // Create bare repository | ||
| 283 | self.ensure_bare_repository(&announcement)?; | ||
| 284 | |||
| 285 | // Build repo path | ||
| 286 | let repo_path = self.ctx.git_data_path.join(announcement.repo_path()); | ||
| 287 | |||
| 288 | // Extract relays from announcement | ||
| 289 | let relays: HashSet<String> = announcement.relays.iter().cloned().collect(); | ||
| 290 | |||
| 291 | // Add to purgatory | ||
| 292 | self.ctx.purgatory.add_announcement( | ||
| 293 | event.clone(), | ||
| 294 | announcement.identifier.clone(), | ||
| 295 | event.pubkey, | ||
| 296 | repo_path, | ||
| 297 | relays, | ||
| 298 | ); | ||
| 299 | |||
| 300 | tracing::info!( | ||
| 301 | identifier = %announcement.identifier, | ||
| 302 | event_id = %event.id, | ||
| 303 | "Added announcement to purgatory" | ||
| 304 | ); | ||
| 305 | |||
| 306 | Ok(()) | ||
| 307 | } | ||
| 308 | |||
| 75 | /// Create a bare git repository if it doesn't exist | 309 | /// Create a bare git repository if it doesn't exist |
| 76 | /// Path format: <git_data_path>/<npub>/<identifier>.git | 310 | /// Path format: <git_data_path>/<npub>/<identifier>.git |
| 77 | pub fn ensure_bare_repository( | 311 | pub fn ensure_bare_repository( |
| @@ -117,6 +351,11 @@ impl AnnouncementPolicy { | |||
| 117 | /// | 351 | /// |
| 118 | /// This enables accepting announcements from maintainers even when they don't list | 352 | /// This enables accepting announcements from maintainers even when they don't list |
| 119 | /// this GRASP server, for maintainer chain discovery and GRASP-02 sync. | 353 | /// this GRASP server, for maintainer chain discovery and GRASP-02 sync. |
| 354 | /// | ||
| 355 | /// Checks both the database (promoted announcements) and purgatory (announcements | ||
| 356 | /// waiting for git data). This is necessary because a maintainer's announcement | ||
| 357 | /// (which lists the recursive maintainer) may still be in purgatory when the | ||
| 358 | /// recursive maintainer's announcement arrives. | ||
| 120 | async fn is_maintainer_in_any_announcement( | 359 | async fn is_maintainer_in_any_announcement( |
| 121 | &self, | 360 | &self, |
| 122 | identifier: &str, | 361 | identifier: &str, |
| @@ -128,12 +367,26 @@ impl AnnouncementPolicy { | |||
| 128 | identifier.to_string(), | 367 | identifier.to_string(), |
| 129 | ); | 368 | ); |
| 130 | 369 | ||
| 131 | let announcements: Vec<Event> = match self.ctx.database.query(filter).await { | 370 | let db_announcements: Vec<Event> = match self.ctx.database.query(filter).await { |
| 132 | Ok(events) => events.into_iter().collect(), | 371 | Ok(events) => events.into_iter().collect(), |
| 133 | Err(e) => return Err(format!("Database query failed: {}", e)), | 372 | Err(e) => return Err(format!("Database query failed: {}", e)), |
| 134 | }; | 373 | }; |
| 135 | 374 | ||
| 136 | if announcements.is_empty() { | 375 | // Also collect purgatory announcements for this identifier |
| 376 | let purgatory_announcements: Vec<Event> = self | ||
| 377 | .ctx | ||
| 378 | .purgatory | ||
| 379 | .get_announcements_by_identifier(identifier) | ||
| 380 | .into_iter() | ||
| 381 | .map(|entry| entry.event) | ||
| 382 | .collect(); | ||
| 383 | |||
| 384 | let all_announcements: Vec<&Event> = db_announcements | ||
| 385 | .iter() | ||
| 386 | .chain(purgatory_announcements.iter()) | ||
| 387 | .collect(); | ||
| 388 | |||
| 389 | if all_announcements.is_empty() { | ||
| 137 | // No existing announcements for this identifier - author cannot be a maintainer | 390 | // No existing announcements for this identifier - author cannot be a maintainer |
| 138 | return Ok(false); | 391 | return Ok(false); |
| 139 | } | 392 | } |
| @@ -141,14 +394,14 @@ impl AnnouncementPolicy { | |||
| 141 | let author_hex = author.to_hex(); | 394 | let author_hex = author.to_hex(); |
| 142 | 395 | ||
| 143 | // Check each announcement to see if author is listed as a maintainer | 396 | // Check each announcement to see if author is listed as a maintainer |
| 144 | for event in &announcements { | 397 | for event in &all_announcements { |
| 145 | // Check if author is the owner of this announcement | 398 | // Check if author is the owner of this announcement |
| 146 | if event.pubkey == *author { | 399 | if event.pubkey == *author { |
| 147 | return Ok(true); | 400 | return Ok(true); |
| 148 | } | 401 | } |
| 149 | 402 | ||
| 150 | // Check if author is listed in the maintainers tag | 403 | // Check if author is listed in the maintainers tag |
| 151 | if let Ok(announcement) = RepositoryAnnouncement::from_event(event.clone()) { | 404 | if let Ok(announcement) = RepositoryAnnouncement::from_event((*event).clone()) { |
| 152 | if announcement.maintainers.contains(&author_hex) { | 405 | if announcement.maintainers.contains(&author_hex) { |
| 153 | return Ok(true); | 406 | return Ok(true); |
| 154 | } | 407 | } |
diff --git a/src/nostr/policy/deletion.rs b/src/nostr/policy/deletion.rs new file mode 100644 index 0000000..6457c90 --- /dev/null +++ b/src/nostr/policy/deletion.rs | |||
| @@ -0,0 +1,498 @@ | |||
| 1 | /// Deletion Policy - NIP-09 event deletion request handling | ||
| 2 | /// | ||
| 3 | /// Handles kind 5 (EventDeletion) events that request removal of purgatory entries | ||
| 4 | /// for repository announcements (kind 30617) and state events (kind 30618). | ||
| 5 | /// | ||
| 6 | /// ## NIP-09 Rules Enforced | ||
| 7 | /// | ||
| 8 | /// - Only the event author can delete their own events (pubkey must match) | ||
| 9 | /// - `e` tags reference specific event IDs to delete | ||
| 10 | /// - `a` tags reference addressable events by coordinate (`<kind>:<pubkey>:<d-identifier>`) | ||
| 11 | /// - When an `a` tag is used, all versions up to `created_at` of the deletion request | ||
| 12 | /// are considered deleted | ||
| 13 | /// | ||
| 14 | /// ## Purgatory Interaction | ||
| 15 | /// | ||
| 16 | /// - Kind 30617 (announcement) in purgatory: entry removed, bare repo deleted from disk | ||
| 17 | /// - Kind 30618 (state event) in purgatory: matching state event(s) removed by event ID | ||
| 18 | /// or by (author, identifier) coordinate | ||
| 19 | use nostr_relay_builder::prelude::{Event, WritePolicyResult}; | ||
| 20 | |||
| 21 | use super::PolicyContext; | ||
| 22 | |||
| 23 | /// Policy for handling NIP-09 event deletion requests | ||
| 24 | #[derive(Clone)] | ||
| 25 | pub struct DeletionPolicy { | ||
| 26 | ctx: PolicyContext, | ||
| 27 | } | ||
| 28 | |||
| 29 | impl DeletionPolicy { | ||
| 30 | pub fn new(ctx: PolicyContext) -> Self { | ||
| 31 | Self { ctx } | ||
| 32 | } | ||
| 33 | |||
| 34 | /// Process a kind 5 (EventDeletion) event. | ||
| 35 | /// | ||
| 36 | /// Checks whether the deletion request targets any purgatory announcements | ||
| 37 | /// and removes them if so. The deletion event itself is always accepted | ||
| 38 | /// (relays should store deletion requests per NIP-09). | ||
| 39 | /// | ||
| 40 | /// Only the event author can delete their own events — this is enforced by | ||
| 41 | /// checking that the purgatory entry's owner matches `event.pubkey`. | ||
| 42 | pub async fn handle(&self, event: &Event) -> WritePolicyResult { | ||
| 43 | // Process purgatory removals synchronously (no async needed) | ||
| 44 | self.remove_purgatory_targets(event); | ||
| 45 | |||
| 46 | // Always accept the deletion event itself so it is stored and | ||
| 47 | // can prevent re-acceptance of the deleted event in the future. | ||
| 48 | WritePolicyResult::Accept | ||
| 49 | } | ||
| 50 | |||
| 51 | /// Remove any purgatory entries targeted by this deletion event. | ||
| 52 | /// | ||
| 53 | /// Handles both reference styles from NIP-09: | ||
| 54 | /// - `e` tags: event ID references — match against announcement or state event IDs | ||
| 55 | /// - `a` tags: addressable coordinate references — `30617:…` or `30618:…` | ||
| 56 | /// | ||
| 57 | /// Only removes entries where the purgatory entry's author matches the deletion | ||
| 58 | /// event's pubkey (enforces author-only deletion). | ||
| 59 | fn remove_purgatory_targets(&self, event: &Event) { | ||
| 60 | let author = &event.pubkey; | ||
| 61 | |||
| 62 | for tag in event.tags.iter() { | ||
| 63 | let tag_vec = tag.as_slice(); | ||
| 64 | if tag_vec.len() < 2 { | ||
| 65 | continue; | ||
| 66 | } | ||
| 67 | |||
| 68 | match tag_vec[0].as_str() { | ||
| 69 | "e" => { | ||
| 70 | // Event ID reference: find purgatory announcement with this event ID | ||
| 71 | let target_id = &tag_vec[1]; | ||
| 72 | self.remove_by_event_id(author, target_id, event.created_at.as_secs()); | ||
| 73 | } | ||
| 74 | "a" => { | ||
| 75 | // Addressable coordinate reference: `<kind>:<pubkey>:<d-identifier>` | ||
| 76 | let coord = &tag_vec[1]; | ||
| 77 | self.remove_by_coordinate(author, coord, event.created_at.as_secs()); | ||
| 78 | } | ||
| 79 | _ => {} | ||
| 80 | } | ||
| 81 | } | ||
| 82 | } | ||
| 83 | |||
| 84 | /// Remove a purgatory entry (announcement, state event, or PR event) matched by event ID. | ||
| 85 | /// | ||
| 86 | /// Checks in order: announcements (30617), state events (30618), PR/PR-update events. | ||
| 87 | /// Only removes entries whose author matches `author`. | ||
| 88 | fn remove_by_event_id( | ||
| 89 | &self, | ||
| 90 | author: &nostr_relay_builder::prelude::PublicKey, | ||
| 91 | target_id_hex: &str, | ||
| 92 | _deletion_created_at: u64, | ||
| 93 | ) { | ||
| 94 | // --- Check PR events (kind 1617/1618) first — O(1) direct lookup --- | ||
| 95 | // PR purgatory is keyed by event ID hex, so this is the cheapest check. | ||
| 96 | // Only remove if the entry has an actual event (not a placeholder) and the | ||
| 97 | // event's author matches the deletion request author. | ||
| 98 | if let Some(entry) = self.ctx.purgatory.find_pr(target_id_hex) { | ||
| 99 | if let Some(ref event) = entry.event { | ||
| 100 | if event.pubkey == *author { | ||
| 101 | tracing::info!( | ||
| 102 | event_id = %target_id_hex, | ||
| 103 | author = %author.to_hex(), | ||
| 104 | "Deletion request: removing purgatory PR event by event ID" | ||
| 105 | ); | ||
| 106 | self.ctx.purgatory.remove_pr(target_id_hex); | ||
| 107 | return; | ||
| 108 | } | ||
| 109 | } | ||
| 110 | // Entry exists but is a placeholder or wrong author — don't remove | ||
| 111 | return; | ||
| 112 | } | ||
| 113 | |||
| 114 | // --- Check announcements (kind 30617) --- | ||
| 115 | // The DashMap doesn't expose a direct "find by event ID" method, so we use | ||
| 116 | // the announcements_for_sync snapshot to enumerate all (repo_id, _) pairs. | ||
| 117 | let all = self.ctx.purgatory.announcements_for_sync(); | ||
| 118 | for (repo_id, _) in all { | ||
| 119 | // repo_id format: "30617:{pubkey_hex}:{identifier}" | ||
| 120 | let parts: Vec<&str> = repo_id.splitn(3, ':').collect(); | ||
| 121 | if parts.len() != 3 { | ||
| 122 | continue; | ||
| 123 | } | ||
| 124 | let entry_pubkey_hex = parts[1]; | ||
| 125 | let identifier = parts[2]; | ||
| 126 | |||
| 127 | if entry_pubkey_hex != author.to_hex() { | ||
| 128 | continue; | ||
| 129 | } | ||
| 130 | |||
| 131 | if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { | ||
| 132 | if entry.event.id.to_hex() == target_id_hex { | ||
| 133 | tracing::info!( | ||
| 134 | event_id = %target_id_hex, | ||
| 135 | identifier = %identifier, | ||
| 136 | author = %author.to_hex(), | ||
| 137 | "Deletion request: removing purgatory announcement by event ID" | ||
| 138 | ); | ||
| 139 | self.evict_purgatory_entry(author, identifier); | ||
| 140 | return; // event IDs are unique | ||
| 141 | } | ||
| 142 | } | ||
| 143 | } | ||
| 144 | |||
| 145 | // --- Check state events (kind 30618) --- | ||
| 146 | // State events are keyed by identifier; scan all identifiers for a match. | ||
| 147 | let state_identifiers = self.ctx.purgatory.get_all_identifiers(); | ||
| 148 | for identifier in state_identifiers { | ||
| 149 | let entries = self.ctx.purgatory.find_state(&identifier); | ||
| 150 | for entry in entries { | ||
| 151 | if entry.author == *author && entry.event.id.to_hex() == target_id_hex { | ||
| 152 | tracing::info!( | ||
| 153 | event_id = %target_id_hex, | ||
| 154 | identifier = %identifier, | ||
| 155 | author = %author.to_hex(), | ||
| 156 | "Deletion request: removing purgatory state event by event ID" | ||
| 157 | ); | ||
| 158 | self.ctx.purgatory.remove_state_event(&identifier, &entry.event.id); | ||
| 159 | return; // event IDs are unique | ||
| 160 | } | ||
| 161 | } | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | /// Remove a purgatory entry matched by addressable coordinate. | ||
| 166 | /// | ||
| 167 | /// The coordinate format is `<kind>:<pubkey>:<d-identifier>`. | ||
| 168 | /// Handles kind 30617 (announcements) and kind 30618 (state events). | ||
| 169 | /// | ||
| 170 | /// Per NIP-09, all versions up to `deletion_created_at` are considered deleted. | ||
| 171 | fn remove_by_coordinate( | ||
| 172 | &self, | ||
| 173 | author: &nostr_relay_builder::prelude::PublicKey, | ||
| 174 | coordinate: &str, | ||
| 175 | deletion_created_at: u64, | ||
| 176 | ) { | ||
| 177 | // Parse coordinate: `<kind>:<pubkey>:<d-identifier>` | ||
| 178 | let parts: Vec<&str> = coordinate.splitn(3, ':').collect(); | ||
| 179 | if parts.len() != 3 { | ||
| 180 | return; | ||
| 181 | } | ||
| 182 | |||
| 183 | let kind_str = parts[0]; | ||
| 184 | let coord_pubkey_hex = parts[1]; | ||
| 185 | let identifier = parts[2]; | ||
| 186 | |||
| 187 | // The coordinate pubkey must match the deletion event author | ||
| 188 | if coord_pubkey_hex != author.to_hex() { | ||
| 189 | tracing::debug!( | ||
| 190 | coord_pubkey = %coord_pubkey_hex, | ||
| 191 | deletion_author = %author.to_hex(), | ||
| 192 | "Ignoring deletion: coordinate pubkey does not match deletion author" | ||
| 193 | ); | ||
| 194 | return; | ||
| 195 | } | ||
| 196 | |||
| 197 | match kind_str { | ||
| 198 | "30617" => { | ||
| 199 | // Announcement purgatory entry | ||
| 200 | if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { | ||
| 201 | if entry.event.created_at.as_secs() <= deletion_created_at { | ||
| 202 | tracing::info!( | ||
| 203 | identifier = %identifier, | ||
| 204 | author = %author.to_hex(), | ||
| 205 | "Deletion request: removing purgatory announcement by coordinate" | ||
| 206 | ); | ||
| 207 | self.evict_purgatory_entry(author, identifier); | ||
| 208 | } else { | ||
| 209 | tracing::debug!( | ||
| 210 | identifier = %identifier, | ||
| 211 | author = %author.to_hex(), | ||
| 212 | "Ignoring deletion: purgatory announcement is newer than deletion request" | ||
| 213 | ); | ||
| 214 | } | ||
| 215 | } | ||
| 216 | } | ||
| 217 | "30618" => { | ||
| 218 | // State event purgatory entries for this (author, identifier). | ||
| 219 | // Remove all entries authored by `author` with created_at ≤ deletion_created_at. | ||
| 220 | let entries = self.ctx.purgatory.find_state(identifier); | ||
| 221 | let mut removed = 0usize; | ||
| 222 | for entry in entries { | ||
| 223 | if entry.author == *author | ||
| 224 | && entry.event.created_at.as_secs() <= deletion_created_at | ||
| 225 | { | ||
| 226 | self.ctx.purgatory.remove_state_event(identifier, &entry.event.id); | ||
| 227 | removed += 1; | ||
| 228 | } | ||
| 229 | } | ||
| 230 | if removed > 0 { | ||
| 231 | tracing::info!( | ||
| 232 | identifier = %identifier, | ||
| 233 | author = %author.to_hex(), | ||
| 234 | removed = %removed, | ||
| 235 | "Deletion request: removed purgatory state event(s) by coordinate" | ||
| 236 | ); | ||
| 237 | } | ||
| 238 | } | ||
| 239 | _ => { | ||
| 240 | // Other kinds not handled | ||
| 241 | } | ||
| 242 | } | ||
| 243 | } | ||
| 244 | |||
| 245 | /// Remove a purgatory announcement and delete its bare repository from disk. | ||
| 246 | fn evict_purgatory_entry( | ||
| 247 | &self, | ||
| 248 | author: &nostr_relay_builder::prelude::PublicKey, | ||
| 249 | identifier: &str, | ||
| 250 | ) { | ||
| 251 | // Get repo path before removing | ||
| 252 | if let Some(entry) = self.ctx.purgatory.find_announcement(author, identifier) { | ||
| 253 | if entry.repo_path.exists() { | ||
| 254 | if let Err(e) = std::fs::remove_dir_all(&entry.repo_path) { | ||
| 255 | tracing::warn!( | ||
| 256 | path = %entry.repo_path.display(), | ||
| 257 | error = %e, | ||
| 258 | "Failed to delete bare repository during deletion request processing" | ||
| 259 | ); | ||
| 260 | } else { | ||
| 261 | tracing::info!( | ||
| 262 | path = %entry.repo_path.display(), | ||
| 263 | "Deleted bare repository for deletion-requested purgatory announcement" | ||
| 264 | ); | ||
| 265 | } | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | self.ctx.purgatory.remove_announcement(author, identifier); | ||
| 270 | |||
| 271 | // Remove state events for this identifier only if no other owner's | ||
| 272 | // announcement remains in purgatory (state events are keyed by identifier alone) | ||
| 273 | let other_owners_remain = !self | ||
| 274 | .ctx | ||
| 275 | .purgatory | ||
| 276 | .get_announcements_by_identifier(identifier) | ||
| 277 | .is_empty(); | ||
| 278 | |||
| 279 | if !other_owners_remain { | ||
| 280 | self.ctx.purgatory.remove_state(identifier); | ||
| 281 | } | ||
| 282 | } | ||
| 283 | } | ||
| 284 | |||
| 285 | #[cfg(test)] | ||
| 286 | mod tests { | ||
| 287 | use super::*; | ||
| 288 | use crate::nostr::policy::PolicyContext; | ||
| 289 | use crate::purgatory::Purgatory; | ||
| 290 | use nostr_relay_builder::prelude::*; | ||
| 291 | use std::collections::HashSet; | ||
| 292 | use std::path::PathBuf; | ||
| 293 | use std::sync::Arc; | ||
| 294 | |||
| 295 | fn make_context() -> PolicyContext { | ||
| 296 | let db = Arc::new(MemoryDatabase::with_opts(MemoryDatabaseOptions { | ||
| 297 | events: true, | ||
| 298 | max_events: None, | ||
| 299 | })); | ||
| 300 | let purgatory = Arc::new(Purgatory::new(PathBuf::new())); | ||
| 301 | let config = crate::config::Config::for_testing(); | ||
| 302 | PolicyContext::new("test.example.com", db, PathBuf::new(), purgatory, config) | ||
| 303 | } | ||
| 304 | |||
| 305 | fn make_announcement_event(keys: &Keys, identifier: &str) -> Event { | ||
| 306 | EventBuilder::new(Kind::GitRepoAnnouncement, "") | ||
| 307 | .tags(vec![ | ||
| 308 | Tag::identifier(identifier), | ||
| 309 | Tag::custom(TagKind::custom("clone"), vec!["https://example.com/repo.git"]), | ||
| 310 | ]) | ||
| 311 | .sign_with_keys(keys) | ||
| 312 | .unwrap() | ||
| 313 | } | ||
| 314 | |||
| 315 | fn add_to_purgatory(ctx: &PolicyContext, event: &Event, identifier: &str) { | ||
| 316 | ctx.purgatory.add_announcement( | ||
| 317 | event.clone(), | ||
| 318 | identifier.to_string(), | ||
| 319 | event.pubkey, | ||
| 320 | PathBuf::new(), | ||
| 321 | HashSet::new(), | ||
| 322 | ); | ||
| 323 | } | ||
| 324 | |||
| 325 | #[tokio::test] | ||
| 326 | async fn test_deletion_by_event_id_removes_purgatory_entry() { | ||
| 327 | let ctx = make_context(); | ||
| 328 | let keys = Keys::generate(); | ||
| 329 | let identifier = "my-repo"; | ||
| 330 | |||
| 331 | let announcement = make_announcement_event(&keys, identifier); | ||
| 332 | add_to_purgatory(&ctx, &announcement, identifier); | ||
| 333 | |||
| 334 | assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier)); | ||
| 335 | |||
| 336 | // Build kind 5 deletion event referencing the announcement by event ID | ||
| 337 | let deletion = EventBuilder::new(Kind::EventDeletion, "") | ||
| 338 | .tags(vec![ | ||
| 339 | Tag::event(announcement.id), | ||
| 340 | Tag::custom(TagKind::custom("k"), vec!["30617"]), | ||
| 341 | ]) | ||
| 342 | .sign_with_keys(&keys) | ||
| 343 | .unwrap(); | ||
| 344 | |||
| 345 | let policy = DeletionPolicy::new(ctx.clone()); | ||
| 346 | let result = policy.handle(&deletion).await; | ||
| 347 | |||
| 348 | assert!(matches!(result, WritePolicyResult::Accept)); | ||
| 349 | assert!( | ||
| 350 | !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), | ||
| 351 | "Purgatory entry should have been removed" | ||
| 352 | ); | ||
| 353 | } | ||
| 354 | |||
| 355 | #[tokio::test] | ||
| 356 | async fn test_deletion_by_coordinate_removes_purgatory_entry() { | ||
| 357 | let ctx = make_context(); | ||
| 358 | let keys = Keys::generate(); | ||
| 359 | let identifier = "my-repo"; | ||
| 360 | |||
| 361 | let announcement = make_announcement_event(&keys, identifier); | ||
| 362 | add_to_purgatory(&ctx, &announcement, identifier); | ||
| 363 | |||
| 364 | assert!(ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier)); | ||
| 365 | |||
| 366 | // Build kind 5 deletion event referencing the announcement by coordinate | ||
| 367 | let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier); | ||
| 368 | let deletion = EventBuilder::new(Kind::EventDeletion, "") | ||
| 369 | .tags(vec![ | ||
| 370 | Tag::custom(TagKind::custom("a"), vec![coord]), | ||
| 371 | Tag::custom(TagKind::custom("k"), vec!["30617"]), | ||
| 372 | ]) | ||
| 373 | .sign_with_keys(&keys) | ||
| 374 | .unwrap(); | ||
| 375 | |||
| 376 | let policy = DeletionPolicy::new(ctx.clone()); | ||
| 377 | let result = policy.handle(&deletion).await; | ||
| 378 | |||
| 379 | assert!(matches!(result, WritePolicyResult::Accept)); | ||
| 380 | assert!( | ||
| 381 | !ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), | ||
| 382 | "Purgatory entry should have been removed" | ||
| 383 | ); | ||
| 384 | } | ||
| 385 | |||
| 386 | #[tokio::test] | ||
| 387 | async fn test_deletion_by_wrong_author_does_not_remove() { | ||
| 388 | let ctx = make_context(); | ||
| 389 | let owner_keys = Keys::generate(); | ||
| 390 | let attacker_keys = Keys::generate(); | ||
| 391 | let identifier = "my-repo"; | ||
| 392 | |||
| 393 | let announcement = make_announcement_event(&owner_keys, identifier); | ||
| 394 | add_to_purgatory(&ctx, &announcement, identifier); | ||
| 395 | |||
| 396 | // Attacker tries to delete by event ID | ||
| 397 | let deletion = EventBuilder::new(Kind::EventDeletion, "") | ||
| 398 | .tags(vec![ | ||
| 399 | Tag::event(announcement.id), | ||
| 400 | Tag::custom(TagKind::custom("k"), vec!["30617"]), | ||
| 401 | ]) | ||
| 402 | .sign_with_keys(&attacker_keys) | ||
| 403 | .unwrap(); | ||
| 404 | |||
| 405 | let policy = DeletionPolicy::new(ctx.clone()); | ||
| 406 | let result = policy.handle(&deletion).await; | ||
| 407 | |||
| 408 | assert!(matches!(result, WritePolicyResult::Accept)); | ||
| 409 | assert!( | ||
| 410 | ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier), | ||
| 411 | "Purgatory entry should NOT have been removed by wrong author" | ||
| 412 | ); | ||
| 413 | } | ||
| 414 | |||
| 415 | #[tokio::test] | ||
| 416 | async fn test_deletion_by_coordinate_wrong_author_does_not_remove() { | ||
| 417 | let ctx = make_context(); | ||
| 418 | let owner_keys = Keys::generate(); | ||
| 419 | let attacker_keys = Keys::generate(); | ||
| 420 | let identifier = "my-repo"; | ||
| 421 | |||
| 422 | let announcement = make_announcement_event(&owner_keys, identifier); | ||
| 423 | add_to_purgatory(&ctx, &announcement, identifier); | ||
| 424 | |||
| 425 | // Attacker tries to delete by coordinate using owner's pubkey in coord | ||
| 426 | // but signs with their own key — coord pubkey != deletion author | ||
| 427 | let coord = format!("30617:{}:{}", owner_keys.public_key().to_hex(), identifier); | ||
| 428 | let deletion = EventBuilder::new(Kind::EventDeletion, "") | ||
| 429 | .tags(vec![ | ||
| 430 | Tag::custom(TagKind::custom("a"), vec![coord]), | ||
| 431 | Tag::custom(TagKind::custom("k"), vec!["30617"]), | ||
| 432 | ]) | ||
| 433 | .sign_with_keys(&attacker_keys) | ||
| 434 | .unwrap(); | ||
| 435 | |||
| 436 | let policy = DeletionPolicy::new(ctx.clone()); | ||
| 437 | let result = policy.handle(&deletion).await; | ||
| 438 | |||
| 439 | assert!(matches!(result, WritePolicyResult::Accept)); | ||
| 440 | assert!( | ||
| 441 | ctx.purgatory.has_purgatory_announcement(&owner_keys.public_key(), identifier), | ||
| 442 | "Purgatory entry should NOT have been removed by wrong author" | ||
| 443 | ); | ||
| 444 | } | ||
| 445 | |||
| 446 | #[tokio::test] | ||
| 447 | async fn test_deletion_of_nonexistent_entry_is_accepted() { | ||
| 448 | let ctx = make_context(); | ||
| 449 | let keys = Keys::generate(); | ||
| 450 | |||
| 451 | // No purgatory entry exists — deletion should still be accepted | ||
| 452 | let deletion = EventBuilder::new(Kind::EventDeletion, "") | ||
| 453 | .tags(vec![ | ||
| 454 | Tag::custom(TagKind::custom("a"), vec![ | ||
| 455 | format!("30617:{}:nonexistent", keys.public_key().to_hex()) | ||
| 456 | ]), | ||
| 457 | ]) | ||
| 458 | .sign_with_keys(&keys) | ||
| 459 | .unwrap(); | ||
| 460 | |||
| 461 | let policy = DeletionPolicy::new(ctx.clone()); | ||
| 462 | let result = policy.handle(&deletion).await; | ||
| 463 | |||
| 464 | assert!(matches!(result, WritePolicyResult::Accept)); | ||
| 465 | } | ||
| 466 | |||
| 467 | #[tokio::test] | ||
| 468 | async fn test_deletion_by_coordinate_respects_created_at() { | ||
| 469 | let ctx = make_context(); | ||
| 470 | let keys = Keys::generate(); | ||
| 471 | let identifier = "my-repo"; | ||
| 472 | |||
| 473 | // Create announcement with a future timestamp | ||
| 474 | let future_ts = Timestamp::now().as_secs() + 3600; // 1 hour in the future | ||
| 475 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "") | ||
| 476 | .tags(vec![Tag::identifier(identifier)]) | ||
| 477 | .custom_created_at(Timestamp::from(future_ts)) | ||
| 478 | .sign_with_keys(&keys) | ||
| 479 | .unwrap(); | ||
| 480 | add_to_purgatory(&ctx, &announcement, identifier); | ||
| 481 | |||
| 482 | // Deletion event with current timestamp (older than announcement) | ||
| 483 | let coord = format!("30617:{}:{}", keys.public_key().to_hex(), identifier); | ||
| 484 | let deletion = EventBuilder::new(Kind::EventDeletion, "") | ||
| 485 | .tags(vec![Tag::custom(TagKind::custom("a"), vec![coord])]) | ||
| 486 | .sign_with_keys(&keys) | ||
| 487 | .unwrap(); | ||
| 488 | |||
| 489 | let policy = DeletionPolicy::new(ctx.clone()); | ||
| 490 | let result = policy.handle(&deletion).await; | ||
| 491 | |||
| 492 | assert!(matches!(result, WritePolicyResult::Accept)); | ||
| 493 | assert!( | ||
| 494 | ctx.purgatory.has_purgatory_announcement(&keys.public_key(), identifier), | ||
| 495 | "Purgatory entry should NOT be removed: entry is newer than deletion request" | ||
| 496 | ); | ||
| 497 | } | ||
| 498 | } | ||
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..f5b981a 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs | |||
| @@ -6,11 +6,13 @@ | |||
| 6 | /// - `PrEventPolicy` - PR/PR Update validation | 6 | /// - `PrEventPolicy` - PR/PR Update validation |
| 7 | /// - `RelatedEventPolicy` - Forward/backward reference checking | 7 | /// - `RelatedEventPolicy` - Forward/backward reference checking |
| 8 | mod announcement; | 8 | mod announcement; |
| 9 | mod deletion; | ||
| 9 | mod pr_event; | 10 | mod pr_event; |
| 10 | mod related; | 11 | mod related; |
| 11 | mod state; | 12 | mod state; |
| 12 | 13 | ||
| 13 | pub use announcement::{AnnouncementPolicy, AnnouncementResult}; | 14 | pub use announcement::{AnnouncementPolicy, AnnouncementResult}; |
| 15 | pub use deletion::DeletionPolicy; | ||
| 14 | pub use pr_event::PrEventPolicy; | 16 | pub use pr_event::PrEventPolicy; |
| 15 | pub use related::{ReferenceResult, RelatedEventPolicy}; | 17 | pub use related::{ReferenceResult, RelatedEventPolicy}; |
| 16 | pub use state::{StatePolicy, StateResult}; | 18 | pub use state::{StatePolicy, StateResult}; |
diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs index 00e09c3..072e445 100644 --- a/src/nostr/policy/pr_event.rs +++ b/src/nostr/policy/pr_event.rs | |||
| @@ -127,6 +127,10 @@ impl PrEventPolicy { | |||
| 127 | .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; | 127 | .ok_or_else(|| anyhow::anyhow!("No identifier in PR event"))?; |
| 128 | 128 | ||
| 129 | // Fetch repository data | 129 | // Fetch repository data |
| 130 | // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should | ||
| 131 | // only be accepted for announcements that have been promoted (validated). | ||
| 132 | // If the announcement is still in purgatory, the PR event should also go | ||
| 133 | // to purgatory and wait for the announcement to be promoted. | ||
| 130 | let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; | 134 | let db_repo_data = fetch_repository_data(&self.ctx.database, &identifier).await?; |
| 131 | 135 | ||
| 132 | // Extract owner pubkey from source repo path | 136 | // Extract owner pubkey from source repo path |
| @@ -203,6 +207,10 @@ impl PrEventPolicy { | |||
| 203 | let identifier = parts[2]; | 207 | let identifier = parts[2]; |
| 204 | 208 | ||
| 205 | // 2. Fetch repo data | 209 | // 2. Fetch repo data |
| 210 | // NOTE: Only fetch from database, NOT purgatory. Incoming PR events should | ||
| 211 | // only be accepted for announcements that have been promoted (validated). | ||
| 212 | // If the announcement is still in purgatory, the PR event should also go | ||
| 213 | // to purgatory and wait for the announcement to be promoted. | ||
| 206 | let db_repo_data = fetch_repository_data(&self.ctx.database, identifier).await?; | 214 | let db_repo_data = fetch_repository_data(&self.ctx.database, identifier).await?; |
| 207 | 215 | ||
| 208 | // 3. Extract list of maintainers from "a 30617:<maintainer>:<identifier>" tags | 216 | // 3. Extract list of maintainers from "a 30617:<maintainer>:<identifier>" tags |
diff --git a/src/nostr/policy/related.rs b/src/nostr/policy/related.rs index 7ce87db..cfe04a7 100644 --- a/src/nostr/policy/related.rs +++ b/src/nostr/policy/related.rs | |||
| @@ -139,6 +139,11 @@ impl RelatedEventPolicy { | |||
| 139 | .push((addr, pubkey, identifier)); | 139 | .push((addr, pubkey, identifier)); |
| 140 | } | 140 | } |
| 141 | 141 | ||
| 142 | // NOTE: Intentionally only checks the database (promoted announcements), not purgatory. | ||
| 143 | // Related events should only be accepted once the repository announcement has been | ||
| 144 | // validated (promoted via git data). Events referencing purgatory-only repositories | ||
| 145 | // are correctly rejected as orphans and can be re-submitted after promotion. | ||
| 146 | |||
| 142 | // Query each kind group | 147 | // Query each kind group |
| 143 | for (kind, refs) in by_kind { | 148 | for (kind, refs) in by_kind { |
| 144 | let authors: Vec<PublicKey> = refs.iter().map(|(_, pk, _)| *pk).collect(); | 149 | let authors: Vec<PublicKey> = refs.iter().map(|(_, pk, _)| *pk).collect(); |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index 3411077..df743ae 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -1,3 +1,4 @@ | |||
| 1 | use std::collections::HashSet; | ||
| 1 | use std::path::{Path, PathBuf}; | 2 | use std::path::{Path, PathBuf}; |
| 2 | 3 | ||
| 3 | use anyhow::{Context, Result}; | 4 | use anyhow::{Context, Result}; |
| @@ -10,7 +11,7 @@ use nostr_relay_builder::prelude::Event; | |||
| 10 | 11 | ||
| 11 | use super::PolicyContext; | 12 | use super::PolicyContext; |
| 12 | use crate::git; | 13 | use crate::git; |
| 13 | use crate::git::authorization::fetch_repository_data; | 14 | use crate::git::authorization::fetch_repository_data_with_purgatory; |
| 14 | use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; | 15 | use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; |
| 15 | 16 | ||
| 16 | /// Result of state policy evaluation | 17 | /// Result of state policy evaluation |
| @@ -76,7 +77,13 @@ impl StatePolicy { | |||
| 76 | } | 77 | } |
| 77 | 78 | ||
| 78 | // Get all repositories and state events from db with identifier | 79 | // Get all repositories and state events from db with identifier |
| 79 | let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; | 80 | // Include purgatory announcements for authorization |
| 81 | let db_repo_data = fetch_repository_data_with_purgatory( | ||
| 82 | &self.ctx.database, | ||
| 83 | &self.ctx.purgatory, | ||
| 84 | &state.identifier, | ||
| 85 | ) | ||
| 86 | .await?; | ||
| 80 | 87 | ||
| 81 | // CRITICAL: Check if author is authorized via maintainer set | 88 | // CRITICAL: Check if author is authorized via maintainer set |
| 82 | // State events MUST be rejected if author is not in maintainer set of any accepted announcement | 89 | // State events MUST be rejected if author is not in maintainer set of any accepted announcement |
| @@ -139,6 +146,34 @@ impl StatePolicy { | |||
| 139 | "State event author authorized via maintainer set" | 146 | "State event author authorized via maintainer set" |
| 140 | ); | 147 | ); |
| 141 | 148 | ||
| 149 | // Extend expiry for any purgatory announcements for this identifier. | ||
| 150 | // | ||
| 151 | // Per design doc decision #4: state event arrival extends the purgatory | ||
| 152 | // announcement's expiry (reset the 30-minute protocol timer). This prevents | ||
| 153 | // premature expiry during slow sync operations — the repo is actively receiving | ||
| 154 | // metadata so it should stay alive. | ||
| 155 | // | ||
| 156 | // We extend for all owners that authorized this state event, since the state | ||
| 157 | // event proves the repo is active regardless of which owner's announcement | ||
| 158 | // authorized it. | ||
| 159 | for owner_hex in &authorized_owners { | ||
| 160 | if let Ok(owner_pk) = nostr_sdk::PublicKey::from_hex(owner_hex) { | ||
| 161 | if self.ctx.purgatory.has_purgatory_announcement(&owner_pk, &state.identifier) { | ||
| 162 | self.ctx.purgatory.extend_announcement_expiry( | ||
| 163 | &owner_pk, | ||
| 164 | &state.identifier, | ||
| 165 | std::time::Duration::from_secs(1800), | ||
| 166 | ); | ||
| 167 | tracing::debug!( | ||
| 168 | event_id = %event.id, | ||
| 169 | identifier = %state.identifier, | ||
| 170 | owner = %owner_hex, | ||
| 171 | "Extended purgatory announcement expiry due to state event arrival" | ||
| 172 | ); | ||
| 173 | } | ||
| 174 | } | ||
| 175 | } | ||
| 176 | |||
| 142 | // Duplicate check in db | 177 | // Duplicate check in db |
| 143 | if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { | 178 | if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { |
| 144 | tracing::debug!("processed state event duplicate (in db): {}", event.id); | 179 | tracing::debug!("processed state event duplicate (in db): {}", event.id); |
| @@ -186,6 +221,42 @@ impl StatePolicy { | |||
| 186 | } | 221 | } |
| 187 | } | 222 | } |
| 188 | 223 | ||
| 224 | // After copying OIDs to other owner repos, promote any purgatory announcements | ||
| 225 | // for those repos. This handles the case where two maintainers push to the same | ||
| 226 | // identifier on the same relay with identical commit hashes: the second maintainer's | ||
| 227 | // announcement sits in purgatory, and when their state event arrives the relay copies | ||
| 228 | // commits from the first maintainer's repo — but without this call the announcement | ||
| 229 | // would stay in purgatory indefinitely. | ||
| 230 | let local_relay = self.ctx.get_local_relay(); | ||
| 231 | let empty_oids: HashSet<String> = HashSet::new(); | ||
| 232 | for announcement in &db_repo_data.announcements { | ||
| 233 | let target_repo_path = self.ctx.git_data_path.join(announcement.repo_path()); | ||
| 234 | if target_repo_path != repo_with_git_data { | ||
| 235 | // OIDs were copied to this repo by process_state_with_git_data; | ||
| 236 | // check if there's a purgatory announcement waiting for it. | ||
| 237 | if let Err(e) = crate::git::sync::process_newly_available_git_data( | ||
| 238 | &target_repo_path, | ||
| 239 | &empty_oids, | ||
| 240 | &self.ctx.database, | ||
| 241 | local_relay.as_ref(), | ||
| 242 | &self.ctx.purgatory, | ||
| 243 | &self.ctx.git_data_path, | ||
| 244 | None, | ||
| 245 | None, | ||
| 246 | ) | ||
| 247 | .await | ||
| 248 | { | ||
| 249 | tracing::warn!( | ||
| 250 | identifier = %state.identifier, | ||
| 251 | event_id = %event.id, | ||
| 252 | repo_path = %target_repo_path.display(), | ||
| 253 | error = %e, | ||
| 254 | "Failed to process purgatory announcements for target repo after git sync copy" | ||
| 255 | ); | ||
| 256 | } | ||
| 257 | } | ||
| 258 | } | ||
| 259 | |||
| 189 | // Event will be saved and broadcast by relay builder | 260 | // Event will be saved and broadcast by relay builder |
| 190 | Ok(WritePolicyResult::Accept) | 261 | Ok(WritePolicyResult::Accept) |
| 191 | } else { | 262 | } else { |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 2c278f6..bb6ff54 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -17,7 +17,7 @@ pub mod sync; | |||
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| @@ -34,6 +34,13 @@ pub use sync::SyncQueueEntry; | |||
| 34 | /// Default expiry duration for purgatory entries (30 minutes) | 34 | /// Default expiry duration for purgatory entries (30 minutes) |
| 35 | const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); | 35 | const DEFAULT_EXPIRY: Duration = Duration::from_secs(1800); |
| 36 | 36 | ||
| 37 | /// Extended expiry for soft-expired announcements (24 hours). | ||
| 38 | /// | ||
| 39 | /// After the initial 30-minute expiry, the bare repo is deleted but the event is | ||
| 40 | /// retained for this additional period. This allows revival if a state event arrives | ||
| 41 | /// late (e.g. slow sync), without permanently blocking the repository. | ||
| 42 | const SOFT_EXPIRY_EXTENDED: Duration = Duration::from_secs(86400); | ||
| 43 | |||
| 37 | /// Default delay before syncing user-submitted events (3 minutes). | 44 | /// Default delay before syncing user-submitted events (3 minutes). |
| 38 | /// This gives time for the git push to arrive after the nostr event. | 45 | /// This gives time for the git push to arrive after the nostr event. |
| 39 | const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); | 46 | const DEFAULT_SYNC_DELAY: Duration = Duration::from_secs(180); |
| @@ -83,9 +90,35 @@ struct SerializablePrPurgatoryEntry { | |||
| 83 | source: types::EventSource, | 90 | source: types::EventSource, |
| 84 | } | 91 | } |
| 85 | 92 | ||
| 93 | /// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. | ||
| 94 | /// | ||
| 95 | /// Stores `Instant` fields as `Duration` offsets from the `saved_at` timestamp | ||
| 96 | /// in `PurgatoryState`, allowing state to be persisted and restored across restarts. | ||
| 97 | /// | ||
| 98 | /// Note: soft-expired entries (bare repo deleted) are NOT persisted — they have | ||
| 99 | /// no git repo on disk and would be immediately cleaned up on restore anyway. | ||
| 100 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 101 | struct SerializableAnnouncementPurgatoryEntry { | ||
| 102 | /// The nostr announcement event (kind 30617) | ||
| 103 | event: Event, | ||
| 104 | /// The repository identifier from the event's 'd' tag | ||
| 105 | identifier: String, | ||
| 106 | /// The owner pubkey (event author) | ||
| 107 | owner: PublicKey, | ||
| 108 | /// Path to the bare git repository (must exist on disk) | ||
| 109 | repo_path: PathBuf, | ||
| 110 | /// Relay URLs from the announcement (for sync registration) | ||
| 111 | relays: HashSet<String>, | ||
| 112 | /// Duration offset from saved_at for created_at | ||
| 113 | created_at_offset_secs: u64, | ||
| 114 | /// Duration offset from saved_at for expires_at | ||
| 115 | expires_at_offset_secs: u64, | ||
| 116 | } | ||
| 117 | |||
| 86 | /// Serializable purgatory state for disk persistence. | 118 | /// Serializable purgatory state for disk persistence. |
| 87 | /// | 119 | /// |
| 88 | /// Contains all purgatory data needed to restore state across restarts: | 120 | /// Contains all purgatory data needed to restore state across restarts: |
| 121 | /// - Announcement events (indexed by (owner, identifier)) — non-soft-expired only | ||
| 89 | /// - State events (indexed by identifier) | 122 | /// - State events (indexed by identifier) |
| 90 | /// - PR events (indexed by event ID) | 123 | /// - PR events (indexed by event ID) |
| 91 | /// - Expired events (to prevent re-sync loops) | 124 | /// - Expired events (to prevent re-sync loops) |
| @@ -97,6 +130,10 @@ struct PurgatoryState { | |||
| 97 | version: u32, | 130 | version: u32, |
| 98 | /// When this state was saved to disk | 131 | /// When this state was saved to disk |
| 99 | saved_at: SystemTime, | 132 | saved_at: SystemTime, |
| 133 | /// Announcement events indexed by "owner_hex:identifier" | ||
| 134 | /// Only non-soft-expired entries are persisted (bare repo must exist). | ||
| 135 | #[serde(default)] | ||
| 136 | announcement_purgatory: HashMap<String, SerializableAnnouncementPurgatoryEntry>, | ||
| 100 | /// State events indexed by repository identifier | 137 | /// State events indexed by repository identifier |
| 101 | state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>, | 138 | state_events: HashMap<String, Vec<SerializableStatePurgatoryEntry>>, |
| 102 | /// PR events indexed by event ID (hex string) | 139 | /// PR events indexed by event ID (hex string) |
| @@ -107,7 +144,8 @@ struct PurgatoryState { | |||
| 107 | 144 | ||
| 108 | /// Main purgatory structure holding events awaiting git data. | 145 | /// Main purgatory structure holding events awaiting git data. |
| 109 | /// | 146 | /// |
| 110 | /// Provides thread-safe concurrent access to two separate stores: | 147 | /// Provides thread-safe concurrent access to three separate stores: |
| 148 | /// - Announcements indexed by (pubkey, identifier) | ||
| 111 | /// - State events indexed by repository identifier | 149 | /// - State events indexed by repository identifier |
| 112 | /// - PR events indexed by event ID | 150 | /// - PR events indexed by event ID |
| 113 | /// | 151 | /// |
| @@ -128,6 +166,10 @@ struct PurgatoryState { | |||
| 128 | /// that we've already determined have no git data available. | 166 | /// that we've already determined have no git data available. |
| 129 | #[derive(Clone)] | 167 | #[derive(Clone)] |
| 130 | pub struct Purgatory { | 168 | pub struct Purgatory { |
| 169 | /// Repository announcements (kind 30617) indexed by (owner pubkey, identifier). | ||
| 170 | /// Key: (PublicKey, String) where String is the repository identifier. | ||
| 171 | announcement_purgatory: Arc<DashMap<(PublicKey, String), AnnouncementPurgatoryEntry>>, | ||
| 172 | |||
| 131 | /// State events (kind 30618) indexed by repository identifier. | 173 | /// State events (kind 30618) indexed by repository identifier. |
| 132 | /// Multiple state events can wait for the same identifier (different maintainers). | 174 | /// Multiple state events can wait for the same identifier (different maintainers). |
| 133 | state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>, | 175 | state_events: Arc<DashMap<String, Vec<StatePurgatoryEntry>>>, |
| @@ -152,6 +194,7 @@ impl Purgatory { | |||
| 152 | /// Create a new empty purgatory. | 194 | /// Create a new empty purgatory. |
| 153 | pub fn new(git_data_path: impl Into<PathBuf>) -> Self { | 195 | pub fn new(git_data_path: impl Into<PathBuf>) -> Self { |
| 154 | Self { | 196 | Self { |
| 197 | announcement_purgatory: Arc::new(DashMap::new()), | ||
| 155 | state_events: Arc::new(DashMap::new()), | 198 | state_events: Arc::new(DashMap::new()), |
| 156 | pr_events: Arc::new(DashMap::new()), | 199 | pr_events: Arc::new(DashMap::new()), |
| 157 | sync_queue: Arc::new(DashMap::new()), | 200 | sync_queue: Arc::new(DashMap::new()), |
| @@ -576,9 +619,245 @@ impl Purgatory { | |||
| 576 | self.pr_events.remove(event_id); | 619 | self.pr_events.remove(event_id); |
| 577 | } | 620 | } |
| 578 | 621 | ||
| 622 | // ========================================================================= | ||
| 623 | // Announcement Purgatory Methods | ||
| 624 | // ========================================================================= | ||
| 625 | |||
| 626 | /// Add a repository announcement to purgatory. | ||
| 627 | /// | ||
| 628 | /// The announcement will be held until git data arrives, at which point | ||
| 629 | /// it will be promoted to the database and served to clients. | ||
| 630 | /// | ||
| 631 | /// # Arguments | ||
| 632 | /// * `event` - The announcement event (kind 30617) | ||
| 633 | /// * `identifier` - The repository identifier from the 'd' tag | ||
| 634 | /// * `owner` - The owner pubkey (event author) | ||
| 635 | /// * `repo_path` - Path to the bare git repository | ||
| 636 | /// * `relays` - Relay URLs from the announcement (for sync registration) | ||
| 637 | pub fn add_announcement( | ||
| 638 | &self, | ||
| 639 | event: Event, | ||
| 640 | identifier: String, | ||
| 641 | owner: PublicKey, | ||
| 642 | repo_path: PathBuf, | ||
| 643 | relays: HashSet<String>, | ||
| 644 | ) { | ||
| 645 | let now = Instant::now(); | ||
| 646 | let entry = AnnouncementPurgatoryEntry { | ||
| 647 | event, | ||
| 648 | identifier: identifier.clone(), | ||
| 649 | owner, | ||
| 650 | repo_path, | ||
| 651 | relays, | ||
| 652 | created_at: now, | ||
| 653 | expires_at: now + DEFAULT_EXPIRY, | ||
| 654 | soft_expired: false, | ||
| 655 | }; | ||
| 656 | |||
| 657 | let key = (owner, identifier); | ||
| 658 | self.announcement_purgatory.insert(key.clone(), entry); | ||
| 659 | |||
| 660 | tracing::debug!( | ||
| 661 | owner = %key.0, | ||
| 662 | identifier = %key.1, | ||
| 663 | "Added announcement to purgatory" | ||
| 664 | ); | ||
| 665 | } | ||
| 666 | |||
| 667 | /// Find an announcement in purgatory by owner and identifier. | ||
| 668 | /// | ||
| 669 | /// # Arguments | ||
| 670 | /// * `owner` - The owner pubkey | ||
| 671 | /// * `identifier` - The repository identifier | ||
| 672 | /// | ||
| 673 | /// # Returns | ||
| 674 | /// The announcement entry if found, None otherwise | ||
| 675 | pub fn find_announcement(&self, owner: &PublicKey, identifier: &str) -> Option<AnnouncementPurgatoryEntry> { | ||
| 676 | let key = (*owner, identifier.to_string()); | ||
| 677 | self.announcement_purgatory.get(&key).map(|entry| entry.clone()) | ||
| 678 | } | ||
| 679 | |||
| 680 | /// Get all announcements in purgatory for a given identifier. | ||
| 681 | /// | ||
| 682 | /// This is used for authorization - state events and git pushes need to | ||
| 683 | /// check purgatory announcements for maintainer validation. | ||
| 684 | /// | ||
| 685 | /// # Arguments | ||
| 686 | /// * `identifier` - The repository identifier | ||
| 687 | /// | ||
| 688 | /// # Returns | ||
| 689 | /// Vector of announcement entries for this identifier | ||
| 690 | pub fn get_announcements_by_identifier(&self, identifier: &str) -> Vec<AnnouncementPurgatoryEntry> { | ||
| 691 | self.announcement_purgatory | ||
| 692 | .iter() | ||
| 693 | .filter(|entry| entry.key().1 == identifier) | ||
| 694 | .map(|entry| entry.value().clone()) | ||
| 695 | .collect() | ||
| 696 | } | ||
| 697 | |||
| 698 | /// Remove an announcement from purgatory. | ||
| 699 | /// | ||
| 700 | /// # Arguments | ||
| 701 | /// * `owner` - The owner pubkey | ||
| 702 | /// * `identifier` - The repository identifier | ||
| 703 | pub fn remove_announcement(&self, owner: &PublicKey, identifier: &str) { | ||
| 704 | let key = (*owner, identifier.to_string()); | ||
| 705 | self.announcement_purgatory.remove(&key); | ||
| 706 | tracing::debug!( | ||
| 707 | owner = %owner, | ||
| 708 | identifier = %identifier, | ||
| 709 | "Removed announcement from purgatory" | ||
| 710 | ); | ||
| 711 | } | ||
| 712 | |||
| 713 | /// Promote an announcement from purgatory to active status. | ||
| 714 | /// | ||
| 715 | /// This is called when git data arrives. The announcement event is returned | ||
| 716 | /// so it can be saved to the database. | ||
| 717 | /// | ||
| 718 | /// # Arguments | ||
| 719 | /// * `owner` - The owner pubkey | ||
| 720 | /// * `identifier` - The repository identifier | ||
| 721 | /// | ||
| 722 | /// # Returns | ||
| 723 | /// The announcement event if found, None otherwise | ||
| 724 | pub fn promote_announcement(&self, owner: &PublicKey, identifier: &str) -> Option<Event> { | ||
| 725 | let key = (*owner, identifier.to_string()); | ||
| 726 | self.announcement_purgatory.remove(&key).map(|(_, entry)| { | ||
| 727 | tracing::info!( | ||
| 728 | owner = %owner, | ||
| 729 | identifier = %identifier, | ||
| 730 | "Promoted announcement from purgatory to database" | ||
| 731 | ); | ||
| 732 | entry.event | ||
| 733 | }) | ||
| 734 | } | ||
| 735 | |||
| 736 | /// Check if there's an announcement in purgatory for the given owner and identifier. | ||
| 737 | /// | ||
| 738 | /// # Arguments | ||
| 739 | /// * `owner` - The owner pubkey | ||
| 740 | /// * `identifier` - The repository identifier | ||
| 741 | /// | ||
| 742 | /// # Returns | ||
| 743 | /// true if an announcement exists in purgatory, false otherwise | ||
| 744 | pub fn has_purgatory_announcement(&self, owner: &PublicKey, identifier: &str) -> bool { | ||
| 745 | let key = (*owner, identifier.to_string()); | ||
| 746 | self.announcement_purgatory.contains_key(&key) | ||
| 747 | } | ||
| 748 | |||
| 749 | /// Extend the expiry for an announcement in purgatory. | ||
| 750 | /// | ||
| 751 | /// This is called when state events arrive for a purgatory announcement, | ||
| 752 | /// indicating the repository is actively receiving metadata. | ||
| 753 | /// | ||
| 754 | /// # Arguments | ||
| 755 | /// * `owner` - The owner pubkey | ||
| 756 | /// * `identifier` - The repository identifier | ||
| 757 | /// * `duration` - Minimum duration to guarantee from now | ||
| 758 | pub fn extend_announcement_expiry(&self, owner: &PublicKey, identifier: &str, duration: Duration) { | ||
| 759 | let key = (*owner, identifier.to_string()); | ||
| 760 | |||
| 761 | // Collect revival info before taking a mutable borrow | ||
| 762 | let revival_info: Option<(PathBuf, bool)> = self | ||
| 763 | .announcement_purgatory | ||
| 764 | .get(&key) | ||
| 765 | .map(|entry| (entry.repo_path.clone(), entry.soft_expired)); | ||
| 766 | |||
| 767 | if let Some(mut entry) = self.announcement_purgatory.get_mut(&key) { | ||
| 768 | let now = Instant::now(); | ||
| 769 | let new_expiry = now + duration; | ||
| 770 | if entry.expires_at < new_expiry { | ||
| 771 | entry.expires_at = new_expiry; | ||
| 772 | } | ||
| 773 | // Always reset soft_expired when expiry is extended — the caller | ||
| 774 | // (state event or git auth) signals the repo is still active. | ||
| 775 | if entry.soft_expired { | ||
| 776 | entry.soft_expired = false; | ||
| 777 | } | ||
| 778 | } | ||
| 779 | |||
| 780 | // If the entry was soft-expired, recreate the bare repo outside the | ||
| 781 | // mutable borrow so we don't hold the DashMap lock during I/O. | ||
| 782 | if let Some((repo_path, was_soft_expired)) = revival_info { | ||
| 783 | if was_soft_expired { | ||
| 784 | if !repo_path.exists() { | ||
| 785 | match std::fs::create_dir_all(&repo_path) { | ||
| 786 | Ok(()) => { | ||
| 787 | // Initialise as a bare git repository | ||
| 788 | let status = std::process::Command::new("git") | ||
| 789 | .args(["init", "--bare"]) | ||
| 790 | .arg(&repo_path) | ||
| 791 | .status(); | ||
| 792 | match status { | ||
| 793 | Ok(s) if s.success() => { | ||
| 794 | tracing::info!( | ||
| 795 | path = %repo_path.display(), | ||
| 796 | owner = %owner, | ||
| 797 | identifier = %identifier, | ||
| 798 | "Recreated bare repository for revived soft-expired announcement" | ||
| 799 | ); | ||
| 800 | } | ||
| 801 | Ok(s) => { | ||
| 802 | tracing::warn!( | ||
| 803 | path = %repo_path.display(), | ||
| 804 | exit_code = ?s.code(), | ||
| 805 | "git init --bare failed when reviving soft-expired announcement" | ||
| 806 | ); | ||
| 807 | } | ||
| 808 | Err(e) => { | ||
| 809 | tracing::warn!( | ||
| 810 | path = %repo_path.display(), | ||
| 811 | error = %e, | ||
| 812 | "Failed to run git init --bare when reviving soft-expired announcement" | ||
| 813 | ); | ||
| 814 | } | ||
| 815 | } | ||
| 816 | } | ||
| 817 | Err(e) => { | ||
| 818 | tracing::warn!( | ||
| 819 | path = %repo_path.display(), | ||
| 820 | error = %e, | ||
| 821 | "Failed to create directory when reviving soft-expired announcement" | ||
| 822 | ); | ||
| 823 | } | ||
| 824 | } | ||
| 825 | } | ||
| 826 | tracing::info!( | ||
| 827 | owner = %owner, | ||
| 828 | identifier = %identifier, | ||
| 829 | "Revived soft-expired announcement (bare repo recreated, expiry extended)" | ||
| 830 | ); | ||
| 831 | } | ||
| 832 | } | ||
| 833 | } | ||
| 834 | |||
| 835 | /// Get count of announcements in purgatory. | ||
| 836 | pub fn announcement_count(&self) -> usize { | ||
| 837 | self.announcement_purgatory.len() | ||
| 838 | } | ||
| 839 | |||
| 840 | /// Collect (repo_id, relay_urls) for all announcements currently in purgatory. | ||
| 841 | /// | ||
| 842 | /// Returns a vec of `(repo_id, relay_urls)` where `repo_id` is the addressable | ||
| 843 | /// coordinate string `"30617:{pubkey_hex}:{identifier}"`. Used by the purgatory | ||
| 844 | /// announcement sync timer to register StateOnly entries in `repo_sync_index`. | ||
| 845 | pub fn announcements_for_sync(&self) -> Vec<(String, HashSet<String>)> { | ||
| 846 | self.announcement_purgatory | ||
| 847 | .iter() | ||
| 848 | .map(|entry| { | ||
| 849 | let (owner, identifier) = entry.key(); | ||
| 850 | let repo_id = format!("30617:{}:{}", owner.to_hex(), identifier); | ||
| 851 | let relays = entry.value().relays.clone(); | ||
| 852 | (repo_id, relays) | ||
| 853 | }) | ||
| 854 | .collect() | ||
| 855 | } | ||
| 856 | |||
| 579 | /// Get all event IDs currently stored in purgatory AND previously expired events. | 857 | /// Get all event IDs currently stored in purgatory AND previously expired events. |
| 580 | /// | 858 | /// |
| 581 | /// Returns a HashSet of all event IDs for: | 859 | /// Returns a HashSet of all event IDs for: |
| 860 | /// - Announcements currently held in purgatory | ||
| 582 | /// - State events currently held in purgatory | 861 | /// - State events currently held in purgatory |
| 583 | /// - PR events currently held in purgatory | 862 | /// - PR events currently held in purgatory |
| 584 | /// - Events that previously expired from purgatory without finding git data | 863 | /// - Events that previously expired from purgatory without finding git data |
| @@ -593,6 +872,11 @@ impl Purgatory { | |||
| 593 | pub fn event_ids(&self) -> HashSet<EventId> { | 872 | pub fn event_ids(&self) -> HashSet<EventId> { |
| 594 | let mut ids = HashSet::new(); | 873 | let mut ids = HashSet::new(); |
| 595 | 874 | ||
| 875 | // Collect announcement event IDs | ||
| 876 | for entry in self.announcement_purgatory.iter() { | ||
| 877 | ids.insert(entry.value().event.id); | ||
| 878 | } | ||
| 879 | |||
| 596 | // Collect state event IDs | 880 | // Collect state event IDs |
| 597 | for entry in self.state_events.iter() { | 881 | for entry in self.state_events.iter() { |
| 598 | for state_entry in entry.value().iter() { | 882 | for state_entry in entry.value().iter() { |
| @@ -675,9 +959,86 @@ impl Purgatory { | |||
| 675 | /// to support migration scripts and operational monitoring. | 959 | /// to support migration scripts and operational monitoring. |
| 676 | /// | 960 | /// |
| 677 | /// # Returns | 961 | /// # Returns |
| 678 | /// Tuple of (num_state_removed, num_pr_removed) | 962 | /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) |
| 679 | pub fn cleanup(&self) -> (usize, usize) { | 963 | pub fn cleanup(&self) -> (usize, usize, usize) { |
| 680 | let now = Instant::now(); | 964 | let now = Instant::now(); |
| 965 | |||
| 966 | // Process expired announcements with two-phase soft expiry: | ||
| 967 | // | ||
| 968 | // Phase 1 (initial expiry, !soft_expired): Delete bare repo, set soft_expired=true, | ||
| 969 | // extend expiry by SOFT_EXPIRY_EXTENDED so the event is retained for revival. | ||
| 970 | // Phase 2 (extended expiry, soft_expired): Fully remove from purgatory. | ||
| 971 | // | ||
| 972 | // Collect entries that have passed their expires_at deadline. | ||
| 973 | let expired_announcements: Vec<(PublicKey, String, PathBuf, EventId, bool)> = self | ||
| 974 | .announcement_purgatory | ||
| 975 | .iter() | ||
| 976 | .filter(|entry| entry.value().expires_at <= now) | ||
| 977 | .map(|entry| { | ||
| 978 | let key = entry.key(); | ||
| 979 | let v = entry.value(); | ||
| 980 | (key.0.clone(), key.1.clone(), v.repo_path.clone(), v.event.id, v.soft_expired) | ||
| 981 | }) | ||
| 982 | .collect(); | ||
| 983 | |||
| 984 | let mut announcement_removed = 0; | ||
| 985 | for (owner, identifier, repo_path, event_id, already_soft_expired) in expired_announcements { | ||
| 986 | if already_soft_expired { | ||
| 987 | // Phase 2: fully remove | ||
| 988 | self.mark_expired(event_id); | ||
| 989 | self.announcement_purgatory.remove(&(owner.clone(), identifier.clone())); | ||
| 990 | announcement_removed += 1; | ||
| 991 | tracing::info!( | ||
| 992 | owner = %owner, | ||
| 993 | identifier = %identifier, | ||
| 994 | "Announcement fully expired from purgatory (soft expiry period elapsed)" | ||
| 995 | ); | ||
| 996 | } else { | ||
| 997 | // Phase 1: soft expiry — delete bare repo, retain event. | ||
| 998 | // | ||
| 999 | // Only transition to soft_expired if the directory is gone (or never | ||
| 1000 | // existed). If removal fails we leave the entry untouched so the next | ||
| 1001 | // cleanup cycle retries the deletion automatically. | ||
| 1002 | let repo_gone = if repo_path.exists() { | ||
| 1003 | match std::fs::remove_dir_all(&repo_path) { | ||
| 1004 | Ok(()) => { | ||
| 1005 | tracing::info!( | ||
| 1006 | path = %repo_path.display(), | ||
| 1007 | owner = %owner, | ||
| 1008 | identifier = %identifier, | ||
| 1009 | "Deleted bare repository during soft expiry (event retained for revival)" | ||
| 1010 | ); | ||
| 1011 | true | ||
| 1012 | } | ||
| 1013 | Err(e) => { | ||
| 1014 | tracing::warn!( | ||
| 1015 | path = %repo_path.display(), | ||
| 1016 | error = %e, | ||
| 1017 | "Failed to delete bare repository during soft expiry; will retry next cleanup cycle" | ||
| 1018 | ); | ||
| 1019 | false | ||
| 1020 | } | ||
| 1021 | } | ||
| 1022 | } else { | ||
| 1023 | // Already gone (e.g. deleted externally) | ||
| 1024 | true | ||
| 1025 | }; | ||
| 1026 | |||
| 1027 | if repo_gone { | ||
| 1028 | // Mark soft_expired and extend expiry | ||
| 1029 | if let Some(mut entry) = self.announcement_purgatory.get_mut(&(owner.clone(), identifier.clone())) { | ||
| 1030 | entry.soft_expired = true; | ||
| 1031 | entry.expires_at = now + SOFT_EXPIRY_EXTENDED; | ||
| 1032 | } | ||
| 1033 | tracing::debug!( | ||
| 1034 | owner = %owner, | ||
| 1035 | identifier = %identifier, | ||
| 1036 | "Announcement soft-expired: bare repo deleted, event retained for 24h" | ||
| 1037 | ); | ||
| 1038 | } | ||
| 1039 | } | ||
| 1040 | } | ||
| 1041 | |||
| 681 | let mut state_removed = 0; | 1042 | let mut state_removed = 0; |
| 682 | 1043 | ||
| 683 | // Remove expired state events and mark them as expired | 1044 | // Remove expired state events and mark them as expired |
| @@ -823,17 +1184,17 @@ impl Purgatory { | |||
| 823 | self.pr_events.remove(&event_id_str); | 1184 | self.pr_events.remove(&event_id_str); |
| 824 | } | 1185 | } |
| 825 | 1186 | ||
| 826 | (state_removed, pr_removed) | 1187 | (announcement_removed, state_removed, pr_removed) |
| 827 | } | 1188 | } |
| 828 | 1189 | ||
| 829 | /// Remove expired entries from purgatory (legacy method). | 1190 | /// Remove expired entries from purgatory (legacy method). |
| 830 | /// | 1191 | /// |
| 831 | /// # Returns | 1192 | /// # Returns |
| 832 | /// Total number of entries removed (state + PR events) | 1193 | /// Total number of entries removed (announcement + state + PR events) |
| 833 | #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] | 1194 | #[deprecated(since = "0.1.0", note = "Use cleanup() instead for separate counts")] |
| 834 | pub fn remove_expired(&self) -> usize { | 1195 | pub fn remove_expired(&self) -> usize { |
| 835 | let (state, pr) = self.cleanup(); | 1196 | let (announcement, state, pr) = self.cleanup(); |
| 836 | state + pr | 1197 | announcement + state + pr |
| 837 | } | 1198 | } |
| 838 | 1199 | ||
| 839 | /// Remove old expired event records. | 1200 | /// Remove old expired event records. |
| @@ -867,11 +1228,12 @@ impl Purgatory { | |||
| 867 | /// Get current count of entries in purgatory. | 1228 | /// Get current count of entries in purgatory. |
| 868 | /// | 1229 | /// |
| 869 | /// # Returns | 1230 | /// # Returns |
| 870 | /// Tuple of (state_event_count, pr_event_count) | 1231 | /// Tuple of (announcement_count, state_event_count, pr_event_count) |
| 871 | pub fn count(&self) -> (usize, usize) { | 1232 | pub fn count(&self) -> (usize, usize, usize) { |
| 1233 | let announcement_count = self.announcement_purgatory.len(); | ||
| 872 | let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); | 1234 | let state_count: usize = self.state_events.iter().map(|e| e.value().len()).sum(); |
| 873 | let pr_count = self.pr_events.len(); | 1235 | let pr_count = self.pr_events.len(); |
| 874 | (state_count, pr_count) | 1236 | (announcement_count, state_count, pr_count) |
| 875 | } | 1237 | } |
| 876 | 1238 | ||
| 877 | /// Get count of expired events being tracked. | 1239 | /// Get count of expired events being tracked. |
| @@ -885,6 +1247,7 @@ impl Purgatory { | |||
| 885 | /// Clear all entries from purgatory (for testing). | 1247 | /// Clear all entries from purgatory (for testing). |
| 886 | #[cfg(test)] | 1248 | #[cfg(test)] |
| 887 | pub fn clear(&self) { | 1249 | pub fn clear(&self) { |
| 1250 | self.announcement_purgatory.clear(); | ||
| 888 | self.state_events.clear(); | 1251 | self.state_events.clear(); |
| 889 | self.pr_events.clear(); | 1252 | self.pr_events.clear(); |
| 890 | self.sync_queue.clear(); | 1253 | self.sync_queue.clear(); |
| @@ -949,6 +1312,34 @@ impl Purgatory { | |||
| 949 | let saved_at = SystemTime::now(); | 1312 | let saved_at = SystemTime::now(); |
| 950 | let now_instant = Instant::now(); | 1313 | let now_instant = Instant::now(); |
| 951 | 1314 | ||
| 1315 | // Convert announcement_purgatory to serializable format. | ||
| 1316 | // Skip soft-expired entries: their bare repos have been deleted, so they | ||
| 1317 | // cannot be meaningfully restored (the repo path no longer exists on disk). | ||
| 1318 | let mut announcement_purgatory = HashMap::new(); | ||
| 1319 | for entry in self.announcement_purgatory.iter() { | ||
| 1320 | let e = entry.value(); | ||
| 1321 | if e.soft_expired { | ||
| 1322 | continue; | ||
| 1323 | } | ||
| 1324 | let created_offset = | ||
| 1325 | persistence::instant_to_offset(e.created_at, saved_at, now_instant); | ||
| 1326 | let expires_offset = | ||
| 1327 | persistence::instant_to_offset(e.expires_at, saved_at, now_instant); | ||
| 1328 | let key = format!("{}:{}", e.owner.to_hex(), e.identifier); | ||
| 1329 | announcement_purgatory.insert( | ||
| 1330 | key, | ||
| 1331 | SerializableAnnouncementPurgatoryEntry { | ||
| 1332 | event: e.event.clone(), | ||
| 1333 | identifier: e.identifier.clone(), | ||
| 1334 | owner: e.owner, | ||
| 1335 | repo_path: e.repo_path.clone(), | ||
| 1336 | relays: e.relays.clone(), | ||
| 1337 | created_at_offset_secs: created_offset.as_secs(), | ||
| 1338 | expires_at_offset_secs: expires_offset.as_secs(), | ||
| 1339 | }, | ||
| 1340 | ); | ||
| 1341 | } | ||
| 1342 | |||
| 952 | // Convert state_events to serializable format | 1343 | // Convert state_events to serializable format |
| 953 | let mut state_events = HashMap::new(); | 1344 | let mut state_events = HashMap::new(); |
| 954 | for entry in self.state_events.iter() { | 1345 | for entry in self.state_events.iter() { |
| @@ -1013,6 +1404,7 @@ impl Purgatory { | |||
| 1013 | let state = PurgatoryState { | 1404 | let state = PurgatoryState { |
| 1014 | version: 1, | 1405 | version: 1, |
| 1015 | saved_at, | 1406 | saved_at, |
| 1407 | announcement_purgatory, | ||
| 1016 | state_events, | 1408 | state_events, |
| 1017 | pr_events, | 1409 | pr_events, |
| 1018 | expired_events, | 1410 | expired_events, |
| @@ -1024,6 +1416,7 @@ impl Purgatory { | |||
| 1024 | 1416 | ||
| 1025 | tracing::info!( | 1417 | tracing::info!( |
| 1026 | path = %path.display(), | 1418 | path = %path.display(), |
| 1419 | announcements = state.announcement_purgatory.len(), | ||
| 1027 | state_events = state.state_events.len(), | 1420 | state_events = state.state_events.len(), |
| 1028 | pr_events = state.pr_events.len(), | 1421 | pr_events = state.pr_events.len(), |
| 1029 | expired_events = state.expired_events.len(), | 1422 | expired_events = state.expired_events.len(), |
| @@ -1071,6 +1464,45 @@ impl Purgatory { | |||
| 1071 | 1464 | ||
| 1072 | let now_instant = Instant::now(); | 1465 | let now_instant = Instant::now(); |
| 1073 | 1466 | ||
| 1467 | // Restore announcement_purgatory. | ||
| 1468 | // Skip entries whose bare repo no longer exists on disk — this can happen | ||
| 1469 | // if the repo was deleted externally between save and restore. | ||
| 1470 | for (_key, e) in state.announcement_purgatory { | ||
| 1471 | if !e.repo_path.exists() { | ||
| 1472 | tracing::warn!( | ||
| 1473 | owner = %e.owner, | ||
| 1474 | identifier = %e.identifier, | ||
| 1475 | repo_path = %e.repo_path.display(), | ||
| 1476 | "Skipping announcement restore: bare repo no longer exists" | ||
| 1477 | ); | ||
| 1478 | continue; | ||
| 1479 | } | ||
| 1480 | let created_at = persistence::offset_to_instant( | ||
| 1481 | Duration::from_secs(e.created_at_offset_secs), | ||
| 1482 | state.saved_at, | ||
| 1483 | now_instant, | ||
| 1484 | ); | ||
| 1485 | let expires_at = persistence::offset_to_instant( | ||
| 1486 | Duration::from_secs(e.expires_at_offset_secs), | ||
| 1487 | state.saved_at, | ||
| 1488 | now_instant, | ||
| 1489 | ); | ||
| 1490 | let key = (e.owner, e.identifier.clone()); | ||
| 1491 | self.announcement_purgatory.insert( | ||
| 1492 | key, | ||
| 1493 | AnnouncementPurgatoryEntry { | ||
| 1494 | event: e.event, | ||
| 1495 | identifier: e.identifier, | ||
| 1496 | owner: e.owner, | ||
| 1497 | repo_path: e.repo_path, | ||
| 1498 | relays: e.relays, | ||
| 1499 | created_at, | ||
| 1500 | expires_at, | ||
| 1501 | soft_expired: false, | ||
| 1502 | }, | ||
| 1503 | ); | ||
| 1504 | } | ||
| 1505 | |||
| 1074 | // Restore state_events | 1506 | // Restore state_events |
| 1075 | for (identifier, entries) in state.state_events { | 1507 | for (identifier, entries) in state.state_events { |
| 1076 | let restored_entries: Vec<StatePurgatoryEntry> = entries | 1508 | let restored_entries: Vec<StatePurgatoryEntry> = entries |
| @@ -1140,6 +1572,7 @@ impl Purgatory { | |||
| 1140 | 1572 | ||
| 1141 | tracing::info!( | 1573 | tracing::info!( |
| 1142 | path = %path.display(), | 1574 | path = %path.display(), |
| 1575 | announcements = self.announcement_purgatory.len(), | ||
| 1143 | state_events = self.state_events.len(), | 1576 | state_events = self.state_events.len(), |
| 1144 | pr_events = self.pr_events.len(), | 1577 | pr_events = self.pr_events.len(), |
| 1145 | expired_events = self.expired_events.len(), | 1578 | expired_events = self.expired_events.len(), |
| @@ -1162,7 +1595,8 @@ mod tests { | |||
| 1162 | #[test] | 1595 | #[test] |
| 1163 | fn test_purgatory_creation() { | 1596 | fn test_purgatory_creation() { |
| 1164 | let purgatory = Purgatory::new(PathBuf::new()); | 1597 | let purgatory = Purgatory::new(PathBuf::new()); |
| 1165 | let (state_count, pr_count) = purgatory.count(); | 1598 | let (announcement_count, state_count, pr_count) = purgatory.count(); |
| 1599 | assert_eq!(announcement_count, 0); | ||
| 1166 | assert_eq!(state_count, 0); | 1600 | assert_eq!(state_count, 0); |
| 1167 | assert_eq!(pr_count, 0); | 1601 | assert_eq!(pr_count, 0); |
| 1168 | } | 1602 | } |
| @@ -1190,7 +1624,8 @@ mod tests { | |||
| 1190 | false, | 1624 | false, |
| 1191 | ); | 1625 | ); |
| 1192 | 1626 | ||
| 1193 | let (state_count, pr_count) = purgatory.count(); | 1627 | let (announcement_count, state_count, pr_count) = purgatory.count(); |
| 1628 | assert_eq!(announcement_count, 0); | ||
| 1194 | assert_eq!(state_count, 1); | 1629 | assert_eq!(state_count, 1); |
| 1195 | assert_eq!(pr_count, 1); | 1630 | assert_eq!(pr_count, 1); |
| 1196 | } | 1631 | } |
| @@ -1407,7 +1842,7 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1407 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1842 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1408 | 1843 | ||
| 1409 | // Verify entries are there | 1844 | // Verify entries are there |
| 1410 | let (state_count, pr_count) = purgatory.count(); | 1845 | let (_, state_count, pr_count) = purgatory.count(); |
| 1411 | assert_eq!(state_count, 1); | 1846 | assert_eq!(state_count, 1); |
| 1412 | assert_eq!(pr_count, 2); | 1847 | assert_eq!(pr_count, 2); |
| 1413 | 1848 | ||
| @@ -1425,14 +1860,14 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1425 | } | 1860 | } |
| 1426 | 1861 | ||
| 1427 | // Run cleanup | 1862 | // Run cleanup |
| 1428 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1863 | let (_, state_removed, pr_removed) = purgatory.cleanup(); |
| 1429 | 1864 | ||
| 1430 | // Verify counts | 1865 | // Verify counts |
| 1431 | assert_eq!(state_removed, 1); | 1866 | assert_eq!(state_removed, 1); |
| 1432 | assert_eq!(pr_removed, 2); | 1867 | assert_eq!(pr_removed, 2); |
| 1433 | 1868 | ||
| 1434 | // Verify entries are gone | 1869 | // Verify entries are gone |
| 1435 | let (state_count, pr_count) = purgatory.count(); | 1870 | let (_, state_count, pr_count) = purgatory.count(); |
| 1436 | assert_eq!(state_count, 0); | 1871 | assert_eq!(state_count, 0); |
| 1437 | assert_eq!(pr_count, 0); | 1872 | assert_eq!(pr_count, 0); |
| 1438 | } | 1873 | } |
| @@ -1464,14 +1899,14 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1464 | ); | 1899 | ); |
| 1465 | 1900 | ||
| 1466 | // Run cleanup | 1901 | // Run cleanup |
| 1467 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1902 | let (_, state_removed, pr_removed) = purgatory.cleanup(); |
| 1468 | 1903 | ||
| 1469 | // Nothing should be removed | 1904 | // Nothing should be removed |
| 1470 | assert_eq!(state_removed, 0); | 1905 | assert_eq!(state_removed, 0); |
| 1471 | assert_eq!(pr_removed, 0); | 1906 | assert_eq!(pr_removed, 0); |
| 1472 | 1907 | ||
| 1473 | // Verify entries are still there | 1908 | // Verify entries are still there |
| 1474 | let (state_count, pr_count) = purgatory.count(); | 1909 | let (_, state_count, pr_count) = purgatory.count(); |
| 1475 | assert_eq!(state_count, 1); | 1910 | assert_eq!(state_count, 1); |
| 1476 | assert_eq!(pr_count, 1); | 1911 | assert_eq!(pr_count, 1); |
| 1477 | } | 1912 | } |
| @@ -1518,14 +1953,14 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1518 | } | 1953 | } |
| 1519 | 1954 | ||
| 1520 | // Run cleanup | 1955 | // Run cleanup |
| 1521 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1956 | let (_, state_removed, pr_removed) = purgatory.cleanup(); |
| 1522 | 1957 | ||
| 1523 | // One of each should be removed | 1958 | // One of each should be removed |
| 1524 | assert_eq!(state_removed, 1); | 1959 | assert_eq!(state_removed, 1); |
| 1525 | assert_eq!(pr_removed, 1); | 1960 | assert_eq!(pr_removed, 1); |
| 1526 | 1961 | ||
| 1527 | // Verify remaining counts | 1962 | // Verify remaining counts |
| 1528 | let (state_count, pr_count) = purgatory.count(); | 1963 | let (_, state_count, pr_count) = purgatory.count(); |
| 1529 | assert_eq!(state_count, 1); // One state event remains | 1964 | assert_eq!(state_count, 1); // One state event remains |
| 1530 | assert_eq!(pr_count, 1); // One PR event remains | 1965 | assert_eq!(pr_count, 1); // One PR event remains |
| 1531 | } | 1966 | } |
| @@ -1595,7 +2030,7 @@ fn test_expired_event_tracking() { | |||
| 1595 | } | 2030 | } |
| 1596 | 2031 | ||
| 1597 | // Run cleanup | 2032 | // Run cleanup |
| 1598 | let (state_removed, pr_removed) = purgatory.cleanup(); | 2033 | let (_, state_removed, pr_removed) = purgatory.cleanup(); |
| 1599 | assert_eq!(state_removed, 1); | 2034 | assert_eq!(state_removed, 1); |
| 1600 | assert_eq!(pr_removed, 1); | 2035 | assert_eq!(pr_removed, 1); |
| 1601 | 2036 | ||
| @@ -1705,7 +2140,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1705 | } | 2140 | } |
| 1706 | 2141 | ||
| 1707 | // Event should NOT be re-added | 2142 | // Event should NOT be re-added |
| 1708 | let (state_count, _) = purgatory.count(); | 2143 | let (_, state_count, _) = purgatory.count(); |
| 1709 | assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); | 2144 | assert_eq!(state_count, 0, "Event should not be re-added to purgatory"); |
| 1710 | } | 2145 | } |
| 1711 | 2146 | ||
| @@ -1724,7 +2159,7 @@ fn test_pr_placeholder_not_marked_expired() { | |||
| 1724 | } | 2159 | } |
| 1725 | 2160 | ||
| 1726 | // Run cleanup | 2161 | // Run cleanup |
| 1727 | let (_, pr_removed) = purgatory.cleanup(); | 2162 | let (_, _, pr_removed) = purgatory.cleanup(); |
| 1728 | assert_eq!(pr_removed, 1); | 2163 | assert_eq!(pr_removed, 1); |
| 1729 | 2164 | ||
| 1730 | // Expired count should be 0 (placeholders don't have event IDs to track) | 2165 | // Expired count should be 0 (placeholders don't have event IDs to track) |
| @@ -1820,7 +2255,7 @@ async fn test_save_and_restore_state_events() { | |||
| 1820 | assert!(!state_file.exists()); | 2255 | assert!(!state_file.exists()); |
| 1821 | 2256 | ||
| 1822 | // Verify state events were restored | 2257 | // Verify state events were restored |
| 1823 | let (state_count, _) = purgatory2.count(); | 2258 | let (_, state_count, _) = purgatory2.count(); |
| 1824 | assert_eq!(state_count, 2); | 2259 | assert_eq!(state_count, 2); |
| 1825 | 2260 | ||
| 1826 | let restored_entries = purgatory2.find_state("test-repo"); | 2261 | let restored_entries = purgatory2.find_state("test-repo"); |
| @@ -1877,7 +2312,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 1877 | purgatory2.restore_from_disk(&state_file).unwrap(); | 2312 | purgatory2.restore_from_disk(&state_file).unwrap(); |
| 1878 | 2313 | ||
| 1879 | // Verify PR event was restored | 2314 | // Verify PR event was restored |
| 1880 | let (_, pr_count) = purgatory2.count(); | 2315 | let (_, _, pr_count) = purgatory2.count(); |
| 1881 | assert_eq!(pr_count, 1); | 2316 | assert_eq!(pr_count, 1); |
| 1882 | 2317 | ||
| 1883 | let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); | 2318 | let restored_entry = purgatory2.find_pr("pr-event-id").unwrap(); |
| @@ -1906,7 +2341,7 @@ async fn test_save_and_restore_pr_placeholders() { | |||
| 1906 | purgatory2.restore_from_disk(&state_file).unwrap(); | 2341 | purgatory2.restore_from_disk(&state_file).unwrap(); |
| 1907 | 2342 | ||
| 1908 | // Verify placeholder was restored | 2343 | // Verify placeholder was restored |
| 1909 | let (_, pr_count) = purgatory2.count(); | 2344 | let (_, _, pr_count) = purgatory2.count(); |
| 1910 | assert_eq!(pr_count, 1); | 2345 | assert_eq!(pr_count, 1); |
| 1911 | 2346 | ||
| 1912 | let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); | 2347 | let restored_entry = purgatory2.find_pr("placeholder-id").unwrap(); |
| @@ -1984,7 +2419,7 @@ async fn test_save_and_restore_empty_purgatory() { | |||
| 1984 | purgatory2.restore_from_disk(&state_file).unwrap(); | 2419 | purgatory2.restore_from_disk(&state_file).unwrap(); |
| 1985 | 2420 | ||
| 1986 | // Verify purgatory is still empty | 2421 | // Verify purgatory is still empty |
| 1987 | let (state_count, pr_count) = purgatory2.count(); | 2422 | let (_, state_count, pr_count) = purgatory2.count(); |
| 1988 | assert_eq!(state_count, 0); | 2423 | assert_eq!(state_count, 0); |
| 1989 | assert_eq!(pr_count, 0); | 2424 | assert_eq!(pr_count, 0); |
| 1990 | assert_eq!(purgatory2.expired_count(), 0); | 2425 | assert_eq!(purgatory2.expired_count(), 0); |
| @@ -2004,7 +2439,7 @@ async fn test_restore_missing_file() { | |||
| 2004 | assert!(result.is_err()); | 2439 | assert!(result.is_err()); |
| 2005 | 2440 | ||
| 2006 | // Purgatory should remain empty | 2441 | // Purgatory should remain empty |
| 2007 | let (state_count, pr_count) = purgatory.count(); | 2442 | let (_, state_count, pr_count) = purgatory.count(); |
| 2008 | assert_eq!(state_count, 0); | 2443 | assert_eq!(state_count, 0); |
| 2009 | assert_eq!(pr_count, 0); | 2444 | assert_eq!(pr_count, 0); |
| 2010 | } | 2445 | } |
| @@ -2026,7 +2461,7 @@ async fn test_restore_corrupted_json() { | |||
| 2026 | assert!(result.is_err()); | 2461 | assert!(result.is_err()); |
| 2027 | 2462 | ||
| 2028 | // Purgatory should remain empty | 2463 | // Purgatory should remain empty |
| 2029 | let (state_count, pr_count) = purgatory.count(); | 2464 | let (_, state_count, pr_count) = purgatory.count(); |
| 2030 | assert_eq!(state_count, 0); | 2465 | assert_eq!(state_count, 0); |
| 2031 | assert_eq!(pr_count, 0); | 2466 | assert_eq!(pr_count, 0); |
| 2032 | } | 2467 | } |
| @@ -2263,7 +2698,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2263 | purgatory2.restore_from_disk(&state_file).unwrap(); | 2698 | purgatory2.restore_from_disk(&state_file).unwrap(); |
| 2264 | 2699 | ||
| 2265 | // Verify both were restored correctly | 2700 | // Verify both were restored correctly |
| 2266 | let (_, pr_count) = purgatory2.count(); | 2701 | let (_, _, pr_count) = purgatory2.count(); |
| 2267 | assert_eq!(pr_count, 2); | 2702 | assert_eq!(pr_count, 2); |
| 2268 | 2703 | ||
| 2269 | // Verify PR event | 2704 | // Verify PR event |
| @@ -2310,6 +2745,141 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2310 | } | 2745 | } |
| 2311 | 2746 | ||
| 2312 | #[tokio::test] | 2747 | #[tokio::test] |
| 2748 | async fn test_save_and_restore_announcement_events() { | ||
| 2749 | use tempfile::tempdir; | ||
| 2750 | |||
| 2751 | let temp_dir = tempdir().unwrap(); | ||
| 2752 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2753 | |||
| 2754 | // Create a real bare repo directory so the restore path-existence check passes | ||
| 2755 | let repo_dir = temp_dir.path().join("owner.git"); | ||
| 2756 | std::fs::create_dir_all(&repo_dir).unwrap(); | ||
| 2757 | |||
| 2758 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2759 | let keys = Keys::generate(); | ||
| 2760 | |||
| 2761 | let ann_event = EventBuilder::text_note("announcement event") | ||
| 2762 | .sign_with_keys(&keys) | ||
| 2763 | .unwrap(); | ||
| 2764 | let ann_event_id = ann_event.id; | ||
| 2765 | |||
| 2766 | let mut relays = HashSet::new(); | ||
| 2767 | relays.insert("wss://relay.example.com".to_string()); | ||
| 2768 | |||
| 2769 | purgatory.add_announcement( | ||
| 2770 | ann_event.clone(), | ||
| 2771 | "my-repo".to_string(), | ||
| 2772 | keys.public_key(), | ||
| 2773 | repo_dir.clone(), | ||
| 2774 | relays.clone(), | ||
| 2775 | ); | ||
| 2776 | |||
| 2777 | // Save to disk | ||
| 2778 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2779 | assert!(state_file.exists()); | ||
| 2780 | |||
| 2781 | // Create new purgatory and restore | ||
| 2782 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2783 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2784 | |||
| 2785 | // File should be deleted after restore | ||
| 2786 | assert!(!state_file.exists()); | ||
| 2787 | |||
| 2788 | // Verify announcement was restored | ||
| 2789 | let (ann_count, _, _) = purgatory2.count(); | ||
| 2790 | assert_eq!(ann_count, 1); | ||
| 2791 | |||
| 2792 | let restored = purgatory2 | ||
| 2793 | .find_announcement(&keys.public_key(), "my-repo") | ||
| 2794 | .unwrap(); | ||
| 2795 | assert_eq!(restored.event.id, ann_event_id); | ||
| 2796 | assert_eq!(restored.identifier, "my-repo"); | ||
| 2797 | assert_eq!(restored.owner, keys.public_key()); | ||
| 2798 | assert_eq!(restored.repo_path, repo_dir); | ||
| 2799 | assert_eq!(restored.relays, relays); | ||
| 2800 | assert!(!restored.soft_expired); | ||
| 2801 | } | ||
| 2802 | |||
| 2803 | #[tokio::test] | ||
| 2804 | async fn test_soft_expired_announcements_not_persisted() { | ||
| 2805 | use tempfile::tempdir; | ||
| 2806 | |||
| 2807 | let temp_dir = tempdir().unwrap(); | ||
| 2808 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2809 | |||
| 2810 | let repo_dir = temp_dir.path().join("owner.git"); | ||
| 2811 | std::fs::create_dir_all(&repo_dir).unwrap(); | ||
| 2812 | |||
| 2813 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2814 | let keys = Keys::generate(); | ||
| 2815 | |||
| 2816 | let ann_event = EventBuilder::text_note("announcement event") | ||
| 2817 | .sign_with_keys(&keys) | ||
| 2818 | .unwrap(); | ||
| 2819 | |||
| 2820 | purgatory.add_announcement( | ||
| 2821 | ann_event.clone(), | ||
| 2822 | "my-repo".to_string(), | ||
| 2823 | keys.public_key(), | ||
| 2824 | repo_dir.clone(), | ||
| 2825 | HashSet::new(), | ||
| 2826 | ); | ||
| 2827 | |||
| 2828 | // Manually mark as soft-expired (bare repo deleted) | ||
| 2829 | let key = (keys.public_key(), "my-repo".to_string()); | ||
| 2830 | if let Some(mut entry) = purgatory.announcement_purgatory.get_mut(&key) { | ||
| 2831 | entry.soft_expired = true; | ||
| 2832 | } | ||
| 2833 | |||
| 2834 | // Save to disk — soft-expired entry should be excluded | ||
| 2835 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2836 | |||
| 2837 | // Create new purgatory and restore | ||
| 2838 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2839 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2840 | |||
| 2841 | // Soft-expired announcement should NOT be restored | ||
| 2842 | let (ann_count, _, _) = purgatory2.count(); | ||
| 2843 | assert_eq!(ann_count, 0); | ||
| 2844 | } | ||
| 2845 | |||
| 2846 | #[tokio::test] | ||
| 2847 | async fn test_announcement_with_missing_repo_skipped_on_restore() { | ||
| 2848 | use tempfile::tempdir; | ||
| 2849 | |||
| 2850 | let temp_dir = tempdir().unwrap(); | ||
| 2851 | let state_file = temp_dir.path().join("purgatory_state.json"); | ||
| 2852 | |||
| 2853 | // Point to a repo path that does NOT exist | ||
| 2854 | let missing_repo = temp_dir.path().join("nonexistent.git"); | ||
| 2855 | |||
| 2856 | let purgatory = Purgatory::new(PathBuf::new()); | ||
| 2857 | let keys = Keys::generate(); | ||
| 2858 | |||
| 2859 | let ann_event = EventBuilder::text_note("announcement event") | ||
| 2860 | .sign_with_keys(&keys) | ||
| 2861 | .unwrap(); | ||
| 2862 | |||
| 2863 | purgatory.add_announcement( | ||
| 2864 | ann_event.clone(), | ||
| 2865 | "my-repo".to_string(), | ||
| 2866 | keys.public_key(), | ||
| 2867 | missing_repo.clone(), | ||
| 2868 | HashSet::new(), | ||
| 2869 | ); | ||
| 2870 | |||
| 2871 | // Save to disk (repo path is serialized even though it doesn't exist) | ||
| 2872 | purgatory.save_to_disk(&state_file).unwrap(); | ||
| 2873 | |||
| 2874 | // Create new purgatory and restore — entry should be skipped | ||
| 2875 | let purgatory2 = Purgatory::new(PathBuf::new()); | ||
| 2876 | purgatory2.restore_from_disk(&state_file).unwrap(); | ||
| 2877 | |||
| 2878 | let (ann_count, _, _) = purgatory2.count(); | ||
| 2879 | assert_eq!(ann_count, 0); | ||
| 2880 | } | ||
| 2881 | |||
| 2882 | #[tokio::test] | ||
| 2313 | async fn test_comprehensive_roundtrip() { | 2883 | async fn test_comprehensive_roundtrip() { |
| 2314 | use nostr_sdk::{Kind, Tag, TagKind}; | 2884 | use nostr_sdk::{Kind, Tag, TagKind}; |
| 2315 | use tempfile::tempdir; | 2885 | use tempfile::tempdir; |
| @@ -2317,10 +2887,27 @@ async fn test_comprehensive_roundtrip() { | |||
| 2317 | let temp_dir = tempdir().unwrap(); | 2887 | let temp_dir = tempdir().unwrap(); |
| 2318 | let state_file = temp_dir.path().join("purgatory_state.json"); | 2888 | let state_file = temp_dir.path().join("purgatory_state.json"); |
| 2319 | 2889 | ||
| 2890 | // Create a real bare repo directory for the announcement | ||
| 2891 | let repo_dir = temp_dir.path().join("owner.git"); | ||
| 2892 | std::fs::create_dir_all(&repo_dir).unwrap(); | ||
| 2893 | |||
| 2320 | let purgatory = Purgatory::new(PathBuf::new()); | 2894 | let purgatory = Purgatory::new(PathBuf::new()); |
| 2321 | let keys1 = Keys::generate(); | 2895 | let keys1 = Keys::generate(); |
| 2322 | let keys2 = Keys::generate(); | 2896 | let keys2 = Keys::generate(); |
| 2323 | 2897 | ||
| 2898 | // Add announcement | ||
| 2899 | let ann_event = EventBuilder::text_note("announcement") | ||
| 2900 | .sign_with_keys(&keys1) | ||
| 2901 | .unwrap(); | ||
| 2902 | let ann_event_id = ann_event.id; | ||
| 2903 | purgatory.add_announcement( | ||
| 2904 | ann_event, | ||
| 2905 | "repo1".to_string(), | ||
| 2906 | keys1.public_key(), | ||
| 2907 | repo_dir.clone(), | ||
| 2908 | HashSet::new(), | ||
| 2909 | ); | ||
| 2910 | |||
| 2324 | // Add multiple state events | 2911 | // Add multiple state events |
| 2325 | let state1 = EventBuilder::text_note("state 1") | 2912 | let state1 = EventBuilder::text_note("state 1") |
| 2326 | .sign_with_keys(&keys1) | 2913 | .sign_with_keys(&keys1) |
| @@ -2380,7 +2967,8 @@ async fn test_comprehensive_roundtrip() { | |||
| 2380 | purgatory.cleanup(); | 2967 | purgatory.cleanup(); |
| 2381 | 2968 | ||
| 2382 | // Verify initial state | 2969 | // Verify initial state |
| 2383 | let (state_count, pr_count) = purgatory.count(); | 2970 | let (ann_count, state_count, pr_count) = purgatory.count(); |
| 2971 | assert_eq!(ann_count, 1); // announcement | ||
| 2384 | assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) | 2972 | assert_eq!(state_count, 2); // state1, state2 (expired_event was cleaned up) |
| 2385 | assert_eq!(pr_count, 2); // pr-1, pr-2 | 2973 | assert_eq!(pr_count, 2); // pr-1, pr-2 |
| 2386 | assert_eq!(purgatory.expired_count(), 1); // expired_event | 2974 | assert_eq!(purgatory.expired_count(), 1); // expired_event |
| @@ -2393,11 +2981,18 @@ async fn test_comprehensive_roundtrip() { | |||
| 2393 | purgatory2.restore_from_disk(&state_file).unwrap(); | 2981 | purgatory2.restore_from_disk(&state_file).unwrap(); |
| 2394 | 2982 | ||
| 2395 | // Verify all data was restored correctly | 2983 | // Verify all data was restored correctly |
| 2396 | let (state_count2, pr_count2) = purgatory2.count(); | 2984 | let (ann_count2, state_count2, pr_count2) = purgatory2.count(); |
| 2985 | assert_eq!(ann_count2, 1); | ||
| 2397 | assert_eq!(state_count2, 2); | 2986 | assert_eq!(state_count2, 2); |
| 2398 | assert_eq!(pr_count2, 2); | 2987 | assert_eq!(pr_count2, 2); |
| 2399 | assert_eq!(purgatory2.expired_count(), 1); | 2988 | assert_eq!(purgatory2.expired_count(), 1); |
| 2400 | 2989 | ||
| 2990 | // Verify announcement | ||
| 2991 | let restored_ann = purgatory2 | ||
| 2992 | .find_announcement(&keys1.public_key(), "repo1") | ||
| 2993 | .unwrap(); | ||
| 2994 | assert_eq!(restored_ann.event.id, ann_event_id); | ||
| 2995 | |||
| 2401 | // Verify state events | 2996 | // Verify state events |
| 2402 | assert_eq!(purgatory2.find_state("repo1").len(), 1); | 2997 | assert_eq!(purgatory2.find_state("repo1").len(), 1); |
| 2403 | assert_eq!(purgatory2.find_state("repo2").len(), 1); | 2998 | assert_eq!(purgatory2.find_state("repo2").len(), 1); |
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 904f8af..8297515 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -75,7 +75,12 @@ pub trait SyncContext: Send + Sync { | |||
| 75 | /// # Returns | 75 | /// # Returns |
| 76 | /// Set of clone URLs from PR events in purgatory for this identifier | 76 | /// Set of clone URLs from PR events in purgatory for this identifier |
| 77 | fn collect_pr_clone_urls(&self, identifier: &str) -> HashSet<String>; | 77 | fn collect_pr_clone_urls(&self, identifier: &str) -> HashSet<String>; |
| 78 | /// Get repository data (announcements, clone URLs, etc.) from the database. | 78 | /// Get repository data (announcements, clone URLs, etc.) from the database and purgatory. |
| 79 | /// | ||
| 80 | /// Checks both the database (promoted announcements) and purgatory (announcements | ||
| 81 | /// awaiting git data). This is necessary to obtain clone URLs when an announcement | ||
| 82 | /// has not yet been promoted - without purgatory data, the sync loop would have no | ||
| 83 | /// URLs to fetch from and the announcement could never be promoted (circular deadlock). | ||
| 79 | /// | 84 | /// |
| 80 | /// # Arguments | 85 | /// # Arguments |
| 81 | /// * `identifier` - The repository identifier (d-tag value) | 86 | /// * `identifier` - The repository identifier (d-tag value) |
| @@ -279,7 +284,16 @@ impl SyncContext for RealSyncContext { | |||
| 279 | } | 284 | } |
| 280 | 285 | ||
| 281 | async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> { | 286 | async fn fetch_repository_data(&self, identifier: &str) -> Result<RepositoryData> { |
| 282 | crate::git::authorization::fetch_repository_data(&self.database, identifier).await | 287 | // Use the purgatory-aware variant so that clone URLs from announcements still |
| 288 | // in purgatory (not yet promoted) are available. Without this, the sync loop | ||
| 289 | // would find no URLs to fetch from and the announcement could never be promoted | ||
| 290 | // (circular deadlock: can't promote without git data, can't get git data without URLs). | ||
| 291 | crate::git::authorization::fetch_repository_data_with_purgatory( | ||
| 292 | &self.database, | ||
| 293 | &self.purgatory, | ||
| 294 | identifier, | ||
| 295 | ) | ||
| 296 | .await | ||
| 283 | } | 297 | } |
| 284 | 298 | ||
| 285 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { | 299 | fn collect_needed_oids(&self, identifier: &str) -> HashSet<String> { |
| @@ -487,7 +501,9 @@ impl SyncContext for RealSyncContext { | |||
| 487 | source_repo_path: &Path, | 501 | source_repo_path: &Path, |
| 488 | new_oids: &HashSet<String>, | 502 | new_oids: &HashSet<String>, |
| 489 | ) -> Result<ProcessResult> { | 503 | ) -> Result<ProcessResult> { |
| 490 | // Delegate to the unified function from git::sync | 504 | // Delegate to the unified function from git::sync. |
| 505 | // Pass None for write_policy and rejected_events_index: the purgatory sync path | ||
| 506 | // already handles hot-cache re-processing via SyncManager::process_event_static. | ||
| 491 | let result = crate::git::sync::process_newly_available_git_data( | 507 | let result = crate::git::sync::process_newly_available_git_data( |
| 492 | source_repo_path, | 508 | source_repo_path, |
| 493 | new_oids, | 509 | new_oids, |
| @@ -495,6 +511,8 @@ impl SyncContext for RealSyncContext { | |||
| 495 | self.local_relay.as_ref(), | 511 | self.local_relay.as_ref(), |
| 496 | &self.purgatory, | 512 | &self.purgatory, |
| 497 | &self.git_data_path, | 513 | &self.git_data_path, |
| 514 | None, | ||
| 515 | None, | ||
| 498 | ) | 516 | ) |
| 499 | .await?; | 517 | .await?; |
| 500 | 518 | ||
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index e37a3e1..1af5c4e 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -6,6 +6,8 @@ | |||
| 6 | 6 | ||
| 7 | use nostr_sdk::prelude::*; | 7 | use nostr_sdk::prelude::*; |
| 8 | use serde::{Deserialize, Serialize}; | 8 | use serde::{Deserialize, Serialize}; |
| 9 | use std::collections::HashSet; | ||
| 10 | use std::path::PathBuf; | ||
| 9 | use std::time::Instant; | 11 | use std::time::Instant; |
| 10 | 12 | ||
| 11 | /// Source of an event entering purgatory. | 13 | /// Source of an event entering purgatory. |
| @@ -143,3 +145,40 @@ pub struct PrPurgatoryEntry { | |||
| 143 | #[serde(default)] | 145 | #[serde(default)] |
| 144 | pub source: EventSource, | 146 | pub source: EventSource, |
| 145 | } | 147 | } |
| 148 | |||
| 149 | /// Entry for a repository announcement (kind 30617) waiting in purgatory. | ||
| 150 | /// | ||
| 151 | /// Announcements are held in purgatory until git data arrives, proving | ||
| 152 | /// the repository has actual content. This prevents serving announcements | ||
| 153 | /// for empty repositories. | ||
| 154 | /// | ||
| 155 | /// Note: `Instant` fields cannot be serialized directly. Use the `persistence` | ||
| 156 | /// module to convert to/from serializable wrapper types. | ||
| 157 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 158 | pub struct AnnouncementPurgatoryEntry { | ||
| 159 | /// The nostr announcement event (kind 30617) | ||
| 160 | pub event: Event, | ||
| 161 | |||
| 162 | /// The repository identifier from the event's 'd' tag | ||
| 163 | pub identifier: String, | ||
| 164 | |||
| 165 | /// The owner pubkey (event author) | ||
| 166 | pub owner: PublicKey, | ||
| 167 | |||
| 168 | /// Path to the bare git repository | ||
| 169 | pub repo_path: PathBuf, | ||
| 170 | |||
| 171 | /// Relay URLs from the announcement (for sync registration) | ||
| 172 | pub relays: HashSet<String>, | ||
| 173 | |||
| 174 | /// When this entry was added to purgatory | ||
| 175 | #[serde(skip, default = "instant_now")] | ||
| 176 | pub created_at: Instant, | ||
| 177 | |||
| 178 | /// Expiry deadline (30 min from creation, may be extended) | ||
| 179 | #[serde(skip, default = "instant_now")] | ||
| 180 | pub expires_at: Instant, | ||
| 181 | |||
| 182 | /// Whether the bare repo has been deleted (soft expiry) | ||
| 183 | pub soft_expired: bool, | ||
| 184 | } | ||
diff --git a/src/sync/algorithms.rs b/src/sync/algorithms.rs index 39788bc..9899abc 100644 --- a/src/sync/algorithms.rs +++ b/src/sync/algorithms.rs | |||
| @@ -25,8 +25,10 @@ use super::{ConnectionStatus, PendingBatch, RelayState}; | |||
| 25 | /// this repo need to sync from", it's "what repos does this relay need to sync". | 25 | /// this repo need to sync from", it's "what repos does this relay need to sync". |
| 26 | #[derive(Debug, Clone, Default)] | 26 | #[derive(Debug, Clone, Default)] |
| 27 | pub struct RelaySyncNeeds { | 27 | pub struct RelaySyncNeeds { |
| 28 | /// Repos that need to be synced from this relay | 28 | /// Repos that need full L2+L3 sync from this relay |
| 29 | pub repos: HashSet<String>, | 29 | pub repos: HashSet<String>, |
| 30 | /// Repos that only need state event sync (purgatory announcements) | ||
| 31 | pub state_only_repos: HashSet<String>, | ||
| 30 | /// Root events that need to be tracked from this relay | 32 | /// Root events that need to be tracked from this relay |
| 31 | pub root_events: HashSet<EventId>, | 33 | pub root_events: HashSet<EventId>, |
| 32 | } | 34 | } |
| @@ -67,8 +69,15 @@ pub fn derive_relay_targets( | |||
| 67 | for relay_url in &needs.relays { | 69 | for relay_url in &needs.relays { |
| 68 | let entry = relay_targets.entry(relay_url.clone()).or_default(); | 70 | let entry = relay_targets.entry(relay_url.clone()).or_default(); |
| 69 | 71 | ||
| 70 | entry.repos.insert(repo_id.clone()); | 72 | match needs.sync_level { |
| 71 | entry.root_events.extend(needs.root_events.iter().cloned()); | 73 | super::SyncLevel::Full => { |
| 74 | entry.repos.insert(repo_id.clone()); | ||
| 75 | entry.root_events.extend(needs.root_events.iter().cloned()); | ||
| 76 | } | ||
| 77 | super::SyncLevel::StateOnly => { | ||
| 78 | entry.state_only_repos.insert(repo_id.clone()); | ||
| 79 | } | ||
| 80 | } | ||
| 72 | } | 81 | } |
| 73 | } | 82 | } |
| 74 | 83 | ||
| @@ -96,7 +105,7 @@ pub fn compute_actions( | |||
| 96 | pending: &HashMap<String, Vec<PendingBatch>>, | 105 | pending: &HashMap<String, Vec<PendingBatch>>, |
| 97 | confirmed: &HashMap<String, RelayState>, | 106 | confirmed: &HashMap<String, RelayState>, |
| 98 | ) -> Vec<AddFilters> { | 107 | ) -> Vec<AddFilters> { |
| 99 | use crate::sync::filters::build_layer2_and_layer3_filters; | 108 | use crate::sync::filters::build_sync_level_aware_filters; |
| 100 | 109 | ||
| 101 | let mut actions = Vec::new(); | 110 | let mut actions = Vec::new(); |
| 102 | 111 | ||
| @@ -140,14 +149,22 @@ pub fn compute_actions( | |||
| 140 | .map(|state| state.root_events.clone()) | 149 | .map(|state| state.root_events.clone()) |
| 141 | .unwrap_or_default(); | 150 | .unwrap_or_default(); |
| 142 | 151 | ||
| 143 | // Calculate what's NEW (not in pending, not in confirmed) | 152 | // Calculate what's NEW for full repos (not in pending, not in confirmed) |
| 144 | let new_repos: HashSet<String> = target_needs | 153 | let new_full_repos: HashSet<String> = target_needs |
| 145 | .repos | 154 | .repos |
| 146 | .difference(&pending_repos) | 155 | .difference(&pending_repos) |
| 147 | .filter(|repo| !confirmed_repos.contains(*repo)) | 156 | .filter(|repo| !confirmed_repos.contains(*repo)) |
| 148 | .cloned() | 157 | .cloned() |
| 149 | .collect(); | 158 | .collect(); |
| 150 | 159 | ||
| 160 | // Calculate what's NEW for state-only repos | ||
| 161 | let new_state_only_repos: HashSet<String> = target_needs | ||
| 162 | .state_only_repos | ||
| 163 | .difference(&pending_repos) | ||
| 164 | .filter(|repo| !confirmed_repos.contains(*repo)) | ||
| 165 | .cloned() | ||
| 166 | .collect(); | ||
| 167 | |||
| 151 | let new_events: HashSet<EventId> = target_needs | 168 | let new_events: HashSet<EventId> = target_needs |
| 152 | .root_events | 169 | .root_events |
| 153 | .difference(&pending_events) | 170 | .difference(&pending_events) |
| @@ -156,13 +173,23 @@ pub fn compute_actions( | |||
| 156 | .collect(); | 173 | .collect(); |
| 157 | 174 | ||
| 158 | // If there's anything new, create an AddFilters action | 175 | // If there's anything new, create an AddFilters action |
| 159 | if !new_repos.is_empty() || !new_events.is_empty() { | 176 | if !new_full_repos.is_empty() || !new_state_only_repos.is_empty() || !new_events.is_empty() |
| 160 | let filters = build_layer2_and_layer3_filters(&new_repos, &new_events, None); | 177 | { |
| 178 | let filters = build_sync_level_aware_filters( | ||
| 179 | &new_full_repos, | ||
| 180 | &new_state_only_repos, | ||
| 181 | &new_events, | ||
| 182 | None, | ||
| 183 | ); | ||
| 184 | |||
| 185 | // Combine all repos into pending items (pending tracking doesn't need sync level) | ||
| 186 | let mut all_new_repos = new_full_repos; | ||
| 187 | all_new_repos.extend(new_state_only_repos); | ||
| 161 | 188 | ||
| 162 | actions.push(AddFilters { | 189 | actions.push(AddFilters { |
| 163 | relay_url: relay_url.clone(), | 190 | relay_url: relay_url.clone(), |
| 164 | items: PendingItems { | 191 | items: PendingItems { |
| 165 | repos: new_repos, | 192 | repos: all_new_repos, |
| 166 | root_events: new_events, | 193 | root_events: new_events, |
| 167 | }, | 194 | }, |
| 168 | filters, | 195 | filters, |
| @@ -204,6 +231,7 @@ mod tests { | |||
| 204 | ModRepoSyncNeeds { | 231 | ModRepoSyncNeeds { |
| 205 | relays, | 232 | relays, |
| 206 | root_events, | 233 | root_events, |
| 234 | sync_level: Default::default(), | ||
| 207 | }, | 235 | }, |
| 208 | ); | 236 | ); |
| 209 | 237 | ||
| @@ -229,6 +257,7 @@ mod tests { | |||
| 229 | ModRepoSyncNeeds { | 257 | ModRepoSyncNeeds { |
| 230 | relays, | 258 | relays, |
| 231 | root_events: HashSet::new(), | 259 | root_events: HashSet::new(), |
| 260 | sync_level: Default::default(), | ||
| 232 | }, | 261 | }, |
| 233 | ); | 262 | ); |
| 234 | } | 263 | } |
| @@ -252,6 +281,7 @@ mod tests { | |||
| 252 | ModRepoSyncNeeds { | 281 | ModRepoSyncNeeds { |
| 253 | relays, | 282 | relays, |
| 254 | root_events: HashSet::new(), | 283 | root_events: HashSet::new(), |
| 284 | sync_level: Default::default(), | ||
| 255 | }, | 285 | }, |
| 256 | ); | 286 | ); |
| 257 | 287 | ||
| @@ -285,6 +315,7 @@ mod tests { | |||
| 285 | ModRepoSyncNeeds { | 315 | ModRepoSyncNeeds { |
| 286 | relays: relays1, | 316 | relays: relays1, |
| 287 | root_events: root_events1, | 317 | root_events: root_events1, |
| 318 | sync_level: Default::default(), | ||
| 288 | }, | 319 | }, |
| 289 | ); | 320 | ); |
| 290 | 321 | ||
| @@ -299,6 +330,7 @@ mod tests { | |||
| 299 | ModRepoSyncNeeds { | 330 | ModRepoSyncNeeds { |
| 300 | relays: relays2, | 331 | relays: relays2, |
| 301 | root_events: root_events2, | 332 | root_events: root_events2, |
| 333 | sync_level: Default::default(), | ||
| 302 | }, | 334 | }, |
| 303 | ); | 335 | ); |
| 304 | 336 | ||
| @@ -332,6 +364,7 @@ mod tests { | |||
| 332 | "wss://relay1.com".to_string(), | 364 | "wss://relay1.com".to_string(), |
| 333 | RelaySyncNeeds { | 365 | RelaySyncNeeds { |
| 334 | repos: vec!["repo1".to_string()].into_iter().collect(), | 366 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 367 | state_only_repos: HashSet::new(), | ||
| 335 | root_events: HashSet::new(), | 368 | root_events: HashSet::new(), |
| 336 | }, | 369 | }, |
| 337 | ); | 370 | ); |
| @@ -366,6 +399,7 @@ mod tests { | |||
| 366 | "wss://relay1.com".to_string(), | 399 | "wss://relay1.com".to_string(), |
| 367 | RelaySyncNeeds { | 400 | RelaySyncNeeds { |
| 368 | repos: vec!["repo1".to_string()].into_iter().collect(), | 401 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 402 | state_only_repos: HashSet::new(), | ||
| 369 | root_events: HashSet::new(), | 403 | root_events: HashSet::new(), |
| 370 | }, | 404 | }, |
| 371 | ); | 405 | ); |
| @@ -389,6 +423,7 @@ mod tests { | |||
| 389 | "wss://relay1.com".to_string(), | 423 | "wss://relay1.com".to_string(), |
| 390 | RelaySyncNeeds { | 424 | RelaySyncNeeds { |
| 391 | repos: vec!["repo1".to_string()].into_iter().collect(), | 425 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 426 | state_only_repos: HashSet::new(), | ||
| 392 | root_events: HashSet::new(), | 427 | root_events: HashSet::new(), |
| 393 | }, | 428 | }, |
| 394 | ); | 429 | ); |
| @@ -428,6 +463,7 @@ mod tests { | |||
| 428 | "wss://relay1.com".to_string(), | 463 | "wss://relay1.com".to_string(), |
| 429 | RelaySyncNeeds { | 464 | RelaySyncNeeds { |
| 430 | repos: vec!["repo1".to_string()].into_iter().collect(), | 465 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 466 | state_only_repos: HashSet::new(), | ||
| 431 | root_events: HashSet::new(), | 467 | root_events: HashSet::new(), |
| 432 | }, | 468 | }, |
| 433 | ); | 469 | ); |
| @@ -465,6 +501,7 @@ mod tests { | |||
| 465 | "wss://relay1.com".to_string(), | 501 | "wss://relay1.com".to_string(), |
| 466 | RelaySyncNeeds { | 502 | RelaySyncNeeds { |
| 467 | repos: vec!["repo1".to_string()].into_iter().collect(), | 503 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 504 | state_only_repos: HashSet::new(), | ||
| 468 | root_events: HashSet::new(), | 505 | root_events: HashSet::new(), |
| 469 | }, | 506 | }, |
| 470 | ); | 507 | ); |
| @@ -510,6 +547,7 @@ mod tests { | |||
| 510 | ] | 547 | ] |
| 511 | .into_iter() | 548 | .into_iter() |
| 512 | .collect(), | 549 | .collect(), |
| 550 | state_only_repos: HashSet::new(), | ||
| 513 | root_events: HashSet::new(), | 551 | root_events: HashSet::new(), |
| 514 | }, | 552 | }, |
| 515 | ); | 553 | ); |
| @@ -572,6 +610,7 @@ mod tests { | |||
| 572 | "wss://relay1.com".to_string(), | 610 | "wss://relay1.com".to_string(), |
| 573 | RelaySyncNeeds { | 611 | RelaySyncNeeds { |
| 574 | repos: HashSet::new(), | 612 | repos: HashSet::new(), |
| 613 | state_only_repos: HashSet::new(), | ||
| 575 | root_events: vec![event_id].into_iter().collect(), | 614 | root_events: vec![event_id].into_iter().collect(), |
| 576 | }, | 615 | }, |
| 577 | ); | 616 | ); |
| @@ -599,6 +638,7 @@ mod tests { | |||
| 599 | "wss://new-relay.com".to_string(), | 638 | "wss://new-relay.com".to_string(), |
| 600 | RelaySyncNeeds { | 639 | RelaySyncNeeds { |
| 601 | repos: vec!["repo1".to_string()].into_iter().collect(), | 640 | repos: vec!["repo1".to_string()].into_iter().collect(), |
| 641 | state_only_repos: HashSet::new(), | ||
| 602 | root_events: HashSet::new(), | 642 | root_events: HashSet::new(), |
| 603 | }, | 643 | }, |
| 604 | ); | 644 | ); |
diff --git a/src/sync/filters.rs b/src/sync/filters.rs index 3592489..1215e81 100644 --- a/src/sync/filters.rs +++ b/src/sync/filters.rs | |||
| @@ -245,6 +245,37 @@ pub fn build_layer2_and_layer3_filters( | |||
| 245 | filters | 245 | filters |
| 246 | } | 246 | } |
| 247 | 247 | ||
| 248 | /// Builds filters respecting SyncLevel for each repo | ||
| 249 | /// | ||
| 250 | /// StateOnly repos only get state event filters (kind 30618). | ||
| 251 | /// Full repos get all L2/L3 filters (state + repo-tagging + root event). | ||
| 252 | /// | ||
| 253 | /// # Arguments | ||
| 254 | /// * `full_repos` - Repos needing full L2+L3 sync | ||
| 255 | /// * `state_only_repos` - Repos needing only state event sync (purgatory) | ||
| 256 | /// * `root_events` - Root event IDs (only used for Full repos) | ||
| 257 | /// * `since` - Optional timestamp for incremental sync | ||
| 258 | pub fn build_sync_level_aware_filters( | ||
| 259 | full_repos: &HashSet<String>, | ||
| 260 | state_only_repos: &HashSet<String>, | ||
| 261 | root_events: &HashSet<EventId>, | ||
| 262 | since: Option<Timestamp>, | ||
| 263 | ) -> Vec<Filter> { | ||
| 264 | let mut filters = Vec::new(); | ||
| 265 | |||
| 266 | // All repos (both Full and StateOnly) need state event filters | ||
| 267 | let all_repos: HashSet<String> = full_repos.union(state_only_repos).cloned().collect(); | ||
| 268 | filters.extend(state_event_filters_for_our_repos(&all_repos, since)); | ||
| 269 | |||
| 270 | // Only Full repos get repo-tagging and root event filters | ||
| 271 | if !full_repos.is_empty() { | ||
| 272 | filters.extend(tagged_one_of_our_repo_event_filters(full_repos, since)); | ||
| 273 | } | ||
| 274 | filters.extend(tagged_one_of_our_root_event_filters(root_events, since)); | ||
| 275 | |||
| 276 | filters | ||
| 277 | } | ||
| 278 | |||
| 248 | #[cfg(test)] | 279 | #[cfg(test)] |
| 249 | mod tests { | 280 | mod tests { |
| 250 | use super::*; | 281 | use super::*; |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d6634ff..cd62380 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -85,6 +85,19 @@ use rejected_index::RejectedEventsIndex; | |||
| 85 | // Supporting Data Structures | 85 | // Supporting Data Structures |
| 86 | // ============================================================================= | 86 | // ============================================================================= |
| 87 | 87 | ||
| 88 | /// Level of sync needed for a repository | ||
| 89 | /// | ||
| 90 | /// Purgatory announcements only need state events synced (to validate git data). | ||
| 91 | /// Promoted repos need full L2/L3 sync (patches, issues, PRs, etc.). | ||
| 92 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] | ||
| 93 | pub enum SyncLevel { | ||
| 94 | /// Full L2 + L3 sync (promoted repos with git data) | ||
| 95 | #[default] | ||
| 96 | Full, | ||
| 97 | /// Only state events (kind 30618) - for purgatory announcements | ||
| 98 | StateOnly, | ||
| 99 | } | ||
| 100 | |||
| 88 | /// What repos and root events need to be synced | 101 | /// What repos and root events need to be synced |
| 89 | #[derive(Debug, Clone, Default)] | 102 | #[derive(Debug, Clone, Default)] |
| 90 | pub struct RepoSyncNeeds { | 103 | pub struct RepoSyncNeeds { |
| @@ -92,6 +105,8 @@ pub struct RepoSyncNeeds { | |||
| 92 | pub relays: HashSet<String>, | 105 | pub relays: HashSet<String>, |
| 93 | /// Root event IDs - 1617/1618/1621 - that reference this repo | 106 | /// Root event IDs - 1617/1618/1621 - that reference this repo |
| 94 | pub root_events: HashSet<EventId>, | 107 | pub root_events: HashSet<EventId>, |
| 108 | /// Sync level - StateOnly for purgatory, Full for promoted repos | ||
| 109 | pub sync_level: SyncLevel, | ||
| 95 | } | 110 | } |
| 96 | 111 | ||
| 97 | /// Connection status for a relay | 112 | /// Connection status for a relay |
| @@ -382,6 +397,40 @@ async fn run_daily_timer( | |||
| 382 | } | 397 | } |
| 383 | } | 398 | } |
| 384 | 399 | ||
| 400 | /// Background task that periodically syncs purgatory announcements into repo_sync_index. | ||
| 401 | /// | ||
| 402 | /// Runs every 5 seconds by default (200ms when `NGIT_TEST=1`). | ||
| 403 | /// For each announcement currently in purgatory, ensures there is a `StateOnly` entry in | ||
| 404 | /// `repo_sync_index`. New entries trigger `handle_new_sync_filters` which connects to the | ||
| 405 | /// relay URLs listed in the announcement and subscribes to state events (kind 30618). | ||
| 406 | /// | ||
| 407 | /// This is the sole registration path for purgatory announcements: | ||
| 408 | /// - Sync-path announcements: registered here within one interval of arriving. | ||
| 409 | /// - User-submitted purgatory announcements: the SelfSubscriber never sees them | ||
| 410 | /// (they're rejected from DB), so this timer is the only registration path. | ||
| 411 | async fn run_purgatory_announcement_sync( | ||
| 412 | sync_manager: Arc<Mutex<SyncManager>>, | ||
| 413 | mut shutdown_rx: broadcast::Receiver<()>, | ||
| 414 | ) { | ||
| 415 | let interval = if std::env::var("NGIT_TEST").as_deref() == Ok("1") { | ||
| 416 | Duration::from_millis(200) | ||
| 417 | } else { | ||
| 418 | Duration::from_secs(5) | ||
| 419 | }; | ||
| 420 | loop { | ||
| 421 | tokio::select! { | ||
| 422 | _ = tokio::time::sleep(interval) => { | ||
| 423 | let mut manager = sync_manager.lock().await; | ||
| 424 | manager.sync_purgatory_announcements_to_index().await; | ||
| 425 | } | ||
| 426 | _ = shutdown_rx.recv() => { | ||
| 427 | tracing::debug!("Purgatory announcement sync timer received shutdown signal"); | ||
| 428 | break; | ||
| 429 | } | ||
| 430 | } | ||
| 431 | } | ||
| 432 | } | ||
| 433 | |||
| 385 | // Combined Health and Metrics Checker | 434 | // Combined Health and Metrics Checker |
| 386 | 435 | ||
| 387 | /// Background task for cleaning up expired entries from the rejected events index | 436 | /// Background task for cleaning up expired entries from the rejected events index |
| @@ -936,9 +985,29 @@ impl SyncManager { | |||
| 936 | 985 | ||
| 937 | // Create REQ+EOSE subscriptions using original semantic filters | 986 | // Create REQ+EOSE subscriptions using original semantic filters |
| 938 | // This queries by kind/author/tags instead of by ID, which may | 987 | // This queries by kind/author/tags instead of by ID, which may |
| 939 | // succeed even when ID-based queries fail | 988 | // succeed even when ID-based queries fail. |
| 940 | let fallback_filters = filters::build_layer2_and_layer3_filters( | 989 | // Split batch_repos by SyncLevel to avoid sending Layer 2 filters |
| 941 | &batch_repos, | 990 | // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be |
| 991 | // rejected as orphan and then silently dropped by nostr-sdk deduplication. | ||
| 992 | let (full_repos, state_only_repos) = { | ||
| 993 | let repo_index = self.repo_sync_index.read().await; | ||
| 994 | let mut full = HashSet::new(); | ||
| 995 | let mut state_only = HashSet::new(); | ||
| 996 | for repo_ref in &batch_repos { | ||
| 997 | match repo_index.get(repo_ref).map(|n| n.sync_level) { | ||
| 998 | Some(SyncLevel::StateOnly) => { | ||
| 999 | state_only.insert(repo_ref.clone()); | ||
| 1000 | } | ||
| 1001 | _ => { | ||
| 1002 | full.insert(repo_ref.clone()); | ||
| 1003 | } | ||
| 1004 | } | ||
| 1005 | } | ||
| 1006 | (full, state_only) | ||
| 1007 | }; | ||
| 1008 | let fallback_filters = filters::build_sync_level_aware_filters( | ||
| 1009 | &full_repos, | ||
| 1010 | &state_only_repos, | ||
| 942 | &batch_root_events, | 1011 | &batch_root_events, |
| 943 | None, | 1012 | None, |
| 944 | ); | 1013 | ); |
| @@ -1272,7 +1341,7 @@ impl SyncManager { | |||
| 1272 | /// to be batched and create Layer 2/3 filters before we mark sync complete. | 1341 | /// to be batched and create Layer 2/3 filters before we mark sync complete. |
| 1273 | /// | 1342 | /// |
| 1274 | /// The 6-second delay is based on: | 1343 | /// The 6-second delay is based on: |
| 1275 | /// - Self-subscriber batch window: 5 seconds (configurable via NGIT_SYNC_BATCH_WINDOW_MS) | 1344 | /// - Self-subscriber batch window: 5 seconds (200ms when `NGIT_TEST=1`) |
| 1276 | /// - Buffer for processing: 1 second | 1345 | /// - Buffer for processing: 1 second |
| 1277 | /// | 1346 | /// |
| 1278 | /// Called after each batch is confirmed to detect completion. | 1347 | /// Called after each batch is confirmed to detect completion. |
| @@ -1486,7 +1555,17 @@ impl SyncManager { | |||
| 1486 | run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; | 1555 | run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await; |
| 1487 | }); | 1556 | }); |
| 1488 | 1557 | ||
| 1489 | // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | 1558 | // 11. Spawn purgatory announcement sync timer (every 5s) |
| 1559 | // Ensures purgatory announcements (including user-submitted ones that never | ||
| 1560 | // touch the DB) are registered in repo_sync_index as StateOnly so that | ||
| 1561 | // state event subscriptions are established on their listed relay URLs. | ||
| 1562 | let purgatory_sync_manager = Arc::clone(&sync_manager); | ||
| 1563 | let purgatory_sync_shutdown = shutdown_tx.subscribe(); | ||
| 1564 | tokio::spawn(async move { | ||
| 1565 | run_purgatory_announcement_sync(purgatory_sync_manager, purgatory_sync_shutdown).await; | ||
| 1566 | }); | ||
| 1567 | |||
| 1568 | // 12. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications | ||
| 1490 | loop { | 1569 | loop { |
| 1491 | // Wait for an event without holding the lock | 1570 | // Wait for an event without holding the lock |
| 1492 | tokio::select! { | 1571 | tokio::select! { |
| @@ -1719,6 +1798,10 @@ impl SyncManager { | |||
| 1719 | 1798 | ||
| 1720 | // For sync-triggered events that go to purgatory, trigger immediate sync | 1799 | // For sync-triggered events that go to purgatory, trigger immediate sync |
| 1721 | // (instead of the default 3-minute delay for user-submitted events) | 1800 | // (instead of the default 3-minute delay for user-submitted events) |
| 1801 | // | ||
| 1802 | // Note: announcement events (kind 30617) are registered in repo_sync_index | ||
| 1803 | // by the purgatory announcement sync timer (run_purgatory_announcement_sync) | ||
| 1804 | // rather than inline here. | ||
| 1722 | if result == ProcessResult::Purgatory { | 1805 | if result == ProcessResult::Purgatory { |
| 1723 | // State events (kind 30618) - extract identifier and trigger immediate sync | 1806 | // State events (kind 30618) - extract identifier and trigger immediate sync |
| 1724 | if event.kind.as_u16() == 30618 { | 1807 | if event.kind.as_u16() == 30618 { |
| @@ -2303,6 +2386,80 @@ impl SyncManager { | |||
| 2303 | } | 2386 | } |
| 2304 | } | 2387 | } |
| 2305 | 2388 | ||
| 2389 | /// Sync purgatory announcements into repo_sync_index as StateOnly entries. | ||
| 2390 | /// | ||
| 2391 | /// Called periodically by the purgatory announcement sync timer (every 5s). | ||
| 2392 | /// For each announcement currently in purgatory, ensures a `StateOnly` entry | ||
| 2393 | /// exists in `repo_sync_index`. New entries are then picked up by | ||
| 2394 | /// `handle_new_sync_filters` which connects to listed relay URLs and subscribes | ||
| 2395 | /// to state events for that repo. | ||
| 2396 | /// | ||
| 2397 | /// Idempotent: existing entries are not downgraded (a promoted Full entry stays Full). | ||
| 2398 | async fn sync_purgatory_announcements_to_index(&mut self) { | ||
| 2399 | use crate::sync::algorithms::{compute_actions, derive_relay_targets}; | ||
| 2400 | |||
| 2401 | // Collect all purgatory announcements (snapshot - no async holds) | ||
| 2402 | let announcements = self.purgatory.announcements_for_sync(); | ||
| 2403 | |||
| 2404 | if announcements.is_empty() { | ||
| 2405 | return; | ||
| 2406 | } | ||
| 2407 | |||
| 2408 | // Register any new entries in repo_sync_index as StateOnly | ||
| 2409 | let mut new_relay_urls: std::collections::HashSet<String> = std::collections::HashSet::new(); | ||
| 2410 | { | ||
| 2411 | let mut index = self.repo_sync_index.write().await; | ||
| 2412 | for (repo_id, relays) in &announcements { | ||
| 2413 | let entry = index.entry(repo_id.clone()).or_insert_with(|| { | ||
| 2414 | tracing::debug!( | ||
| 2415 | repo_id = %repo_id, | ||
| 2416 | "Registering purgatory announcement in repo_sync_index as StateOnly" | ||
| 2417 | ); | ||
| 2418 | RepoSyncNeeds { | ||
| 2419 | relays: std::collections::HashSet::new(), | ||
| 2420 | root_events: std::collections::HashSet::new(), | ||
| 2421 | sync_level: SyncLevel::StateOnly, | ||
| 2422 | } | ||
| 2423 | }); | ||
| 2424 | // Don't downgrade an already-Full entry | ||
| 2425 | // Add any new relay URLs | ||
| 2426 | for relay in relays { | ||
| 2427 | if entry.relays.insert(relay.clone()) { | ||
| 2428 | new_relay_urls.insert(relay.clone()); | ||
| 2429 | } | ||
| 2430 | } | ||
| 2431 | } | ||
| 2432 | } | ||
| 2433 | |||
| 2434 | if new_relay_urls.is_empty() { | ||
| 2435 | return; | ||
| 2436 | } | ||
| 2437 | |||
| 2438 | // For any relay URLs that are new, compute and send AddFilters actions | ||
| 2439 | let all_targets = { | ||
| 2440 | let repo_index = self.repo_sync_index.read().await; | ||
| 2441 | derive_relay_targets(&repo_index) | ||
| 2442 | }; | ||
| 2443 | |||
| 2444 | let actions = { | ||
| 2445 | let pending_index = self.pending_sync_index.read().await; | ||
| 2446 | let relay_index = self.relay_sync_index.read().await; | ||
| 2447 | compute_actions(&all_targets, &pending_index, &relay_index) | ||
| 2448 | }; | ||
| 2449 | |||
| 2450 | for action in actions { | ||
| 2451 | // Only act on relays that have new URLs (avoids redundant work) | ||
| 2452 | if new_relay_urls.contains(&action.relay_url) { | ||
| 2453 | tracing::info!( | ||
| 2454 | relay = %action.relay_url, | ||
| 2455 | repos = action.items.repos.len(), | ||
| 2456 | "Purgatory sync timer: connecting to new relay from purgatory announcement" | ||
| 2457 | ); | ||
| 2458 | self.handle_new_sync_filters(action).await; | ||
| 2459 | } | ||
| 2460 | } | ||
| 2461 | } | ||
| 2462 | |||
| 2306 | /// Handle a relay disconnection | 2463 | /// Handle a relay disconnection |
| 2307 | /// | 2464 | /// |
| 2308 | /// This method is called when the event loop terminates and sends a disconnect notification. | 2465 | /// This method is called when the event loop terminates and sends a disconnect notification. |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 86e4583..4d69c9a 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -18,7 +18,7 @@ use tokio::sync::{broadcast, mpsc}; | |||
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | 19 | use crate::nostr::builder::SharedDatabase; |
| 20 | 20 | ||
| 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; |
| 22 | 22 | ||
| 23 | // ============================================================================= | 23 | // ============================================================================= |
| 24 | // LoopControl - Result of notification processing | 24 | // LoopControl - Result of notification processing |
| @@ -60,6 +60,7 @@ impl PendingUpdates { | |||
| 60 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { | 60 | let entry = self.repos.entry(repo_id).or_insert_with(|| RepoSyncNeeds { |
| 61 | relays: HashSet::new(), | 61 | relays: HashSet::new(), |
| 62 | root_events: HashSet::new(), | 62 | root_events: HashSet::new(), |
| 63 | sync_level: SyncLevel::Full, | ||
| 63 | }); | 64 | }); |
| 64 | entry.relays.extend(relays); | 65 | entry.relays.extend(relays); |
| 65 | entry.root_events.extend(root_events); | 66 | entry.root_events.extend(root_events); |
| @@ -132,14 +133,14 @@ impl SelfSubscriber { | |||
| 132 | 133 | ||
| 133 | /// Get batch window from environment or use default | 134 | /// Get batch window from environment or use default |
| 134 | /// | 135 | /// |
| 135 | /// Reads `NGIT_SYNC_BATCH_WINDOW_MS` environment variable. | 136 | /// When `NGIT_TEST=1` is set, uses 200ms for faster test execution. |
| 136 | /// Default: 5000ms (5 seconds) | 137 | /// Default: 5000ms (5 seconds) |
| 137 | fn get_batch_window() -> Duration { | 138 | fn get_batch_window() -> Duration { |
| 138 | std::env::var("NGIT_SYNC_BATCH_WINDOW_MS") | 139 | if std::env::var("NGIT_TEST").as_deref() == Ok("1") { |
| 139 | .ok() | 140 | Duration::from_millis(200) |
| 140 | .and_then(|s| s.parse::<u64>().ok()) | 141 | } else { |
| 141 | .map(Duration::from_millis) | 142 | Duration::from_millis(5000) |
| 142 | .unwrap_or(Duration::from_millis(5000)) | 143 | } |
| 143 | } | 144 | } |
| 144 | 145 | ||
| 145 | /// Load existing events from database on startup | 146 | /// Load existing events from database on startup |
| @@ -197,6 +198,7 @@ impl SelfSubscriber { | |||
| 197 | .or_insert_with(|| RepoSyncNeeds { | 198 | .or_insert_with(|| RepoSyncNeeds { |
| 198 | relays: HashSet::new(), | 199 | relays: HashSet::new(), |
| 199 | root_events: HashSet::new(), | 200 | root_events: HashSet::new(), |
| 201 | sync_level: SyncLevel::StateOnly, | ||
| 200 | }); | 202 | }); |
| 201 | entry.relays.extend(needs.relays.clone()); | 203 | entry.relays.extend(needs.relays.clone()); |
| 202 | } | 204 | } |
| @@ -570,7 +572,12 @@ impl SelfSubscriber { | |||
| 570 | .or_insert_with(|| RepoSyncNeeds { | 572 | .or_insert_with(|| RepoSyncNeeds { |
| 571 | relays: HashSet::new(), | 573 | relays: HashSet::new(), |
| 572 | root_events: HashSet::new(), | 574 | root_events: HashSet::new(), |
| 575 | sync_level: SyncLevel::Full, | ||
| 573 | }); | 576 | }); |
| 577 | // Upgrade sync_level to Full - this handles the case where the entry | ||
| 578 | // already exists as StateOnly (purgatory announcement) and is now being | ||
| 579 | // promoted (git data arrived and the event was broadcast via notify_event). | ||
| 580 | entry.sync_level = SyncLevel::Full; | ||
| 574 | entry.relays.extend(needs.relays); | 581 | entry.relays.extend(needs.relays); |
| 575 | entry.root_events.extend(needs.root_events); | 582 | entry.root_events.extend(needs.root_events); |
| 576 | 583 | ||
| @@ -594,21 +601,26 @@ impl SelfSubscriber { | |||
| 594 | continue; | 601 | continue; |
| 595 | } | 602 | } |
| 596 | 603 | ||
| 597 | // Build filters for these repos | 604 | // Build filters for these repos (sync-level-aware) |
| 598 | let filters = crate::sync::filters::build_layer2_and_layer3_filters( | 605 | let filters = crate::sync::filters::build_sync_level_aware_filters( |
| 599 | &needs.repos, | 606 | &needs.repos, |
| 607 | &needs.state_only_repos, | ||
| 600 | &needs.root_events, | 608 | &needs.root_events, |
| 601 | None, | 609 | None, |
| 602 | ); | 610 | ); |
| 603 | 611 | ||
| 604 | // Log before moving values | 612 | // Log before moving values |
| 605 | let repo_count = needs.repos.len(); | 613 | let repo_count = needs.repos.len() + needs.state_only_repos.len(); |
| 606 | let event_count = needs.root_events.len(); | 614 | let event_count = needs.root_events.len(); |
| 607 | 615 | ||
| 616 | // Combine all repos into pending items | ||
| 617 | let mut all_repos = needs.repos; | ||
| 618 | all_repos.extend(needs.state_only_repos); | ||
| 619 | |||
| 608 | let action = AddFilters { | 620 | let action = AddFilters { |
| 609 | relay_url: relay_url.clone(), | 621 | relay_url: relay_url.clone(), |
| 610 | items: crate::sync::PendingItems { | 622 | items: crate::sync::PendingItems { |
| 611 | repos: needs.repos, | 623 | repos: all_repos, |
| 612 | root_events: needs.root_events, | 624 | root_events: needs.root_events, |
| 613 | }, | 625 | }, |
| 614 | filters, | 626 | filters, |