diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 08:39:19 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-07 08:39:19 +0000 |
| commit | da0f173bf4b68928c6b4e3e720562d0b4c0775ac (patch) | |
| tree | 937e590b188e0c500c429b675583140223ab55c3 /docs/explanation | |
| parent | 33b716cf9eb88d95394423b77b779009056d4d5a (diff) | |
docs: purgatory design improve process_satisfiable_events
Diffstat (limited to 'docs/explanation')
| -rw-r--r-- | docs/explanation/purgatory-sync-redesign.md | 294 |
1 files changed, 290 insertions, 4 deletions
diff --git a/docs/explanation/purgatory-sync-redesign.md b/docs/explanation/purgatory-sync-redesign.md index 382f683..6ff94bb 100644 --- a/docs/explanation/purgatory-sync-redesign.md +++ b/docs/explanation/purgatory-sync-redesign.md | |||
| @@ -27,6 +27,40 @@ Redesign purgatory sync to be **identifier-based** rather than **event-based**, | |||
| 27 | 5. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) | 27 | 5. Debouncing for burst event arrivals (500ms for sync-triggered, 3min default) |
| 28 | 6. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs | 28 | 6. **Clean separation of concerns**: Domain throttle handles rate limiting only; sync logic tracks its own tried URLs |
| 29 | 29 | ||
| 30 | ### Key Design Decision: Where Does OID Copying Happen? | ||
| 31 | |||
| 32 | **Answer: In `process_satisfiable_events`, NOT after the entire sync completes.** | ||
| 33 | |||
| 34 | The current implementation (`sync_state_git_data`) fetches all OIDs first, then at the end: | ||
| 35 | 1. Copies OIDs to all authorized owner repos | ||
| 36 | 2. Aligns refs with state | ||
| 37 | 3. Saves to database | ||
| 38 | 4. Notifies subscribers | ||
| 39 | 5. Removes from purgatory | ||
| 40 | |||
| 41 | The redesign moves all of this into `process_satisfiable_events`, which is called after **each successful URL fetch**. This enables: | ||
| 42 | |||
| 43 | | Aspect | Current (end-of-sync) | Redesign (per-fetch) | | ||
| 44 | |--------|----------------------|---------------------| | ||
| 45 | | **When events release** | Only after all URLs tried | As soon as OIDs available | | ||
| 46 | | **Partial success** | All or nothing per event | Events release independently | | ||
| 47 | | **Multiple state events** | All wait for slowest | Each releases when ready | | ||
| 48 | | **Authorization check** | Once at start | At release time (handles changes) | | ||
| 49 | |||
| 50 | **Why this matters:** | ||
| 51 | |||
| 52 | Consider syncing an identifier with 3 state events from different maintainers: | ||
| 53 | - State A needs OIDs from `server1.com` (fast) | ||
| 54 | - State B needs OIDs from `server2.com` (slow) | ||
| 55 | - State C needs OIDs from `server3.com` (down) | ||
| 56 | |||
| 57 | With the redesign: | ||
| 58 | 1. Fetch from `server1.com` succeeds → `process_satisfiable_events` releases State A immediately | ||
| 59 | 2. Fetch from `server2.com` succeeds → `process_satisfiable_events` releases State B | ||
| 60 | 3. Fetch from `server3.com` fails → State C stays in purgatory, retries with backoff | ||
| 61 | |||
| 62 | The current implementation would wait for all servers before releasing any events. | ||
| 63 | |||
| 30 | ## Architecture | 64 | ## Architecture |
| 31 | 65 | ||
| 32 | ### Overview | 66 | ### Overview |
| @@ -600,8 +634,24 @@ pub trait SyncContext: Send + Sync { | |||
| 600 | /// Fetch OIDs from a remote server | 634 | /// Fetch OIDs from a remote server |
| 601 | async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>; | 635 | async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String]) -> Result<Vec<String>>; |
| 602 | 636 | ||
| 603 | /// Process events that can now be satisfied (save to DB, notify, remove from purgatory) | 637 | /// Process events that can now be satisfied. |
| 604 | async fn process_satisfiable_events(&self, identifier: &str) -> Result<()>; | 638 | /// |
| 639 | /// For each purgatory event (state or PR) for this identifier: | ||
| 640 | /// 1. Check if all required OIDs are now available in the source repo | ||
| 641 | /// 2. For satisfiable state events: | ||
| 642 | /// a. Check if this state is authorized and should be applied (vs existing states) | ||
| 643 | /// b. Copy OIDs to all owner repos that authorize this state author | ||
| 644 | /// c. Align refs with state in each authorized repo | ||
| 645 | /// d. Save state event to database | ||
| 646 | /// e. Notify WebSocket subscribers | ||
| 647 | /// f. Remove from purgatory | ||
| 648 | /// 3. For satisfiable PR events: | ||
| 649 | /// a. Copy PR commit to owner repos that share maintainers with tagged owners | ||
| 650 | /// b. Create refs/nostr/<event-id> in each repo | ||
| 651 | /// c. Save PR event to database | ||
| 652 | /// d. Notify WebSocket subscribers | ||
| 653 | /// e. Remove from purgatory | ||
| 654 | async fn process_satisfiable_events(&self, identifier: &str) -> Result<ProcessResult>; | ||
| 605 | 655 | ||
| 606 | /// Check if there are still pending events for this identifier | 656 | /// Check if there are still pending events for this identifier |
| 607 | fn has_pending_events(&self, identifier: &str) -> bool; | 657 | fn has_pending_events(&self, identifier: &str) -> bool; |
| @@ -612,8 +662,62 @@ pub trait SyncContext: Send + Sync { | |||
| 612 | /// Our domain (to exclude from clone URLs) | 662 | /// Our domain (to exclude from clone URLs) |
| 613 | fn our_domain(&self) -> Option<&str>; | 663 | fn our_domain(&self) -> Option<&str>; |
| 614 | } | 664 | } |
| 665 | |||
| 666 | /// Real implementation of SyncContext with all dependencies | ||
| 667 | pub struct RealSyncContext { | ||
| 668 | purgatory: Purgatory, | ||
| 669 | database: SharedDatabase, | ||
| 670 | git_data_path: PathBuf, | ||
| 671 | our_domain: Option<String>, | ||
| 672 | local_relay: Option<nostr_relay_builder::LocalRelay>, | ||
| 673 | } | ||
| 674 | |||
| 675 | impl RealSyncContext { | ||
| 676 | pub fn new( | ||
| 677 | purgatory: Purgatory, | ||
| 678 | database: SharedDatabase, | ||
| 679 | git_data_path: PathBuf, | ||
| 680 | our_domain: Option<String>, | ||
| 681 | local_relay: Option<nostr_relay_builder::LocalRelay>, | ||
| 682 | ) -> Self { | ||
| 683 | Self { | ||
| 684 | purgatory, | ||
| 685 | database, | ||
| 686 | git_data_path, | ||
| 687 | our_domain, | ||
| 688 | local_relay, | ||
| 689 | } | ||
| 690 | } | ||
| 691 | } | ||
| 692 | |||
| 693 | #[async_trait] | ||
| 694 | impl SyncContext for RealSyncContext { | ||
| 695 | // ... other methods ... | ||
| 696 | |||
| 697 | async fn process_satisfiable_events(&self, identifier: &str) -> Result<ProcessResult> { | ||
| 698 | // Get repository data and find source repo | ||
| 699 | let db_repo_data = fetch_repository_data(&self.database, identifier).await?; | ||
| 700 | let source_repo_path = self.find_target_repo(&db_repo_data) | ||
| 701 | .ok_or_else(|| anyhow::anyhow!("No target repo found"))?; | ||
| 702 | |||
| 703 | // Call the standalone function with all dependencies | ||
| 704 | process_satisfiable_events_impl( | ||
| 705 | identifier, | ||
| 706 | &source_repo_path, | ||
| 707 | &db_repo_data, | ||
| 708 | &self.git_data_path, | ||
| 709 | &self.database, | ||
| 710 | self.local_relay.as_ref(), | ||
| 711 | &self.purgatory, | ||
| 712 | ).await | ||
| 713 | } | ||
| 714 | |||
| 715 | // ... other methods ... | ||
| 716 | } | ||
| 615 | ``` | 717 | ``` |
| 616 | 718 | ||
| 719 | **Note**: The `SyncContext` trait abstracts away the dependencies for testability. The real implementation (`RealSyncContext`) holds references to purgatory, database, etc., and the `process_satisfiable_events` method uses them internally. This keeps the sync logic functions (`sync_identifier_next_url`, `sync_identifier_from_url`) clean and testable with mocks. | ||
| 720 | |||
| 617 | ## Core Sync Logic | 721 | ## Core Sync Logic |
| 618 | 722 | ||
| 619 | ### Two-Function Design | 723 | ### Two-Function Design |
| @@ -871,6 +975,188 @@ pub async fn sync_identifier_from_url<C: SyncContext>( | |||
| 871 | } | 975 | } |
| 872 | ``` | 976 | ``` |
| 873 | 977 | ||
| 978 | ### process_satisfiable_events | ||
| 979 | |||
| 980 | This is the core function that handles the "release from purgatory" logic. It's called after each successful fetch to check if any purgatory events can now be satisfied with the available git data. | ||
| 981 | |||
| 982 | **Key Design Decision**: OID copying and ref alignment happen in `process_satisfiable_events`, NOT after the entire sync completes. This enables: | ||
| 983 | |||
| 984 | 1. **Incremental progress**: Events can be released as soon as their OIDs are available, even if other events for the same identifier still need data | ||
| 985 | 2. **Partial success**: If we fetch OIDs for one state event but not another, the first can be released immediately | ||
| 986 | 3. **Cleaner separation**: `sync_identifier_from_url` only fetches; `process_satisfiable_events` handles all the "what to do with the data" logic | ||
| 987 | |||
| 988 | ```rust | ||
| 989 | /// Result of processing satisfiable events | ||
| 990 | #[derive(Debug, Default)] | ||
| 991 | pub struct ProcessResult { | ||
| 992 | /// Number of state events released from purgatory | ||
| 993 | pub states_released: usize, | ||
| 994 | /// Number of PR events released from purgatory | ||
| 995 | pub prs_released: usize, | ||
| 996 | /// Number of repositories synced (OIDs copied + refs aligned) | ||
| 997 | pub repos_synced: usize, | ||
| 998 | /// Errors encountered | ||
| 999 | pub errors: Vec<String>, | ||
| 1000 | } | ||
| 1001 | |||
| 1002 | /// Process purgatory events that can now be satisfied with available git data. | ||
| 1003 | /// | ||
| 1004 | /// This function is called after each successful OID fetch. It: | ||
| 1005 | /// 1. Iterates through all purgatory events for this identifier | ||
| 1006 | /// 2. For each event, checks if all required OIDs are now available | ||
| 1007 | /// 3. For satisfiable events, performs the full "release" workflow | ||
| 1008 | /// | ||
| 1009 | /// The release workflow for STATE events: | ||
| 1010 | /// 1. Check authorization: is this state author authorized by any owner's maintainer set? | ||
| 1011 | /// 2. Check priority: is this state newer than existing states for those owners? | ||
| 1012 | /// 3. Copy OIDs to all authorized owner repos (using sync_to_owner_repos logic) | ||
| 1013 | /// 4. Align refs with state in each authorized repo | ||
| 1014 | /// 5. Save state event to database | ||
| 1015 | /// 6. Notify WebSocket subscribers | ||
| 1016 | /// 7. Remove from purgatory | ||
| 1017 | /// | ||
| 1018 | /// The release workflow for PR events: | ||
| 1019 | /// 1. Copy PR commit to owner repos that share maintainers with tagged owners | ||
| 1020 | /// 2. Create refs/nostr/<event-id> in each repo | ||
| 1021 | /// 3. Save PR event to database | ||
| 1022 | /// 4. Notify WebSocket subscribers | ||
| 1023 | /// 5. Remove from purgatory | ||
| 1024 | /// | ||
| 1025 | /// Note: This is the implementation function called by RealSyncContext. | ||
| 1026 | /// The SyncContext trait method has a simpler signature because the | ||
| 1027 | /// implementation has access to all dependencies via self. | ||
| 1028 | pub async fn process_satisfiable_events_impl( | ||
| 1029 | identifier: &str, | ||
| 1030 | source_repo_path: &Path, | ||
| 1031 | db_repo_data: &RepositoryData, | ||
| 1032 | git_data_path: &Path, | ||
| 1033 | database: &SharedDatabase, | ||
| 1034 | local_relay: Option<&nostr_relay_builder::LocalRelay>, | ||
| 1035 | purgatory: &Purgatory, | ||
| 1036 | ) -> Result<ProcessResult> { | ||
| 1037 | let mut result = ProcessResult::default(); | ||
| 1038 | |||
| 1039 | // Process state events | ||
| 1040 | let state_entries = purgatory.find_state(identifier); | ||
| 1041 | for entry in state_entries { | ||
| 1042 | // Parse the state event | ||
| 1043 | let state = match RepositoryState::from_event(&entry.event) { | ||
| 1044 | Ok(s) => s, | ||
| 1045 | Err(e) => { | ||
| 1046 | tracing::warn!( | ||
| 1047 | event_id = %entry.event.id, | ||
| 1048 | error = %e, | ||
| 1049 | "Failed to parse state event from purgatory" | ||
| 1050 | ); | ||
| 1051 | continue; | ||
| 1052 | } | ||
| 1053 | }; | ||
| 1054 | |||
| 1055 | // Check if all OIDs are available in the source repo | ||
| 1056 | let missing_oids = identify_missing_oids(&state, source_repo_path); | ||
| 1057 | if !missing_oids.is_empty() { | ||
| 1058 | tracing::debug!( | ||
| 1059 | event_id = %entry.event.id, | ||
| 1060 | missing = missing_oids.len(), | ||
| 1061 | "State event still missing OIDs, skipping" | ||
| 1062 | ); | ||
| 1063 | continue; | ||
| 1064 | } | ||
| 1065 | |||
| 1066 | // All OIDs available - proceed with release | ||
| 1067 | tracing::info!( | ||
| 1068 | identifier = %identifier, | ||
| 1069 | event_id = %entry.event.id, | ||
| 1070 | "All OIDs available, releasing state event from purgatory" | ||
| 1071 | ); | ||
| 1072 | |||
| 1073 | // Sync to owner repos (copy OIDs + align refs) | ||
| 1074 | // This handles authorization checks internally | ||
| 1075 | let sync_result = sync_to_owner_repos( | ||
| 1076 | source_repo_path, | ||
| 1077 | &state, | ||
| 1078 | db_repo_data, | ||
| 1079 | git_data_path, | ||
| 1080 | ); | ||
| 1081 | result.repos_synced += sync_result.repos_synced; | ||
| 1082 | |||
| 1083 | if sync_result.repos_synced == 0 { | ||
| 1084 | tracing::warn!( | ||
| 1085 | identifier = %identifier, | ||
| 1086 | event_id = %entry.event.id, | ||
| 1087 | "No repos synced - state author may not be authorized" | ||
| 1088 | ); | ||
| 1089 | // Don't remove from purgatory - maybe authorization will change | ||
| 1090 | continue; | ||
| 1091 | } | ||
| 1092 | |||
| 1093 | // Save to database | ||
| 1094 | match database.save_event(&entry.event).await { | ||
| 1095 | Ok(_) => { | ||
| 1096 | tracing::info!( | ||
| 1097 | identifier = %identifier, | ||
| 1098 | event_id = %entry.event.id, | ||
| 1099 | "Saved state event to database" | ||
| 1100 | ); | ||
| 1101 | |||
| 1102 | // Notify WebSocket subscribers | ||
| 1103 | if let Some(relay) = local_relay { | ||
| 1104 | relay.notify_event(entry.event.clone()); | ||
| 1105 | } | ||
| 1106 | |||
| 1107 | // Remove from purgatory | ||
| 1108 | purgatory.remove_state_event(identifier, &entry.event.id); | ||
| 1109 | result.states_released += 1; | ||
| 1110 | } | ||
| 1111 | Err(e) => { | ||
| 1112 | tracing::warn!( | ||
| 1113 | event_id = %entry.event.id, | ||
| 1114 | error = %e, | ||
| 1115 | "Failed to save state event to database" | ||
| 1116 | ); | ||
| 1117 | result.errors.push(format!("Failed to save state {}: {}", entry.event.id, e)); | ||
| 1118 | } | ||
| 1119 | } | ||
| 1120 | } | ||
| 1121 | |||
| 1122 | // TODO: Process PR events similarly | ||
| 1123 | // For now, PR events are handled separately | ||
| 1124 | |||
| 1125 | Ok(result) | ||
| 1126 | } | ||
| 1127 | |||
| 1128 | /// Identify OIDs in a state that are missing from a repository | ||
| 1129 | fn identify_missing_oids(state: &RepositoryState, repo_path: &Path) -> Vec<String> { | ||
| 1130 | let mut missing = Vec::new(); | ||
| 1131 | |||
| 1132 | for branch in &state.branches { | ||
| 1133 | if !branch.commit.starts_with("ref: ") && !oid_exists(repo_path, &branch.commit) { | ||
| 1134 | missing.push(branch.commit.clone()); | ||
| 1135 | } | ||
| 1136 | } | ||
| 1137 | |||
| 1138 | for tag in &state.tags { | ||
| 1139 | if !tag.commit.starts_with("ref: ") && !oid_exists(repo_path, &tag.commit) { | ||
| 1140 | missing.push(tag.commit.clone()); | ||
| 1141 | } | ||
| 1142 | } | ||
| 1143 | |||
| 1144 | missing | ||
| 1145 | } | ||
| 1146 | ``` | ||
| 1147 | |||
| 1148 | **Why this design?** | ||
| 1149 | |||
| 1150 | The key insight is that `process_satisfiable_events` is called after *each* successful URL fetch, not just at the end of the sync. This means: | ||
| 1151 | |||
| 1152 | 1. **Early release**: If we fetch from `server1.com` and get all OIDs for state event A, we immediately release A even if state event B still needs OIDs from `server2.com` | ||
| 1153 | |||
| 1154 | 2. **Idempotent**: The function can be called multiple times safely. It only processes events that are actually satisfiable. | ||
| 1155 | |||
| 1156 | 3. **Atomic per-event**: Each event is processed independently. If saving one event fails, others can still succeed. | ||
| 1157 | |||
| 1158 | 4. **Authorization at release time**: We check authorization when releasing, not when adding to purgatory. This handles the case where maintainer sets change while an event is in purgatory. | ||
| 1159 | |||
| 874 | ### The Sync Identifier Loop (Main Sync) | 1160 | ### The Sync Identifier Loop (Main Sync) |
| 875 | 1161 | ||
| 876 | ```rust | 1162 | ```rust |
| @@ -1087,9 +1373,9 @@ mod tests { | |||
| 1087 | Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) | 1373 | Ok(self.fetch_results.borrow().get(url).cloned().unwrap_or_default()) |
| 1088 | } | 1374 | } |
| 1089 | 1375 | ||
| 1090 | async fn process_satisfiable_events(&self, _id: &str) -> Result<()> { | 1376 | async fn process_satisfiable_events(&self, _id: &str) -> Result<ProcessResult> { |
| 1091 | *self.processed_count.borrow_mut() += 1; | 1377 | *self.processed_count.borrow_mut() += 1; |
| 1092 | Ok(()) | 1378 | Ok(ProcessResult::default()) |
| 1093 | } | 1379 | } |
| 1094 | 1380 | ||
| 1095 | fn has_pending_events(&self, _id: &str) -> bool { | 1381 | fn has_pending_events(&self, _id: &str) -> bool { |