upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 12:34:59 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 12:34:59 +0000
commitd78d3a86ba81a5b59cde527a448f5c9d131db8d6 (patch)
treebd197adb274fb19513a15429ebbe77f760e39508 /src
parente72edbae86affcb9fc0429bd197639bf438ffb6c (diff)
Refactor handle_receive_pack to use unified process_newly_available_git_data
Replace ~100 lines of duplicated post-push processing in handle_receive_pack with a single call to the unified process_newly_available_git_data function. The unified function handles all post-git-data-available processing: - Discovering satisfiable events from purgatory (state and PR events) - Syncing OIDs to authorized owner repos - Aligning refs (+ setting HEAD) in all owner repos - Saving events to database - Notifying WebSocket subscribers - Removing from purgatory This ensures consistent behavior regardless of how git data arrives (git push vs purgatory sync fetching from remote servers). Also mark test-only internal methods with #[cfg(test)] to silence dead code warnings.
Diffstat (limited to 'src')
-rw-r--r--src/git/handlers.rs189
-rw-r--r--src/purgatory/sync/throttle.rs2
2 files changed, 53 insertions, 138 deletions
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 2325106..c5de5db 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -6,6 +6,7 @@ use http_body_util::Full;
6use hyper::{body::Bytes, Response, StatusCode}; 6use hyper::{body::Bytes, Response, StatusCode};
7use nostr_relay_builder::LocalRelay; 7use nostr_relay_builder::LocalRelay;
8use nostr_sdk::prelude::*; 8use nostr_sdk::prelude::*;
9use std::collections::HashSet;
9use std::path::PathBuf; 10use std::path::PathBuf;
10use std::sync::Arc; 11use std::sync::Arc;
11use tokio::io::{AsyncReadExt, AsyncWriteExt}; 12use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -15,10 +16,9 @@ use super::protocol::{GitService, PktLine};
15use super::subprocess::GitSubprocess; 16use super::subprocess::GitSubprocess;
16use super::try_set_head_if_available; 17use super::try_set_head_if_available;
17 18
18use crate::git::authorization::{authorize_push, fetch_repository_data, parse_pushed_refs}; 19use crate::git::authorization::{authorize_push, parse_pushed_refs};
19use crate::git::sync::{sync_pr_refs_to_tagged_owner_repos, sync_to_owner_repos}; 20use crate::git::sync::process_newly_available_git_data;
20use crate::nostr::builder::SharedDatabase; 21use crate::nostr::builder::SharedDatabase;
21use crate::nostr::events::{KIND_PR, KIND_PR_UPDATE, KIND_REPOSITORY_STATE};
22use crate::purgatory::Purgatory; 22use crate::purgatory::Purgatory;
23 23
24/// Handle GET /info/refs?service=git-{upload,receive}-pack 24/// Handle GET /info/refs?service=git-{upload,receive}-pack
@@ -280,6 +280,10 @@ pub async fn handle_receive_pack(
280 // GRASP-01: Set HEAD after git data is received 280 // GRASP-01: Set HEAD after git data is received
281 // "MUST set repository HEAD per repository state announcement 281 // "MUST set repository HEAD per repository state announcement
282 // as soon as the git data related to that branch has been received." 282 // as soon as the git data related to that branch has been received."
283 //
284 // Note: HEAD setting is also handled by process_newly_available_git_data via
285 // align_repository_with_state, but we do it here first for the pushed-to repo
286 // to ensure it's set immediately after the push succeeds.
283 if let Some(ref state) = auth_result.state { 287 if let Some(ref state) = auth_result.state {
284 if let Some(head_ref) = &state.head { 288 if let Some(head_ref) = &state.head {
285 if let Some(branch_name) = state.get_head_branch() { 289 if let Some(branch_name) = state.get_head_branch() {
@@ -303,148 +307,57 @@ pub async fn handle_receive_pack(
303 } 307 }
304 } 308 }
305 309
306 // Save all events from purgatory that authorized this push and remove them from purgatory 310 // Process newly available git data using the unified function
307 // This includes state events, PR events, and PR-update events 311 // This handles:
308 info!( 312 // - Discovering satisfiable events from purgatory (state events and PR events)
309 "Saving {} purgatory event(s) to database after successful push", 313 // - Syncing OIDs to authorized owner repos
310 auth_result.purgatory_events.len() 314 // - Aligning refs (+ setting HEAD) in all owner repos
311 ); 315 // - Saving events to database
312 316 // - Notifying WebSocket subscribers
313 for event in &auth_result.purgatory_events { 317 // - Removing from purgatory
314 match database.save_event(event).await { 318 //
315 Ok(_) => { 319 // Parse pushed refs to collect new OIDs
316 // Remove from purgatory based on event kind
317 if event.kind == Kind::from(KIND_REPOSITORY_STATE) {
318 info!("Saved purgatory state event {} to database", event.id);
319 purgatory.remove_state_event(identifier, &event.id);
320 info!("Removed saved state event {} from purgatory", event.id);
321 } else if event.kind == Kind::from(KIND_PR)
322 || event.kind == Kind::from(KIND_PR_UPDATE)
323 {
324 info!("Saved purgatory PR event {} to database", event.id);
325 // Extract event ID from the event itself (it's the event.id)
326 let event_id_hex = event.id.to_hex();
327 purgatory.remove_pr(&event_id_hex);
328 info!("Removed saved PR event {} from purgatory", event.id);
329 }
330 // Broadcast to WebSocket subscribers
331 if relay.notify_event(event.clone()) {
332 info!(
333 "Broadcast purgatory event {} to websocket listeners",
334 event.id
335 );
336 } else {
337 warn!(
338 "Failed to broadcast purgatory event {} to websocket listeners",
339 event.id
340 );
341 }
342 }
343 Err(e) => {
344 warn!(
345 "Failed to save purgatory event {} to database: {}",
346 event.id, e
347 );
348 }
349 }
350 }
351
352 // TODO figure out what atomic pushes look like in GRASP (we cant accepted differnte state events changing different branches at the same time)
353
354 // Sync git data to other owner repositories that authorize the same state event
355 // This ensures all owners who share maintainers get the same git data
356 if let Some(ref state) = auth_result.state {
357 // Fetch repository data for sync
358 match fetch_repository_data(&database, identifier).await {
359 Ok(db_repo_data) => {
360 let git_data_path_buf = std::path::PathBuf::from(git_data_path);
361 let sync_result =
362 sync_to_owner_repos(&repo_path, state, &db_repo_data, &git_data_path_buf);
363
364 if sync_result.repos_synced > 0 {
365 info!(
366 "Synced git data to {} other owner repositories for {}",
367 sync_result.repos_synced, identifier
368 );
369 }
370
371 if !sync_result.errors.is_empty() {
372 for (repo, error) in &sync_result.errors {
373 warn!("Error syncing to {}: {}", repo, error);
374 }
375 }
376 }
377 Err(e) => {
378 warn!(
379 "Failed to fetch repository data for sync after push to {}: {}",
380 identifier, e
381 );
382 }
383 }
384 }
385
386 // Sync PR data (refs/nostr/<event-id>) to other owner repositories
387 // Parse pushed refs to find refs/nostr/* refs
388 let pushed_refs = parse_pushed_refs(&request_body); 320 let pushed_refs = parse_pushed_refs(&request_body);
389 let pr_refs: Vec<(String, String)> = pushed_refs 321 let new_oids: HashSet<String> = pushed_refs
390 .iter() 322 .iter()
391 .filter_map(|(_, new_oid, ref_name)| { 323 .filter(|(_, new_oid, _)| new_oid != "0000000000000000000000000000000000000000")
392 ref_name 324 .map(|(_, new_oid, _)| new_oid.clone())
393 .strip_prefix("refs/nostr/")
394 .map(|event_id| (event_id.to_string(), new_oid.clone()))
395 })
396 .collect(); 325 .collect();
397 326
398 if !pr_refs.is_empty() { 327 let git_data_path_buf = std::path::Path::new(git_data_path);
399 // Extract PR events from purgatory_events (filter for KIND_PR and KIND_PR_UPDATE)
400 let purgatory_pr_events: Vec<_> = auth_result
401 .purgatory_events
402 .iter()
403 .filter(|e| e.kind == Kind::from(KIND_PR) || e.kind == Kind::from(KIND_PR_UPDATE))
404 .cloned()
405 .collect();
406
407 match fetch_repository_data(&database, identifier).await {
408 Ok(db_repo_data) => {
409 let git_data_path_buf = std::path::PathBuf::from(git_data_path);
410
411 // sync to owner repos and repos of other owners that list them as maintainers
412 // This uses the `a` tags from PR events to find tagged owner repos
413 if !purgatory_pr_events.is_empty() {
414 let tagged_sync_result = sync_pr_refs_to_tagged_owner_repos(
415 &repo_path,
416 &pr_refs,
417 &purgatory_pr_events,
418 &db_repo_data,
419 &git_data_path_buf,
420 owner_pubkey,
421 );
422
423 if tagged_sync_result.repos_synced > 0 {
424 info!(
425 "Synced {} PR refs to {} other owner repositories for {} (via tagged owners)",
426 tagged_sync_result.refs_created,
427 tagged_sync_result.repos_synced,
428 identifier
429 );
430 }
431 328
432 if !tagged_sync_result.errors.is_empty() { 329 match process_newly_available_git_data(
433 for (repo, error) in &tagged_sync_result.errors { 330 &repo_path,
434 warn!( 331 &new_oids,
435 "Error syncing PR ref to {} (via tagged owner): {}", 332 &database,
436 repo, error 333 Some(&relay),
437 ); 334 &purgatory,
438 } 335 git_data_path_buf,
439 } 336 )
440 } 337 .await
441 } 338 {
442 Err(e) => { 339 Ok(result) => {
443 warn!( 340 if result.released_any() {
444 "Failed to fetch repository data for PR sync after push to {}: {}", 341 info!(
445 identifier, e 342 "Processed push for {}: {} states released, {} PRs released, {} repos synced",
343 identifier,
344 result.states_released,
345 result.prs_released,
346 result.repos_synced
446 ); 347 );
447 } 348 }
349
350 if !result.errors.is_empty() {
351 for error in &result.errors {
352 warn!("Error during post-push processing for {}: {}", identifier, error);
353 }
354 }
355 }
356 Err(e) => {
357 warn!(
358 "Failed to process newly available git data after push to {}: {}",
359 identifier, e
360 );
448 } 361 }
449 } 362 }
450 363
diff --git a/src/purgatory/sync/throttle.rs b/src/purgatory/sync/throttle.rs
index a310a91..05b0878 100644
--- a/src/purgatory/sync/throttle.rs
+++ b/src/purgatory/sync/throttle.rs
@@ -343,6 +343,7 @@ impl ThrottleManager {
343 /// 343 ///
344 /// Decrements in-flight count and cleans up old timestamps. 344 /// Decrements in-flight count and cleans up old timestamps.
345 /// Does not trigger processing of queued identifiers. 345 /// Does not trigger processing of queued identifiers.
346 #[cfg(test)]
346 fn complete_request_internal(&self, domain: &str) { 347 fn complete_request_internal(&self, domain: &str) {
347 if let Some(entry) = self.throttles.get(domain) { 348 if let Some(entry) = self.throttles.get(domain) {
348 let mut throttle = entry.lock().unwrap(); 349 let mut throttle = entry.lock().unwrap();
@@ -377,6 +378,7 @@ impl ThrottleManager {
377 /// 378 ///
378 /// If the identifier is already queued for this domain, merges the tried_urls sets. 379 /// If the identifier is already queued for this domain, merges the tried_urls sets.
379 /// Does not trigger processing. 380 /// Does not trigger processing.
381 #[cfg(test)]
380 fn enqueue_identifier_internal( 382 fn enqueue_identifier_internal(
381 &self, 383 &self,
382 domain: &str, 384 domain: &str,