diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-03 14:50:22 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-03 15:18:23 +0000 |
| commit | 874a8abe1d076cfafd9baf919ec23d7d58200698 (patch) | |
| tree | dce0d0d36bddc496ff32f8555a8790d8dc7be7e4 /src/purgatory | |
| parent | 9fd4350c57bbe986ebf65bf3ea4c996572e81884 (diff) | |
| parent | 92a9a3bfe0bc522e8ae411991a366a3a6310d525 (diff) | |
Merge relay.ngit.dev migration: bug fixes and migration tooling
This merge includes critical bug fixes and comprehensive migration tooling
developed during the relay.ngit.dev migration effort.
Bug Fixes:
- Fix git protocol error handling to return HTTP 200 with ERR pkt-line
- Fix naughty list false positives and DNS failure identification
- Fix database query filters in load_existing_events (remove .since())
- Fix OID fetch tracking to distinguish 0 OIDs from successful fetches
- Fix purgatory event source tracking for filtered expiry logging
- Implement OID retry logic for 'not our ref' errors
Migration Tools & Documentation:
- Complete 5-phase migration analysis pipeline with orchestration script
- Phase 1: Event fetching from source relay
- Phase 2: Git sync verification
- Phase 3: Categorization and relay comparison
- Phase 4: Log extraction (parse failures, purgatory expiry)
- Phase 5: Action classification for migration decisions
- Comprehensive migration guide with lessons learned
- Troubleshooting guide for permission and corruption issues
Configuration:
- Add NGIT_LOG_LEVEL configuration option
- Update git throttle limits to 60/minute
- Improve logging throughout for better observability
Diffstat (limited to 'src/purgatory')
| -rw-r--r-- | src/purgatory/mod.rs | 339 | ||||
| -rw-r--r-- | src/purgatory/sync/context.rs | 163 | ||||
| -rw-r--r-- | src/purgatory/sync/functions.rs | 22 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 30 |
4 files changed, 429 insertions, 125 deletions
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 47798a6..8094450 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -17,10 +17,11 @@ pub mod sync; | |||
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use nostr_sdk::ToBech32; | ||
| 24 | use serde::{Deserialize, Serialize}; | 25 | use serde::{Deserialize, Serialize}; |
| 25 | use std::collections::HashMap; | 26 | use std::collections::HashMap; |
| 26 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| @@ -57,6 +58,9 @@ struct SerializableStatePurgatoryEntry { | |||
| 57 | created_at_offset_secs: u64, | 58 | created_at_offset_secs: u64, |
| 58 | /// Duration offset from saved_at for expires_at | 59 | /// Duration offset from saved_at for expires_at |
| 59 | expires_at_offset_secs: u64, | 60 | expires_at_offset_secs: u64, |
| 61 | /// Source of this event (direct submission vs sync) | ||
| 62 | #[serde(default)] | ||
| 63 | source: types::EventSource, | ||
| 60 | } | 64 | } |
| 61 | 65 | ||
| 62 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | 66 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. |
| @@ -74,6 +78,9 @@ struct SerializablePrPurgatoryEntry { | |||
| 74 | created_at_offset_secs: u64, | 78 | created_at_offset_secs: u64, |
| 75 | /// Duration offset from saved_at for expires_at | 79 | /// Duration offset from saved_at for expires_at |
| 76 | expires_at_offset_secs: u64, | 80 | expires_at_offset_secs: u64, |
| 81 | /// Source of this event (direct submission vs sync) | ||
| 82 | #[serde(default)] | ||
| 83 | source: types::EventSource, | ||
| 77 | } | 84 | } |
| 78 | 85 | ||
| 79 | /// Serializable purgatory state for disk persistence. | 86 | /// Serializable purgatory state for disk persistence. |
| @@ -270,11 +277,38 @@ impl Purgatory { | |||
| 270 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately | 277 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately |
| 271 | /// to override this delay. | 278 | /// to override this delay. |
| 272 | /// | 279 | /// |
| 280 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 281 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 282 | /// | ||
| 273 | /// # Arguments | 283 | /// # Arguments |
| 274 | /// * `event` - The state event (kind 30618) to hold | 284 | /// * `event` - The state event (kind 30618) to hold |
| 275 | /// * `identifier` - The repository identifier from the 'd' tag | 285 | /// * `identifier` - The repository identifier from the 'd' tag |
| 276 | /// * `author` - The event author's public key | 286 | /// * `author` - The event author's public key |
| 277 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 287 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 288 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) { | ||
| 289 | let source = if from_sync { | ||
| 290 | types::EventSource::Sync | ||
| 291 | } else { | ||
| 292 | types::EventSource::Direct | ||
| 293 | }; | ||
| 294 | |||
| 295 | // Check if event already exists - if so, potentially upgrade source | ||
| 296 | if let Some(mut entries) = self.state_events.get_mut(&identifier) { | ||
| 297 | if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) { | ||
| 298 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 299 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 300 | existing.source = types::EventSource::Direct; | ||
| 301 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 302 | tracing::debug!( | ||
| 303 | event_id = %event.id, | ||
| 304 | identifier = %identifier, | ||
| 305 | "Upgraded purgatory entry source from Sync to Direct, reset expiry" | ||
| 306 | ); | ||
| 307 | } | ||
| 308 | return; // Event already exists, don't add duplicate | ||
| 309 | } | ||
| 310 | } | ||
| 311 | |||
| 278 | let now = Instant::now(); | 312 | let now = Instant::now(); |
| 279 | let entry = StatePurgatoryEntry { | 313 | let entry = StatePurgatoryEntry { |
| 280 | event, | 314 | event, |
| @@ -282,6 +316,7 @@ impl Purgatory { | |||
| 282 | author, | 316 | author, |
| 283 | created_at: now, | 317 | created_at: now, |
| 284 | expires_at: now + DEFAULT_EXPIRY, | 318 | expires_at: now + DEFAULT_EXPIRY, |
| 319 | source, | ||
| 285 | }; | 320 | }; |
| 286 | 321 | ||
| 287 | self.state_events | 322 | self.state_events |
| @@ -301,11 +336,35 @@ impl Purgatory { | |||
| 301 | /// Automatically enqueues the referenced repository identifier for background sync | 336 | /// Automatically enqueues the referenced repository identifier for background sync |
| 302 | /// with the default delay (3 minutes), giving time for a git push to arrive. | 337 | /// with the default delay (3 minutes), giving time for a git push to arrive. |
| 303 | /// | 338 | /// |
| 339 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 340 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 341 | /// | ||
| 304 | /// # Arguments | 342 | /// # Arguments |
| 305 | /// * `event` - The PR event (kind 1617/1618) to hold | 343 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 306 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 344 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 307 | /// * `commit` - The commit SHA from the 'c' tag | 345 | /// * `commit` - The commit SHA from the 'c' tag |
| 308 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 346 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 347 | pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) { | ||
| 348 | let source = if from_sync { | ||
| 349 | types::EventSource::Sync | ||
| 350 | } else { | ||
| 351 | types::EventSource::Direct | ||
| 352 | }; | ||
| 353 | |||
| 354 | // Check if event already exists - if so, potentially upgrade source | ||
| 355 | if let Some(mut existing) = self.pr_events.get_mut(&event_id) { | ||
| 356 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 357 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 358 | existing.source = types::EventSource::Direct; | ||
| 359 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 360 | tracing::debug!( | ||
| 361 | event_id = %event_id, | ||
| 362 | "Upgraded PR purgatory entry source from Sync to Direct, reset expiry" | ||
| 363 | ); | ||
| 364 | } | ||
| 365 | return; // Event already exists, don't add duplicate | ||
| 366 | } | ||
| 367 | |||
| 309 | // Extract identifier from the event's `a` tag for sync enqueueing | 368 | // Extract identifier from the event's `a` tag for sync enqueueing |
| 310 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | 369 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); |
| 311 | 370 | ||
| @@ -315,6 +374,7 @@ impl Purgatory { | |||
| 315 | commit, | 374 | commit, |
| 316 | created_at: now, | 375 | created_at: now, |
| 317 | expires_at: now + DEFAULT_EXPIRY, | 376 | expires_at: now + DEFAULT_EXPIRY, |
| 377 | source, | ||
| 318 | }; | 378 | }; |
| 319 | 379 | ||
| 320 | self.pr_events.insert(event_id, entry); | 380 | self.pr_events.insert(event_id, entry); |
| @@ -328,6 +388,8 @@ impl Purgatory { | |||
| 328 | /// Add a PR placeholder (git data arrived before PR event). | 388 | /// Add a PR placeholder (git data arrived before PR event). |
| 329 | /// | 389 | /// |
| 330 | /// Creates a placeholder entry waiting for the corresponding PR event. | 390 | /// Creates a placeholder entry waiting for the corresponding PR event. |
| 391 | /// Placeholders are always marked as `Direct` source since they originate | ||
| 392 | /// from git pushes (direct user action). | ||
| 331 | /// | 393 | /// |
| 332 | /// # Arguments | 394 | /// # Arguments |
| 333 | /// * `event_id` - The expected event ID (from git ref name) | 395 | /// * `event_id` - The expected event ID (from git ref name) |
| @@ -339,6 +401,7 @@ impl Purgatory { | |||
| 339 | commit, | 401 | commit, |
| 340 | created_at: now, | 402 | created_at: now, |
| 341 | expires_at: now + DEFAULT_EXPIRY, | 403 | expires_at: now + DEFAULT_EXPIRY, |
| 404 | source: types::EventSource::Direct, // Git pushes are direct user actions | ||
| 342 | }; | 405 | }; |
| 343 | 406 | ||
| 344 | self.pr_events.insert(event_id, entry); | 407 | self.pr_events.insert(event_id, entry); |
| @@ -608,6 +671,9 @@ impl Purgatory { | |||
| 608 | /// prevent infinite re-sync loops. Events that expire without finding git data | 671 | /// prevent infinite re-sync loops. Events that expire without finding git data |
| 609 | /// will be filtered out during future negentropy/REQ sync operations. | 672 | /// will be filtered out during future negentropy/REQ sync operations. |
| 610 | /// | 673 | /// |
| 674 | /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event | ||
| 675 | /// to support migration scripts and operational monitoring. | ||
| 676 | /// | ||
| 611 | /// # Returns | 677 | /// # Returns |
| 612 | /// Tuple of (num_state_removed, num_pr_removed) | 678 | /// Tuple of (num_state_removed, num_pr_removed) |
| 613 | pub fn cleanup(&self) -> (usize, usize) { | 679 | pub fn cleanup(&self) -> (usize, usize) { |
| @@ -615,18 +681,38 @@ impl Purgatory { | |||
| 615 | let mut state_removed = 0; | 681 | let mut state_removed = 0; |
| 616 | 682 | ||
| 617 | // Remove expired state events and mark them as expired | 683 | // Remove expired state events and mark them as expired |
| 618 | self.state_events.retain(|_, entries| { | 684 | self.state_events.retain(|identifier, entries| { |
| 619 | let original_len = entries.len(); | 685 | let original_len = entries.len(); |
| 620 | // Collect event IDs before removing | ||
| 621 | let expired_ids: Vec<EventId> = entries | ||
| 622 | .iter() | ||
| 623 | .filter(|entry| entry.expires_at <= now) | ||
| 624 | .map(|entry| entry.event.id) | ||
| 625 | .collect(); | ||
| 626 | 686 | ||
| 627 | // Mark as expired to prevent re-sync | 687 | // Log and collect expired entries before removing |
| 628 | for event_id in expired_ids { | 688 | for entry in entries.iter().filter(|e| e.expires_at <= now) { |
| 629 | self.mark_expired(event_id); | 689 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); |
| 690 | let event_id_short = &entry.event.id.to_hex()[..12]; | ||
| 691 | let source_str = if entry.source.is_direct() { "direct" } else { "sync" }; | ||
| 692 | |||
| 693 | // Structured log for migration scripts | ||
| 694 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 695 | if entry.source.is_direct() { | ||
| 696 | tracing::warn!( | ||
| 697 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 698 | identifier, | ||
| 699 | npub, | ||
| 700 | event_id_short, | ||
| 701 | entry.event.kind.as_u16(), | ||
| 702 | source_str | ||
| 703 | ); | ||
| 704 | } else { | ||
| 705 | tracing::debug!( | ||
| 706 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 707 | identifier, | ||
| 708 | npub, | ||
| 709 | event_id_short, | ||
| 710 | entry.event.kind.as_u16(), | ||
| 711 | source_str | ||
| 712 | ); | ||
| 713 | } | ||
| 714 | |||
| 715 | self.mark_expired(entry.event.id); | ||
| 630 | } | 716 | } |
| 631 | 717 | ||
| 632 | // Remove expired entries | 718 | // Remove expired entries |
| @@ -636,21 +722,103 @@ impl Purgatory { | |||
| 636 | }); | 722 | }); |
| 637 | 723 | ||
| 638 | // Remove expired PR events and mark them as expired | 724 | // Remove expired PR events and mark them as expired |
| 639 | let expired_prs: Vec<(String, Option<EventId>)> = self | 725 | let expired_prs: Vec<_> = self |
| 640 | .pr_events | 726 | .pr_events |
| 641 | .iter() | 727 | .iter() |
| 642 | .filter(|entry| entry.value().expires_at <= now) | 728 | .filter(|entry| entry.value().expires_at <= now) |
| 643 | .map(|entry| { | 729 | .map(|entry| { |
| 644 | let event_id = entry.value().event.as_ref().map(|e| e.id); | 730 | let pr_entry = entry.value(); |
| 645 | (entry.key().clone(), event_id) | 731 | let event_id_str = entry.key().clone(); |
| 732 | let event_opt = pr_entry.event.clone(); | ||
| 733 | let commit = pr_entry.commit.clone(); | ||
| 734 | let source = pr_entry.source; | ||
| 735 | (event_id_str, event_opt, commit, source) | ||
| 646 | }) | 736 | }) |
| 647 | .collect(); | 737 | .collect(); |
| 648 | 738 | ||
| 649 | let pr_removed = expired_prs.len(); | 739 | let pr_removed = expired_prs.len(); |
| 650 | for (event_id_str, event_id_opt) in expired_prs { | 740 | for (event_id_str, event_opt, commit, source) in expired_prs { |
| 651 | // Mark actual PR events as expired (not placeholders) | 741 | // Log structured entry for PR events (not placeholders) |
| 652 | if let Some(event_id) = event_id_opt { | 742 | if let Some(ref event) = event_opt { |
| 653 | self.mark_expired(event_id); | 743 | let npub = event |
| 744 | .pubkey | ||
| 745 | .to_bech32() | ||
| 746 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 747 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 748 | let source_str = if source.is_direct() { "direct" } else { "sync" }; | ||
| 749 | |||
| 750 | // Extract ALL repo identifiers from 'a' tags | ||
| 751 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 752 | let repos: Vec<String> = event | ||
| 753 | .tags | ||
| 754 | .iter() | ||
| 755 | .filter_map(|tag| { | ||
| 756 | let tag_vec = tag.clone().to_vec(); | ||
| 757 | if tag_vec.len() >= 2 | ||
| 758 | && tag_vec[0] == "a" | ||
| 759 | && tag_vec[1].starts_with("30617:") | ||
| 760 | { | ||
| 761 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 762 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 763 | if parts.len() >= 3 { | ||
| 764 | Some(parts[2].to_string()) | ||
| 765 | } else { | ||
| 766 | None | ||
| 767 | } | ||
| 768 | } else { | ||
| 769 | None | ||
| 770 | } | ||
| 771 | }) | ||
| 772 | .collect(); | ||
| 773 | |||
| 774 | // Deduplicate while preserving order | ||
| 775 | let mut seen = std::collections::HashSet::new(); | ||
| 776 | let unique_repos: Vec<String> = repos | ||
| 777 | .into_iter() | ||
| 778 | .filter(|r| seen.insert(r.clone())) | ||
| 779 | .collect(); | ||
| 780 | |||
| 781 | let repos_to_log = if unique_repos.is_empty() { | ||
| 782 | vec!["unknown".to_string()] | ||
| 783 | } else { | ||
| 784 | unique_repos | ||
| 785 | }; | ||
| 786 | |||
| 787 | // Structured log for migration scripts - log once per repo | ||
| 788 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 789 | for repo in &repos_to_log { | ||
| 790 | if source.is_direct() { | ||
| 791 | tracing::warn!( | ||
| 792 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 793 | repo, | ||
| 794 | npub, | ||
| 795 | event_id_short, | ||
| 796 | event.kind.as_u16(), | ||
| 797 | &commit[..commit.len().min(12)], | ||
| 798 | source_str | ||
| 799 | ); | ||
| 800 | } else { | ||
| 801 | tracing::debug!( | ||
| 802 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 803 | repo, | ||
| 804 | npub, | ||
| 805 | event_id_short, | ||
| 806 | event.kind.as_u16(), | ||
| 807 | &commit[..commit.len().min(12)], | ||
| 808 | source_str | ||
| 809 | ); | ||
| 810 | } | ||
| 811 | } | ||
| 812 | |||
| 813 | self.mark_expired(event.id); | ||
| 814 | } else { | ||
| 815 | // Placeholder (git data arrived first, but PR event never came) | ||
| 816 | // Placeholders are always Direct source (from git push) | ||
| 817 | tracing::debug!( | ||
| 818 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"", | ||
| 819 | &event_id_str[..event_id_str.len().min(12)], | ||
| 820 | &commit[..commit.len().min(12)] | ||
| 821 | ); | ||
| 654 | } | 822 | } |
| 655 | self.pr_events.remove(&event_id_str); | 823 | self.pr_events.remove(&event_id_str); |
| 656 | } | 824 | } |
| @@ -800,6 +968,7 @@ impl Purgatory { | |||
| 800 | author: e.author, | 968 | author: e.author, |
| 801 | created_at_offset_secs: created_offset.as_secs(), | 969 | created_at_offset_secs: created_offset.as_secs(), |
| 802 | expires_at_offset_secs: expires_offset.as_secs(), | 970 | expires_at_offset_secs: expires_offset.as_secs(), |
| 971 | source: e.source, | ||
| 803 | } | 972 | } |
| 804 | }) | 973 | }) |
| 805 | .collect(); | 974 | .collect(); |
| @@ -822,6 +991,7 @@ impl Purgatory { | |||
| 822 | commit: e.commit.clone(), | 991 | commit: e.commit.clone(), |
| 823 | created_at_offset_secs: created_offset.as_secs(), | 992 | created_at_offset_secs: created_offset.as_secs(), |
| 824 | expires_at_offset_secs: expires_offset.as_secs(), | 993 | expires_at_offset_secs: expires_offset.as_secs(), |
| 994 | source: e.source, | ||
| 825 | }; | 995 | }; |
| 826 | pr_events.insert(event_id, serializable); | 996 | pr_events.insert(event_id, serializable); |
| 827 | } | 997 | } |
| @@ -923,6 +1093,7 @@ impl Purgatory { | |||
| 923 | author: e.author, | 1093 | author: e.author, |
| 924 | created_at, | 1094 | created_at, |
| 925 | expires_at, | 1095 | expires_at, |
| 1096 | source: e.source, | ||
| 926 | } | 1097 | } |
| 927 | }) | 1098 | }) |
| 928 | .collect(); | 1099 | .collect(); |
| @@ -948,6 +1119,7 @@ impl Purgatory { | |||
| 948 | commit: e.commit, | 1119 | commit: e.commit, |
| 949 | created_at, | 1120 | created_at, |
| 950 | expires_at, | 1121 | expires_at, |
| 1122 | source: e.source, | ||
| 951 | }; | 1123 | }; |
| 952 | 1124 | ||
| 953 | self.pr_events.insert(event_id, entry); | 1125 | self.pr_events.insert(event_id, entry); |
| @@ -1005,8 +1177,18 @@ mod tests { | |||
| 1005 | .sign_with_keys(&keys) | 1177 | .sign_with_keys(&keys) |
| 1006 | .unwrap(); | 1178 | .unwrap(); |
| 1007 | 1179 | ||
| 1008 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); | 1180 | purgatory.add_state( |
| 1009 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); | 1181 | event.clone(), |
| 1182 | "test-repo".to_string(), | ||
| 1183 | keys.public_key(), | ||
| 1184 | false, | ||
| 1185 | ); | ||
| 1186 | purgatory.add_pr( | ||
| 1187 | event, | ||
| 1188 | "test-event-id".to_string(), | ||
| 1189 | "abc123".to_string(), | ||
| 1190 | false, | ||
| 1191 | ); | ||
| 1010 | 1192 | ||
| 1011 | let (state_count, pr_count) = purgatory.count(); | 1193 | let (state_count, pr_count) = purgatory.count(); |
| 1012 | assert_eq!(state_count, 1); | 1194 | assert_eq!(state_count, 1); |
| @@ -1057,7 +1239,7 @@ mod tests { | |||
| 1057 | let event = EventBuilder::text_note("state") | 1239 | let event = EventBuilder::text_note("state") |
| 1058 | .sign_with_keys(&keys) | 1240 | .sign_with_keys(&keys) |
| 1059 | .unwrap(); | 1241 | .unwrap(); |
| 1060 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | 1242 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false); |
| 1061 | 1243 | ||
| 1062 | // Now should have pending events | 1244 | // Now should have pending events |
| 1063 | assert!(purgatory.has_pending_events("test-repo")); | 1245 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1087,7 +1269,12 @@ mod tests { | |||
| 1087 | .sign_with_keys(&keys) | 1269 | .sign_with_keys(&keys) |
| 1088 | .unwrap(); | 1270 | .unwrap(); |
| 1089 | 1271 | ||
| 1090 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | 1272 | purgatory.add_pr( |
| 1273 | event, | ||
| 1274 | "pr-event-id".to_string(), | ||
| 1275 | "commit123".to_string(), | ||
| 1276 | false, | ||
| 1277 | ); | ||
| 1091 | 1278 | ||
| 1092 | // Now should have pending events for test-repo | 1279 | // Now should have pending events for test-repo |
| 1093 | assert!(purgatory.has_pending_events("test-repo")); | 1280 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1152,6 +1339,7 @@ fn test_pr_event_vs_placeholder() { | |||
| 1152 | event.clone(), | 1339 | event.clone(), |
| 1153 | "event-id-1".to_string(), | 1340 | "event-id-1".to_string(), |
| 1154 | "commit-abc".to_string(), | 1341 | "commit-abc".to_string(), |
| 1342 | false, | ||
| 1155 | ); | 1343 | ); |
| 1156 | 1344 | ||
| 1157 | // Add a placeholder (no event) | 1345 | // Add a placeholder (no event) |
| @@ -1208,8 +1396,14 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1208 | state_event.clone(), | 1396 | state_event.clone(), |
| 1209 | "test-repo".to_string(), | 1397 | "test-repo".to_string(), |
| 1210 | keys.public_key(), | 1398 | keys.public_key(), |
| 1399 | false, | ||
| 1400 | ); | ||
| 1401 | purgatory.add_pr( | ||
| 1402 | pr_event, | ||
| 1403 | "pr-123".to_string(), | ||
| 1404 | "commit-abc".to_string(), | ||
| 1405 | false, | ||
| 1211 | ); | 1406 | ); |
| 1212 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | ||
| 1213 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1407 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1214 | 1408 | ||
| 1215 | // Verify entries are there | 1409 | // Verify entries are there |
| @@ -1256,8 +1450,18 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1256 | .unwrap(); | 1450 | .unwrap(); |
| 1257 | 1451 | ||
| 1258 | // Add fresh entries | 1452 | // Add fresh entries |
| 1259 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); | 1453 | purgatory.add_state( |
| 1260 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1454 | state_event, |
| 1455 | "test-repo".to_string(), | ||
| 1456 | keys.public_key(), | ||
| 1457 | false, | ||
| 1458 | ); | ||
| 1459 | purgatory.add_pr( | ||
| 1460 | pr_event, | ||
| 1461 | "pr-123".to_string(), | ||
| 1462 | "commit-abc".to_string(), | ||
| 1463 | false, | ||
| 1464 | ); | ||
| 1261 | 1465 | ||
| 1262 | // Run cleanup | 1466 | // Run cleanup |
| 1263 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1467 | let (state_removed, pr_removed) = purgatory.cleanup(); |
| @@ -1287,8 +1491,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1287 | .sign_with_keys(&keys) | 1491 | .sign_with_keys(&keys) |
| 1288 | .unwrap(); | 1492 | .unwrap(); |
| 1289 | 1493 | ||
| 1290 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); | 1494 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false); |
| 1291 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); | 1495 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false); |
| 1292 | 1496 | ||
| 1293 | // Expire only the first one | 1497 | // Expire only the first one |
| 1294 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { | 1498 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { |
| @@ -1305,8 +1509,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1305 | .sign_with_keys(&keys) | 1509 | .sign_with_keys(&keys) |
| 1306 | .unwrap(); | 1510 | .unwrap(); |
| 1307 | 1511 | ||
| 1308 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); | 1512 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false); |
| 1309 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); | 1513 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false); |
| 1310 | 1514 | ||
| 1311 | // Expire only first PR | 1515 | // Expire only first PR |
| 1312 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { | 1516 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { |
| @@ -1338,8 +1542,8 @@ fn test_remove_expired_legacy_method() { | |||
| 1338 | .unwrap(); | 1542 | .unwrap(); |
| 1339 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); | 1543 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); |
| 1340 | 1544 | ||
| 1341 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1545 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1342 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1546 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1343 | 1547 | ||
| 1344 | // Expire both | 1548 | // Expire both |
| 1345 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1549 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1373,8 +1577,8 @@ fn test_expired_event_tracking() { | |||
| 1373 | let pr_event_id = pr_event.id; | 1577 | let pr_event_id = pr_event.id; |
| 1374 | 1578 | ||
| 1375 | // Add events to purgatory | 1579 | // Add events to purgatory |
| 1376 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1580 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1377 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1581 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1378 | 1582 | ||
| 1379 | // Events should not be marked as expired yet | 1583 | // Events should not be marked as expired yet |
| 1380 | assert!(!purgatory.is_expired(&state_event_id)); | 1584 | assert!(!purgatory.is_expired(&state_event_id)); |
| @@ -1426,7 +1630,7 @@ fn test_cleanup_expired_events() { | |||
| 1426 | let event2_id = event2.id; | 1630 | let event2_id = event2.id; |
| 1427 | 1631 | ||
| 1428 | // Add and immediately expire event1 | 1632 | // Add and immediately expire event1 |
| 1429 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); | 1633 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false); |
| 1430 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { | 1634 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { |
| 1431 | for entry in entries.iter_mut() { | 1635 | for entry in entries.iter_mut() { |
| 1432 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1636 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1435,7 +1639,7 @@ fn test_cleanup_expired_events() { | |||
| 1435 | purgatory.cleanup(); | 1639 | purgatory.cleanup(); |
| 1436 | 1640 | ||
| 1437 | // Add and expire event2 (will be more recent) | 1641 | // Add and expire event2 (will be more recent) |
| 1438 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); | 1642 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false); |
| 1439 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { | 1643 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { |
| 1440 | for entry in entries.iter_mut() { | 1644 | for entry in entries.iter_mut() { |
| 1441 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1645 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1477,7 +1681,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1477 | let event_id = event.id; | 1681 | let event_id = event.id; |
| 1478 | 1682 | ||
| 1479 | // Add event to purgatory | 1683 | // Add event to purgatory |
| 1480 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1684 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1481 | 1685 | ||
| 1482 | // Expire it | 1686 | // Expire it |
| 1483 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1687 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1497,7 +1701,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1497 | // This simulates what negentropy/REQ+EOSE should do: | 1701 | // This simulates what negentropy/REQ+EOSE should do: |
| 1498 | // Check if event is in event_ids() before adding | 1702 | // Check if event is in event_ids() before adding |
| 1499 | if !ids.contains(&event_id) { | 1703 | if !ids.contains(&event_id) { |
| 1500 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1704 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1501 | } | 1705 | } |
| 1502 | 1706 | ||
| 1503 | // Event should NOT be re-added | 1707 | // Event should NOT be re-added |
| @@ -1540,7 +1744,7 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1540 | let event_id = event.id; | 1744 | let event_id = event.id; |
| 1541 | 1745 | ||
| 1542 | // Add event to purgatory | 1746 | // Add event to purgatory |
| 1543 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1747 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1544 | 1748 | ||
| 1545 | // Expire it | 1749 | // Expire it |
| 1546 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1750 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1589,8 +1793,18 @@ async fn test_save_and_restore_state_events() { | |||
| 1589 | let event1_id = event1.id; | 1793 | let event1_id = event1.id; |
| 1590 | let event2_id = event2.id; | 1794 | let event2_id = event2.id; |
| 1591 | 1795 | ||
| 1592 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | 1796 | purgatory.add_state( |
| 1593 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | 1797 | event1.clone(), |
| 1798 | "test-repo".to_string(), | ||
| 1799 | keys.public_key(), | ||
| 1800 | false, | ||
| 1801 | ); | ||
| 1802 | purgatory.add_state( | ||
| 1803 | event2.clone(), | ||
| 1804 | "test-repo".to_string(), | ||
| 1805 | keys.public_key(), | ||
| 1806 | false, | ||
| 1807 | ); | ||
| 1594 | 1808 | ||
| 1595 | // Save to disk | 1809 | // Save to disk |
| 1596 | purgatory.save_to_disk(&state_file).unwrap(); | 1810 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -1652,6 +1866,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 1652 | pr_event.clone(), | 1866 | pr_event.clone(), |
| 1653 | "pr-event-id".to_string(), | 1867 | "pr-event-id".to_string(), |
| 1654 | "commit-abc".to_string(), | 1868 | "commit-abc".to_string(), |
| 1869 | false, | ||
| 1655 | ); | 1870 | ); |
| 1656 | 1871 | ||
| 1657 | // Save to disk | 1872 | // Save to disk |
| @@ -1721,7 +1936,7 @@ async fn test_save_and_restore_expired_events() { | |||
| 1721 | let event_id = event.id; | 1936 | let event_id = event.id; |
| 1722 | 1937 | ||
| 1723 | // Add and expire event | 1938 | // Add and expire event |
| 1724 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1939 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1725 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1940 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| 1726 | for entry in entries.iter_mut() { | 1941 | for entry in entries.iter_mut() { |
| 1727 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1942 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1860,7 +2075,7 @@ async fn test_downtime_calculation() { | |||
| 1860 | .sign_with_keys(&keys) | 2075 | .sign_with_keys(&keys) |
| 1861 | .unwrap(); | 2076 | .unwrap(); |
| 1862 | 2077 | ||
| 1863 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2078 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1864 | 2079 | ||
| 1865 | // Get original expiry time | 2080 | // Get original expiry time |
| 1866 | let original_entries = purgatory.find_state("repo"); | 2081 | let original_entries = purgatory.find_state("repo"); |
| @@ -1916,7 +2131,7 @@ async fn test_expiry_times_preserved() { | |||
| 1916 | .sign_with_keys(&keys) | 2131 | .sign_with_keys(&keys) |
| 1917 | .unwrap(); | 2132 | .unwrap(); |
| 1918 | 2133 | ||
| 1919 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2134 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1920 | 2135 | ||
| 1921 | // Manually set expiry to a specific time in the future | 2136 | // Manually set expiry to a specific time in the future |
| 1922 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | 2137 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes |
| @@ -1975,16 +2190,19 @@ async fn test_multiple_state_events_same_identifier() { | |||
| 1975 | event1.clone(), | 2190 | event1.clone(), |
| 1976 | "shared-repo".to_string(), | 2191 | "shared-repo".to_string(), |
| 1977 | keys1.public_key(), | 2192 | keys1.public_key(), |
| 2193 | false, | ||
| 1978 | ); | 2194 | ); |
| 1979 | purgatory.add_state( | 2195 | purgatory.add_state( |
| 1980 | event2.clone(), | 2196 | event2.clone(), |
| 1981 | "shared-repo".to_string(), | 2197 | "shared-repo".to_string(), |
| 1982 | keys2.public_key(), | 2198 | keys2.public_key(), |
| 2199 | false, | ||
| 1983 | ); | 2200 | ); |
| 1984 | purgatory.add_state( | 2201 | purgatory.add_state( |
| 1985 | event3.clone(), | 2202 | event3.clone(), |
| 1986 | "shared-repo".to_string(), | 2203 | "shared-repo".to_string(), |
| 1987 | keys3.public_key(), | 2204 | keys3.public_key(), |
| 2205 | false, | ||
| 1988 | ); | 2206 | ); |
| 1989 | 2207 | ||
| 1990 | // Save to disk | 2208 | // Save to disk |
| @@ -2031,6 +2249,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2031 | pr_event.clone(), | 2249 | pr_event.clone(), |
| 2032 | "pr-with-event".to_string(), | 2250 | "pr-with-event".to_string(), |
| 2033 | "commit-abc".to_string(), | 2251 | "commit-abc".to_string(), |
| 2252 | false, | ||
| 2034 | ); | 2253 | ); |
| 2035 | 2254 | ||
| 2036 | // Add PR placeholder | 2255 | // Add PR placeholder |
| @@ -2076,7 +2295,7 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2076 | let event = EventBuilder::text_note("test") | 2295 | let event = EventBuilder::text_note("test") |
| 2077 | .sign_with_keys(&keys) | 2296 | .sign_with_keys(&keys) |
| 2078 | .unwrap(); | 2297 | .unwrap(); |
| 2079 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2298 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2080 | 2299 | ||
| 2081 | // Save to disk | 2300 | // Save to disk |
| 2082 | purgatory.save_to_disk(&state_file).unwrap(); | 2301 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2110,8 +2329,18 @@ async fn test_comprehensive_roundtrip() { | |||
| 2110 | .sign_with_keys(&keys2) | 2329 | .sign_with_keys(&keys2) |
| 2111 | .unwrap(); | 2330 | .unwrap(); |
| 2112 | 2331 | ||
| 2113 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | 2332 | purgatory.add_state( |
| 2114 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | 2333 | state1.clone(), |
| 2334 | "repo1".to_string(), | ||
| 2335 | keys1.public_key(), | ||
| 2336 | false, | ||
| 2337 | ); | ||
| 2338 | purgatory.add_state( | ||
| 2339 | state2.clone(), | ||
| 2340 | "repo2".to_string(), | ||
| 2341 | keys2.public_key(), | ||
| 2342 | false, | ||
| 2343 | ); | ||
| 2115 | 2344 | ||
| 2116 | // Add PR event | 2345 | // Add PR event |
| 2117 | let tags = vec![Tag::custom( | 2346 | let tags = vec![Tag::custom( |
| @@ -2122,7 +2351,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2122 | .tags(tags) | 2351 | .tags(tags) |
| 2123 | .sign_with_keys(&keys1) | 2352 | .sign_with_keys(&keys1) |
| 2124 | .unwrap(); | 2353 | .unwrap(); |
| 2125 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | 2354 | purgatory.add_pr( |
| 2355 | pr_event.clone(), | ||
| 2356 | "pr-1".to_string(), | ||
| 2357 | "commit-1".to_string(), | ||
| 2358 | false, | ||
| 2359 | ); | ||
| 2126 | 2360 | ||
| 2127 | // Add PR placeholder | 2361 | // Add PR placeholder |
| 2128 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | 2362 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); |
| @@ -2132,7 +2366,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2132 | .sign_with_keys(&keys1) | 2366 | .sign_with_keys(&keys1) |
| 2133 | .unwrap(); | 2367 | .unwrap(); |
| 2134 | let expired_id = expired_event.id; | 2368 | let expired_id = expired_event.id; |
| 2135 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | 2369 | purgatory.add_state( |
| 2370 | expired_event, | ||
| 2371 | "repo3".to_string(), | ||
| 2372 | keys1.public_key(), | ||
| 2373 | false, | ||
| 2374 | ); | ||
| 2136 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | 2375 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { |
| 2137 | for entry in entries.iter_mut() { | 2376 | for entry in entries.iter_mut() { |
| 2138 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2377 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 33c2d12..904f8af 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -361,94 +361,121 @@ impl SyncContext for RealSyncContext { | |||
| 361 | let naughty_list = self.git_naughty_list.clone(); | 361 | let naughty_list = self.git_naughty_list.clone(); |
| 362 | 362 | ||
| 363 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { | 363 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { |
| 364 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history | 364 | let mut remaining_oids = missing_oids.clone(); |
| 365 | let mut args = vec!["fetch", &url]; | 365 | let mut missing_from_remote: Vec<String> = Vec::new(); |
| 366 | args.extend(missing_oids.iter().map(|s| s.as_str())); | 366 | |
| 367 | 367 | // Retry loop: keep fetching until success or no OIDs left | |
| 368 | let output = Command::new("git") | 368 | loop { |
| 369 | .args(&args) | 369 | if remaining_oids.is_empty() { |
| 370 | .current_dir(&repo_path) | 370 | // All OIDs were missing from remote |
| 371 | .output(); | 371 | debug!( |
| 372 | 372 | url = %url, | |
| 373 | match output { | 373 | missing_count = missing_from_remote.len(), |
| 374 | Ok(result) if result.status.success() => { | 374 | "All requested OIDs missing from remote" |
| 375 | // Count how many OIDs we now have | 375 | ); |
| 376 | let fetched: Vec<String> = missing_oids | 376 | return Ok(vec![]); |
| 377 | .iter() | ||
| 378 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) | ||
| 379 | .cloned() | ||
| 380 | .collect(); | ||
| 381 | |||
| 382 | debug!(fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 383 | |||
| 384 | Ok(fetched) | ||
| 385 | } | 377 | } |
| 386 | Ok(result) => { | 378 | |
| 387 | let stderr = String::from_utf8_lossy(&result.stderr); | 379 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history |
| 388 | 380 | let mut args = vec!["fetch".to_string(), url.clone()]; | |
| 389 | // Extract domain and classify error for naughty list | 381 | args.extend(remaining_oids.iter().cloned()); |
| 390 | if let Some(domain) = extract_domain(&url) { | 382 | |
| 391 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | 383 | let output = Command::new("git") |
| 392 | let is_new = naughty_list.record(&domain, category, stderr.to_string()); | 384 | .args(&args) |
| 393 | 385 | .current_dir(&repo_path) | |
| 394 | if is_new { | 386 | .output(); |
| 395 | tracing::warn!( | 387 | |
| 396 | domain = %domain, | 388 | match output { |
| 397 | category = %category, | 389 | Ok(result) if result.status.success() => { |
| 398 | error = %stderr, | 390 | // Fetch succeeded - count how many OIDs we now have |
| 399 | "Git remote domain added to naughty list" | 391 | let fetched: Vec<String> = missing_oids |
| 400 | ); | 392 | .iter() |
| 401 | } else { | 393 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) |
| 402 | debug!( | 394 | .cloned() |
| 403 | domain = %domain, | 395 | .collect(); |
| 404 | category = %category, | 396 | |
| 405 | "Git remote domain still on naughty list" | 397 | if !missing_from_remote.is_empty() { |
| 406 | ); | 398 | debug!( |
| 407 | } | 399 | url = %url, |
| 400 | fetched_count = fetched.len(), | ||
| 401 | missing_count = missing_from_remote.len(), | ||
| 402 | missing_oids = ?missing_from_remote, | ||
| 403 | "Fetch completed after retries - some OIDs were missing from remote" | ||
| 404 | ); | ||
| 405 | } else { | ||
| 406 | debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 408 | } | 407 | } |
| 408 | |||
| 409 | return Ok(fetched); | ||
| 409 | } | 410 | } |
| 411 | Ok(result) => { | ||
| 412 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 410 | 413 | ||
| 411 | // Check for "not our ref" errors and provide a clearer error message | 414 | // Check for "not our ref" error - this is retryable |
| 412 | let error_msg = if stderr.contains("upload-pack: not our ref") { | 415 | if stderr.contains("upload-pack: not our ref") { |
| 413 | // Parse out the missing OID from stderr (git only reports one at a time) | 416 | // Parse out the missing OID from stderr |
| 414 | let missing_oid = stderr | 417 | let missing_oid = stderr.lines().find_map(|line| { |
| 415 | .lines() | ||
| 416 | .find_map(|line| { | ||
| 417 | if line.contains("not our ref") { | 418 | if line.contains("not our ref") { |
| 418 | // Extract the OID from lines like: | 419 | // Extract the OID from lines like: |
| 419 | // "fatal: remote error: upload-pack: not our ref <oid>" | 420 | // "fatal: remote error: upload-pack: not our ref <oid>" |
| 420 | line.split("not our ref").nth(1).map(|s| s.trim().to_string()) | 421 | line.split("not our ref") |
| 422 | .nth(1) | ||
| 423 | .map(|s| s.trim().to_string()) | ||
| 421 | } else { | 424 | } else { |
| 422 | None | 425 | None |
| 423 | } | 426 | } |
| 424 | }); | 427 | }); |
| 425 | 428 | ||
| 426 | let total_requested = missing_oids.len(); | 429 | if let Some(ref oid) = missing_oid { |
| 430 | // Remove the missing OID and retry with remaining | ||
| 431 | remaining_oids.retain(|o| o != oid); | ||
| 432 | missing_from_remote.push(oid.clone()); | ||
| 427 | 433 | ||
| 428 | if let Some(oid) = missing_oid { | 434 | debug!( |
| 429 | if total_requested > 1 { | ||
| 430 | // BUG: Git stops at first missing OID, so we don't know if the others exist | ||
| 431 | // We need retry logic to fetch remaining OIDs individually | ||
| 432 | tracing::warn!( | ||
| 433 | url = %url, | 435 | url = %url, |
| 434 | missing_oid = %oid, | 436 | missing_oid = %oid, |
| 435 | total_requested = total_requested, | 437 | remaining_count = remaining_oids.len(), |
| 436 | "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." | 438 | "OID not found on remote, retrying with remaining OIDs" |
| 437 | ); | 439 | ); |
| 438 | format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) | 440 | |
| 439 | } else { | 441 | continue; // Retry with remaining OIDs |
| 440 | format!("remote missing only oid requested: {}", oid) | 442 | } |
| 443 | } | ||
| 444 | |||
| 445 | // Non-retryable error - record to naughty list and return error | ||
| 446 | if let Some(domain) = extract_domain(&url) { | ||
| 447 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | ||
| 448 | let is_new = | ||
| 449 | naughty_list.record(&domain, category, stderr.to_string()); | ||
| 450 | |||
| 451 | if is_new { | ||
| 452 | tracing::warn!( | ||
| 453 | domain = %domain, | ||
| 454 | category = %category, | ||
| 455 | error = %stderr, | ||
| 456 | "Git remote domain added to naughty list" | ||
| 457 | ); | ||
| 458 | } else { | ||
| 459 | debug!( | ||
| 460 | domain = %domain, | ||
| 461 | category = %category, | ||
| 462 | error = %stderr, | ||
| 463 | "Git fetch failed (domain on naughty list)" | ||
| 464 | ); | ||
| 465 | } | ||
| 441 | } | 466 | } |
| 442 | } else { | ||
| 443 | format!("git fetch failed: {}", stderr) | ||
| 444 | } | 467 | } |
| 445 | } else { | ||
| 446 | format!("git fetch failed: {}", stderr) | ||
| 447 | }; | ||
| 448 | 468 | ||
| 449 | Err(anyhow::anyhow!("{}", error_msg)) | 469 | return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr)); |
| 470 | } | ||
| 471 | Err(e) => { | ||
| 472 | return Err(anyhow::anyhow!( | ||
| 473 | "git fetch command error for {}: {}", | ||
| 474 | url, | ||
| 475 | e | ||
| 476 | )) | ||
| 477 | } | ||
| 450 | } | 478 | } |
| 451 | Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)), | ||
| 452 | } | 479 | } |
| 453 | }) | 480 | }) |
| 454 | .await | 481 | .await |
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index 65d29af..9207d58 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; | 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; |
| 369 | throttle_manager.complete_request(&domain); | 369 | throttle_manager.complete_request(&domain); |
| 370 | 370 | ||
| 371 | let oids_fetched = match fetch_result { | 371 | let fetched_oids = match fetch_result { |
| 372 | Ok(fetched) => { | 372 | Ok(fetched) if !fetched.is_empty() => { |
| 373 | debug!( | 373 | debug!( |
| 374 | identifier = %identifier, | 374 | identifier = %identifier, |
| 375 | url = %url, | 375 | url = %url, |
| 376 | oids_fetched = fetched.len(), | 376 | oids_fetched = fetched.len(), |
| 377 | "Fetch succeeded" | 377 | "Fetch succeeded" |
| 378 | ); | 378 | ); |
| 379 | fetched.len() | 379 | fetched |
| 380 | } | ||
| 381 | Ok(_) => { | ||
| 382 | debug!( | ||
| 383 | identifier = %identifier, | ||
| 384 | url = %url, | ||
| 385 | "Fetch returned no OIDs (not available on remote)" | ||
| 386 | ); | ||
| 387 | vec![] | ||
| 380 | } | 388 | } |
| 381 | Err(e) => { | 389 | Err(e) => { |
| 382 | debug!( | 390 | debug!( |
| @@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 385 | error = %e, | 393 | error = %e, |
| 386 | "Fetch failed" | 394 | "Fetch failed" |
| 387 | ); | 395 | ); |
| 388 | 0 | 396 | vec![] |
| 389 | } | 397 | } |
| 390 | }; | 398 | }; |
| 391 | 399 | ||
| 392 | // Try to process any events that can now be satisfied | 400 | // Try to process any events that can now be satisfied |
| 393 | if oids_fetched > 0 { | 401 | if !fetched_oids.is_empty() { |
| 394 | let new_oids: HashSet<String> = needed_oids.into_iter().collect(); | 402 | let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect(); |
| 395 | if let Err(e) = ctx | 403 | if let Err(e) = ctx |
| 396 | .process_newly_available_git_data(&target_repo, &new_oids) | 404 | .process_newly_available_git_data(&target_repo, &new_oids) |
| 397 | .await | 405 | .await |
| @@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 404 | } | 412 | } |
| 405 | } | 413 | } |
| 406 | 414 | ||
| 407 | oids_fetched | 415 | fetched_oids.len() |
| 408 | } | 416 | } |
| 409 | 417 | ||
| 410 | /// Sync git data for an identifier. | 418 | /// Sync git data for an identifier. |
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 919504b..e37a3e1 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -8,6 +8,28 @@ use nostr_sdk::prelude::*; | |||
| 8 | use serde::{Deserialize, Serialize}; | 8 | use serde::{Deserialize, Serialize}; |
| 9 | use std::time::Instant; | 9 | use std::time::Instant; |
| 10 | 10 | ||
| 11 | /// Source of an event entering purgatory. | ||
| 12 | /// | ||
| 13 | /// Tracks whether an event was submitted directly by a user or fetched via | ||
| 14 | /// proactive sync from another relay. This distinction is used for: | ||
| 15 | /// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG | ||
| 16 | /// - Operational monitoring: Helps identify user-facing issues vs sync noise | ||
| 17 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] | ||
| 18 | pub enum EventSource { | ||
| 19 | /// Event was published directly to this relay by a user | ||
| 20 | #[default] | ||
| 21 | Direct, | ||
| 22 | /// Event was fetched via proactive sync from another relay | ||
| 23 | Sync, | ||
| 24 | } | ||
| 25 | |||
| 26 | impl EventSource { | ||
| 27 | /// Returns true if this is a direct submission (not synced) | ||
| 28 | pub fn is_direct(&self) -> bool { | ||
| 29 | matches!(self, EventSource::Direct) | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 11 | /// Default value for Instant fields during deserialization | 33 | /// Default value for Instant fields during deserialization |
| 12 | fn instant_now() -> Instant { | 34 | fn instant_now() -> Instant { |
| 13 | Instant::now() | 35 | Instant::now() |
| @@ -86,6 +108,10 @@ pub struct StatePurgatoryEntry { | |||
| 86 | /// Expiry deadline (30 min from creation, may be extended) | 108 | /// Expiry deadline (30 min from creation, may be extended) |
| 87 | #[serde(skip, default = "instant_now")] | 109 | #[serde(skip, default = "instant_now")] |
| 88 | pub expires_at: Instant, | 110 | pub expires_at: Instant, |
| 111 | |||
| 112 | /// Source of this event (direct submission vs sync) | ||
| 113 | #[serde(default)] | ||
| 114 | pub source: EventSource, | ||
| 89 | } | 115 | } |
| 90 | 116 | ||
| 91 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. | 117 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. |
| @@ -112,4 +138,8 @@ pub struct PrPurgatoryEntry { | |||
| 112 | /// Expiry deadline (30 min from creation, may be extended) | 138 | /// Expiry deadline (30 min from creation, may be extended) |
| 113 | #[serde(skip, default = "instant_now")] | 139 | #[serde(skip, default = "instant_now")] |
| 114 | pub expires_at: Instant, | 140 | pub expires_at: Instant, |
| 141 | |||
| 142 | /// Source of this event (direct submission vs sync) | ||
| 143 | #[serde(default)] | ||
| 144 | pub source: EventSource, | ||
| 115 | } | 145 | } |