diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 17:12:17 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-18 17:12:17 +0000 |
| commit | d76003b629a4a03dba23a8a1c41da6e4ac4c30cf (patch) | |
| tree | 38412fdeed3e7996923603fe1964db4e5ce94bdc /src/purgatory | |
| parent | 806936e7d1aab5dfd0c2ad6b98a115122dc1785c (diff) | |
feat: upgrade repo to Full sync and trigger PR event subscription after announcement promotion
When git data arrives for a purgatory announcement and promotes it to the
database, the relay now:
1. Upgrades the announcement's sync level in RepoSyncIndex from StateOnly
to Full (git/sync.rs: process_purgatory_announcements)
2. Sends AddFilters actions to SyncManager for all connected relays, using
Full sync filters (Layer 2 #a/#A/#q) to subscribe to PR events
(purgatory/sync/context.rs: RealSyncContext.process_newly_available_git_data)
3. For user-submitted purgatory announcements, registers the repo in
RepoSyncIndex with StateOnly level and sends AddFilters to SyncManager
so it discovers and connects to relays listed in the announcement tags
(nostr/builder.rs: handle_announcement AcceptPurgatory path)
The RealSyncContext now accepts optional repo_sync_index and sync_action_tx
parameters. main.rs wires these up from SyncManager. PolicyContext gains
repo_sync_index and sync_action_tx fields for the write policy path.
Diffstat (limited to 'src/purgatory')
| -rw-r--r-- | src/purgatory/sync/context.rs | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 3568e89..4dbb402 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -193,6 +193,7 @@ use crate::nostr::builder::SharedDatabase; | |||
| 193 | use crate::nostr::events::RepositoryState; | 193 | use crate::nostr::events::RepositoryState; |
| 194 | use crate::purgatory::Purgatory; | 194 | use crate::purgatory::Purgatory; |
| 195 | use crate::sync::naughty_list::NaughtyListTracker; | 195 | use crate::sync::naughty_list::NaughtyListTracker; |
| 196 | use crate::sync::RepoSyncIndex; | ||
| 196 | 197 | ||
| 197 | use super::functions::extract_domain; | 198 | use super::functions::extract_domain; |
| 198 | 199 | ||
| @@ -221,6 +222,13 @@ pub struct RealSyncContext { | |||
| 221 | 222 | ||
| 222 | /// Naughty list tracker for git remote domains with persistent errors | 223 | /// Naughty list tracker for git remote domains with persistent errors |
| 223 | git_naughty_list: Arc<NaughtyListTracker>, | 224 | git_naughty_list: Arc<NaughtyListTracker>, |
| 225 | |||
| 226 | /// Optional repo sync index for upgrading sync level on promotion | ||
| 227 | repo_sync_index: Option<RepoSyncIndex>, | ||
| 228 | |||
| 229 | /// Optional sender for AddFilters actions to SyncManager. | ||
| 230 | /// Used after announcement promotion to trigger PR event subscription on connected relays. | ||
| 231 | sync_action_tx: Option<tokio::sync::mpsc::Sender<crate::sync::AddFilters>>, | ||
| 224 | } | 232 | } |
| 225 | 233 | ||
| 226 | impl RealSyncContext { | 234 | impl RealSyncContext { |
| @@ -233,6 +241,9 @@ impl RealSyncContext { | |||
| 233 | /// * `our_domain` - Our domain to exclude from clone URLs | 241 | /// * `our_domain` - Our domain to exclude from clone URLs |
| 234 | /// * `local_relay` - Local relay for WebSocket notifications | 242 | /// * `local_relay` - Local relay for WebSocket notifications |
| 235 | /// * `git_naughty_list` - Naughty list tracker for git remote domains | 243 | /// * `git_naughty_list` - Naughty list tracker for git remote domains |
| 244 | /// * `repo_sync_index` - Optional repo sync index for upgrading sync level on promotion | ||
| 245 | /// * `sync_action_tx` - Optional sender for triggering filter recomputation after promotion | ||
| 246 | #[allow(clippy::too_many_arguments)] | ||
| 236 | pub fn new( | 247 | pub fn new( |
| 237 | purgatory: Arc<Purgatory>, | 248 | purgatory: Arc<Purgatory>, |
| 238 | database: SharedDatabase, | 249 | database: SharedDatabase, |
| @@ -240,6 +251,8 @@ impl RealSyncContext { | |||
| 240 | our_domain: Option<String>, | 251 | our_domain: Option<String>, |
| 241 | local_relay: Option<LocalRelay>, | 252 | local_relay: Option<LocalRelay>, |
| 242 | git_naughty_list: Arc<NaughtyListTracker>, | 253 | git_naughty_list: Arc<NaughtyListTracker>, |
| 254 | repo_sync_index: Option<RepoSyncIndex>, | ||
| 255 | sync_action_tx: Option<tokio::sync::mpsc::Sender<crate::sync::AddFilters>>, | ||
| 243 | ) -> Self { | 256 | ) -> Self { |
| 244 | Self { | 257 | Self { |
| 245 | purgatory, | 258 | purgatory, |
| @@ -248,9 +261,23 @@ impl RealSyncContext { | |||
| 248 | our_domain_value: our_domain, | 261 | our_domain_value: our_domain, |
| 249 | local_relay, | 262 | local_relay, |
| 250 | git_naughty_list, | 263 | git_naughty_list, |
| 264 | repo_sync_index, | ||
| 265 | sync_action_tx, | ||
| 251 | } | 266 | } |
| 252 | } | 267 | } |
| 253 | 268 | ||
| 269 | /// Set the sync action sender for triggering filter recomputation after announcement promotion. | ||
| 270 | /// | ||
| 271 | /// When an announcement is promoted from purgatory to Full sync level, the SyncManager | ||
| 272 | /// needs to subscribe to PR events for that repo on all connected relays. This sender | ||
| 273 | /// is used to trigger that subscription. | ||
| 274 | pub fn set_sync_action_tx( | ||
| 275 | &mut self, | ||
| 276 | tx: tokio::sync::mpsc::Sender<crate::sync::AddFilters>, | ||
| 277 | ) { | ||
| 278 | self.sync_action_tx = Some(tx); | ||
| 279 | } | ||
| 280 | |||
| 254 | /// Get reference to the git naughty list tracker | 281 | /// Get reference to the git naughty list tracker |
| 255 | pub fn git_naughty_list(&self) -> &Arc<NaughtyListTracker> { | 282 | pub fn git_naughty_list(&self) -> &Arc<NaughtyListTracker> { |
| 256 | &self.git_naughty_list | 283 | &self.git_naughty_list |
| @@ -482,9 +509,98 @@ impl SyncContext for RealSyncContext { | |||
| 482 | self.local_relay.as_ref(), | 509 | self.local_relay.as_ref(), |
| 483 | &self.purgatory, | 510 | &self.purgatory, |
| 484 | &self.git_data_path, | 511 | &self.git_data_path, |
| 512 | self.repo_sync_index.clone(), | ||
| 485 | ) | 513 | ) |
| 486 | .await?; | 514 | .await?; |
| 487 | 515 | ||
| 516 | // If announcements were promoted (now Full sync level), notify SyncManager to | ||
| 517 | // recompute filters so PR event subscriptions are created on connected relays. | ||
| 518 | if result.announcements_released > 0 { | ||
| 519 | if let (Some(ref tx), Some(ref repo_sync_index)) = | ||
| 520 | (&self.sync_action_tx, &self.repo_sync_index) | ||
| 521 | { | ||
| 522 | let index = repo_sync_index.read().await; | ||
| 523 | for (repo_id, needs) in index.iter() { | ||
| 524 | if needs.sync_level == crate::sync::SyncLevel::Full | ||
| 525 | && !needs.root_events.is_empty() | ||
| 526 | { | ||
| 527 | // Send AddFilters for Full repos with root events | ||
| 528 | for relay_url in &needs.relays { | ||
| 529 | if let Some(ref domain) = self.our_domain_value { | ||
| 530 | if relay_url.contains(domain.as_str()) { | ||
| 531 | continue; | ||
| 532 | } | ||
| 533 | } | ||
| 534 | let full_repos: std::collections::HashSet<String> = | ||
| 535 | std::iter::once(repo_id.clone()).collect(); | ||
| 536 | let filters = | ||
| 537 | crate::sync::filters::build_sync_level_aware_filters( | ||
| 538 | &full_repos, | ||
| 539 | &std::collections::HashSet::new(), | ||
| 540 | &needs.root_events, | ||
| 541 | None, | ||
| 542 | ); | ||
| 543 | let action = crate::sync::AddFilters { | ||
| 544 | relay_url: relay_url.clone(), | ||
| 545 | items: crate::sync::PendingItems { | ||
| 546 | repos: full_repos.clone(), | ||
| 547 | root_events: needs.root_events.clone(), | ||
| 548 | }, | ||
| 549 | filters, | ||
| 550 | }; | ||
| 551 | if let Err(e) = tx.send(action).await { | ||
| 552 | debug!( | ||
| 553 | relay = %relay_url, | ||
| 554 | error = %e, | ||
| 555 | "Failed to send AddFilters after announcement promotion" | ||
| 556 | ); | ||
| 557 | } else { | ||
| 558 | debug!( | ||
| 559 | relay = %relay_url, | ||
| 560 | repo_id = %repo_id, | ||
| 561 | "Sent AddFilters to SyncManager after announcement promotion" | ||
| 562 | ); | ||
| 563 | } | ||
| 564 | } | ||
| 565 | } else if needs.sync_level == crate::sync::SyncLevel::Full { | ||
| 566 | // Even without root_events, send empty repo filter to ensure | ||
| 567 | // Layer 2 subscriptions (PR events) are set up | ||
| 568 | for relay_url in &needs.relays { | ||
| 569 | if let Some(ref domain) = self.our_domain_value { | ||
| 570 | if relay_url.contains(domain.as_str()) { | ||
| 571 | continue; | ||
| 572 | } | ||
| 573 | } | ||
| 574 | let full_repos: std::collections::HashSet<String> = | ||
| 575 | std::iter::once(repo_id.clone()).collect(); | ||
| 576 | let filters = | ||
| 577 | crate::sync::filters::build_sync_level_aware_filters( | ||
| 578 | &full_repos, | ||
| 579 | &std::collections::HashSet::new(), | ||
| 580 | &std::collections::HashSet::new(), | ||
| 581 | None, | ||
| 582 | ); | ||
| 583 | let action = crate::sync::AddFilters { | ||
| 584 | relay_url: relay_url.clone(), | ||
| 585 | items: crate::sync::PendingItems { | ||
| 586 | repos: full_repos.clone(), | ||
| 587 | root_events: std::collections::HashSet::new(), | ||
| 588 | }, | ||
| 589 | filters, | ||
| 590 | }; | ||
| 591 | if let Err(e) = tx.send(action).await { | ||
| 592 | debug!( | ||
| 593 | relay = %relay_url, | ||
| 594 | error = %e, | ||
| 595 | "Failed to send AddFilters (no root_events) after announcement promotion" | ||
| 596 | ); | ||
| 597 | } | ||
| 598 | } | ||
| 599 | } | ||
| 600 | } | ||
| 601 | } | ||
| 602 | } | ||
| 603 | |||
| 488 | // Convert from git::sync::ProcessResult to our ProcessResult | 604 | // Convert from git::sync::ProcessResult to our ProcessResult |
| 489 | Ok(ProcessResult { | 605 | Ok(ProcessResult { |
| 490 | states_released: result.states_released, | 606 | states_released: result.states_released, |