upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/git/authorization.rs4
-rw-r--r--src/git/process.rs66
-rw-r--r--src/git/sync.rs40
-rw-r--r--src/main.rs4
-rw-r--r--src/nostr/builder.rs9
-rw-r--r--src/nostr/policy/pr_event.rs1
-rw-r--r--src/nostr/policy/state.rs10
-rw-r--r--src/purgatory/helpers.rs22
-rw-r--r--src/purgatory/mod.rs3
-rw-r--r--src/purgatory/sync/context.rs19
-rw-r--r--src/purgatory/sync/functions.rs69
-rw-r--r--src/purgatory/sync/loop.rs5
-rw-r--r--src/purgatory/sync/throttle.rs31
-rw-r--r--src/sync/mod.rs4
14 files changed, 150 insertions, 137 deletions
diff --git a/src/git/authorization.rs b/src/git/authorization.rs
index fbddb98..7502a52 100644
--- a/src/git/authorization.rs
+++ b/src/git/authorization.rs
@@ -134,11 +134,11 @@ pub async fn authorize_push(
134 e 134 e
135 ))); 135 )));
136 } 136 }
137 137
138 // Create placeholder for git-data-first scenario 138 // Create placeholder for git-data-first scenario
139 // This allows cleanup if the PR event never arrives 139 // This allows cleanup if the PR event never arrives
140 purgatory.add_pr_placeholder(event_id_hex.to_string(), new_oid.clone()); 140 purgatory.add_pr_placeholder(event_id_hex.to_string(), new_oid.clone());
141 141
142 debug!( 142 debug!(
143 "Created placeholder for {} - awaiting PR event (will expire in 30min if event doesn't arrive)", 143 "Created placeholder for {} - awaiting PR event (will expire in 30min if event doesn't arrive)",
144 event_id_hex 144 event_id_hex
diff --git a/src/git/process.rs b/src/git/process.rs
index d052c04..215b423 100644
--- a/src/git/process.rs
+++ b/src/git/process.rs
@@ -6,12 +6,15 @@
6//! - When events are released from purgatory (purgatory sync) 6//! - When events are released from purgatory (purgatory sync)
7//! - When git pushes trigger purgatory releases (receive-pack handler) 7//! - When git pushes trigger purgatory releases (receive-pack handler)
8 8
9use std::path::Path;
10use nostr_sdk::Event;
11use crate::git::authorization::{collect_authorized_maintainers, RepositoryData};
12use crate::git::sync::{align_repository_with_state, sync_pr_refs_to_tagged_owner_repos, copy_missing_oids_between_repos};
13use crate::git; 9use crate::git;
10use crate::git::authorization::{collect_authorized_maintainers, RepositoryData};
11use crate::git::sync::{
12 align_repository_with_state, copy_missing_oids_between_repos,
13 sync_pr_refs_to_tagged_owner_repos,
14};
14use crate::nostr::events::RepositoryState; 15use crate::nostr::events::RepositoryState;
16use nostr_sdk::Event;
17use std::path::Path;
15 18
16/// Result of processing a state event with git data 19/// Result of processing a state event with git data
17#[derive(Debug, Default, Clone)] 20#[derive(Debug, Default, Clone)]
@@ -68,19 +71,19 @@ pub fn process_state_with_git_data(
68 git_data_path: &Path, 71 git_data_path: &Path,
69) -> ProcessStateResult { 72) -> ProcessStateResult {
70 let mut result = ProcessStateResult::default(); 73 let mut result = ProcessStateResult::default();
71 74
72 let state_author = state.event.pubkey.to_hex(); 75 let state_author = state.event.pubkey.to_hex();
73 76
74 // Collect authorized maintainers per owner 77 // Collect authorized maintainers per owner
75 let by_owner = collect_authorized_maintainers(&db_repo_data.announcements); 78 let by_owner = collect_authorized_maintainers(&db_repo_data.announcements);
76 79
77 // Step 1: Identify owner repos that the state event author is maintainer for 80 // Step 1: Identify owner repos that the state event author is maintainer for
78 let authorized_owners: Vec<&String> = by_owner 81 let authorized_owners: Vec<&String> = by_owner
79 .iter() 82 .iter()
80 .filter(|(_, maintainers)| maintainers.contains(&state_author)) 83 .filter(|(_, maintainers)| maintainers.contains(&state_author))
81 .map(|(owner, _)| owner) 84 .map(|(owner, _)| owner)
82 .collect(); 85 .collect();
83 86
84 if authorized_owners.is_empty() { 87 if authorized_owners.is_empty() {
85 tracing::debug!( 88 tracing::debug!(
86 identifier = %state.identifier, 89 identifier = %state.identifier,
@@ -89,18 +92,18 @@ pub fn process_state_with_git_data(
89 ); 92 );
90 return result; 93 return result;
91 } 94 }
92 95
93 // Process each owner repo that authorizes this state event author 96 // Process each owner repo that authorizes this state event author
94 for owner in &authorized_owners { 97 for owner in &authorized_owners {
95 let maintainers = by_owner.get(*owner).unwrap(); 98 let maintainers = by_owner.get(*owner).unwrap();
96 99
97 // Step 2: Check if this state event is the latest authorized for this owner 100 // Step 2: Check if this state event is the latest authorized for this owner
98 let is_latest = crate::git::sync::is_latest_authorized_state_public( 101 let is_latest = crate::git::sync::is_latest_authorized_state_public(
99 state, 102 state,
100 maintainers, 103 maintainers,
101 &db_repo_data.states, 104 &db_repo_data.states,
102 ); 105 );
103 106
104 if !is_latest { 107 if !is_latest {
105 tracing::debug!( 108 tracing::debug!(
106 identifier = %state.identifier, 109 identifier = %state.identifier,
@@ -109,7 +112,7 @@ pub fn process_state_with_git_data(
109 ); 112 );
110 continue; 113 continue;
111 } 114 }
112 115
113 // Find the announcement for this owner 116 // Find the announcement for this owner
114 let Some(announcement) = db_repo_data 117 let Some(announcement) = db_repo_data
115 .announcements 118 .announcements
@@ -118,9 +121,9 @@ pub fn process_state_with_git_data(
118 else { 121 else {
119 continue; 122 continue;
120 }; 123 };
121 124
122 let target_repo_path = git_data_path.join(announcement.repo_path()); 125 let target_repo_path = git_data_path.join(announcement.repo_path());
123 126
124 // Step 3: Check git repo exists for that owner 127 // Step 3: Check git repo exists for that owner
125 if !target_repo_path.exists() { 128 if !target_repo_path.exists() {
126 tracing::debug!( 129 tracing::debug!(
@@ -131,14 +134,12 @@ pub fn process_state_with_git_data(
131 ); 134 );
132 continue; 135 continue;
133 } 136 }
134 137
135 // Step 4: Copy all required OIDs to that repo (unless it's source_repo_path) 138 // Step 4: Copy all required OIDs to that repo (unless it's source_repo_path)
136 if target_repo_path != source_repo_path { 139 if target_repo_path != source_repo_path {
137 if let Err(e) = copy_missing_oids_between_repos( 140 if let Err(e) =
138 source_repo_path, 141 copy_missing_oids_between_repos(source_repo_path, &target_repo_path, state)
139 &target_repo_path, 142 {
140 state,
141 ) {
142 tracing::warn!( 143 tracing::warn!(
143 identifier = %state.identifier, 144 identifier = %state.identifier,
144 source = %source_repo_path.display(), 145 source = %source_repo_path.display(),
@@ -150,14 +151,14 @@ pub fn process_state_with_git_data(
150 continue; // Skip this owner repo 151 continue; // Skip this owner repo
151 } 152 }
152 } 153 }
153 154
154 // Step 5: Reset the git state in that repo to match the state event 155 // Step 5: Reset the git state in that repo to match the state event
155 let align_result = align_repository_with_state(&target_repo_path, state); 156 let align_result = align_repository_with_state(&target_repo_path, state);
156 result.repos_synced += 1; 157 result.repos_synced += 1;
157 result.refs_created += align_result.refs_created; 158 result.refs_created += align_result.refs_created;
158 result.refs_updated += align_result.refs_updated; 159 result.refs_updated += align_result.refs_updated;
159 result.refs_deleted += align_result.refs_deleted; 160 result.refs_deleted += align_result.refs_deleted;
160 161
161 tracing::info!( 162 tracing::info!(
162 identifier = %state.identifier, 163 identifier = %state.identifier,
163 owner = %owner, 164 owner = %owner,
@@ -169,7 +170,7 @@ pub fn process_state_with_git_data(
169 "Aligned repository with state" 170 "Aligned repository with state"
170 ); 171 );
171 } 172 }
172 173
173 result 174 result
174} 175}
175 176
@@ -205,13 +206,13 @@ pub fn process_pr_with_git_data(
205 source_owner_pubkey: &str, 206 source_owner_pubkey: &str,
206) -> ProcessPrResult { 207) -> ProcessPrResult {
207 let mut result = ProcessPrResult::default(); 208 let mut result = ProcessPrResult::default();
208 209
209 let event_id = event.id.to_hex(); 210 let event_id = event.id.to_hex();
210 211
211 // Sync PR ref to owner repos using tagged maintainer logic 212 // Sync PR ref to owner repos using tagged maintainer logic
212 let pr_refs = vec![(event_id.clone(), commit.to_string())]; 213 let pr_refs = vec![(event_id.clone(), commit.to_string())];
213 let pr_events = vec![event.clone()]; 214 let pr_events = vec![event.clone()];
214 215
215 let sync_result = sync_pr_refs_to_tagged_owner_repos( 216 let sync_result = sync_pr_refs_to_tagged_owner_repos(
216 source_repo_path, 217 source_repo_path,
217 &pr_refs, 218 &pr_refs,
@@ -222,13 +223,10 @@ pub fn process_pr_with_git_data(
222 ); 223 );
223 result.repos_synced += sync_result.repos_synced; 224 result.repos_synced += sync_result.repos_synced;
224 result.refs_created += sync_result.refs_created; 225 result.refs_created += sync_result.refs_created;
225 result.errors.extend( 226 result
226 sync_result 227 .errors
227 .errors 228 .extend(sync_result.errors.into_iter().map(|(_, e)| e));
228 .into_iter() 229
229 .map(|(_, e)| e),
230 );
231
232 // Create the ref in the source repo if it doesn't exist 230 // Create the ref in the source repo if it doesn't exist
233 let ref_name = format!("refs/nostr/{}", event_id); 231 let ref_name = format!("refs/nostr/{}", event_id);
234 if git::get_ref_commit(source_repo_path, &ref_name).is_none() { 232 if git::get_ref_commit(source_repo_path, &ref_name).is_none() {
@@ -250,6 +248,6 @@ pub fn process_pr_with_git_data(
250 ); 248 );
251 } 249 }
252 } 250 }
253 251
254 result 252 result
255} 253}
diff --git a/src/git/sync.rs b/src/git/sync.rs
index 5e2d3f2..06013a5 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -837,13 +837,27 @@ pub async fn process_newly_available_git_data(
837 ); 837 );
838 838
839 // Process state events from purgatory 839 // Process state events from purgatory
840 let state_result = 840 let state_result = process_purgatory_state_events(
841 process_purgatory_state_events(&identifier, source_repo_path, database, local_relay, purgatory, git_data_path).await; 841 &identifier,
842 source_repo_path,
843 database,
844 local_relay,
845 purgatory,
846 git_data_path,
847 )
848 .await;
842 result.merge(state_result); 849 result.merge(state_result);
843 850
844 // Process PR events from purgatory 851 // Process PR events from purgatory
845 let pr_result = 852 let pr_result = process_purgatory_pr_events(
846 process_purgatory_pr_events(&identifier, source_repo_path, database, local_relay, purgatory, git_data_path).await; 853 &identifier,
854 source_repo_path,
855 database,
856 local_relay,
857 purgatory,
858 git_data_path,
859 )
860 .await;
847 result.merge(pr_result); 861 result.merge(pr_result);
848 862
849 if result.released_any() { 863 if result.released_any() {
@@ -1113,7 +1127,9 @@ async fn process_purgatory_pr_events(
1113 error = %e, 1127 error = %e,
1114 "Failed to fetch repository data for PR events" 1128 "Failed to fetch repository data for PR events"
1115 ); 1129 );
1116 result.errors.push(format!("Failed to fetch repo data: {}", e)); 1130 result
1131 .errors
1132 .push(format!("Failed to fetch repo data: {}", e));
1117 return result; 1133 return result;
1118 } 1134 }
1119 }; 1135 };
@@ -1137,8 +1153,8 @@ async fn process_purgatory_pr_events(
1137 } 1153 }
1138 1154
1139 // Extract owner pubkey 1155 // Extract owner pubkey
1140 let owner_pubkey = extract_owner_from_repo_path(source_repo_path, git_data_path) 1156 let owner_pubkey =
1141 .unwrap_or_default(); 1157 extract_owner_from_repo_path(source_repo_path, git_data_path).unwrap_or_default();
1142 1158
1143 // Use unified processing function 1159 // Use unified processing function
1144 let process_result = crate::git::process::process_pr_with_git_data( 1160 let process_result = crate::git::process::process_pr_with_git_data(
@@ -1192,7 +1208,9 @@ async fn process_purgatory_pr_events(
1192 error = %e, 1208 error = %e,
1193 "Failed to save PR event to database" 1209 "Failed to save PR event to database"
1194 ); 1210 );
1195 result.errors.push(format!("Failed to save PR event: {}", e)); 1211 result
1212 .errors
1213 .push(format!("Failed to save PR event: {}", e));
1196 } 1214 }
1197 } 1215 }
1198 } 1216 }
@@ -1527,11 +1545,7 @@ mod tests {
1527 } 1545 }
1528 1546
1529 // Helper function to create a test state event with specific timestamp 1547 // Helper function to create a test state event with specific timestamp
1530 fn create_test_state_event( 1548 fn create_test_state_event(keys: &Keys, identifier: &str, created_at: u64) -> RepositoryState {
1531 keys: &Keys,
1532 identifier: &str,
1533 created_at: u64,
1534 ) -> RepositoryState {
1535 create_test_state_event_with_nonce(keys, identifier, created_at, "") 1549 create_test_state_event_with_nonce(keys, identifier, created_at, "")
1536 } 1550 }
1537 1551
diff --git a/src/main.rs b/src/main.rs
index 8b870dc..5e9e2d0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -146,7 +146,9 @@ async fn main() -> Result<()> {
146 throttle_manager.set_context(sync_ctx.clone()); 146 throttle_manager.set_context(sync_ctx.clone());
147 147
148 // Start the sync loop 148 // Start the sync loop
149 let _sync_loop_handle = purgatory.clone().start_sync_loop(sync_ctx, throttle_manager); 149 let _sync_loop_handle = purgatory
150 .clone()
151 .start_sync_loop(sync_ctx, throttle_manager);
150 info!("Purgatory sync loop started (1s interval)"); 152 info!("Purgatory sync loop started (1s interval)");
151 153
152 // Setup shutdown handler for purgatory cleanup 154 // Setup shutdown handler for purgatory cleanup
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 0e5c18a..81f7fbb 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -162,7 +162,11 @@ impl Nip34WritePolicy {
162 match self.state_policy.validate(event) { 162 match self.state_policy.validate(event) {
163 StateResult::Accept => { 163 StateResult::Accept => {
164 // Process state alignment asynchronously 164 // Process state alignment asynchronously
165 match self.state_policy.process_state_event(event, is_synced).await { 165 match self
166 .state_policy
167 .process_state_event(event, is_synced)
168 .await
169 {
166 Ok(poilicy_result) => poilicy_result, 170 Ok(poilicy_result) => poilicy_result,
167 Err(e) => { 171 Err(e) => {
168 tracing::warn!("Failed to process state event {}: {}", event_id_str, e); 172 tracing::warn!("Failed to process state event {}: {}", event_id_str, e);
@@ -247,7 +251,8 @@ impl Nip34WritePolicy {
247 ); 251 );
248 return WritePolicyResult::Reject { 252 return WritePolicyResult::Reject {
249 status: false, 253 status: false,
250 message: "invalid: previously expired from purgatory without git data".into(), 254 message: "invalid: previously expired from purgatory without git data"
255 .into(),
251 }; 256 };
252 } 257 }
253 258
diff --git a/src/nostr/policy/pr_event.rs b/src/nostr/policy/pr_event.rs
index 9942a6a..00e09c3 100644
--- a/src/nostr/policy/pr_event.rs
+++ b/src/nostr/policy/pr_event.rs
@@ -237,5 +237,4 @@ impl PrEventPolicy {
237 237
238 Ok(repo_paths) 238 Ok(repo_paths)
239 } 239 }
240
241} 240}
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index 7bbb379..acb76a3 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -9,8 +9,8 @@ use nostr_relay_builder::builder::WritePolicyResult;
9use nostr_relay_builder::prelude::Event; 9use nostr_relay_builder::prelude::Event;
10 10
11use super::PolicyContext; 11use super::PolicyContext;
12use crate::git::authorization::fetch_repository_data;
13use crate::git; 12use crate::git;
13use crate::git::authorization::fetch_repository_data;
14use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState}; 14use crate::nostr::events::{validate_state, RepositoryAnnouncement, RepositoryState};
15 15
16/// Result of state policy evaluation 16/// Result of state policy evaluation
@@ -48,7 +48,11 @@ impl StatePolicy {
48 /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) 48 /// * `is_synced` - True if this event came from proactive sync (vs user-submitted)
49 /// 49 ///
50 /// Returns the true if git data already availale or false if added to purgatory 50 /// Returns the true if git data already availale or false if added to purgatory
51 pub async fn process_state_event(&self, event: &Event, is_synced: bool) -> Result<WritePolicyResult> { 51 pub async fn process_state_event(
52 &self,
53 event: &Event,
54 is_synced: bool,
55 ) -> Result<WritePolicyResult> {
52 // Parse state to get HEAD and branch info 56 // Parse state to get HEAD and branch info
53 let state = 57 let state =
54 RepositoryState::from_event(event.clone()).context("Failed to parse state event")?; 58 RepositoryState::from_event(event.clone()).context("Failed to parse state event")?;
@@ -155,8 +159,6 @@ impl StatePolicy {
155 }) 159 })
156 } 160 }
157 } 161 }
158
159
160} 162}
161 163
162fn find_repo_with_git_data( 164fn find_repo_with_git_data(
diff --git a/src/purgatory/helpers.rs b/src/purgatory/helpers.rs
index 93dc378..193ef99 100644
--- a/src/purgatory/helpers.rs
+++ b/src/purgatory/helpers.rs
@@ -515,7 +515,7 @@ mod tests {
515 515
516 // Create a working repo to generate a commit 516 // Create a working repo to generate a commit
517 let work_dir = tempfile::tempdir().unwrap(); 517 let work_dir = tempfile::tempdir().unwrap();
518 518
519 Command::new("git") 519 Command::new("git")
520 .args(["init"]) 520 .args(["init"])
521 .current_dir(work_dir.path()) 521 .current_dir(work_dir.path())
@@ -585,7 +585,7 @@ mod tests {
585 use std::process::Command; 585 use std::process::Command;
586 586
587 let temp_dir = tempfile::tempdir().unwrap(); 587 let temp_dir = tempfile::tempdir().unwrap();
588 588
589 Command::new("git") 589 Command::new("git")
590 .args(["init", "--bare"]) 590 .args(["init", "--bare"])
591 .current_dir(temp_dir.path()) 591 .current_dir(temp_dir.path())
@@ -603,10 +603,7 @@ mod tests {
603 let commit_hash = commit_hash.expect("Should have a commit"); 603 let commit_hash = commit_hash.expect("Should have a commit");
604 604
605 // Create a state event referencing that commit 605 // Create a state event referencing that commit
606 let event = create_test_state_event( 606 let event = create_test_state_event("test-repo", vec![("refs/heads/main", &commit_hash)]);
607 "test-repo",
608 vec![("refs/heads/main", &commit_hash)],
609 );
610 607
611 // Should return true since the OID exists 608 // Should return true since the OID exists
612 assert!(can_apply_state(&event, repo_path)); 609 assert!(can_apply_state(&event, repo_path));
@@ -621,7 +618,10 @@ mod tests {
621 // Create a state event referencing a non-existent commit 618 // Create a state event referencing a non-existent commit
622 let event = create_test_state_event( 619 let event = create_test_state_event(
623 "test-repo", 620 "test-repo",
624 vec![("refs/heads/main", "0000000000000000000000000000000000000000")], 621 vec![(
622 "refs/heads/main",
623 "0000000000000000000000000000000000000000",
624 )],
625 ); 625 );
626 626
627 // Should return false since the OID doesn't exist 627 // Should return false since the OID doesn't exist
@@ -655,8 +655,8 @@ mod tests {
655 let event = create_test_state_event( 655 let event = create_test_state_event(
656 "test-repo", 656 "test-repo",
657 vec![ 657 vec![
658 ("refs/heads/main", &commit_hash), // exists 658 ("refs/heads/main", &commit_hash), // exists
659 ("refs/heads/dev", "0000000000000000000000000000000000000000"), // doesn't exist 659 ("refs/heads/dev", "0000000000000000000000000000000000000000"), // doesn't exist
660 ], 660 ],
661 ); 661 );
662 662
@@ -687,8 +687,8 @@ mod tests {
687 let event = create_test_state_event( 687 let event = create_test_state_event(
688 "test-repo", 688 "test-repo",
689 vec![ 689 vec![
690 ("refs/heads/main", &commit_hash), // real OID that exists 690 ("refs/heads/main", &commit_hash), // real OID that exists
691 ("refs/heads/alias", "ref: refs/heads/main"), // symbolic ref 691 ("refs/heads/alias", "ref: refs/heads/main"), // symbolic ref
692 ], 692 ],
693 ); 693 );
694 694
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index fe0a439..20df19b 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -1134,7 +1134,8 @@ fn test_cleanup_expired_events() {
1134 1134
1135 // Manually set event1's expiry time to be old 1135 // Manually set event1's expiry time to be old
1136 if let Some(mut entry) = purgatory.expired_events.get_mut(&event1_id) { 1136 if let Some(mut entry) = purgatory.expired_events.get_mut(&event1_id) {
1137 *entry.value_mut() = Instant::now() - Duration::from_secs(8 * 24 * 3600); // 8 days ago 1137 *entry.value_mut() = Instant::now() - Duration::from_secs(8 * 24 * 3600);
1138 // 8 days ago
1138 } 1139 }
1139 1140
1140 // Clean up expired events older than 7 days 1141 // Clean up expired events older than 7 days
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 2922f10..9e195c7 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -119,12 +119,8 @@ pub trait SyncContext: Send + Sync {
119 /// 119 ///
120 /// # Returns 120 /// # Returns
121 /// List of OIDs that were successfully fetched 121 /// List of OIDs that were successfully fetched
122 async fn fetch_oids( 122 async fn fetch_oids(&self, repo_path: &Path, url: &str, oids: &[String])
123 &self, 123 -> Result<Vec<String>>;
124 repo_path: &Path,
125 url: &str,
126 oids: &[String],
127 ) -> Result<Vec<String>>;
128 124
129 /// Process newly available git data. 125 /// Process newly available git data.
130 /// 126 ///
@@ -368,10 +364,7 @@ impl SyncContext for RealSyncContext {
368 .cloned() 364 .cloned()
369 .collect(); 365 .collect();
370 366
371 debug!( 367 debug!(fetched_count = fetched.len(), "Successfully fetched OIDs");
372 fetched_count = fetched.len(),
373 "Successfully fetched OIDs"
374 );
375 368
376 fetched 369 fetched
377 } 370 }
@@ -702,11 +695,7 @@ pub mod mock {
702 } 695 }
703 696
704 // Get OIDs this URL can provide 697 // Get OIDs this URL can provide
705 let provides = self 698 let provides = self.url_provides_oids.get(url).cloned().unwrap_or_default();
706 .url_provides_oids
707 .get(url)
708 .cloned()
709 .unwrap_or_default();
710 699
711 // Find which requested OIDs this URL can provide 700 // Find which requested OIDs this URL can provide
712 let fetched: Vec<String> = oids 701 let fetched: Vec<String> = oids
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index bb7c0b9..370990e 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -32,15 +32,17 @@ use super::throttle::ThrottleManager;
32fn extract_domain(url: &str) -> Option<String> { 32fn extract_domain(url: &str) -> Option<String> {
33 // Simple URL parsing for HTTP(S) URLs 33 // Simple URL parsing for HTTP(S) URLs
34 // Format: scheme://[user@]host[:port]/path 34 // Format: scheme://[user@]host[:port]/path
35 let url = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://"))?; 35 let url = url
36 36 .strip_prefix("https://")
37 .or_else(|| url.strip_prefix("http://"))?;
38
37 // Remove user info if present (e.g., "user@host" -> "host") 39 // Remove user info if present (e.g., "user@host" -> "host")
38 let url = url.split('@').next_back()?; 40 let url = url.split('@').next_back()?;
39 41
40 // Extract host (before first '/' or ':') 42 // Extract host (before first '/' or ':')
41 let host = url.split('/').next()?; 43 let host = url.split('/').next()?;
42 let host = host.split(':').next()?; 44 let host = host.split(':').next()?;
43 45
44 if host.is_empty() { 46 if host.is_empty() {
45 None 47 None
46 } else { 48 } else {
@@ -112,17 +114,17 @@ pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>(
112 114
113 // 4. Collect clone URLs from announcements AND PR events in purgatory 115 // 4. Collect clone URLs from announcements AND PR events in purgatory
114 let our_domain = ctx.our_domain(); 116 let our_domain = ctx.our_domain();
115 117
116 // Get clone URLs from repository announcements 118 // Get clone URLs from repository announcements
117 let announcement_urls: HashSet<String> = repo_data 119 let announcement_urls: HashSet<String> = repo_data
118 .announcements 120 .announcements
119 .iter() 121 .iter()
120 .flat_map(|a| a.clone_urls.iter().cloned()) 122 .flat_map(|a| a.clone_urls.iter().cloned())
121 .collect(); 123 .collect();
122 124
123 // Get clone URLs from PR events in purgatory 125 // Get clone URLs from PR events in purgatory
124 let pr_urls = ctx.collect_pr_clone_urls(identifier); 126 let pr_urls = ctx.collect_pr_clone_urls(identifier);
125 127
126 // Merge and filter out our domain 128 // Merge and filter out our domain
127 let all_urls: HashSet<String> = announcement_urls 129 let all_urls: HashSet<String> = announcement_urls
128 .union(&pr_urls) 130 .union(&pr_urls)
@@ -151,11 +153,9 @@ pub async fn sync_identifier_next_url<C: SyncContext + ?Sized>(
151 match domain { 153 match domain {
152 Some(specific_domain) => { 154 Some(specific_domain) => {
153 // Only look at URLs from this specific domain 155 // Only look at URLs from this specific domain
154 urls_by_domain.get(specific_domain).and_then(|urls| { 156 urls_by_domain
155 urls.iter() 157 .get(specific_domain)
156 .find(|url| !tried_urls.contains(*url)) 158 .and_then(|urls| urls.iter().find(|url| !tried_urls.contains(*url)).cloned())
157 .cloned()
158 })
159 } 159 }
160 None => { 160 None => {
161 // Try any non-throttled domain 161 // Try any non-throttled domain
@@ -217,17 +217,17 @@ pub async fn get_throttled_domains_with_untried_urls<C: SyncContext + ?Sized>(
217 }; 217 };
218 218
219 let our_domain = ctx.our_domain(); 219 let our_domain = ctx.our_domain();
220 220
221 // Get clone URLs from repository announcements 221 // Get clone URLs from repository announcements
222 let announcement_urls: HashSet<String> = repo_data 222 let announcement_urls: HashSet<String> = repo_data
223 .announcements 223 .announcements
224 .iter() 224 .iter()
225 .flat_map(|a| a.clone_urls.iter().cloned()) 225 .flat_map(|a| a.clone_urls.iter().cloned())
226 .collect(); 226 .collect();
227 227
228 // Get clone URLs from PR events in purgatory 228 // Get clone URLs from PR events in purgatory
229 let pr_urls = ctx.collect_pr_clone_urls(identifier); 229 let pr_urls = ctx.collect_pr_clone_urls(identifier);
230 230
231 // Merge and filter out our domain 231 // Merge and filter out our domain
232 let all_urls: HashSet<String> = announcement_urls 232 let all_urls: HashSet<String> = announcement_urls
233 .union(&pr_urls) 233 .union(&pr_urls)
@@ -766,9 +766,13 @@ mod tests {
766 let mut tried_urls = HashSet::new(); 766 let mut tried_urls = HashSet::new();
767 tried_urls.insert("https://github.com/foo/bar.git".to_string()); 767 tried_urls.insert("https://github.com/foo/bar.git".to_string());
768 768
769 let throttled = 769 let throttled = get_throttled_domains_with_untried_urls(
770 get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) 770 &mock,
771 .await; 771 "test-repo",
772 &tried_urls,
773 &throttle_manager,
774 )
775 .await;
772 776
773 // Should only include gitlab.com (throttled with untried URLs) 777 // Should only include gitlab.com (throttled with untried URLs)
774 // github.com is throttled but URL was tried 778 // github.com is throttled but URL was tried
@@ -885,11 +889,10 @@ mod tests {
885 #[tokio::test] 889 #[tokio::test]
886 async fn test_collect_pr_clone_urls_returns_configured_urls() { 890 async fn test_collect_pr_clone_urls_returns_configured_urls() {
887 // Test that MockSyncContext returns configured PR clone URLs 891 // Test that MockSyncContext returns configured PR clone URLs
888 let mock = MockSyncContext::new() 892 let mock = MockSyncContext::new().with_pr_clone_urls(&[
889 .with_pr_clone_urls(&[ 893 "https://pr-server.com/fork.git",
890 "https://pr-server.com/fork.git", 894 "https://another-server.com/fork.git",
891 "https://another-server.com/fork.git", 895 ]);
892 ]);
893 896
894 let pr_urls = mock.collect_pr_clone_urls("test-repo"); 897 let pr_urls = mock.collect_pr_clone_urls("test-repo");
895 898
@@ -945,7 +948,7 @@ mod tests {
945 .with_urls(&["https://github.com/owner/repo.git"]) 948 .with_urls(&["https://github.com/owner/repo.git"])
946 .with_pr_clone_urls(&[ 949 .with_pr_clone_urls(&[
947 "https://our-relay.com/fork.git", // Should be filtered 950 "https://our-relay.com/fork.git", // Should be filtered
948 "https://external.com/fork.git", // Should be included 951 "https://external.com/fork.git", // Should be included
949 ]) 952 ])
950 .with_our_domain("our-relay.com") 953 .with_our_domain("our-relay.com")
951 .with_needed_oids(&["abc123"]) 954 .with_needed_oids(&["abc123"])
@@ -957,8 +960,7 @@ mod tests {
957 // Collect all available URLs 960 // Collect all available URLs
958 let mut available_urls = Vec::new(); 961 let mut available_urls = Vec::new();
959 while let Some(url) = 962 while let Some(url) =
960 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager) 963 sync_identifier_next_url(&mock, "test-repo", None, &tried_urls, &throttle_manager).await
961 .await
962 { 964 {
963 available_urls.push(url.clone()); 965 available_urls.push(url.clone());
964 tried_urls.insert(url); 966 tried_urls.insert(url);
@@ -1006,16 +1008,17 @@ mod tests {
1006 1008
1007 let tried_urls = HashSet::new(); 1009 let tried_urls = HashSet::new();
1008 1010
1009 let throttled = 1011 let throttled = get_throttled_domains_with_untried_urls(
1010 get_throttled_domains_with_untried_urls(&mock, "test-repo", &tried_urls, &throttle_manager) 1012 &mock,
1011 .await; 1013 "test-repo",
1014 &tried_urls,
1015 &throttle_manager,
1016 )
1017 .await;
1012 1018
1013 // Should include both throttled domains 1019 // Should include both throttled domains
1014 let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect(); 1020 let domains: Vec<&str> = throttled.iter().map(|t| t.domain.as_str()).collect();
1015 assert!( 1021 assert!(domains.contains(&"github.com"), "Should include github.com");
1016 domains.contains(&"github.com"),
1017 "Should include github.com"
1018 );
1019 assert!( 1022 assert!(
1020 domains.contains(&"pr-server.com"), 1023 domains.contains(&"pr-server.com"),
1021 "Should include pr-server.com from PR clone URLs" 1024 "Should include pr-server.com from PR clone URLs"
diff --git a/src/purgatory/sync/loop.rs b/src/purgatory/sync/loop.rs
index ebca766..92e0594 100644
--- a/src/purgatory/sync/loop.rs
+++ b/src/purgatory/sync/loop.rs
@@ -62,7 +62,10 @@ impl Purgatory {
62 ctx: Arc<dyn SyncContext>, 62 ctx: Arc<dyn SyncContext>,
63 throttle_manager: Arc<ThrottleManager>, 63 throttle_manager: Arc<ThrottleManager>,
64 ) -> JoinHandle<()> { 64 ) -> JoinHandle<()> {
65 info!("Starting purgatory sync loop (interval: {:?})", SYNC_LOOP_INTERVAL); 65 info!(
66 "Starting purgatory sync loop (interval: {:?})",
67 SYNC_LOOP_INTERVAL
68 );
66 69
67 tokio::spawn(async move { 70 tokio::spawn(async move {
68 let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL); 71 let mut interval = tokio::time::interval(SYNC_LOOP_INTERVAL);
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
index e6efe1f..ad6e8ea 100644
--- a/src/purgatory/sync/throttle.rs
+++ b/src/purgatory/sync/throttle.rs
@@ -316,15 +316,13 @@ impl ThrottleManager {
316 } 316 }
317 317
318 // Create new throttle 318 // Create new throttle
319 self.throttles 319 self.throttles.entry(domain.to_string()).or_insert_with(|| {
320 .entry(domain.to_string()) 320 Mutex::new(DomainThrottle::new(
321 .or_insert_with(|| { 321 domain.to_string(),
322 Mutex::new(DomainThrottle::new( 322 self.max_concurrent_per_domain,
323 domain.to_string(), 323 self.max_per_minute_per_domain,
324 self.max_concurrent_per_domain, 324 ))
325 self.max_per_minute_per_domain, 325 });
326 ))
327 });
328 326
329 // Return the entry (we know it exists now) 327 // Return the entry (we know it exists now)
330 self.throttles.get(domain).unwrap() 328 self.throttles.get(domain).unwrap()
@@ -438,7 +436,9 @@ impl ThrottleManager {
438 let domain = domain.to_string(); 436 let domain = domain.to_string();
439 437
440 tokio::spawn(async move { 438 tokio::spawn(async move {
441 manager.process_queued_identifier(&domain, &identifier).await; 439 manager
440 .process_queued_identifier(&domain, &identifier)
441 .await;
442 }); 442 });
443 } 443 }
444 } 444 }
@@ -480,14 +480,9 @@ impl ThrottleManager {
480 }; 480 };
481 481
482 // Get next URL for this identifier on this specific domain 482 // Get next URL for this identifier on this specific domain
483 let url = sync_identifier_next_url( 483 let url =
484 ctx.as_ref(), 484 sync_identifier_next_url(ctx.as_ref(), identifier, Some(domain), &tried_urls, self)
485 identifier, 485 .await;
486 Some(domain),
487 &tried_urls,
488 self,
489 )
490 .await;
491 486
492 match url { 487 match url {
493 Some(url) => { 488 Some(url) => {
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 7d60ea4..fa44ab1 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1069,7 +1069,9 @@ impl SyncManager {
1069 } 1069 }
1070 // PR events (kind 1617/1618) - extract identifier from 'a' tag 1070 // PR events (kind 1617/1618) - extract identifier from 'a' tag
1071 else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 { 1071 else if event.kind.as_u16() == 1617 || event.kind.as_u16() == 1618 {
1072 if let Some(identifier) = crate::git::sync::extract_identifier_from_pr_event(&event) { 1072 if let Some(identifier) =
1073 crate::git::sync::extract_identifier_from_pr_event(&event)
1074 {
1073 tracing::debug!( 1075 tracing::debug!(
1074 event_id = %event.id, 1076 event_id = %event.id,
1075 identifier = %identifier, 1077 identifier = %identifier,