upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 14:19:27 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-07 14:46:17 +0000
commit3dfec1e449f260295e8c5c505dd1edb82d787c58 (patch)
treef093ebfd02cbc16ca2c3c36f98601a78cf0876fd /src
parent74979c1de32f69a39e0e290f56435ef687c2b6f6 (diff)
Wire up new purgatory sync loop, remove legacy sync_state_git_data
Phase 13 of purgatory-sync-redesign: - Add sync loop startup in main.rs (RealSyncContext + ThrottleManager + start_sync_loop) - Update add_state() and add_pr() to automatically enqueue for background sync - Remove start_state_sync() call from state.rs (now handled by sync loop) - Remove orphaned legacy functions: sync_state_git_data, fetch_missing_oids_from_server, get_most_complete_local_repo, identify_missing_oids, get_date_of_most_recent_commit_on_default_branch - Clean up unused imports in purgatory/mod.rs
Diffstat (limited to 'src')
-rw-r--r--src/main.rs20
-rw-r--r--src/nostr/policy/state.rs9
-rw-r--r--src/purgatory/mod.rs484
3 files changed, 41 insertions, 472 deletions
diff --git a/src/main.rs b/src/main.rs
index 59edc09..b4a42af 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -11,7 +11,7 @@ use ngit_grasp::{
11 git, http, 11 git, http,
12 metrics::Metrics, 12 metrics::Metrics,
13 nostr, 13 nostr,
14 purgatory::Purgatory, 14 purgatory::{sync::RealSyncContext, sync::ThrottleManager, Purgatory},
15 sync::SyncManager, 15 sync::SyncManager,
16}; 16};
17 17
@@ -111,6 +111,24 @@ async fn main() -> Result<()> {
111 }); 111 });
112 info!("Purgatory cleanup task started (60s interval)"); 112 info!("Purgatory cleanup task started (60s interval)");
113 113
114 // Start purgatory sync loop for background git data fetching
115 let sync_ctx = Arc::new(RealSyncContext::new(
116 purgatory.clone(),
117 relay_with_db.database.clone(),
118 PathBuf::from(config.effective_git_data_path()),
119 Some(config.domain.clone()),
120 Some(relay_with_db.relay.clone()),
121 ));
122
123 // Create throttle manager for rate limiting remote git servers
124 // Default: 5 concurrent requests per domain, 30 requests per minute per domain
125 let throttle_manager = Arc::new(ThrottleManager::new(5, 30));
126 throttle_manager.set_context(sync_ctx.clone());
127
128 // Start the sync loop
129 let _sync_loop_handle = purgatory.clone().start_sync_loop(sync_ctx, throttle_manager);
130 info!("Purgatory sync loop started (1s interval)");
131
114 // Setup shutdown handler for purgatory cleanup 132 // Setup shutdown handler for purgatory cleanup
115 let shutdown_purgatory = purgatory.clone(); 133 let shutdown_purgatory = purgatory.clone();
116 let git_data_path = config.effective_git_data_path(); 134 let git_data_path = config.effective_git_data_path();
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index a85e351..7d69d7d 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -152,18 +152,11 @@ impl StatePolicy {
152 Ok(WritePolicyResult::Accept) // event should be saved and broadcast 152 Ok(WritePolicyResult::Accept) // event should be saved and broadcast
153 } else { 153 } else {
154 // if no git data - add to purgatory 154 // if no git data - add to purgatory
155 // (add_state automatically enqueues for background sync)
155 self.ctx 156 self.ctx
156 .purgatory 157 .purgatory
157 .add_state(event.clone(), state.identifier.clone(), event.pubkey); 158 .add_state(event.clone(), state.identifier.clone(), event.pubkey);
158 159
159 // Trigger background git data sync from remote servers
160 self.ctx.purgatory.start_state_sync(
161 state.clone(),
162 self.ctx.database.clone(),
163 Some(self.ctx.domain.clone()),
164 self.ctx.get_local_relay(),
165 );
166
167 tracing::info!( 160 tracing::info!(
168 "state event added to purgatory: eventid: {}, identifier: {}", 161 "state event added to purgatory: eventid: {}, identifier: {}",
169 state.event.id, 162 state.event.id,
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 499e534..7045923 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -15,24 +15,16 @@ mod helpers;
15pub mod sync; 15pub mod sync;
16mod types; 16mod types;
17 17
18use anyhow::{bail, Result};
19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 18pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs};
20pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 19pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 20
22use dashmap::DashMap; 21use dashmap::DashMap;
23use nostr_sdk::prelude::*; 22use nostr_sdk::prelude::*;
24use std::collections::HashSet; 23use std::collections::HashSet;
25use std::path::{Path, PathBuf}; 24use std::path::PathBuf;
26use std::process::Command;
27use std::sync::Arc; 25use std::sync::Arc;
28use std::time::{Duration, Instant}; 26use std::time::{Duration, Instant};
29 27
30use crate::git::authorization::{fetch_repository_data, pubkey_authorised_for_repo_owners, RepositoryData};
31use crate::git::oid_exists;
32use crate::git::sync::sync_to_owner_repos;
33use crate::nostr::builder::SharedDatabase;
34use crate::nostr::events::RepositoryState;
35
36pub use sync::SyncQueueEntry; 28pub use sync::SyncQueueEntry;
37 29
38/// Default expiry duration for purgatory entries (30 minutes) 30/// Default expiry duration for purgatory entries (30 minutes)
@@ -195,6 +187,9 @@ impl Purgatory {
195 /// The event will expire after the default duration unless matched with git data. 187 /// The event will expire after the default duration unless matched with git data.
196 /// Multiple state events for the same identifier are allowed (from different authors). 188 /// Multiple state events for the same identifier are allowed (from different authors).
197 /// 189 ///
190 /// Automatically enqueues the identifier for background sync with the default delay
191 /// (3 minutes), giving time for a git push to arrive after the nostr event.
192 ///
198 /// # Arguments 193 /// # Arguments
199 /// * `event` - The state event (kind 30618) to hold 194 /// * `event` - The state event (kind 30618) to hold
200 /// * `identifier` - The repository identifier from the 'd' tag 195 /// * `identifier` - The repository identifier from the 'd' tag
@@ -209,84 +204,30 @@ impl Purgatory {
209 expires_at: now + DEFAULT_EXPIRY, 204 expires_at: now + DEFAULT_EXPIRY,
210 }; 205 };
211 206
212 self.state_events.entry(identifier).or_default().push(entry); 207 self.state_events
213 } 208 .entry(identifier.clone())
209 .or_default()
210 .push(entry);
214 211
215 /// Trigger a background git data sync for a state event. 212 // Enqueue for background sync with default delay
216 /// 213 self.enqueue_sync_default(&identifier);
217 /// This method spawns a background task to attempt fetching missing git data
218 /// from remote servers listed in the repository announcements. It's called
219 /// when a state event arrives but the required git data isn't available locally.
220 ///
221 /// After successfully syncing OIDs:
222 /// 1. Syncs OIDs to other owner repositories that authorize this state
223 /// 2. Aligns refs with state for each repository
224 /// 3. Saves the state event to database
225 /// 4. Notifies WebSocket subscribers
226 /// 5. Removes the event from purgatory
227 ///
228 /// # Arguments
229 /// * `state` - The parsed repository state event
230 /// * `database` - Database to query for repository announcements and save events
231 /// * `our_domain` - Our service domain to exclude from fetch targets
232 /// * `local_relay` - Local relay for notifying WebSocket subscribers (optional)
233 pub fn start_state_sync(
234 &self,
235 state: RepositoryState,
236 database: SharedDatabase,
237 our_domain: Option<String>,
238 local_relay: Option<nostr_relay_builder::LocalRelay>,
239 ) {
240 let git_data_path = self.git_data_path.clone();
241 let identifier = state.identifier.clone();
242 let event_id = state.event.id;
243 let purgatory = self.clone();
244
245 tokio::spawn(async move {
246 tracing::debug!(
247 identifier = %identifier,
248 event_id = %event_id,
249 "Starting background git data sync for purgatory state event"
250 );
251
252 match sync_state_git_data(
253 state,
254 &database,
255 &git_data_path,
256 our_domain.as_deref(),
257 local_relay.as_ref(),
258 &purgatory,
259 )
260 .await
261 {
262 Ok(()) => {
263 tracing::info!(
264 identifier = %identifier,
265 event_id = %event_id,
266 "Successfully synced git data and released state event from purgatory"
267 );
268 }
269 Err(e) => {
270 tracing::warn!(
271 identifier = %identifier,
272 event_id = %event_id,
273 error = %e,
274 "Failed to sync git data for purgatory state event"
275 );
276 }
277 }
278 });
279 } 214 }
280 215
281 /// Add a PR event to purgatory. 216 /// Add a PR event to purgatory.
282 /// 217 ///
283 /// The event will expire after the default duration unless matched with git data. 218 /// The event will expire after the default duration unless matched with git data.
284 /// 219 ///
220 /// Automatically enqueues the referenced repository identifier for background sync
221 /// with the default delay (3 minutes), giving time for a git push to arrive.
222 ///
285 /// # Arguments 223 /// # Arguments
286 /// * `event` - The PR event (kind 1617/1618) to hold 224 /// * `event` - The PR event (kind 1617/1618) to hold
287 /// * `event_id` - The event ID (hex string) from the 'e' tag 225 /// * `event_id` - The event ID (hex string) from the 'e' tag
288 /// * `commit` - The commit SHA from the 'c' tag 226 /// * `commit` - The commit SHA from the 'c' tag
289 pub fn add_pr(&self, event: Event, event_id: String, commit: String) { 227 pub fn add_pr(&self, event: Event, event_id: String, commit: String) {
228 // Extract identifier from the event's `a` tag for sync enqueueing
229 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event);
230
290 let now = Instant::now(); 231 let now = Instant::now();
291 let entry = PrPurgatoryEntry { 232 let entry = PrPurgatoryEntry {
292 event: Some(event), 233 event: Some(event),
@@ -296,6 +237,11 @@ impl Purgatory {
296 }; 237 };
297 238
298 self.pr_events.insert(event_id, entry); 239 self.pr_events.insert(event_id, entry);
240
241 // Enqueue the identifier for background sync if we could extract it
242 if let Some(id) = identifier {
243 self.enqueue_sync_default(&id);
244 }
299 } 245 }
300 246
301 /// Add a PR placeholder (git data arrived before PR event). 247 /// Add a PR placeholder (git data arrived before PR event).
@@ -604,394 +550,6 @@ impl Purgatory {
604 } 550 }
605} 551}
606 552
607/// Async function to sync git data for a state event from remote servers.
608///
609/// This function:
610/// 1. Fetches repository data from the database
611/// 2. Identifies which owners authorize the state event author
612/// 3. Collects clone URLs from authorized announcements
613/// 4. Finds the most complete local repo to fetch into
614/// 5. Identifies missing OIDs and fetches them from remote servers
615/// 6. After OIDs are synced, copies them to other owner repositories
616/// 7. Aligns refs with state for each authorized repository
617/// 8. Saves the state event to database
618/// 9. Notifies WebSocket subscribers
619/// 10. Removes the event from purgatory
620async fn sync_state_git_data(
621 state: RepositoryState,
622 database: &SharedDatabase,
623 git_data_path: &Path,
624 our_domain: Option<&str>,
625 local_relay: Option<&nostr_relay_builder::LocalRelay>,
626 purgatory: &Purgatory,
627) -> Result<()> {
628 // Fetch repository data from database
629 let db_repo_data = fetch_repository_data(database, &state.identifier).await?;
630
631 if db_repo_data.announcements.is_empty() {
632 bail!(
633 "No announcements found for identifier: {}",
634 state.identifier
635 );
636 }
637
638 // Find owners that authorize this pubkey as a maintainer
639 let repo_owners_authorising_pubkey =
640 pubkey_authorised_for_repo_owners(&state.event.pubkey, &db_repo_data);
641
642 if repo_owners_authorising_pubkey.is_empty() {
643 bail!(
644 "No owners authorize pubkey {} for identifier {}",
645 state.event.pubkey,
646 state.identifier
647 );
648 }
649
650 // Collect clone URLs from authorized announcements, excluding our own service
651 let servers: HashSet<String> = db_repo_data
652 .announcements
653 .iter()
654 .filter(|a| repo_owners_authorising_pubkey.contains(&a.event.pubkey.to_hex()))
655 .flat_map(|a| a.clone_urls.iter().cloned())
656 .filter(|url| {
657 // Exclude our own domain if specified
658 if let Some(domain) = our_domain {
659 !url.contains(domain)
660 } else {
661 true
662 }
663 })
664 .collect();
665
666 if servers.is_empty() {
667 bail!(
668 "No external clone URLs found for identifier: {}",
669 state.identifier
670 );
671 }
672
673 tracing::debug!(
674 identifier = %state.identifier,
675 servers = ?servers,
676 "Found {} external servers for git data sync",
677 servers.len()
678 );
679
680 // Find the most complete local repo to fetch into
681 let (source_repo_path, missing_oids) =
682 get_most_complete_local_repo(&db_repo_data, &state, git_data_path)?;
683
684 // Fetch missing OIDs from remote servers
685 if !missing_oids.is_empty() {
686 tracing::info!(
687 identifier = %state.identifier,
688 repo_path = %source_repo_path.display(),
689 missing_oids = ?missing_oids,
690 "Attempting to fetch {} missing OIDs from remote servers",
691 missing_oids.len()
692 );
693
694 // Try to fetch from each server until we get all missing OIDs
695 let mut last_error: Option<String> = None;
696 for server_url in &servers {
697 match fetch_missing_oids_from_server(&source_repo_path, server_url, &missing_oids).await
698 {
699 Ok(fetched) => {
700 if fetched > 0 {
701 tracing::info!(
702 identifier = %state.identifier,
703 server = %server_url,
704 fetched = %fetched,
705 "Successfully fetched git data"
706 );
707 }
708
709 // Check if all OIDs are now available
710 let still_missing: Vec<_> = missing_oids
711 .iter()
712 .filter(|oid| !oid_exists(&source_repo_path, oid))
713 .collect();
714
715 if still_missing.is_empty() {
716 tracing::info!(
717 identifier = %state.identifier,
718 "All missing OIDs fetched successfully"
719 );
720 break;
721 }
722 }
723 Err(e) => {
724 tracing::debug!(
725 identifier = %state.identifier,
726 server = %server_url,
727 error = %e,
728 "Failed to fetch from server"
729 );
730 last_error = Some(e.to_string());
731 }
732 }
733 }
734
735 // Check final state - if still missing OIDs, fail
736 let still_missing: Vec<_> = missing_oids
737 .iter()
738 .filter(|oid| !oid_exists(&source_repo_path, oid))
739 .collect();
740
741 if !still_missing.is_empty() {
742 bail!(
743 "Failed to fetch {} OIDs from any server. Last error: {:?}",
744 still_missing.len(),
745 last_error
746 );
747 }
748 } else {
749 tracing::debug!(
750 identifier = %state.identifier,
751 repo_path = %source_repo_path.display(),
752 "No missing OIDs - git data is already complete"
753 );
754 }
755
756 // Now that we have all OIDs, sync to other owner repositories and align refs
757 let sync_result = sync_to_owner_repos(&source_repo_path, &state, &db_repo_data, git_data_path);
758
759 tracing::info!(
760 identifier = %state.identifier,
761 event_id = %state.event.id,
762 repos_synced = sync_result.repos_synced,
763 "Synced git data and aligned {} repositories from purgatory",
764 sync_result.repos_synced
765 );
766
767 // Save state event to database
768 match database.save_event(&state.event).await {
769 Ok(_) => {
770 tracing::info!(
771 identifier = %state.identifier,
772 event_id = %state.event.id,
773 "Saved purgatory state event to database after git sync"
774 );
775
776 // Notify WebSocket subscribers
777 if let Some(relay) = local_relay {
778 if relay.notify_event(state.event.clone()) {
779 tracing::info!(
780 identifier = %state.identifier,
781 event_id = %state.event.id,
782 "Broadcast purgatory state event to websocket listeners"
783 );
784 } else {
785 tracing::warn!(
786 identifier = %state.identifier,
787 event_id = %state.event.id,
788 "Failed to broadcast purgatory state event to websocket listeners"
789 );
790 }
791 }
792
793 // Remove from purgatory
794 purgatory.remove_state_event(&state.identifier, &state.event.id);
795 tracing::info!(
796 identifier = %state.identifier,
797 event_id = %state.event.id,
798 "Removed state event from purgatory after successful sync"
799 );
800 }
801 Err(e) => {
802 tracing::warn!(
803 identifier = %state.identifier,
804 event_id = %state.event.id,
805 error = %e,
806 "Failed to save purgatory state event to database"
807 );
808 // Don't remove from purgatory if save failed - it will retry or expire
809 bail!("Failed to save state event to database: {}", e);
810 }
811 }
812
813 Ok(())
814}
815
816/// Fetch missing OIDs from a remote git server.
817///
818/// Uses `git fetch` to retrieve specific commits from the server.
819async fn fetch_missing_oids_from_server(
820 repo_path: &Path,
821 server_url: &str,
822 missing_oids: &[String],
823) -> Result<usize> {
824 if missing_oids.is_empty() {
825 return Ok(0);
826 }
827
828 // Use tokio::task::spawn_blocking for the git operations since they're blocking
829 let repo_path = repo_path.to_path_buf();
830 let server_url = server_url.to_string();
831 let oids = missing_oids.to_vec();
832
833 tokio::task::spawn_blocking(move || {
834 // Filter to only OIDs that don't already exist
835 let missing: Vec<&String> = oids
836 .iter()
837 .filter(|oid| !oid_exists(&repo_path, oid))
838 .collect();
839
840 if missing.is_empty() {
841 return Ok(0);
842 }
843
844 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs in one command
845 let mut args = vec!["fetch", "--depth=1", &server_url];
846 args.extend(missing.iter().map(|s| s.as_str()));
847
848 tracing::debug!(
849 oids = ?missing,
850 server = %server_url,
851 "Fetching OIDs"
852 );
853
854 let output = Command::new("git")
855 .args(&args)
856 .current_dir(&repo_path)
857 .output();
858
859 match output {
860 Ok(result) if result.status.success() => {
861 // Count how many OIDs we now have
862 let fetched_count = missing
863 .iter()
864 .filter(|oid| oid_exists(&repo_path, oid))
865 .count();
866
867 tracing::debug!(
868 fetched_count = fetched_count,
869 server = %server_url,
870 "Successfully fetched OIDs"
871 );
872
873 Ok(fetched_count)
874 }
875 Ok(result) => {
876 let stderr = String::from_utf8_lossy(&result.stderr);
877 tracing::debug!(
878 oids = ?missing,
879 server = %server_url,
880 stderr = %stderr,
881 "git fetch failed for OIDs"
882 );
883 Ok(0)
884 }
885 Err(e) => {
886 tracing::debug!(
887 oids = ?missing,
888 server = %server_url,
889 error = %e,
890 "git fetch command error"
891 );
892 Ok(0)
893 }
894 }
895 })
896 .await?
897}
898
899fn get_most_complete_local_repo(
900 db_repo_data: &RepositoryData,
901 state: &RepositoryState,
902 git_path: &Path,
903) -> Result<(PathBuf, Vec<String>)> {
904 // should we filter for those where pubkey is authorised?
905
906 let repo_onwers_authorising_pubkey =
907 pubkey_authorised_for_repo_owners(&state.event.pubkey, db_repo_data);
908
909 let mut res: Option<(Timestamp, PathBuf, Vec<String>)> = None;
910 for announcement in &db_repo_data.announcements {
911 if !repo_onwers_authorising_pubkey.contains(&announcement.event.pubkey.to_hex()) {
912 continue; // skip where event author isn't a maintainer
913 }
914 let repo_path = git_path.join(announcement.repo_path().clone());
915 if let Ok(missing_oids) = identify_missing_oids(state, &repo_path) {
916 let commit_date = get_date_of_most_recent_commit_on_default_branch(&repo_path)
917 .unwrap_or(Timestamp::zero());
918 let newest_commmit_date = if let Some((d, _, _)) = &res {
919 d
920 } else {
921 &Timestamp::zero()
922 };
923 if commit_date.gt(newest_commmit_date) {
924 res = Some((commit_date, repo_path, missing_oids));
925 }
926 }
927 }
928 if let Some((_newest_commit_date, repo_path, missing_oids)) = res {
929 Ok((repo_path, missing_oids))
930 } else {
931 bail!("no repo directories exists yet");
932 }
933}
934
935fn identify_missing_oids(state: &RepositoryState, git_repo_path: &Path) -> Result<Vec<String>> {
936 if !git_repo_path.exists() {
937 bail!("repo directory doesn't exists");
938 }
939 let mut missing_oids = vec![];
940 for branch_state in &state.branches {
941 if !branch_state.commit.starts_with("ref: ")
942 && !oid_exists(git_repo_path, &branch_state.commit)
943 {
944 missing_oids.push(branch_state.commit.clone());
945 }
946 }
947 for tag_state in &state.tags {
948 if !tag_state.commit.starts_with("ref: ") && !oid_exists(git_repo_path, &tag_state.commit) {
949 missing_oids.push(tag_state.commit.clone());
950 }
951 }
952 Ok(missing_oids)
953}
954
955fn get_date_of_most_recent_commit_on_default_branch(git_repo_path: &Path) -> Result<Timestamp> {
956 if !git_repo_path.exists() {
957 bail!("repo directory doesn't exists");
958 }
959
960 // Get the default branch (HEAD)
961 let head_output = std::process::Command::new("git")
962 .args(["symbolic-ref", "HEAD"])
963 .current_dir(git_repo_path)
964 .output()?;
965
966 if !head_output.status.success() {
967 bail!("Failed to get repository HEAD");
968 }
969
970 let head_ref = String::from_utf8_lossy(&head_output.stdout)
971 .trim()
972 .to_string();
973
974 // Get the most recent commit timestamp on the default branch
975 // Use %ct to get the committer date as Unix timestamp
976 let log_output = std::process::Command::new("git")
977 .args(["log", "-1", "--format=%ct", &head_ref])
978 .current_dir(git_repo_path)
979 .output()?;
980
981 if !log_output.status.success() {
982 bail!("Failed to get commit timestamp for {}", head_ref);
983 }
984
985 let timestamp_str = String::from_utf8_lossy(&log_output.stdout)
986 .trim()
987 .to_string();
988 let unix_timestamp: u64 = timestamp_str
989 .parse()
990 .map_err(|_| anyhow::anyhow!("Failed to parse timestamp: {}", timestamp_str))?;
991
992 Ok(Timestamp::from(unix_timestamp))
993}
994
995#[cfg(test)] 553#[cfg(test)]
996mod tests { 554mod tests {
997 use super::*; 555 use super::*;