diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-23 15:20:59 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-23 15:20:59 +0000 |
| commit | 113928aa84894ea8f65c247d9987527e792b32a9 (patch) | |
| tree | ec967d6195d9f7ec4f061449596611afe3a0950f /src/purgatory | |
| parent | 26f608e5011b9d1ad6036da75b89272835e69695 (diff) | |
| parent | e0ad39a489b3398f8208713bf728db0cb11475b0 (diff) | |
Merge master into 3ca0-announcements-purgatory
Diffstat (limited to 'src/purgatory')
| -rw-r--r-- | src/purgatory/helpers.rs | 204 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 341 | ||||
| -rw-r--r-- | src/purgatory/sync/context.rs | 163 | ||||
| -rw-r--r-- | src/purgatory/sync/functions.rs | 22 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 30 |
5 files changed, 634 insertions, 126 deletions
diff --git a/src/purgatory/helpers.rs b/src/purgatory/helpers.rs index 193ef99..a9f6e66 100644 --- a/src/purgatory/helpers.rs +++ b/src/purgatory/helpers.rs | |||
| @@ -225,6 +225,117 @@ pub fn get_unpushed_refs(event: &Event, pushed_refs: &[RefPair]) -> Vec<RefPair> | |||
| 225 | .collect() | 225 | .collect() |
| 226 | } | 226 | } |
| 227 | 227 | ||
| 228 | /// Diagnose why a state event doesn't match the push. | ||
| 229 | /// | ||
| 230 | /// Returns a human-readable explanation of the mismatch between the state event | ||
| 231 | /// and what would result from applying the push to local refs. | ||
| 232 | /// | ||
| 233 | /// # Arguments | ||
| 234 | /// * `event` - The state event to check | ||
| 235 | /// * `pushed_updates` - Ref updates in the current push operation | ||
| 236 | /// * `local_refs` - Refs already existing locally (ref_name -> SHA) | ||
| 237 | /// | ||
| 238 | /// # Returns | ||
| 239 | /// String explaining why the state doesn't match, or None if it matches | ||
| 240 | pub fn diagnose_state_mismatch( | ||
| 241 | event: &Event, | ||
| 242 | pushed_updates: &[RefUpdate], | ||
| 243 | local_refs: &HashMap<String, String>, | ||
| 244 | ) -> Option<String> { | ||
| 245 | let state_refs = extract_refs_from_state(event); | ||
| 246 | |||
| 247 | // Filter local_refs to only branches and tags | ||
| 248 | let mut would_be_state: HashMap<String, String> = local_refs | ||
| 249 | .iter() | ||
| 250 | .filter(|(ref_name, _)| { | ||
| 251 | ref_name.starts_with("refs/heads/") || ref_name.starts_with("refs/tags/") | ||
| 252 | }) | ||
| 253 | .map(|(k, v)| (k.clone(), v.clone())) | ||
| 254 | .collect(); | ||
| 255 | |||
| 256 | // Apply all pushed updates to create the would-be state | ||
| 257 | for update in pushed_updates { | ||
| 258 | // Only process branches and tags | ||
| 259 | if !update.ref_name.starts_with("refs/heads/") && !update.ref_name.starts_with("refs/tags/") | ||
| 260 | { | ||
| 261 | continue; | ||
| 262 | } | ||
| 263 | |||
| 264 | if update.is_deletion() { | ||
| 265 | would_be_state.remove(&update.ref_name); | ||
| 266 | } else { | ||
| 267 | would_be_state.insert(update.ref_name.clone(), update.new_oid.clone()); | ||
| 268 | } | ||
| 269 | } | ||
| 270 | |||
| 271 | // Convert event's state refs to a HashMap for comparison | ||
| 272 | let declared_state: HashMap<String, String> = state_refs | ||
| 273 | .into_iter() | ||
| 274 | .map(|r| (r.ref_name, r.object_sha)) | ||
| 275 | .collect(); | ||
| 276 | |||
| 277 | // Check if they match | ||
| 278 | if would_be_state == declared_state { | ||
| 279 | return None; // No mismatch | ||
| 280 | } | ||
| 281 | |||
| 282 | // Build diagnostic message | ||
| 283 | let mut reasons = Vec::new(); | ||
| 284 | |||
| 285 | // Check for refs in declared state but not in would-be state | ||
| 286 | for (ref_name, declared_sha) in &declared_state { | ||
| 287 | if let Some(would_be_sha) = would_be_state.get(ref_name) { | ||
| 288 | if would_be_sha != declared_sha { | ||
| 289 | let would_be_short = if would_be_sha.len() >= 8 { | ||
| 290 | &would_be_sha[..8] | ||
| 291 | } else { | ||
| 292 | would_be_sha.as_str() | ||
| 293 | }; | ||
| 294 | let declared_short = if declared_sha.len() >= 8 { | ||
| 295 | &declared_sha[..8] | ||
| 296 | } else { | ||
| 297 | declared_sha.as_str() | ||
| 298 | }; | ||
| 299 | reasons.push(format!( | ||
| 300 | "{} would be at {} but state declares {}", | ||
| 301 | ref_name, would_be_short, declared_short | ||
| 302 | )); | ||
| 303 | } | ||
| 304 | } else { | ||
| 305 | let declared_short = if declared_sha.len() >= 8 { | ||
| 306 | &declared_sha[..8] | ||
| 307 | } else { | ||
| 308 | declared_sha.as_str() | ||
| 309 | }; | ||
| 310 | reasons.push(format!( | ||
| 311 | "{} missing (state declares {})", | ||
| 312 | ref_name, declared_short | ||
| 313 | )); | ||
| 314 | } | ||
| 315 | } | ||
| 316 | |||
| 317 | // Check for refs in would-be state but not in declared state | ||
| 318 | for (ref_name, would_be_sha) in &would_be_state { | ||
| 319 | if !declared_state.contains_key(ref_name) { | ||
| 320 | let would_be_short = if would_be_sha.len() >= 8 { | ||
| 321 | &would_be_sha[..8] | ||
| 322 | } else { | ||
| 323 | would_be_sha.as_str() | ||
| 324 | }; | ||
| 325 | reasons.push(format!( | ||
| 326 | "{} would exist at {} but state doesn't declare it", | ||
| 327 | ref_name, would_be_short | ||
| 328 | )); | ||
| 329 | } | ||
| 330 | } | ||
| 331 | |||
| 332 | if reasons.is_empty() { | ||
| 333 | Some("Unknown mismatch".to_string()) | ||
| 334 | } else { | ||
| 335 | Some(reasons.join("; ")) | ||
| 336 | } | ||
| 337 | } | ||
| 338 | |||
| 228 | #[cfg(test)] | 339 | #[cfg(test)] |
| 229 | mod tests { | 340 | mod tests { |
| 230 | use super::*; | 341 | use super::*; |
| @@ -695,4 +806,97 @@ mod tests { | |||
| 695 | // Should return true - real OID exists, symbolic ref skipped | 806 | // Should return true - real OID exists, symbolic ref skipped |
| 696 | assert!(can_apply_state(&event, repo_path)); | 807 | assert!(can_apply_state(&event, repo_path)); |
| 697 | } | 808 | } |
| 809 | |||
| 810 | #[test] | ||
| 811 | fn test_diagnose_state_mismatch_missing_ref() { | ||
| 812 | // State declares both main and test branches | ||
| 813 | let event = create_test_state_event( | ||
| 814 | "test-repo", | ||
| 815 | vec![("refs/heads/main", "abc123"), ("refs/heads/test", "def456")], | ||
| 816 | ); | ||
| 817 | |||
| 818 | // Push only creates test branch | ||
| 819 | let pushed_updates = vec![RefUpdate { | ||
| 820 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 821 | new_oid: "def456".to_string(), | ||
| 822 | ref_name: "refs/heads/test".to_string(), | ||
| 823 | }]; | ||
| 824 | |||
| 825 | // No local refs | ||
| 826 | let local_refs = HashMap::new(); | ||
| 827 | |||
| 828 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 829 | assert!(diagnosis.is_some()); | ||
| 830 | let msg = diagnosis.unwrap(); | ||
| 831 | assert!(msg.contains("refs/heads/main")); | ||
| 832 | assert!(msg.contains("missing")); | ||
| 833 | } | ||
| 834 | |||
| 835 | #[test] | ||
| 836 | fn test_diagnose_state_mismatch_wrong_sha() { | ||
| 837 | // State declares main at abc123 | ||
| 838 | let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]); | ||
| 839 | |||
| 840 | // Push updates main to different SHA | ||
| 841 | let pushed_updates = vec![RefUpdate { | ||
| 842 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 843 | new_oid: "wrong123".to_string(), | ||
| 844 | ref_name: "refs/heads/main".to_string(), | ||
| 845 | }]; | ||
| 846 | |||
| 847 | let local_refs = HashMap::new(); | ||
| 848 | |||
| 849 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 850 | assert!(diagnosis.is_some()); | ||
| 851 | let msg = diagnosis.unwrap(); | ||
| 852 | assert!(msg.contains("refs/heads/main")); | ||
| 853 | assert!(msg.contains("would be at")); | ||
| 854 | assert!(msg.contains("state declares")); | ||
| 855 | } | ||
| 856 | |||
| 857 | #[test] | ||
| 858 | fn test_diagnose_state_mismatch_extra_ref() { | ||
| 859 | // State declares only main | ||
| 860 | let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]); | ||
| 861 | |||
| 862 | // Push creates both main and test | ||
| 863 | let pushed_updates = vec![ | ||
| 864 | RefUpdate { | ||
| 865 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 866 | new_oid: "abc123".to_string(), | ||
| 867 | ref_name: "refs/heads/main".to_string(), | ||
| 868 | }, | ||
| 869 | RefUpdate { | ||
| 870 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 871 | new_oid: "def456".to_string(), | ||
| 872 | ref_name: "refs/heads/test".to_string(), | ||
| 873 | }, | ||
| 874 | ]; | ||
| 875 | |||
| 876 | let local_refs = HashMap::new(); | ||
| 877 | |||
| 878 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 879 | assert!(diagnosis.is_some()); | ||
| 880 | let msg = diagnosis.unwrap(); | ||
| 881 | assert!(msg.contains("refs/heads/test")); | ||
| 882 | assert!(msg.contains("doesn't declare")); | ||
| 883 | } | ||
| 884 | |||
| 885 | #[test] | ||
| 886 | fn test_diagnose_state_mismatch_no_mismatch() { | ||
| 887 | // State declares main | ||
| 888 | let event = create_test_state_event("test-repo", vec![("refs/heads/main", "abc123")]); | ||
| 889 | |||
| 890 | // Push creates main at correct SHA | ||
| 891 | let pushed_updates = vec![RefUpdate { | ||
| 892 | old_oid: "0000000000000000000000000000000000000000".to_string(), | ||
| 893 | new_oid: "abc123".to_string(), | ||
| 894 | ref_name: "refs/heads/main".to_string(), | ||
| 895 | }]; | ||
| 896 | |||
| 897 | let local_refs = HashMap::new(); | ||
| 898 | |||
| 899 | let diagnosis = diagnose_state_mismatch(&event, &pushed_updates, &local_refs); | ||
| 900 | assert!(diagnosis.is_none()); // No mismatch | ||
| 901 | } | ||
| 698 | } | 902 | } |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 9a63bf6..bb6ff54 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -16,11 +16,12 @@ pub mod persistence; | |||
| 16 | pub mod sync; | 16 | pub mod sync; |
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, diagnose_state_mismatch, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{AnnouncementPurgatoryEntry, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{AnnouncementPurgatoryEntry, EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use nostr_sdk::ToBech32; | ||
| 24 | use serde::{Deserialize, Serialize}; | 25 | use serde::{Deserialize, Serialize}; |
| 25 | use std::collections::HashMap; | 26 | use std::collections::HashMap; |
| 26 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| @@ -64,6 +65,9 @@ struct SerializableStatePurgatoryEntry { | |||
| 64 | created_at_offset_secs: u64, | 65 | created_at_offset_secs: u64, |
| 65 | /// Duration offset from saved_at for expires_at | 66 | /// Duration offset from saved_at for expires_at |
| 66 | expires_at_offset_secs: u64, | 67 | expires_at_offset_secs: u64, |
| 68 | /// Source of this event (direct submission vs sync) | ||
| 69 | #[serde(default)] | ||
| 70 | source: types::EventSource, | ||
| 67 | } | 71 | } |
| 68 | 72 | ||
| 69 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | 73 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. |
| @@ -81,6 +85,9 @@ struct SerializablePrPurgatoryEntry { | |||
| 81 | created_at_offset_secs: u64, | 85 | created_at_offset_secs: u64, |
| 82 | /// Duration offset from saved_at for expires_at | 86 | /// Duration offset from saved_at for expires_at |
| 83 | expires_at_offset_secs: u64, | 87 | expires_at_offset_secs: u64, |
| 88 | /// Source of this event (direct submission vs sync) | ||
| 89 | #[serde(default)] | ||
| 90 | source: types::EventSource, | ||
| 84 | } | 91 | } |
| 85 | 92 | ||
| 86 | /// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. | 93 | /// Serializable wrapper for `AnnouncementPurgatoryEntry` with time offsets. |
| @@ -313,11 +320,38 @@ impl Purgatory { | |||
| 313 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately | 320 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately |
| 314 | /// to override this delay. | 321 | /// to override this delay. |
| 315 | /// | 322 | /// |
| 323 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 324 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 325 | /// | ||
| 316 | /// # Arguments | 326 | /// # Arguments |
| 317 | /// * `event` - The state event (kind 30618) to hold | 327 | /// * `event` - The state event (kind 30618) to hold |
| 318 | /// * `identifier` - The repository identifier from the 'd' tag | 328 | /// * `identifier` - The repository identifier from the 'd' tag |
| 319 | /// * `author` - The event author's public key | 329 | /// * `author` - The event author's public key |
| 320 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 330 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 331 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) { | ||
| 332 | let source = if from_sync { | ||
| 333 | types::EventSource::Sync | ||
| 334 | } else { | ||
| 335 | types::EventSource::Direct | ||
| 336 | }; | ||
| 337 | |||
| 338 | // Check if event already exists - if so, potentially upgrade source | ||
| 339 | if let Some(mut entries) = self.state_events.get_mut(&identifier) { | ||
| 340 | if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) { | ||
| 341 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 342 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 343 | existing.source = types::EventSource::Direct; | ||
| 344 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 345 | tracing::debug!( | ||
| 346 | event_id = %event.id, | ||
| 347 | identifier = %identifier, | ||
| 348 | "Upgraded purgatory entry source from Sync to Direct, reset expiry" | ||
| 349 | ); | ||
| 350 | } | ||
| 351 | return; // Event already exists, don't add duplicate | ||
| 352 | } | ||
| 353 | } | ||
| 354 | |||
| 321 | let now = Instant::now(); | 355 | let now = Instant::now(); |
| 322 | let entry = StatePurgatoryEntry { | 356 | let entry = StatePurgatoryEntry { |
| 323 | event, | 357 | event, |
| @@ -325,6 +359,7 @@ impl Purgatory { | |||
| 325 | author, | 359 | author, |
| 326 | created_at: now, | 360 | created_at: now, |
| 327 | expires_at: now + DEFAULT_EXPIRY, | 361 | expires_at: now + DEFAULT_EXPIRY, |
| 362 | source, | ||
| 328 | }; | 363 | }; |
| 329 | 364 | ||
| 330 | self.state_events | 365 | self.state_events |
| @@ -344,11 +379,35 @@ impl Purgatory { | |||
| 344 | /// Automatically enqueues the referenced repository identifier for background sync | 379 | /// Automatically enqueues the referenced repository identifier for background sync |
| 345 | /// with the default delay (3 minutes), giving time for a git push to arrive. | 380 | /// with the default delay (3 minutes), giving time for a git push to arrive. |
| 346 | /// | 381 | /// |
| 382 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 383 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 384 | /// | ||
| 347 | /// # Arguments | 385 | /// # Arguments |
| 348 | /// * `event` - The PR event (kind 1617/1618) to hold | 386 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 349 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 387 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 350 | /// * `commit` - The commit SHA from the 'c' tag | 388 | /// * `commit` - The commit SHA from the 'c' tag |
| 351 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 389 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 390 | pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) { | ||
| 391 | let source = if from_sync { | ||
| 392 | types::EventSource::Sync | ||
| 393 | } else { | ||
| 394 | types::EventSource::Direct | ||
| 395 | }; | ||
| 396 | |||
| 397 | // Check if event already exists - if so, potentially upgrade source | ||
| 398 | if let Some(mut existing) = self.pr_events.get_mut(&event_id) { | ||
| 399 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 400 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 401 | existing.source = types::EventSource::Direct; | ||
| 402 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 403 | tracing::debug!( | ||
| 404 | event_id = %event_id, | ||
| 405 | "Upgraded PR purgatory entry source from Sync to Direct, reset expiry" | ||
| 406 | ); | ||
| 407 | } | ||
| 408 | return; // Event already exists, don't add duplicate | ||
| 409 | } | ||
| 410 | |||
| 352 | // Extract identifier from the event's `a` tag for sync enqueueing | 411 | // Extract identifier from the event's `a` tag for sync enqueueing |
| 353 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | 412 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); |
| 354 | 413 | ||
| @@ -358,6 +417,7 @@ impl Purgatory { | |||
| 358 | commit, | 417 | commit, |
| 359 | created_at: now, | 418 | created_at: now, |
| 360 | expires_at: now + DEFAULT_EXPIRY, | 419 | expires_at: now + DEFAULT_EXPIRY, |
| 420 | source, | ||
| 361 | }; | 421 | }; |
| 362 | 422 | ||
| 363 | self.pr_events.insert(event_id, entry); | 423 | self.pr_events.insert(event_id, entry); |
| @@ -371,6 +431,8 @@ impl Purgatory { | |||
| 371 | /// Add a PR placeholder (git data arrived before PR event). | 431 | /// Add a PR placeholder (git data arrived before PR event). |
| 372 | /// | 432 | /// |
| 373 | /// Creates a placeholder entry waiting for the corresponding PR event. | 433 | /// Creates a placeholder entry waiting for the corresponding PR event. |
| 434 | /// Placeholders are always marked as `Direct` source since they originate | ||
| 435 | /// from git pushes (direct user action). | ||
| 374 | /// | 436 | /// |
| 375 | /// # Arguments | 437 | /// # Arguments |
| 376 | /// * `event_id` - The expected event ID (from git ref name) | 438 | /// * `event_id` - The expected event ID (from git ref name) |
| @@ -382,6 +444,7 @@ impl Purgatory { | |||
| 382 | commit, | 444 | commit, |
| 383 | created_at: now, | 445 | created_at: now, |
| 384 | expires_at: now + DEFAULT_EXPIRY, | 446 | expires_at: now + DEFAULT_EXPIRY, |
| 447 | source: types::EventSource::Direct, // Git pushes are direct user actions | ||
| 385 | }; | 448 | }; |
| 386 | 449 | ||
| 387 | self.pr_events.insert(event_id, entry); | 450 | self.pr_events.insert(event_id, entry); |
| @@ -892,6 +955,9 @@ impl Purgatory { | |||
| 892 | /// prevent infinite re-sync loops. Events that expire without finding git data | 955 | /// prevent infinite re-sync loops. Events that expire without finding git data |
| 893 | /// will be filtered out during future negentropy/REQ sync operations. | 956 | /// will be filtered out during future negentropy/REQ sync operations. |
| 894 | /// | 957 | /// |
| 958 | /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event | ||
| 959 | /// to support migration scripts and operational monitoring. | ||
| 960 | /// | ||
| 895 | /// # Returns | 961 | /// # Returns |
| 896 | /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) | 962 | /// Tuple of (num_announcement_removed, num_state_removed, num_pr_removed) |
| 897 | pub fn cleanup(&self) -> (usize, usize, usize) { | 963 | pub fn cleanup(&self) -> (usize, usize, usize) { |
| @@ -976,18 +1042,38 @@ impl Purgatory { | |||
| 976 | let mut state_removed = 0; | 1042 | let mut state_removed = 0; |
| 977 | 1043 | ||
| 978 | // Remove expired state events and mark them as expired | 1044 | // Remove expired state events and mark them as expired |
| 979 | self.state_events.retain(|_, entries| { | 1045 | self.state_events.retain(|identifier, entries| { |
| 980 | let original_len = entries.len(); | 1046 | let original_len = entries.len(); |
| 981 | // Collect event IDs before removing | ||
| 982 | let expired_ids: Vec<EventId> = entries | ||
| 983 | .iter() | ||
| 984 | .filter(|entry| entry.expires_at <= now) | ||
| 985 | .map(|entry| entry.event.id) | ||
| 986 | .collect(); | ||
| 987 | 1047 | ||
| 988 | // Mark as expired to prevent re-sync | 1048 | // Log and collect expired entries before removing |
| 989 | for event_id in expired_ids { | 1049 | for entry in entries.iter().filter(|e| e.expires_at <= now) { |
| 990 | self.mark_expired(event_id); | 1050 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); |
| 1051 | let event_id_short = &entry.event.id.to_hex()[..12]; | ||
| 1052 | let source_str = if entry.source.is_direct() { "direct" } else { "sync" }; | ||
| 1053 | |||
| 1054 | // Structured log for migration scripts | ||
| 1055 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 1056 | if entry.source.is_direct() { | ||
| 1057 | tracing::warn!( | ||
| 1058 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1059 | identifier, | ||
| 1060 | npub, | ||
| 1061 | event_id_short, | ||
| 1062 | entry.event.kind.as_u16(), | ||
| 1063 | source_str | ||
| 1064 | ); | ||
| 1065 | } else { | ||
| 1066 | tracing::debug!( | ||
| 1067 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1068 | identifier, | ||
| 1069 | npub, | ||
| 1070 | event_id_short, | ||
| 1071 | entry.event.kind.as_u16(), | ||
| 1072 | source_str | ||
| 1073 | ); | ||
| 1074 | } | ||
| 1075 | |||
| 1076 | self.mark_expired(entry.event.id); | ||
| 991 | } | 1077 | } |
| 992 | 1078 | ||
| 993 | // Remove expired entries | 1079 | // Remove expired entries |
| @@ -997,21 +1083,103 @@ impl Purgatory { | |||
| 997 | }); | 1083 | }); |
| 998 | 1084 | ||
| 999 | // Remove expired PR events and mark them as expired | 1085 | // Remove expired PR events and mark them as expired |
| 1000 | let expired_prs: Vec<(String, Option<EventId>)> = self | 1086 | let expired_prs: Vec<_> = self |
| 1001 | .pr_events | 1087 | .pr_events |
| 1002 | .iter() | 1088 | .iter() |
| 1003 | .filter(|entry| entry.value().expires_at <= now) | 1089 | .filter(|entry| entry.value().expires_at <= now) |
| 1004 | .map(|entry| { | 1090 | .map(|entry| { |
| 1005 | let event_id = entry.value().event.as_ref().map(|e| e.id); | 1091 | let pr_entry = entry.value(); |
| 1006 | (entry.key().clone(), event_id) | 1092 | let event_id_str = entry.key().clone(); |
| 1093 | let event_opt = pr_entry.event.clone(); | ||
| 1094 | let commit = pr_entry.commit.clone(); | ||
| 1095 | let source = pr_entry.source; | ||
| 1096 | (event_id_str, event_opt, commit, source) | ||
| 1007 | }) | 1097 | }) |
| 1008 | .collect(); | 1098 | .collect(); |
| 1009 | 1099 | ||
| 1010 | let pr_removed = expired_prs.len(); | 1100 | let pr_removed = expired_prs.len(); |
| 1011 | for (event_id_str, event_id_opt) in expired_prs { | 1101 | for (event_id_str, event_opt, commit, source) in expired_prs { |
| 1012 | // Mark actual PR events as expired (not placeholders) | 1102 | // Log structured entry for PR events (not placeholders) |
| 1013 | if let Some(event_id) = event_id_opt { | 1103 | if let Some(ref event) = event_opt { |
| 1014 | self.mark_expired(event_id); | 1104 | let npub = event |
| 1105 | .pubkey | ||
| 1106 | .to_bech32() | ||
| 1107 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 1108 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 1109 | let source_str = if source.is_direct() { "direct" } else { "sync" }; | ||
| 1110 | |||
| 1111 | // Extract ALL repo identifiers from 'a' tags | ||
| 1112 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 1113 | let repos: Vec<String> = event | ||
| 1114 | .tags | ||
| 1115 | .iter() | ||
| 1116 | .filter_map(|tag| { | ||
| 1117 | let tag_vec = tag.clone().to_vec(); | ||
| 1118 | if tag_vec.len() >= 2 | ||
| 1119 | && tag_vec[0] == "a" | ||
| 1120 | && tag_vec[1].starts_with("30617:") | ||
| 1121 | { | ||
| 1122 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 1123 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 1124 | if parts.len() >= 3 { | ||
| 1125 | Some(parts[2].to_string()) | ||
| 1126 | } else { | ||
| 1127 | None | ||
| 1128 | } | ||
| 1129 | } else { | ||
| 1130 | None | ||
| 1131 | } | ||
| 1132 | }) | ||
| 1133 | .collect(); | ||
| 1134 | |||
| 1135 | // Deduplicate while preserving order | ||
| 1136 | let mut seen = std::collections::HashSet::new(); | ||
| 1137 | let unique_repos: Vec<String> = repos | ||
| 1138 | .into_iter() | ||
| 1139 | .filter(|r| seen.insert(r.clone())) | ||
| 1140 | .collect(); | ||
| 1141 | |||
| 1142 | let repos_to_log = if unique_repos.is_empty() { | ||
| 1143 | vec!["unknown".to_string()] | ||
| 1144 | } else { | ||
| 1145 | unique_repos | ||
| 1146 | }; | ||
| 1147 | |||
| 1148 | // Structured log for migration scripts - log once per repo | ||
| 1149 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 1150 | for repo in &repos_to_log { | ||
| 1151 | if source.is_direct() { | ||
| 1152 | tracing::warn!( | ||
| 1153 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1154 | repo, | ||
| 1155 | npub, | ||
| 1156 | event_id_short, | ||
| 1157 | event.kind.as_u16(), | ||
| 1158 | &commit[..commit.len().min(12)], | ||
| 1159 | source_str | ||
| 1160 | ); | ||
| 1161 | } else { | ||
| 1162 | tracing::debug!( | ||
| 1163 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 1164 | repo, | ||
| 1165 | npub, | ||
| 1166 | event_id_short, | ||
| 1167 | event.kind.as_u16(), | ||
| 1168 | &commit[..commit.len().min(12)], | ||
| 1169 | source_str | ||
| 1170 | ); | ||
| 1171 | } | ||
| 1172 | } | ||
| 1173 | |||
| 1174 | self.mark_expired(event.id); | ||
| 1175 | } else { | ||
| 1176 | // Placeholder (git data arrived first, but PR event never came) | ||
| 1177 | // Placeholders are always Direct source (from git push) | ||
| 1178 | tracing::debug!( | ||
| 1179 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"", | ||
| 1180 | &event_id_str[..event_id_str.len().min(12)], | ||
| 1181 | &commit[..commit.len().min(12)] | ||
| 1182 | ); | ||
| 1015 | } | 1183 | } |
| 1016 | self.pr_events.remove(&event_id_str); | 1184 | self.pr_events.remove(&event_id_str); |
| 1017 | } | 1185 | } |
| @@ -1191,6 +1359,7 @@ impl Purgatory { | |||
| 1191 | author: e.author, | 1359 | author: e.author, |
| 1192 | created_at_offset_secs: created_offset.as_secs(), | 1360 | created_at_offset_secs: created_offset.as_secs(), |
| 1193 | expires_at_offset_secs: expires_offset.as_secs(), | 1361 | expires_at_offset_secs: expires_offset.as_secs(), |
| 1362 | source: e.source, | ||
| 1194 | } | 1363 | } |
| 1195 | }) | 1364 | }) |
| 1196 | .collect(); | 1365 | .collect(); |
| @@ -1213,6 +1382,7 @@ impl Purgatory { | |||
| 1213 | commit: e.commit.clone(), | 1382 | commit: e.commit.clone(), |
| 1214 | created_at_offset_secs: created_offset.as_secs(), | 1383 | created_at_offset_secs: created_offset.as_secs(), |
| 1215 | expires_at_offset_secs: expires_offset.as_secs(), | 1384 | expires_at_offset_secs: expires_offset.as_secs(), |
| 1385 | source: e.source, | ||
| 1216 | }; | 1386 | }; |
| 1217 | pr_events.insert(event_id, serializable); | 1387 | pr_events.insert(event_id, serializable); |
| 1218 | } | 1388 | } |
| @@ -1355,6 +1525,7 @@ impl Purgatory { | |||
| 1355 | author: e.author, | 1525 | author: e.author, |
| 1356 | created_at, | 1526 | created_at, |
| 1357 | expires_at, | 1527 | expires_at, |
| 1528 | source: e.source, | ||
| 1358 | } | 1529 | } |
| 1359 | }) | 1530 | }) |
| 1360 | .collect(); | 1531 | .collect(); |
| @@ -1380,6 +1551,7 @@ impl Purgatory { | |||
| 1380 | commit: e.commit, | 1551 | commit: e.commit, |
| 1381 | created_at, | 1552 | created_at, |
| 1382 | expires_at, | 1553 | expires_at, |
| 1554 | source: e.source, | ||
| 1383 | }; | 1555 | }; |
| 1384 | 1556 | ||
| 1385 | self.pr_events.insert(event_id, entry); | 1557 | self.pr_events.insert(event_id, entry); |
| @@ -1439,8 +1611,18 @@ mod tests { | |||
| 1439 | .sign_with_keys(&keys) | 1611 | .sign_with_keys(&keys) |
| 1440 | .unwrap(); | 1612 | .unwrap(); |
| 1441 | 1613 | ||
| 1442 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); | 1614 | purgatory.add_state( |
| 1443 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); | 1615 | event.clone(), |
| 1616 | "test-repo".to_string(), | ||
| 1617 | keys.public_key(), | ||
| 1618 | false, | ||
| 1619 | ); | ||
| 1620 | purgatory.add_pr( | ||
| 1621 | event, | ||
| 1622 | "test-event-id".to_string(), | ||
| 1623 | "abc123".to_string(), | ||
| 1624 | false, | ||
| 1625 | ); | ||
| 1444 | 1626 | ||
| 1445 | let (announcement_count, state_count, pr_count) = purgatory.count(); | 1627 | let (announcement_count, state_count, pr_count) = purgatory.count(); |
| 1446 | assert_eq!(announcement_count, 0); | 1628 | assert_eq!(announcement_count, 0); |
| @@ -1492,7 +1674,7 @@ mod tests { | |||
| 1492 | let event = EventBuilder::text_note("state") | 1674 | let event = EventBuilder::text_note("state") |
| 1493 | .sign_with_keys(&keys) | 1675 | .sign_with_keys(&keys) |
| 1494 | .unwrap(); | 1676 | .unwrap(); |
| 1495 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | 1677 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false); |
| 1496 | 1678 | ||
| 1497 | // Now should have pending events | 1679 | // Now should have pending events |
| 1498 | assert!(purgatory.has_pending_events("test-repo")); | 1680 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1522,7 +1704,12 @@ mod tests { | |||
| 1522 | .sign_with_keys(&keys) | 1704 | .sign_with_keys(&keys) |
| 1523 | .unwrap(); | 1705 | .unwrap(); |
| 1524 | 1706 | ||
| 1525 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | 1707 | purgatory.add_pr( |
| 1708 | event, | ||
| 1709 | "pr-event-id".to_string(), | ||
| 1710 | "commit123".to_string(), | ||
| 1711 | false, | ||
| 1712 | ); | ||
| 1526 | 1713 | ||
| 1527 | // Now should have pending events for test-repo | 1714 | // Now should have pending events for test-repo |
| 1528 | assert!(purgatory.has_pending_events("test-repo")); | 1715 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1587,6 +1774,7 @@ fn test_pr_event_vs_placeholder() { | |||
| 1587 | event.clone(), | 1774 | event.clone(), |
| 1588 | "event-id-1".to_string(), | 1775 | "event-id-1".to_string(), |
| 1589 | "commit-abc".to_string(), | 1776 | "commit-abc".to_string(), |
| 1777 | false, | ||
| 1590 | ); | 1778 | ); |
| 1591 | 1779 | ||
| 1592 | // Add a placeholder (no event) | 1780 | // Add a placeholder (no event) |
| @@ -1643,8 +1831,14 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1643 | state_event.clone(), | 1831 | state_event.clone(), |
| 1644 | "test-repo".to_string(), | 1832 | "test-repo".to_string(), |
| 1645 | keys.public_key(), | 1833 | keys.public_key(), |
| 1834 | false, | ||
| 1835 | ); | ||
| 1836 | purgatory.add_pr( | ||
| 1837 | pr_event, | ||
| 1838 | "pr-123".to_string(), | ||
| 1839 | "commit-abc".to_string(), | ||
| 1840 | false, | ||
| 1646 | ); | 1841 | ); |
| 1647 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | ||
| 1648 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1842 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1649 | 1843 | ||
| 1650 | // Verify entries are there | 1844 | // Verify entries are there |
| @@ -1691,8 +1885,18 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1691 | .unwrap(); | 1885 | .unwrap(); |
| 1692 | 1886 | ||
| 1693 | // Add fresh entries | 1887 | // Add fresh entries |
| 1694 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); | 1888 | purgatory.add_state( |
| 1695 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1889 | state_event, |
| 1890 | "test-repo".to_string(), | ||
| 1891 | keys.public_key(), | ||
| 1892 | false, | ||
| 1893 | ); | ||
| 1894 | purgatory.add_pr( | ||
| 1895 | pr_event, | ||
| 1896 | "pr-123".to_string(), | ||
| 1897 | "commit-abc".to_string(), | ||
| 1898 | false, | ||
| 1899 | ); | ||
| 1696 | 1900 | ||
| 1697 | // Run cleanup | 1901 | // Run cleanup |
| 1698 | let (_, state_removed, pr_removed) = purgatory.cleanup(); | 1902 | let (_, state_removed, pr_removed) = purgatory.cleanup(); |
| @@ -1722,8 +1926,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1722 | .sign_with_keys(&keys) | 1926 | .sign_with_keys(&keys) |
| 1723 | .unwrap(); | 1927 | .unwrap(); |
| 1724 | 1928 | ||
| 1725 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); | 1929 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false); |
| 1726 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); | 1930 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false); |
| 1727 | 1931 | ||
| 1728 | // Expire only the first one | 1932 | // Expire only the first one |
| 1729 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { | 1933 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { |
| @@ -1740,8 +1944,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1740 | .sign_with_keys(&keys) | 1944 | .sign_with_keys(&keys) |
| 1741 | .unwrap(); | 1945 | .unwrap(); |
| 1742 | 1946 | ||
| 1743 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); | 1947 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false); |
| 1744 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); | 1948 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false); |
| 1745 | 1949 | ||
| 1746 | // Expire only first PR | 1950 | // Expire only first PR |
| 1747 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { | 1951 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { |
| @@ -1773,8 +1977,8 @@ fn test_remove_expired_legacy_method() { | |||
| 1773 | .unwrap(); | 1977 | .unwrap(); |
| 1774 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); | 1978 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); |
| 1775 | 1979 | ||
| 1776 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1980 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1777 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1981 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1778 | 1982 | ||
| 1779 | // Expire both | 1983 | // Expire both |
| 1780 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1984 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1808,8 +2012,8 @@ fn test_expired_event_tracking() { | |||
| 1808 | let pr_event_id = pr_event.id; | 2012 | let pr_event_id = pr_event.id; |
| 1809 | 2013 | ||
| 1810 | // Add events to purgatory | 2014 | // Add events to purgatory |
| 1811 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 2015 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1812 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 2016 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1813 | 2017 | ||
| 1814 | // Events should not be marked as expired yet | 2018 | // Events should not be marked as expired yet |
| 1815 | assert!(!purgatory.is_expired(&state_event_id)); | 2019 | assert!(!purgatory.is_expired(&state_event_id)); |
| @@ -1861,7 +2065,7 @@ fn test_cleanup_expired_events() { | |||
| 1861 | let event2_id = event2.id; | 2065 | let event2_id = event2.id; |
| 1862 | 2066 | ||
| 1863 | // Add and immediately expire event1 | 2067 | // Add and immediately expire event1 |
| 1864 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); | 2068 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false); |
| 1865 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { | 2069 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { |
| 1866 | for entry in entries.iter_mut() { | 2070 | for entry in entries.iter_mut() { |
| 1867 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2071 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1870,7 +2074,7 @@ fn test_cleanup_expired_events() { | |||
| 1870 | purgatory.cleanup(); | 2074 | purgatory.cleanup(); |
| 1871 | 2075 | ||
| 1872 | // Add and expire event2 (will be more recent) | 2076 | // Add and expire event2 (will be more recent) |
| 1873 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); | 2077 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false); |
| 1874 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { | 2078 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { |
| 1875 | for entry in entries.iter_mut() { | 2079 | for entry in entries.iter_mut() { |
| 1876 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2080 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1912,7 +2116,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1912 | let event_id = event.id; | 2116 | let event_id = event.id; |
| 1913 | 2117 | ||
| 1914 | // Add event to purgatory | 2118 | // Add event to purgatory |
| 1915 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2119 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1916 | 2120 | ||
| 1917 | // Expire it | 2121 | // Expire it |
| 1918 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 2122 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1932,7 +2136,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1932 | // This simulates what negentropy/REQ+EOSE should do: | 2136 | // This simulates what negentropy/REQ+EOSE should do: |
| 1933 | // Check if event is in event_ids() before adding | 2137 | // Check if event is in event_ids() before adding |
| 1934 | if !ids.contains(&event_id) { | 2138 | if !ids.contains(&event_id) { |
| 1935 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2139 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1936 | } | 2140 | } |
| 1937 | 2141 | ||
| 1938 | // Event should NOT be re-added | 2142 | // Event should NOT be re-added |
| @@ -1975,7 +2179,7 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1975 | let event_id = event.id; | 2179 | let event_id = event.id; |
| 1976 | 2180 | ||
| 1977 | // Add event to purgatory | 2181 | // Add event to purgatory |
| 1978 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2182 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1979 | 2183 | ||
| 1980 | // Expire it | 2184 | // Expire it |
| 1981 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 2185 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -2024,8 +2228,18 @@ async fn test_save_and_restore_state_events() { | |||
| 2024 | let event1_id = event1.id; | 2228 | let event1_id = event1.id; |
| 2025 | let event2_id = event2.id; | 2229 | let event2_id = event2.id; |
| 2026 | 2230 | ||
| 2027 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | 2231 | purgatory.add_state( |
| 2028 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | 2232 | event1.clone(), |
| 2233 | "test-repo".to_string(), | ||
| 2234 | keys.public_key(), | ||
| 2235 | false, | ||
| 2236 | ); | ||
| 2237 | purgatory.add_state( | ||
| 2238 | event2.clone(), | ||
| 2239 | "test-repo".to_string(), | ||
| 2240 | keys.public_key(), | ||
| 2241 | false, | ||
| 2242 | ); | ||
| 2029 | 2243 | ||
| 2030 | // Save to disk | 2244 | // Save to disk |
| 2031 | purgatory.save_to_disk(&state_file).unwrap(); | 2245 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2087,6 +2301,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 2087 | pr_event.clone(), | 2301 | pr_event.clone(), |
| 2088 | "pr-event-id".to_string(), | 2302 | "pr-event-id".to_string(), |
| 2089 | "commit-abc".to_string(), | 2303 | "commit-abc".to_string(), |
| 2304 | false, | ||
| 2090 | ); | 2305 | ); |
| 2091 | 2306 | ||
| 2092 | // Save to disk | 2307 | // Save to disk |
| @@ -2156,7 +2371,7 @@ async fn test_save_and_restore_expired_events() { | |||
| 2156 | let event_id = event.id; | 2371 | let event_id = event.id; |
| 2157 | 2372 | ||
| 2158 | // Add and expire event | 2373 | // Add and expire event |
| 2159 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2374 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2160 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 2375 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| 2161 | for entry in entries.iter_mut() { | 2376 | for entry in entries.iter_mut() { |
| 2162 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2377 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -2295,7 +2510,7 @@ async fn test_downtime_calculation() { | |||
| 2295 | .sign_with_keys(&keys) | 2510 | .sign_with_keys(&keys) |
| 2296 | .unwrap(); | 2511 | .unwrap(); |
| 2297 | 2512 | ||
| 2298 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2513 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 2299 | 2514 | ||
| 2300 | // Get original expiry time | 2515 | // Get original expiry time |
| 2301 | let original_entries = purgatory.find_state("repo"); | 2516 | let original_entries = purgatory.find_state("repo"); |
| @@ -2351,7 +2566,7 @@ async fn test_expiry_times_preserved() { | |||
| 2351 | .sign_with_keys(&keys) | 2566 | .sign_with_keys(&keys) |
| 2352 | .unwrap(); | 2567 | .unwrap(); |
| 2353 | 2568 | ||
| 2354 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2569 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 2355 | 2570 | ||
| 2356 | // Manually set expiry to a specific time in the future | 2571 | // Manually set expiry to a specific time in the future |
| 2357 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | 2572 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes |
| @@ -2410,16 +2625,19 @@ async fn test_multiple_state_events_same_identifier() { | |||
| 2410 | event1.clone(), | 2625 | event1.clone(), |
| 2411 | "shared-repo".to_string(), | 2626 | "shared-repo".to_string(), |
| 2412 | keys1.public_key(), | 2627 | keys1.public_key(), |
| 2628 | false, | ||
| 2413 | ); | 2629 | ); |
| 2414 | purgatory.add_state( | 2630 | purgatory.add_state( |
| 2415 | event2.clone(), | 2631 | event2.clone(), |
| 2416 | "shared-repo".to_string(), | 2632 | "shared-repo".to_string(), |
| 2417 | keys2.public_key(), | 2633 | keys2.public_key(), |
| 2634 | false, | ||
| 2418 | ); | 2635 | ); |
| 2419 | purgatory.add_state( | 2636 | purgatory.add_state( |
| 2420 | event3.clone(), | 2637 | event3.clone(), |
| 2421 | "shared-repo".to_string(), | 2638 | "shared-repo".to_string(), |
| 2422 | keys3.public_key(), | 2639 | keys3.public_key(), |
| 2640 | false, | ||
| 2423 | ); | 2641 | ); |
| 2424 | 2642 | ||
| 2425 | // Save to disk | 2643 | // Save to disk |
| @@ -2466,6 +2684,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2466 | pr_event.clone(), | 2684 | pr_event.clone(), |
| 2467 | "pr-with-event".to_string(), | 2685 | "pr-with-event".to_string(), |
| 2468 | "commit-abc".to_string(), | 2686 | "commit-abc".to_string(), |
| 2687 | false, | ||
| 2469 | ); | 2688 | ); |
| 2470 | 2689 | ||
| 2471 | // Add PR placeholder | 2690 | // Add PR placeholder |
| @@ -2511,7 +2730,7 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2511 | let event = EventBuilder::text_note("test") | 2730 | let event = EventBuilder::text_note("test") |
| 2512 | .sign_with_keys(&keys) | 2731 | .sign_with_keys(&keys) |
| 2513 | .unwrap(); | 2732 | .unwrap(); |
| 2514 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2733 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2515 | 2734 | ||
| 2516 | // Save to disk | 2735 | // Save to disk |
| 2517 | purgatory.save_to_disk(&state_file).unwrap(); | 2736 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2697,8 +2916,18 @@ async fn test_comprehensive_roundtrip() { | |||
| 2697 | .sign_with_keys(&keys2) | 2916 | .sign_with_keys(&keys2) |
| 2698 | .unwrap(); | 2917 | .unwrap(); |
| 2699 | 2918 | ||
| 2700 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | 2919 | purgatory.add_state( |
| 2701 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | 2920 | state1.clone(), |
| 2921 | "repo1".to_string(), | ||
| 2922 | keys1.public_key(), | ||
| 2923 | false, | ||
| 2924 | ); | ||
| 2925 | purgatory.add_state( | ||
| 2926 | state2.clone(), | ||
| 2927 | "repo2".to_string(), | ||
| 2928 | keys2.public_key(), | ||
| 2929 | false, | ||
| 2930 | ); | ||
| 2702 | 2931 | ||
| 2703 | // Add PR event | 2932 | // Add PR event |
| 2704 | let tags = vec![Tag::custom( | 2933 | let tags = vec![Tag::custom( |
| @@ -2709,7 +2938,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2709 | .tags(tags) | 2938 | .tags(tags) |
| 2710 | .sign_with_keys(&keys1) | 2939 | .sign_with_keys(&keys1) |
| 2711 | .unwrap(); | 2940 | .unwrap(); |
| 2712 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | 2941 | purgatory.add_pr( |
| 2942 | pr_event.clone(), | ||
| 2943 | "pr-1".to_string(), | ||
| 2944 | "commit-1".to_string(), | ||
| 2945 | false, | ||
| 2946 | ); | ||
| 2713 | 2947 | ||
| 2714 | // Add PR placeholder | 2948 | // Add PR placeholder |
| 2715 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | 2949 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); |
| @@ -2719,7 +2953,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2719 | .sign_with_keys(&keys1) | 2953 | .sign_with_keys(&keys1) |
| 2720 | .unwrap(); | 2954 | .unwrap(); |
| 2721 | let expired_id = expired_event.id; | 2955 | let expired_id = expired_event.id; |
| 2722 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | 2956 | purgatory.add_state( |
| 2957 | expired_event, | ||
| 2958 | "repo3".to_string(), | ||
| 2959 | keys1.public_key(), | ||
| 2960 | false, | ||
| 2961 | ); | ||
| 2723 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | 2962 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { |
| 2724 | for entry in entries.iter_mut() { | 2963 | for entry in entries.iter_mut() { |
| 2725 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2964 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index ece8cd6..8297515 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -375,94 +375,121 @@ impl SyncContext for RealSyncContext { | |||
| 375 | let naughty_list = self.git_naughty_list.clone(); | 375 | let naughty_list = self.git_naughty_list.clone(); |
| 376 | 376 | ||
| 377 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { | 377 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { |
| 378 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history | 378 | let mut remaining_oids = missing_oids.clone(); |
| 379 | let mut args = vec!["fetch", &url]; | 379 | let mut missing_from_remote: Vec<String> = Vec::new(); |
| 380 | args.extend(missing_oids.iter().map(|s| s.as_str())); | 380 | |
| 381 | 381 | // Retry loop: keep fetching until success or no OIDs left | |
| 382 | let output = Command::new("git") | 382 | loop { |
| 383 | .args(&args) | 383 | if remaining_oids.is_empty() { |
| 384 | .current_dir(&repo_path) | 384 | // All OIDs were missing from remote |
| 385 | .output(); | 385 | debug!( |
| 386 | 386 | url = %url, | |
| 387 | match output { | 387 | missing_count = missing_from_remote.len(), |
| 388 | Ok(result) if result.status.success() => { | 388 | "All requested OIDs missing from remote" |
| 389 | // Count how many OIDs we now have | 389 | ); |
| 390 | let fetched: Vec<String> = missing_oids | 390 | return Ok(vec![]); |
| 391 | .iter() | ||
| 392 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) | ||
| 393 | .cloned() | ||
| 394 | .collect(); | ||
| 395 | |||
| 396 | debug!(fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 397 | |||
| 398 | Ok(fetched) | ||
| 399 | } | 391 | } |
| 400 | Ok(result) => { | 392 | |
| 401 | let stderr = String::from_utf8_lossy(&result.stderr); | 393 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history |
| 402 | 394 | let mut args = vec!["fetch".to_string(), url.clone()]; | |
| 403 | // Extract domain and classify error for naughty list | 395 | args.extend(remaining_oids.iter().cloned()); |
| 404 | if let Some(domain) = extract_domain(&url) { | 396 | |
| 405 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | 397 | let output = Command::new("git") |
| 406 | let is_new = naughty_list.record(&domain, category, stderr.to_string()); | 398 | .args(&args) |
| 407 | 399 | .current_dir(&repo_path) | |
| 408 | if is_new { | 400 | .output(); |
| 409 | tracing::warn!( | 401 | |
| 410 | domain = %domain, | 402 | match output { |
| 411 | category = %category, | 403 | Ok(result) if result.status.success() => { |
| 412 | error = %stderr, | 404 | // Fetch succeeded - count how many OIDs we now have |
| 413 | "Git remote domain added to naughty list" | 405 | let fetched: Vec<String> = missing_oids |
| 414 | ); | 406 | .iter() |
| 415 | } else { | 407 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) |
| 416 | debug!( | 408 | .cloned() |
| 417 | domain = %domain, | 409 | .collect(); |
| 418 | category = %category, | 410 | |
| 419 | "Git remote domain still on naughty list" | 411 | if !missing_from_remote.is_empty() { |
| 420 | ); | 412 | debug!( |
| 421 | } | 413 | url = %url, |
| 414 | fetched_count = fetched.len(), | ||
| 415 | missing_count = missing_from_remote.len(), | ||
| 416 | missing_oids = ?missing_from_remote, | ||
| 417 | "Fetch completed after retries - some OIDs were missing from remote" | ||
| 418 | ); | ||
| 419 | } else { | ||
| 420 | debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 422 | } | 421 | } |
| 422 | |||
| 423 | return Ok(fetched); | ||
| 423 | } | 424 | } |
| 425 | Ok(result) => { | ||
| 426 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 424 | 427 | ||
| 425 | // Check for "not our ref" errors and provide a clearer error message | 428 | // Check for "not our ref" error - this is retryable |
| 426 | let error_msg = if stderr.contains("upload-pack: not our ref") { | 429 | if stderr.contains("upload-pack: not our ref") { |
| 427 | // Parse out the missing OID from stderr (git only reports one at a time) | 430 | // Parse out the missing OID from stderr |
| 428 | let missing_oid = stderr | 431 | let missing_oid = stderr.lines().find_map(|line| { |
| 429 | .lines() | ||
| 430 | .find_map(|line| { | ||
| 431 | if line.contains("not our ref") { | 432 | if line.contains("not our ref") { |
| 432 | // Extract the OID from lines like: | 433 | // Extract the OID from lines like: |
| 433 | // "fatal: remote error: upload-pack: not our ref <oid>" | 434 | // "fatal: remote error: upload-pack: not our ref <oid>" |
| 434 | line.split("not our ref").nth(1).map(|s| s.trim().to_string()) | 435 | line.split("not our ref") |
| 436 | .nth(1) | ||
| 437 | .map(|s| s.trim().to_string()) | ||
| 435 | } else { | 438 | } else { |
| 436 | None | 439 | None |
| 437 | } | 440 | } |
| 438 | }); | 441 | }); |
| 439 | 442 | ||
| 440 | let total_requested = missing_oids.len(); | 443 | if let Some(ref oid) = missing_oid { |
| 444 | // Remove the missing OID and retry with remaining | ||
| 445 | remaining_oids.retain(|o| o != oid); | ||
| 446 | missing_from_remote.push(oid.clone()); | ||
| 441 | 447 | ||
| 442 | if let Some(oid) = missing_oid { | 448 | debug!( |
| 443 | if total_requested > 1 { | ||
| 444 | // BUG: Git stops at first missing OID, so we don't know if the others exist | ||
| 445 | // We need retry logic to fetch remaining OIDs individually | ||
| 446 | tracing::warn!( | ||
| 447 | url = %url, | 449 | url = %url, |
| 448 | missing_oid = %oid, | 450 | missing_oid = %oid, |
| 449 | total_requested = total_requested, | 451 | remaining_count = remaining_oids.len(), |
| 450 | "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." | 452 | "OID not found on remote, retrying with remaining OIDs" |
| 451 | ); | 453 | ); |
| 452 | format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) | 454 | |
| 453 | } else { | 455 | continue; // Retry with remaining OIDs |
| 454 | format!("remote missing only oid requested: {}", oid) | 456 | } |
| 457 | } | ||
| 458 | |||
| 459 | // Non-retryable error - record to naughty list and return error | ||
| 460 | if let Some(domain) = extract_domain(&url) { | ||
| 461 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | ||
| 462 | let is_new = | ||
| 463 | naughty_list.record(&domain, category, stderr.to_string()); | ||
| 464 | |||
| 465 | if is_new { | ||
| 466 | tracing::warn!( | ||
| 467 | domain = %domain, | ||
| 468 | category = %category, | ||
| 469 | error = %stderr, | ||
| 470 | "Git remote domain added to naughty list" | ||
| 471 | ); | ||
| 472 | } else { | ||
| 473 | debug!( | ||
| 474 | domain = %domain, | ||
| 475 | category = %category, | ||
| 476 | error = %stderr, | ||
| 477 | "Git fetch failed (domain on naughty list)" | ||
| 478 | ); | ||
| 479 | } | ||
| 455 | } | 480 | } |
| 456 | } else { | ||
| 457 | format!("git fetch failed: {}", stderr) | ||
| 458 | } | 481 | } |
| 459 | } else { | ||
| 460 | format!("git fetch failed: {}", stderr) | ||
| 461 | }; | ||
| 462 | 482 | ||
| 463 | Err(anyhow::anyhow!("{}", error_msg)) | 483 | return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr)); |
| 484 | } | ||
| 485 | Err(e) => { | ||
| 486 | return Err(anyhow::anyhow!( | ||
| 487 | "git fetch command error for {}: {}", | ||
| 488 | url, | ||
| 489 | e | ||
| 490 | )) | ||
| 491 | } | ||
| 464 | } | 492 | } |
| 465 | Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)), | ||
| 466 | } | 493 | } |
| 467 | }) | 494 | }) |
| 468 | .await | 495 | .await |
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index 65d29af..9207d58 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; | 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; |
| 369 | throttle_manager.complete_request(&domain); | 369 | throttle_manager.complete_request(&domain); |
| 370 | 370 | ||
| 371 | let oids_fetched = match fetch_result { | 371 | let fetched_oids = match fetch_result { |
| 372 | Ok(fetched) => { | 372 | Ok(fetched) if !fetched.is_empty() => { |
| 373 | debug!( | 373 | debug!( |
| 374 | identifier = %identifier, | 374 | identifier = %identifier, |
| 375 | url = %url, | 375 | url = %url, |
| 376 | oids_fetched = fetched.len(), | 376 | oids_fetched = fetched.len(), |
| 377 | "Fetch succeeded" | 377 | "Fetch succeeded" |
| 378 | ); | 378 | ); |
| 379 | fetched.len() | 379 | fetched |
| 380 | } | ||
| 381 | Ok(_) => { | ||
| 382 | debug!( | ||
| 383 | identifier = %identifier, | ||
| 384 | url = %url, | ||
| 385 | "Fetch returned no OIDs (not available on remote)" | ||
| 386 | ); | ||
| 387 | vec![] | ||
| 380 | } | 388 | } |
| 381 | Err(e) => { | 389 | Err(e) => { |
| 382 | debug!( | 390 | debug!( |
| @@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 385 | error = %e, | 393 | error = %e, |
| 386 | "Fetch failed" | 394 | "Fetch failed" |
| 387 | ); | 395 | ); |
| 388 | 0 | 396 | vec![] |
| 389 | } | 397 | } |
| 390 | }; | 398 | }; |
| 391 | 399 | ||
| 392 | // Try to process any events that can now be satisfied | 400 | // Try to process any events that can now be satisfied |
| 393 | if oids_fetched > 0 { | 401 | if !fetched_oids.is_empty() { |
| 394 | let new_oids: HashSet<String> = needed_oids.into_iter().collect(); | 402 | let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect(); |
| 395 | if let Err(e) = ctx | 403 | if let Err(e) = ctx |
| 396 | .process_newly_available_git_data(&target_repo, &new_oids) | 404 | .process_newly_available_git_data(&target_repo, &new_oids) |
| 397 | .await | 405 | .await |
| @@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 404 | } | 412 | } |
| 405 | } | 413 | } |
| 406 | 414 | ||
| 407 | oids_fetched | 415 | fetched_oids.len() |
| 408 | } | 416 | } |
| 409 | 417 | ||
| 410 | /// Sync git data for an identifier. | 418 | /// Sync git data for an identifier. |
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index d891bc9..1af5c4e 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -10,6 +10,28 @@ use std::collections::HashSet; | |||
| 10 | use std::path::PathBuf; | 10 | use std::path::PathBuf; |
| 11 | use std::time::Instant; | 11 | use std::time::Instant; |
| 12 | 12 | ||
| 13 | /// Source of an event entering purgatory. | ||
| 14 | /// | ||
| 15 | /// Tracks whether an event was submitted directly by a user or fetched via | ||
| 16 | /// proactive sync from another relay. This distinction is used for: | ||
| 17 | /// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG | ||
| 18 | /// - Operational monitoring: Helps identify user-facing issues vs sync noise | ||
| 19 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] | ||
| 20 | pub enum EventSource { | ||
| 21 | /// Event was published directly to this relay by a user | ||
| 22 | #[default] | ||
| 23 | Direct, | ||
| 24 | /// Event was fetched via proactive sync from another relay | ||
| 25 | Sync, | ||
| 26 | } | ||
| 27 | |||
| 28 | impl EventSource { | ||
| 29 | /// Returns true if this is a direct submission (not synced) | ||
| 30 | pub fn is_direct(&self) -> bool { | ||
| 31 | matches!(self, EventSource::Direct) | ||
| 32 | } | ||
| 33 | } | ||
| 34 | |||
| 13 | /// Default value for Instant fields during deserialization | 35 | /// Default value for Instant fields during deserialization |
| 14 | fn instant_now() -> Instant { | 36 | fn instant_now() -> Instant { |
| 15 | Instant::now() | 37 | Instant::now() |
| @@ -88,6 +110,10 @@ pub struct StatePurgatoryEntry { | |||
| 88 | /// Expiry deadline (30 min from creation, may be extended) | 110 | /// Expiry deadline (30 min from creation, may be extended) |
| 89 | #[serde(skip, default = "instant_now")] | 111 | #[serde(skip, default = "instant_now")] |
| 90 | pub expires_at: Instant, | 112 | pub expires_at: Instant, |
| 113 | |||
| 114 | /// Source of this event (direct submission vs sync) | ||
| 115 | #[serde(default)] | ||
| 116 | pub source: EventSource, | ||
| 91 | } | 117 | } |
| 92 | 118 | ||
| 93 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. | 119 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. |
| @@ -114,6 +140,10 @@ pub struct PrPurgatoryEntry { | |||
| 114 | /// Expiry deadline (30 min from creation, may be extended) | 140 | /// Expiry deadline (30 min from creation, may be extended) |
| 115 | #[serde(skip, default = "instant_now")] | 141 | #[serde(skip, default = "instant_now")] |
| 116 | pub expires_at: Instant, | 142 | pub expires_at: Instant, |
| 143 | |||
| 144 | /// Source of this event (direct submission vs sync) | ||
| 145 | #[serde(default)] | ||
| 146 | pub source: EventSource, | ||
| 117 | } | 147 | } |
| 118 | 148 | ||
| 119 | /// Entry for a repository announcement (kind 30617) waiting in purgatory. | 149 | /// Entry for a repository announcement (kind 30617) waiting in purgatory. |