upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/docs/explanation
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 08:39:19 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 08:39:19 +0000
commitda0f173bf4b68928c6b4e3e720562d0b4c0775ac (patch)
tree937e590b188e0c500c429b675583140223ab55c3 /docs/explanation
parent33b716cf9eb88d95394423b77b779009056d4d5a (diff)
docs: purgatory design improve process_satisfiable_events
Diffstat (limited to 'docs/explanation')
-rw-r--r--docs/explanation/purgatory-sync-redesign.md294
1 files changed, 290 insertions, 4 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md
index 382f683..6ff94bb 100644
--- a/docs/explanation/purgatory-sync-redesign.md
+++ b/docs/explanation/purgatory-sync-redesign.md
@@ -27,6 +27,40 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**,
275. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) 275. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default)
286. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs 286. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs
29 29
30### Key Design Decision: Where Does OID Copying Happen?
31
32**Answer: In `process_satisfiable_events`, NOT after the entire sync completes.**
33
34The current implementation (`sync_state_git_data`) fetches all OIDs first, then at the end:
351. Copies OIDs to all authorized owner repos
362. Aligns refs with state
373. Saves to database
384. Notifies subscribers
395. Removes from purgatory
40
41The redesign moves all of this into `process_satisfiable_events`, which is called after **each successful URL fetch**. This enables:
42
43| Aspect | Current (end-of-sync) | Redesign (per-fetch) |
44|--------|----------------------|---------------------|
45| **When events release** | Only after all URLs tried | As soon as OIDs available |
46| **Partial success** | All or nothing per event | Events release independently |
47| **Multiple state events** | All wait for slowest | Each releases when ready |
48| **Authorization check** | Once at start | At release time (handles changes) |
49
50**Why this matters:**
51
52Consider syncing an identifier with 3 state events from different maintainers:
53- State A needs OIDs from `server1.com` (fast)
54- State B needs OIDs from `server2.com` (slow)
55- State C needs OIDs from `server3.com` (down)
56
57With the redesign:
581. Fetch from `server1.com` succeeds → `process_satisfiable_events` releases State A immediately
592. Fetch from `server2.com` succeeds → `process_satisfiable_events` releases State B
603. Fetch from `server3.com` fails → State C stays in purgatory, retries with backoff
61
62The current implementation would wait for all servers before releasing any events.
63
30## Architecture 64## Architecture
31 65
32### Overview 66### Overview
@@ -600,8 +634,24 @@ pub trait SyncContext: Send + Sync {
600 /// Fetch OIDs from a remote server 634 /// Fetch OIDs from a remote server
601 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>; 635 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>;
602 636
603 /// Process events that can now be satisfied (save to DB, notify, remove from purgatory) 637 /// Process events that can now be satisfied.
604 async fn process_satisfiable_events(&self, identifier: &str) -> Result<()>; 638 ///
639 /// For each purgatory event (state or PR) for this identifier:
640 /// 1. Check if all required OIDs are now available in the source repo
641 /// 2. For satisfiable state events:
642 /// a. Check if this state is authorized and should be applied (vs existing states)
643 /// b. Copy OIDs to all owner repos that authorize this state author
644 /// c. Align refs with state in each authorized repo
645 /// d. Save state event to database
646 /// e. Notify WebSocket subscribers
647 /// f. Remove from purgatory
648 /// 3. For satisfiable PR events:
649 /// a. Copy PR commit to owner repos that share maintainers with tagged owners
650 /// b. Create refs/nostr/<event-id> in each repo
651 /// c. Save PR event to database
652 /// d. Notify WebSocket subscribers
653 /// e. Remove from purgatory
654 async fn process_satisfiable_events(&self, identifier: &str) -> Result<ProcessResult>;
605 655
606 /// Check if there are still pending events for this identifier 656 /// Check if there are still pending events for this identifier
607 fn has_pending_events(&self, identifier: &str) -> bool; 657 fn has_pending_events(&self, identifier: &str) -> bool;
@@ -612,8 +662,62 @@ pub trait SyncContext: Send + Sync {
612 /// Our domain (to exclude from clone URLs) 662 /// Our domain (to exclude from clone URLs)
613 fn our_domain(&self) -> Option<&str>; 663 fn our_domain(&self) -> Option<&str>;
614} 664}
665
666/// Real implementation of SyncContext with all dependencies
667pub struct RealSyncContext {
668 purgatory: Purgatory,
669 database: SharedDatabase,
670 git_data_path: PathBuf,
671 our_domain: Option<String>,
672 local_relay: Option<nostr_relay_builder::LocalRelay>,
673}
674
675impl RealSyncContext {
676 pub fn new(
677 purgatory: Purgatory,
678 database: SharedDatabase,
679 git_data_path: PathBuf,
680 our_domain: Option<String>,
681 local_relay: Option<nostr_relay_builder::LocalRelay>,
682 ) -> Self {
683 Self {
684 purgatory,
685 database,
686 git_data_path,
687 our_domain,
688 local_relay,
689 }
690 }
691}
692
693#[async_trait]
694impl SyncContext for RealSyncContext {
695 // ... other methods ...
696
697 async fn process_satisfiable_events(&self, identifier: &str) -> Result<ProcessResult> {
698 // Get repository data and find source repo
699 let db_repo_data = fetch_repository_data(&self.database, identifier).await?;
700 let source_repo_path = self.find_target_repo(&db_repo_data)
701 .ok_or_else(|| anyhow::anyhow!("No target repo found"))?;
702
703 // Call the standalone function with all dependencies
704 process_satisfiable_events_impl(
705 identifier,
706 &source_repo_path,
707 &db_repo_data,
708 &self.git_data_path,
709 &self.database,
710 self.local_relay.as_ref(),
711 &self.purgatory,
712 ).await
713 }
714
715 // ... other methods ...
716}
615``` 717```
616 718
719**Note**: The `SyncContext` trait abstracts away the dependencies for testability. The real implementation (`RealSyncContext`) holds references to purgatory, database, etc., and the `process_satisfiable_events` method uses them internally. This keeps the sync logic functions (`sync_identifier_next_url`, `sync_identifier_from_url`) clean and testable with mocks.
720
617## Core Sync Logic 721## Core Sync Logic
618 722
619### Two-Function Design 723### Two-Function Design
@@ -871,6 +975,188 @@ pub async fn sync_identifier_from_url<C: SyncContext>(
871} 975}
872``` 976```
873 977
978### process_satisfiable_events
979
980This is the core function that handles the "release from purgatory" logic. It's called after each successful fetch to check if any purgatory events can now be satisfied with the available git data.
981
982**Key Design Decision**: OID copying and ref alignment happen in `process_satisfiable_events`, NOT after the entire sync completes. This enables:
983
9841. **Incremental progress**: Events can be released as soon as their OIDs are available, even if other events for the same identifier still need data
9852. **Partial success**: If we fetch OIDs for one state event but not another, the first can be released immediately
9863. **Cleaner separation**: `sync_identifier_from_url` only fetches; `process_satisfiable_events` handles all the "what to do with the data" logic
987
988```rust
989/// Result of processing satisfiable events
990#[derive(Debug, Default)]
991pub struct ProcessResult {
992 /// Number of state events released from purgatory
993 pub states_released: usize,
994 /// Number of PR events released from purgatory
995 pub prs_released: usize,
996 /// Number of repositories synced (OIDs copied + refs aligned)
997 pub repos_synced: usize,
998 /// Errors encountered
999 pub errors: Vec<String>,
1000}
1001
1002/// Process purgatory events that can now be satisfied with available git data.
1003///
1004/// This function is called after each successful OID fetch. It:
1005/// 1. Iterates through all purgatory events for this identifier
1006/// 2. For each event, checks if all required OIDs are now available
1007/// 3. For satisfiable events, performs the full "release" workflow
1008///
1009/// The release workflow for STATE events:
1010/// 1. Check authorization: is this state author authorized by any owner's maintainer set?
1011/// 2. Check priority: is this state newer than existing states for those owners?
1012/// 3. Copy OIDs to all authorized owner repos (using sync_to_owner_repos logic)
1013/// 4. Align refs with state in each authorized repo
1014/// 5. Save state event to database
1015/// 6. Notify WebSocket subscribers
1016/// 7. Remove from purgatory
1017///
1018/// The release workflow for PR events:
1019/// 1. Copy PR commit to owner repos that share maintainers with tagged owners
1020/// 2. Create refs/nostr/<event-id> in each repo
1021/// 3. Save PR event to database
1022/// 4. Notify WebSocket subscribers
1023/// 5. Remove from purgatory
1024///
1025/// Note: This is the implementation function called by RealSyncContext.
1026/// The SyncContext trait method has a simpler signature because the
1027/// implementation has access to all dependencies via self.
1028pub async fn process_satisfiable_events_impl(
1029 identifier: &str,
1030 source_repo_path: &Path,
1031 db_repo_data: &RepositoryData,
1032 git_data_path: &Path,
1033 database: &SharedDatabase,
1034 local_relay: Option<&nostr_relay_builder::LocalRelay>,
1035 purgatory: &Purgatory,
1036) -> Result<ProcessResult> {
1037 let mut result = ProcessResult::default();
1038
1039 // Process state events
1040 let state_entries = purgatory.find_state(identifier);
1041 for entry in state_entries {
1042 // Parse the state event
1043 let state = match RepositoryState::from_event(&entry.event) {
1044 Ok(s) => s,
1045 Err(e) => {
1046 tracing::warn!(
1047 event_id = %entry.event.id,
1048 error = %e,
1049 "Failed to parse state event from purgatory"
1050 );
1051 continue;
1052 }
1053 };
1054
1055 // Check if all OIDs are available in the source repo
1056 let missing_oids = identify_missing_oids(&state, source_repo_path);
1057 if !missing_oids.is_empty() {
1058 tracing::debug!(
1059 event_id = %entry.event.id,
1060 missing = missing_oids.len(),
1061 "State event still missing OIDs, skipping"
1062 );
1063 continue;
1064 }
1065
1066 // All OIDs available - proceed with release
1067 tracing::info!(
1068 identifier = %identifier,
1069 event_id = %entry.event.id,
1070 "All OIDs available, releasing state event from purgatory"
1071 );
1072
1073 // Sync to owner repos (copy OIDs + align refs)
1074 // This handles authorization checks internally
1075 let sync_result = sync_to_owner_repos(
1076 source_repo_path,
1077 &state,
1078 db_repo_data,
1079 git_data_path,
1080 );
1081 result.repos_synced += sync_result.repos_synced;
1082
1083 if sync_result.repos_synced == 0 {
1084 tracing::warn!(
1085 identifier = %identifier,
1086 event_id = %entry.event.id,
1087 "No repos synced - state author may not be authorized"
1088 );
1089 // Don't remove from purgatory - maybe authorization will change
1090 continue;
1091 }
1092
1093 // Save to database
1094 match database.save_event(&entry.event).await {
1095 Ok(_) => {
1096 tracing::info!(
1097 identifier = %identifier,
1098 event_id = %entry.event.id,
1099 "Saved state event to database"
1100 );
1101
1102 // Notify WebSocket subscribers
1103 if let Some(relay) = local_relay {
1104 relay.notify_event(entry.event.clone());
1105 }
1106
1107 // Remove from purgatory
1108 purgatory.remove_state_event(identifier, &entry.event.id);
1109 result.states_released += 1;
1110 }
1111 Err(e) => {
1112 tracing::warn!(
1113 event_id = %entry.event.id,
1114 error = %e,
1115 "Failed to save state event to database"
1116 );
1117 result.errors.push(format!("Failed to save state {}: {}", entry.event.id, e));
1118 }
1119 }
1120 }
1121
1122 // TODO: Process PR events similarly
1123 // For now, PR events are handled separately
1124
1125 Ok(result)
1126}
1127
1128/// Identify OIDs in a state that are missing from a repository
1129fn identify_missing_oids(state: &RepositoryState, repo_path: &Path) -> Vec<String> {
1130 let mut missing = Vec::new();
1131
1132 for branch in &state.branches {
1133 if !branch.commit.starts_with("ref: ") && !oid_exists(repo_path, &branch.commit) {
1134 missing.push(branch.commit.clone());
1135 }
1136 }
1137
1138 for tag in &state.tags {
1139 if !tag.commit.starts_with("ref: ") && !oid_exists(repo_path, &tag.commit) {
1140 missing.push(tag.commit.clone());
1141 }
1142 }
1143
1144 missing
1145}
1146```
1147
1148**Why this design?**
1149
1150The key insight is that `process_satisfiable_events` is called after *each* successful URL fetch, not just at the end of the sync. This means:
1151
11521. **Early release**: If we fetch from `server1.com` and get all OIDs for state event A, we immediately release A even if state event B still needs OIDs from `server2.com`
1153
11542. **Idempotent**: The function can be called multiple times safely. It only processes events that are actually satisfiable.
1155
11563. **Atomic per-event**: Each event is processed independently. If saving one event fails, others can still succeed.
1157
11584. **Authorization at release time**: We check authorization when releasing, not when adding to purgatory. This handles the case where maintainer sets change while an event is in purgatory.
1159
874### The Sync Identifier Loop (Main Sync) 1160### The Sync Identifier Loop (Main Sync)
875 1161
876```rust 1162```rust
@@ -1087,9 +1373,9 @@ mod tests {
1087 Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) 1373 Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default())
1088 } 1374 }
1089 1375
1090 async fn process_satisfiable_events(&self, _id: &str) -> Result<()> { 1376 async fn process_satisfiable_events(&self, _id: &str) -> Result<ProcessResult> {
1091 *self.processed_count.borrow_mut() += 1; 1377 *self.processed_count.borrow_mut() += 1;
1092 Ok(()) 1378 Ok(ProcessResult::default())
1093 } 1379 }
1094 1380
1095 fn has_pending_events(&self, _id: &str) -> bool { 1381 fn has_pending_events(&self, _id: &str) -> bool {