upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config.rs74
-rw-r--r--src/git/handlers.rs66
-rw-r--r--src/main.rs20
-rw-r--r--src/nostr/builder.rs159
-rw-r--r--src/nostr/events.rs16
-rw-r--r--src/nostr/policy/state.rs9
-rw-r--r--src/purgatory/mod.rs339
-rw-r--r--src/purgatory/sync/context.rs163
-rw-r--r--src/purgatory/sync/functions.rs22
-rw-r--r--src/purgatory/types.rs30
-rw-r--r--src/sync/mod.rs3
-rw-r--r--src/sync/naughty_list.rs485
-rw-r--r--src/sync/self_subscriber.rs141
13 files changed, 1116 insertions, 411 deletions
diff --git a/src/config.rs b/src/config.rs
index 271a340..dd7b1e3 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -109,22 +109,25 @@ impl WhitelistEntry {
109} 109}
110 110
111/// GRASP-05 Archive mode configuration 111/// GRASP-05 Archive mode configuration
112#[derive(Debug, Clone, Serialize, Deserialize)] 112#[derive(Debug, Clone, Serialize, Deserialize, Default)]
113pub struct ArchiveConfig { 113pub struct ArchiveConfig {
114 /// Accept all repository announcements (no filtering) 114 /// Accept all repository announcements (no filtering)
115 /// 115 ///
116 /// WARNING: Setting this to true allows anyone to mirror any repository 116 /// WARNING: Setting this to true allows anyone to mirror any repository
117 /// to this relay, potentially causing storage/bandwidth exhaustion. 117 /// to this relay, potentially causing storage/bandwidth exhaustion.
118 #[serde(default)]
118 pub archive_all: bool, 119 pub archive_all: bool,
119 120
120 /// Whitelist entries for selective archiving 121 /// Whitelist entries for selective archiving
121 /// 122 ///
122 /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode). 123 /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode).
124 #[serde(default)]
123 pub whitelist: Vec<WhitelistEntry>, 125 pub whitelist: Vec<WhitelistEntry>,
124 126
125 /// GRASP server domains to archive (archive all repositories from these domains) 127 /// GRASP server domains to archive (archive all repositories from these domains)
126 /// 128 ///
127 /// If non-empty, archives all repositories from the specified GRASP server domains. 129 /// If non-empty, archives all repositories from the specified GRASP server domains.
130 #[serde(default)]
128 pub grasp_services: Vec<String>, 131 pub grasp_services: Vec<String>,
129 132
130 /// Read-only archive mode: relay is a read-only sync of archived repositories 133 /// Read-only archive mode: relay is a read-only sync of archived repositories
@@ -132,6 +135,7 @@ pub struct ArchiveConfig {
132 /// When true, the relay ONLY accepts announcements matching the archive whitelist/all. 135 /// When true, the relay ONLY accepts announcements matching the archive whitelist/all.
133 /// Announcements listing the relay but not in the whitelist are rejected. 136 /// Announcements listing the relay but not in the whitelist are rejected.
134 /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos. 137 /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos.
138 #[serde(default)]
135 pub read_only: bool, 139 pub read_only: bool,
136} 140}
137 141
@@ -146,6 +150,7 @@ impl ArchiveConfig {
146 /// Returns true if: 150 /// Returns true if:
147 /// - archive_all is true, OR 151 /// - archive_all is true, OR
148 /// - announcement matches any whitelist entry 152 /// - announcement matches any whitelist entry
153 ///
149 /// Note: grasp_services matching is handled via matches_grasp_services() 154 /// Note: grasp_services matching is handled via matches_grasp_services()
150 pub fn matches(&self, npub: &str, identifier: &str) -> bool { 155 pub fn matches(&self, npub: &str, identifier: &str) -> bool {
151 if self.archive_all { 156 if self.archive_all {
@@ -171,23 +176,13 @@ impl ArchiveConfig {
171 } 176 }
172} 177}
173 178
174impl Default for ArchiveConfig {
175 fn default() -> Self {
176 Self {
177 archive_all: false,
178 whitelist: Vec::new(),
179 grasp_services: Vec::new(),
180 read_only: false,
181 }
182 }
183}
184
185/// Repository whitelist configuration 179/// Repository whitelist configuration
186#[derive(Debug, Clone, Serialize, Deserialize)] 180#[derive(Debug, Clone, Serialize, Deserialize, Default)]
187pub struct RepositoryConfig { 181pub struct RepositoryConfig {
188 /// Whitelist entries for selective repository acceptance 182 /// Whitelist entries for selective repository acceptance
189 /// 183 ///
190 /// If empty, all repositories listing the service are accepted (GRASP-01 mode). 184 /// If empty, all repositories listing the service are accepted (GRASP-01 mode).
185 #[serde(default)]
191 pub whitelist: Vec<WhitelistEntry>, 186 pub whitelist: Vec<WhitelistEntry>,
192} 187}
193 188
@@ -207,21 +202,14 @@ impl RepositoryConfig {
207 } 202 }
208} 203}
209 204
210impl Default for RepositoryConfig {
211 fn default() -> Self {
212 Self {
213 whitelist: Vec::new(),
214 }
215 }
216}
217
218/// Repository blacklist configuration 205/// Repository blacklist configuration
219#[derive(Debug, Clone, Serialize, Deserialize)] 206#[derive(Debug, Clone, Serialize, Deserialize, Default)]
220pub struct BlacklistConfig { 207pub struct BlacklistConfig {
221 /// Blacklist entries for blocking specific repositories 208 /// Blacklist entries for blocking specific repositories
222 /// 209 ///
223 /// If empty, no repositories are blacklisted. 210 /// If empty, no repositories are blacklisted.
224 /// Blacklist takes precedence over both archive and repository whitelists. 211 /// Blacklist takes precedence over both archive and repository whitelists.
212 #[serde(default)]
225 pub blacklist: Vec<WhitelistEntry>, 213 pub blacklist: Vec<WhitelistEntry>,
226} 214}
227 215
@@ -256,21 +244,14 @@ impl BlacklistConfig {
256 } 244 }
257} 245}
258 246
259impl Default for BlacklistConfig {
260 fn default() -> Self {
261 Self {
262 blacklist: Vec::new(),
263 }
264 }
265}
266
267/// Event blacklist configuration for blocking events by author npub 247/// Event blacklist configuration for blocking events by author npub
268#[derive(Debug, Clone, Serialize, Deserialize)] 248#[derive(Debug, Clone, Serialize, Deserialize, Default)]
269pub struct EventBlacklistConfig { 249pub struct EventBlacklistConfig {
270 /// Blacklisted npubs - events from these authors are rejected 250 /// Blacklisted npubs - events from these authors are rejected
271 /// 251 ///
272 /// If empty, no events are blacklisted by author. 252 /// If empty, no events are blacklisted by author.
273 /// Applies to ALL event types, preventing events from reaching both the relay and purgatory. 253 /// Applies to ALL event types, preventing events from reaching both the relay and purgatory.
254 #[serde(default)]
274 pub blacklisted_npubs: Vec<String>, 255 pub blacklisted_npubs: Vec<String>,
275} 256}
276 257
@@ -292,14 +273,6 @@ impl EventBlacklistConfig {
292 } 273 }
293} 274}
294 275
295impl Default for EventBlacklistConfig {
296 fn default() -> Self {
297 Self {
298 blacklisted_npubs: Vec::new(),
299 }
300 }
301}
302
303/// Database backend type for the relay 276/// Database backend type for the relay
304#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, ValueEnum)] 277#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, ValueEnum)]
305#[serde(rename_all = "lowercase")] 278#[serde(rename_all = "lowercase")]
@@ -500,6 +473,10 @@ pub struct Config {
500 /// Prevents connection exhaustion DoS attacks 473 /// Prevents connection exhaustion DoS attacks
501 #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)] 474 #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)]
502 pub max_connections: usize, 475 pub max_connections: usize,
476
477 /// Log level for application logging
478 #[arg(long, env = "NGIT_LOG_LEVEL", default_value = "info")]
479 pub log_level: String,
503} 480}
504 481
505impl Config { 482impl Config {
@@ -782,6 +759,7 @@ impl Config {
782 repository_blacklist: String::new(), 759 repository_blacklist: String::new(),
783 event_blacklist: String::new(), 760 event_blacklist: String::new(),
784 max_connections: 500, 761 max_connections: 500,
762 log_level: "debug".to_string(),
785 } 763 }
786 } 764 }
787} 765}
@@ -1103,14 +1081,14 @@ mod tests {
1103 fn test_archive_read_only_defaults() { 1081 fn test_archive_read_only_defaults() {
1104 // Default: false when no archive mode 1082 // Default: false when no archive mode
1105 let config = Config::for_testing(); 1083 let config = Config::for_testing();
1106 assert_eq!(config.archive_config().read_only, false); 1084 assert!(!config.archive_config().read_only);
1107 1085
1108 // Default: true when archive_all is set 1086 // Default: true when archive_all is set
1109 let config = Config { 1087 let config = Config {
1110 archive_all: true, 1088 archive_all: true,
1111 ..Config::for_testing() 1089 ..Config::for_testing()
1112 }; 1090 };
1113 assert_eq!(config.archive_config().read_only, true); 1091 assert!(config.archive_config().read_only);
1114 1092
1115 // Default: true when archive_whitelist is set 1093 // Default: true when archive_whitelist is set
1116 let keys = Keys::generate(); 1094 let keys = Keys::generate();
@@ -1119,7 +1097,7 @@ mod tests {
1119 archive_whitelist: test_npub, 1097 archive_whitelist: test_npub,
1120 ..Config::for_testing() 1098 ..Config::for_testing()
1121 }; 1099 };
1122 assert_eq!(config.archive_config().read_only, true); 1100 assert!(config.archive_config().read_only);
1123 } 1101 }
1124 1102
1125 #[test] 1103 #[test]
@@ -1130,7 +1108,7 @@ mod tests {
1130 archive_read_only: Some(true), 1108 archive_read_only: Some(true),
1131 ..Config::for_testing() 1109 ..Config::for_testing()
1132 }; 1110 };
1133 assert_eq!(config.archive_config().read_only, true); 1111 assert!(config.archive_config().read_only);
1134 1112
1135 // Explicit false with archive_all (unusual but allowed) 1113 // Explicit false with archive_all (unusual but allowed)
1136 let config = Config { 1114 let config = Config {
@@ -1138,14 +1116,14 @@ mod tests {
1138 archive_read_only: Some(false), 1116 archive_read_only: Some(false),
1139 ..Config::for_testing() 1117 ..Config::for_testing()
1140 }; 1118 };
1141 assert_eq!(config.archive_config().read_only, false); 1119 assert!(!config.archive_config().read_only);
1142 1120
1143 // Explicit false without archive mode 1121 // Explicit false without archive mode
1144 let config = Config { 1122 let config = Config {
1145 archive_read_only: Some(false), 1123 archive_read_only: Some(false),
1146 ..Config::for_testing() 1124 ..Config::for_testing()
1147 }; 1125 };
1148 assert_eq!(config.archive_config().read_only, false); 1126 assert!(!config.archive_config().read_only);
1149 } 1127 }
1150 1128
1151 #[test] 1129 #[test]
@@ -1548,7 +1526,7 @@ mod tests {
1548 }; 1526 };
1549 let archive_config = config.archive_config(); 1527 let archive_config = config.archive_config();
1550 assert!(archive_config.enabled()); 1528 assert!(archive_config.enabled());
1551 assert_eq!(archive_config.read_only, true); // Default to true 1529 assert!(archive_config.read_only); // Default to true
1552 } 1530 }
1553 1531
1554 #[test] 1532 #[test]
@@ -1558,7 +1536,7 @@ mod tests {
1558 archive_grasp_services: "git.example.com".to_string(), 1536 archive_grasp_services: "git.example.com".to_string(),
1559 ..Config::for_testing() 1537 ..Config::for_testing()
1560 }; 1538 };
1561 assert_eq!(config.archive_config().read_only, true); 1539 assert!(config.archive_config().read_only);
1562 } 1540 }
1563 1541
1564 #[test] 1542 #[test]
@@ -1569,7 +1547,7 @@ mod tests {
1569 archive_read_only: Some(false), 1547 archive_read_only: Some(false),
1570 ..Config::for_testing() 1548 ..Config::for_testing()
1571 }; 1549 };
1572 assert_eq!(config.archive_config().read_only, false); 1550 assert!(!config.archive_config().read_only);
1573 } 1551 }
1574 1552
1575 #[test] 1553 #[test]
diff --git a/src/git/handlers.rs b/src/git/handlers.rs
index 017eee4..e3a6ad4 100644
--- a/src/git/handlers.rs
+++ b/src/git/handlers.rs
@@ -99,6 +99,42 @@ pub async fn handle_info_refs(
99 .unwrap()) 99 .unwrap())
100} 100}
101 101
102/// Build an HTTP 200 OK response with an ERR pkt-line for git protocol errors.
103///
104/// Per the git smart HTTP protocol spec, protocol-level errors (like "not our ref")
105/// should be returned as HTTP 200 OK with the error message in pkt-line format:
106/// `PKT-LINE("ERR" SP explanation-text)`
107///
108/// This allows git clients to properly parse and display the error message.
109fn build_git_protocol_error_response(
110 service: GitService,
111 error_message: &str,
112) -> Response<Full<Bytes>> {
113 // Format: "ERR <message>\n"
114 let err_content = format!("ERR {}\n", error_message.trim());
115 let err_pktline = PktLine::data(err_content.as_bytes()).encode();
116
117 Response::builder()
118 .status(StatusCode::OK)
119 .header("content-type", service.result_content_type())
120 .header("cache-control", "no-cache")
121 .body(Full::new(Bytes::from(err_pktline)))
122 .unwrap()
123}
124
125/// Check if a git process failure is a protocol error (vs transport error).
126///
127/// Protocol errors are communicated via stderr when git exits with code 128.
128/// These should be returned to the client as HTTP 200 with ERR pkt-line.
129///
130/// Transport errors (process spawn failures, I/O errors, signals) should
131/// remain as HTTP 500 errors.
132fn is_git_protocol_error(exit_code: Option<i32>, stderr: &[u8]) -> bool {
133 // Git uses exit code 128 for protocol/usage errors
134 // If there's stderr content, it's a protocol error message
135 exit_code == Some(128) && !stderr.is_empty()
136}
137
102/// Handle POST /git-upload-pack (clone/fetch) 138/// Handle POST /git-upload-pack (clone/fetch)
103pub async fn handle_upload_pack( 139pub async fn handle_upload_pack(
104 repo_path: PathBuf, 140 repo_path: PathBuf,
@@ -150,6 +186,21 @@ pub async fn handle_upload_pack(
150 186
151 if !status.success() { 187 if !status.success() {
152 let stderr_str = String::from_utf8_lossy(&stderr_output); 188 let stderr_str = String::from_utf8_lossy(&stderr_output);
189
190 // Check if this is a git protocol error (exit code 128 with stderr)
191 // Protocol errors should be returned as HTTP 200 with ERR pkt-line
192 if is_git_protocol_error(status.code(), &stderr_output) {
193 warn!(
194 "Git upload-pack protocol error (returning ERR pkt-line): {}",
195 stderr_str
196 );
197 return Ok(build_git_protocol_error_response(
198 GitService::UploadPack,
199 &stderr_str,
200 ));
201 }
202
203 // Transport errors (spawn failures, signals, etc.) remain as HTTP 500
153 error!("Git upload-pack failed: {}", stderr_str); 204 error!("Git upload-pack failed: {}", stderr_str);
154 return Err(GitError::GitFailed(status.code())); 205 return Err(GitError::GitFailed(status.code()));
155 } 206 }
@@ -277,6 +328,21 @@ pub async fn handle_receive_pack(
277 328
278 if !status.success() { 329 if !status.success() {
279 let stderr_str = String::from_utf8_lossy(&stderr_output); 330 let stderr_str = String::from_utf8_lossy(&stderr_output);
331
332 // Check if this is a git protocol error (exit code 128 with stderr)
333 // Protocol errors should be returned as HTTP 200 with ERR pkt-line
334 if is_git_protocol_error(status.code(), &stderr_output) {
335 warn!(
336 "Git receive-pack protocol error (returning ERR pkt-line): {}",
337 stderr_str
338 );
339 return Ok(build_git_protocol_error_response(
340 GitService::ReceivePack,
341 &stderr_str,
342 ));
343 }
344
345 // Transport errors (spawn failures, signals, etc.) remain as HTTP 500
280 error!("Git receive-pack failed: {}", stderr_str); 346 error!("Git receive-pack failed: {}", stderr_str);
281 return Err(GitError::GitFailed(status.code())); 347 return Err(GitError::GitFailed(status.code()));
282 } 348 }
diff --git a/src/main.rs b/src/main.rs
index 5e5b83a..6c9da05 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,8 +3,8 @@ use std::{path::PathBuf, sync::Arc};
3 3
4use anyhow::Result; 4use anyhow::Result;
5use tokio::signal; 5use tokio::signal;
6use tracing::{error, info, warn, Level}; 6use tracing::{error, info, warn};
7use tracing_subscriber::FmtSubscriber; 7use tracing_subscriber::{EnvFilter, FmtSubscriber};
8 8
9use ngit_grasp::{ 9use ngit_grasp::{
10 config::{Config, DatabaseBackend}, 10 config::{Config, DatabaseBackend},
@@ -17,16 +17,16 @@ use ngit_grasp::{
17 17
18#[tokio::main] 18#[tokio::main]
19async fn main() -> Result<()> { 19async fn main() -> Result<()> {
20 // Initialize tracing 20 // Load configuration first (priority: CLI flags > env vars > .env file > defaults)
21 let config = Config::load()?;
22
23 // Initialize tracing with configured log level
21 let subscriber = FmtSubscriber::builder() 24 let subscriber = FmtSubscriber::builder()
22 .with_max_level(Level::DEBUG) 25 .with_env_filter(EnvFilter::new(&config.log_level))
23 .finish(); 26 .finish();
24 tracing::subscriber::set_global_default(subscriber)?; 27 tracing::subscriber::set_global_default(subscriber)?;
25 28
26 info!("Starting ngit-grasp with nostr-relay-builder..."); 29 info!("Starting ngit-grasp with log level: {}", config.log_level);
27
28 // Load configuration (priority: CLI flags > env vars > .env file > defaults)
29 let config = Config::load()?;
30 30
31 // Validate configuration and fail fast on fatal errors 31 // Validate configuration and fail fast on fatal errors
32 // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings 32 // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings
@@ -187,8 +187,8 @@ async fn main() -> Result<()> {
187 )); 187 ));
188 188
189 // Create throttle manager for rate limiting remote git servers 189 // Create throttle manager for rate limiting remote git servers
190 // Default: 5 concurrent requests per domain, 30 requests per minute per domain 190 // Default: 5 concurrent requests per domain, 60 requests per minute per domain
191 let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); 191 let throttle_manager = Arc::new(ThrottleManager::new(5, 60));
192 throttle_manager.set_context(sync_ctx.clone()); 192 throttle_manager.set_context(sync_ctx.clone());
193 throttle_manager.set_git_naughty_list(git_naughty_list.clone()); 193 throttle_manager.set_git_naughty_list(git_naughty_list.clone());
194 194
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 34014db..3baa2ff 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -98,6 +98,62 @@ impl Nip34WritePolicy {
98 self.ctx.set_local_relay(relay); 98 self.ctx.set_local_relay(relay);
99 } 99 }
100 100
101 /// Extract repository identifier from event's 'd' tag.
102 ///
103 /// Used for structured logging when parsing fails - we try to extract
104 /// the identifier even if full parsing failed.
105 fn extract_identifier_from_event(event: &Event) -> String {
106 use nostr_relay_builder::prelude::TagKind;
107 event
108 .tags
109 .iter()
110 .find(|t| t.kind() == TagKind::d())
111 .and_then(|t| t.content())
112 .map(|s| s.to_string())
113 .unwrap_or_else(|| "unknown".to_string())
114 }
115
116 /// Extract ALL repository identifiers from PR event's 'a' tags.
117 ///
118 /// PR events can reference multiple repositories via multiple 'a' tags
119 /// (e.g., when there are multiple maintainers). Each tag has format
120 /// `30617:<owner_pubkey>:<identifier>`.
121 ///
122 /// Returns a vector of unique identifiers, or `["unknown"]` if none found.
123 fn extract_repos_from_pr_event(event: &Event) -> Vec<String> {
124 let repos: Vec<String> = event
125 .tags
126 .iter()
127 .filter_map(|tag| {
128 let tag_vec = tag.clone().to_vec();
129 if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") {
130 // Format: 30617:<owner_pubkey>:<identifier>
131 let parts: Vec<&str> = tag_vec[1].split(':').collect();
132 if parts.len() >= 3 {
133 Some(parts[2].to_string())
134 } else {
135 None
136 }
137 } else {
138 None
139 }
140 })
141 .collect();
142
143 // Deduplicate while preserving order
144 let mut seen = std::collections::HashSet::new();
145 let unique_repos: Vec<String> = repos
146 .into_iter()
147 .filter(|r| seen.insert(r.clone()))
148 .collect();
149
150 if unique_repos.is_empty() {
151 vec!["unknown".to_string()]
152 } else {
153 unique_repos
154 }
155 }
156
101 /// Handle repository announcement event 157 /// Handle repository announcement event
102 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { 158 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult {
103 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 159 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
@@ -129,10 +185,21 @@ impl Nip34WritePolicy {
129 WritePolicyResult::Accept 185 WritePolicyResult::Accept
130 } 186 }
131 Err(e) => { 187 Err(e) => {
188 let npub = event
189 .pubkey
190 .to_bech32()
191 .unwrap_or_else(|_| event.pubkey.to_hex());
192 let event_id_short = &event.id.to_hex()[..12];
193 // Try to extract repo identifier from 'd' tag even if parsing failed
194 let repo = Self::extract_identifier_from_event(event);
195 // Structured log for migration scripts
132 tracing::warn!( 196 tracing::warn!(
133 "Failed to parse repository announcement {}: {}", 197 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
134 event_id_str, 198 event.kind.as_u16(),
135 e 199 event_id_short,
200 e,
201 repo,
202 npub
136 ); 203 );
137 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) 204 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e))
138 } 205 }
@@ -157,10 +224,21 @@ impl Nip34WritePolicy {
157 WritePolicyResult::Accept 224 WritePolicyResult::Accept
158 } 225 }
159 Err(e) => { 226 Err(e) => {
227 let npub = event
228 .pubkey
229 .to_bech32()
230 .unwrap_or_else(|_| event.pubkey.to_hex());
231 let event_id_short = &event.id.to_hex()[..12];
232 // Try to extract repo identifier from 'd' tag even if parsing failed
233 let repo = Self::extract_identifier_from_event(event);
234 // Structured log for migration scripts
160 tracing::warn!( 235 tracing::warn!(
161 "Failed to parse maintainer announcement {}: {}", 236 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
162 event_id_str, 237 event.kind.as_u16(),
163 e 238 event_id_short,
239 e,
240 repo,
241 npub
164 ); 242 );
165 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) 243 WritePolicyResult::reject(format!("Failed to parse announcement: {}", e))
166 } 244 }
@@ -183,8 +261,6 @@ impl Nip34WritePolicy {
183 /// * `event` - The state event to validate 261 /// * `event` - The state event to validate
184 /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) 262 /// * `is_synced` - True if this event came from proactive sync (vs user-submitted)
185 async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult { 263 async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult {
186 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
187
188 match self.state_policy.validate(event) { 264 match self.state_policy.validate(event) {
189 StateResult::Accept => { 265 StateResult::Accept => {
190 // Process state alignment asynchronously 266 // Process state alignment asynchronously
@@ -195,7 +271,22 @@ impl Nip34WritePolicy {
195 { 271 {
196 Ok(poilicy_result) => poilicy_result, 272 Ok(poilicy_result) => poilicy_result,
197 Err(e) => { 273 Err(e) => {
198 tracing::warn!("Failed to process state event {}: {}", event_id_str, e); 274 let npub = event
275 .pubkey
276 .to_bech32()
277 .unwrap_or_else(|_| event.pubkey.to_hex());
278 let event_id_short = &event.id.to_hex()[..12];
279 // Try to extract repo identifier from 'd' tag even if parsing failed
280 let repo = Self::extract_identifier_from_event(event);
281 // Structured log for migration scripts
282 tracing::warn!(
283 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
284 event.kind.as_u16(),
285 event_id_short,
286 e,
287 repo,
288 npub
289 );
199 // reject if processing failed 290 // reject if processing failed
200 WritePolicyResult::Reject { 291 WritePolicyResult::Reject {
201 status: false, 292 status: false,
@@ -205,7 +296,22 @@ impl Nip34WritePolicy {
205 } 296 }
206 } 297 }
207 StateResult::Reject(reason) => { 298 StateResult::Reject(reason) => {
208 tracing::warn!("Rejected repository state {}: {}", event_id_str, reason); 299 let npub = event
300 .pubkey
301 .to_bech32()
302 .unwrap_or_else(|_| event.pubkey.to_hex());
303 let event_id_short = &event.id.to_hex()[..12];
304 // Try to extract repo identifier from 'd' tag even if parsing failed
305 let repo = Self::extract_identifier_from_event(event);
306 // Structured log for migration scripts
307 tracing::warn!(
308 "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}",
309 event.kind.as_u16(),
310 event_id_short,
311 reason,
312 repo,
313 npub
314 );
209 WritePolicyResult::reject(reason) 315 WritePolicyResult::reject(reason)
210 } 316 }
211 } 317 }
@@ -303,9 +409,12 @@ impl Nip34WritePolicy {
303 ); 409 );
304 410
305 // Add to purgatory 411 // Add to purgatory
306 self.ctx 412 self.ctx.purgatory.add_pr(
307 .purgatory 413 event.clone(),
308 .add_pr(event.clone(), event.id.to_hex(), commit.clone()); 414 event.id.to_hex(),
415 commit.clone(),
416 is_synced,
417 );
309 418
310 WritePolicyResult::Reject { 419 WritePolicyResult::Reject {
311 status: true, // Client sees OK 420 status: true, // Client sees OK
@@ -323,11 +432,25 @@ impl Nip34WritePolicy {
323 } 432 }
324 Err(e) => { 433 Err(e) => {
325 // Error checking git data - reject event 434 // Error checking git data - reject event
326 tracing::warn!( 435 let npub = event
327 "Failed to check git data for PR event {}: {}", 436 .pubkey
328 event_id_str, 437 .to_bech32()
329 e 438 .unwrap_or_else(|_| event.pubkey.to_hex());
330 ); 439 let event_id_short = &event.id.to_hex()[..12];
440 // Extract ALL repo identifiers from 'a' tags for PR events
441 // (PR events can reference multiple repos when there are multiple maintainers)
442 let repos = Self::extract_repos_from_pr_event(event);
443 // Structured log for migration scripts - log once per repo
444 for repo in &repos {
445 tracing::warn!(
446 "[PARSE_FAIL] kind={} event_id={}... reason=\"git data check failed: {}\" repo={} npub={}",
447 event.kind.as_u16(),
448 event_id_short,
449 e,
450 repo,
451 npub
452 );
453 }
331 WritePolicyResult::reject(format!("Failed to check git data: {}", e)) 454 WritePolicyResult::reject(format!("Failed to check git data: {}", e))
332 } 455 }
333 } 456 }
diff --git a/src/nostr/events.rs b/src/nostr/events.rs
index 718633e..a441742 100644
--- a/src/nostr/events.rs
+++ b/src/nostr/events.rs
@@ -419,14 +419,14 @@ pub fn validate_announcement(
419 // GRASP-01: Normal mode - accept if announcement lists our service AND matches repository whitelist (if enabled) 419 // GRASP-01: Normal mode - accept if announcement lists our service AND matches repository whitelist (if enabled)
420 if lists_service && !archive_config.read_only { 420 if lists_service && !archive_config.read_only {
421 // Check repository whitelist if enabled 421 // Check repository whitelist if enabled
422 if repository_config.enabled() { 422 if repository_config.enabled()
423 if !repository_config.matches(&npub, &announcement.identifier) { 423 && !repository_config.matches(&npub, &announcement.identifier)
424 return AnnouncementResult::Reject(format!( 424 {
425 "Announcement lists service but does not match repository whitelist. \ 425 return AnnouncementResult::Reject(format!(
426 Repository {}/{} not in whitelist", 426 "Announcement lists service but does not match repository whitelist. \
427 npub, announcement.identifier 427 Repository {}/{} not in whitelist",
428 )); 428 npub, announcement.identifier
429 } 429 ));
430 } 430 }
431 return AnnouncementResult::Accept; 431 return AnnouncementResult::Accept;
432 } 432 }
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index f94f004..3411077 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -205,9 +205,12 @@ impl StatePolicy {
205 205
206 // If no git data - add to purgatory 206 // If no git data - add to purgatory
207 // (add_state automatically enqueues for background sync) 207 // (add_state automatically enqueues for background sync)
208 self.ctx 208 self.ctx.purgatory.add_state(
209 .purgatory 209 event.clone(),
210 .add_state(event.clone(), state.identifier.clone(), event.pubkey); 210 state.identifier.clone(),
211 event.pubkey,
212 is_synced,
213 );
211 214
212 tracing::info!( 215 tracing::info!(
213 "state event added to purgatory: eventid: {}, identifier: {}", 216 "state event added to purgatory: eventid: {}, identifier: {}",
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs
index 47798a6..8094450 100644
--- a/src/purgatory/mod.rs
+++ b/src/purgatory/mod.rs
@@ -17,10 +17,11 @@ pub mod sync;
17mod types; 17mod types;
18 18
19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; 19pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs};
20pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; 20pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry};
21 21
22use dashmap::DashMap; 22use dashmap::DashMap;
23use nostr_sdk::prelude::*; 23use nostr_sdk::prelude::*;
24use nostr_sdk::ToBech32;
24use serde::{Deserialize, Serialize}; 25use serde::{Deserialize, Serialize};
25use std::collections::HashMap; 26use std::collections::HashMap;
26use std::collections::HashSet; 27use std::collections::HashSet;
@@ -57,6 +58,9 @@ struct SerializableStatePurgatoryEntry {
57 created_at_offset_secs: u64, 58 created_at_offset_secs: u64,
58 /// Duration offset from saved_at for expires_at 59 /// Duration offset from saved_at for expires_at
59 expires_at_offset_secs: u64, 60 expires_at_offset_secs: u64,
61 /// Source of this event (direct submission vs sync)
62 #[serde(default)]
63 source: types::EventSource,
60} 64}
61 65
62/// Serializable wrapper for `PrPurgatoryEntry` with time offsets. 66/// Serializable wrapper for `PrPurgatoryEntry` with time offsets.
@@ -74,6 +78,9 @@ struct SerializablePrPurgatoryEntry {
74 created_at_offset_secs: u64, 78 created_at_offset_secs: u64,
75 /// Duration offset from saved_at for expires_at 79 /// Duration offset from saved_at for expires_at
76 expires_at_offset_secs: u64, 80 expires_at_offset_secs: u64,
81 /// Source of this event (direct submission vs sync)
82 #[serde(default)]
83 source: types::EventSource,
77} 84}
78 85
79/// Serializable purgatory state for disk persistence. 86/// Serializable purgatory state for disk persistence.
@@ -270,11 +277,38 @@ impl Purgatory {
270 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately 277 /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately
271 /// to override this delay. 278 /// to override this delay.
272 /// 279 ///
280 /// If an event already exists in purgatory with `Sync` source and the new submission
281 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
282 ///
273 /// # Arguments 283 /// # Arguments
274 /// * `event` - The state event (kind 30618) to hold 284 /// * `event` - The state event (kind 30618) to hold
275 /// * `identifier` - The repository identifier from the 'd' tag 285 /// * `identifier` - The repository identifier from the 'd' tag
276 /// * `author` - The event author's public key 286 /// * `author` - The event author's public key
277 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { 287 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
288 pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) {
289 let source = if from_sync {
290 types::EventSource::Sync
291 } else {
292 types::EventSource::Direct
293 };
294
295 // Check if event already exists - if so, potentially upgrade source
296 if let Some(mut entries) = self.state_events.get_mut(&identifier) {
297 if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) {
298 // Upgrade source from Sync to Direct if new submission is direct
299 if existing.source == types::EventSource::Sync && !from_sync {
300 existing.source = types::EventSource::Direct;
301 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
302 tracing::debug!(
303 event_id = %event.id,
304 identifier = %identifier,
305 "Upgraded purgatory entry source from Sync to Direct, reset expiry"
306 );
307 }
308 return; // Event already exists, don't add duplicate
309 }
310 }
311
278 let now = Instant::now(); 312 let now = Instant::now();
279 let entry = StatePurgatoryEntry { 313 let entry = StatePurgatoryEntry {
280 event, 314 event,
@@ -282,6 +316,7 @@ impl Purgatory {
282 author, 316 author,
283 created_at: now, 317 created_at: now,
284 expires_at: now + DEFAULT_EXPIRY, 318 expires_at: now + DEFAULT_EXPIRY,
319 source,
285 }; 320 };
286 321
287 self.state_events 322 self.state_events
@@ -301,11 +336,35 @@ impl Purgatory {
301 /// Automatically enqueues the referenced repository identifier for background sync 336 /// Automatically enqueues the referenced repository identifier for background sync
302 /// with the default delay (3 minutes), giving time for a git push to arrive. 337 /// with the default delay (3 minutes), giving time for a git push to arrive.
303 /// 338 ///
339 /// If an event already exists in purgatory with `Sync` source and the new submission
340 /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry.
341 ///
304 /// # Arguments 342 /// # Arguments
305 /// * `event` - The PR event (kind 1617/1618) to hold 343 /// * `event` - The PR event (kind 1617/1618) to hold
306 /// * `event_id` - The event ID (hex string) from the 'e' tag 344 /// * `event_id` - The event ID (hex string) from the 'e' tag
307 /// * `commit` - The commit SHA from the 'c' tag 345 /// * `commit` - The commit SHA from the 'c' tag
308 pub fn add_pr(&self, event: Event, event_id: String, commit: String) { 346 /// * `from_sync` - True if this event came from proactive sync (vs user-submitted)
347 pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) {
348 let source = if from_sync {
349 types::EventSource::Sync
350 } else {
351 types::EventSource::Direct
352 };
353
354 // Check if event already exists - if so, potentially upgrade source
355 if let Some(mut existing) = self.pr_events.get_mut(&event_id) {
356 // Upgrade source from Sync to Direct if new submission is direct
357 if existing.source == types::EventSource::Sync && !from_sync {
358 existing.source = types::EventSource::Direct;
359 existing.expires_at = Instant::now() + DEFAULT_EXPIRY;
360 tracing::debug!(
361 event_id = %event_id,
362 "Upgraded PR purgatory entry source from Sync to Direct, reset expiry"
363 );
364 }
365 return; // Event already exists, don't add duplicate
366 }
367
309 // Extract identifier from the event's `a` tag for sync enqueueing 368 // Extract identifier from the event's `a` tag for sync enqueueing
310 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); 369 let identifier = crate::git::sync::extract_identifier_from_pr_event(&event);
311 370
@@ -315,6 +374,7 @@ impl Purgatory {
315 commit, 374 commit,
316 created_at: now, 375 created_at: now,
317 expires_at: now + DEFAULT_EXPIRY, 376 expires_at: now + DEFAULT_EXPIRY,
377 source,
318 }; 378 };
319 379
320 self.pr_events.insert(event_id, entry); 380 self.pr_events.insert(event_id, entry);
@@ -328,6 +388,8 @@ impl Purgatory {
328 /// Add a PR placeholder (git data arrived before PR event). 388 /// Add a PR placeholder (git data arrived before PR event).
329 /// 389 ///
330 /// Creates a placeholder entry waiting for the corresponding PR event. 390 /// Creates a placeholder entry waiting for the corresponding PR event.
391 /// Placeholders are always marked as `Direct` source since they originate
392 /// from git pushes (direct user action).
331 /// 393 ///
332 /// # Arguments 394 /// # Arguments
333 /// * `event_id` - The expected event ID (from git ref name) 395 /// * `event_id` - The expected event ID (from git ref name)
@@ -339,6 +401,7 @@ impl Purgatory {
339 commit, 401 commit,
340 created_at: now, 402 created_at: now,
341 expires_at: now + DEFAULT_EXPIRY, 403 expires_at: now + DEFAULT_EXPIRY,
404 source: types::EventSource::Direct, // Git pushes are direct user actions
342 }; 405 };
343 406
344 self.pr_events.insert(event_id, entry); 407 self.pr_events.insert(event_id, entry);
@@ -608,6 +671,9 @@ impl Purgatory {
608 /// prevent infinite re-sync loops. Events that expire without finding git data 671 /// prevent infinite re-sync loops. Events that expire without finding git data
609 /// will be filtered out during future negentropy/REQ sync operations. 672 /// will be filtered out during future negentropy/REQ sync operations.
610 /// 673 ///
674 /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event
675 /// to support migration scripts and operational monitoring.
676 ///
611 /// # Returns 677 /// # Returns
612 /// Tuple of (num_state_removed, num_pr_removed) 678 /// Tuple of (num_state_removed, num_pr_removed)
613 pub fn cleanup(&self) -> (usize, usize) { 679 pub fn cleanup(&self) -> (usize, usize) {
@@ -615,18 +681,38 @@ impl Purgatory {
615 let mut state_removed = 0; 681 let mut state_removed = 0;
616 682
617 // Remove expired state events and mark them as expired 683 // Remove expired state events and mark them as expired
618 self.state_events.retain(|_, entries| { 684 self.state_events.retain(|identifier, entries| {
619 let original_len = entries.len(); 685 let original_len = entries.len();
620 // Collect event IDs before removing
621 let expired_ids: Vec<EventId> = entries
622 .iter()
623 .filter(|entry| entry.expires_at <= now)
624 .map(|entry| entry.event.id)
625 .collect();
626 686
627 // Mark as expired to prevent re-sync 687 // Log and collect expired entries before removing
628 for event_id in expired_ids { 688 for entry in entries.iter().filter(|e| e.expires_at <= now) {
629 self.mark_expired(event_id); 689 let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex());
690 let event_id_short = &entry.event.id.to_hex()[..12];
691 let source_str = if entry.source.is_direct() { "direct" } else { "sync" };
692
693 // Structured log for migration scripts
694 // Direct submissions log at WARN, synced events at DEBUG
695 if entry.source.is_direct() {
696 tracing::warn!(
697 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
698 identifier,
699 npub,
700 event_id_short,
701 entry.event.kind.as_u16(),
702 source_str
703 );
704 } else {
705 tracing::debug!(
706 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"",
707 identifier,
708 npub,
709 event_id_short,
710 entry.event.kind.as_u16(),
711 source_str
712 );
713 }
714
715 self.mark_expired(entry.event.id);
630 } 716 }
631 717
632 // Remove expired entries 718 // Remove expired entries
@@ -636,21 +722,103 @@ impl Purgatory {
636 }); 722 });
637 723
638 // Remove expired PR events and mark them as expired 724 // Remove expired PR events and mark them as expired
639 let expired_prs: Vec<(String, Option<EventId>)> = self 725 let expired_prs: Vec<_> = self
640 .pr_events 726 .pr_events
641 .iter() 727 .iter()
642 .filter(|entry| entry.value().expires_at <= now) 728 .filter(|entry| entry.value().expires_at <= now)
643 .map(|entry| { 729 .map(|entry| {
644 let event_id = entry.value().event.as_ref().map(|e| e.id); 730 let pr_entry = entry.value();
645 (entry.key().clone(), event_id) 731 let event_id_str = entry.key().clone();
732 let event_opt = pr_entry.event.clone();
733 let commit = pr_entry.commit.clone();
734 let source = pr_entry.source;
735 (event_id_str, event_opt, commit, source)
646 }) 736 })
647 .collect(); 737 .collect();
648 738
649 let pr_removed = expired_prs.len(); 739 let pr_removed = expired_prs.len();
650 for (event_id_str, event_id_opt) in expired_prs { 740 for (event_id_str, event_opt, commit, source) in expired_prs {
651 // Mark actual PR events as expired (not placeholders) 741 // Log structured entry for PR events (not placeholders)
652 if let Some(event_id) = event_id_opt { 742 if let Some(ref event) = event_opt {
653 self.mark_expired(event_id); 743 let npub = event
744 .pubkey
745 .to_bech32()
746 .unwrap_or_else(|_| event.pubkey.to_hex());
747 let event_id_short = &event.id.to_hex()[..12];
748 let source_str = if source.is_direct() { "direct" } else { "sync" };
749
750 // Extract ALL repo identifiers from 'a' tags
751 // (PR events can reference multiple repos when there are multiple maintainers)
752 let repos: Vec<String> = event
753 .tags
754 .iter()
755 .filter_map(|tag| {
756 let tag_vec = tag.clone().to_vec();
757 if tag_vec.len() >= 2
758 && tag_vec[0] == "a"
759 && tag_vec[1].starts_with("30617:")
760 {
761 // Format: 30617:<owner_pubkey>:<identifier>
762 let parts: Vec<&str> = tag_vec[1].split(':').collect();
763 if parts.len() >= 3 {
764 Some(parts[2].to_string())
765 } else {
766 None
767 }
768 } else {
769 None
770 }
771 })
772 .collect();
773
774 // Deduplicate while preserving order
775 let mut seen = std::collections::HashSet::new();
776 let unique_repos: Vec<String> = repos
777 .into_iter()
778 .filter(|r| seen.insert(r.clone()))
779 .collect();
780
781 let repos_to_log = if unique_repos.is_empty() {
782 vec!["unknown".to_string()]
783 } else {
784 unique_repos
785 };
786
787 // Structured log for migration scripts - log once per repo
788 // Direct submissions log at WARN, synced events at DEBUG
789 for repo in &repos_to_log {
790 if source.is_direct() {
791 tracing::warn!(
792 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
793 repo,
794 npub,
795 event_id_short,
796 event.kind.as_u16(),
797 &commit[..commit.len().min(12)],
798 source_str
799 );
800 } else {
801 tracing::debug!(
802 "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"",
803 repo,
804 npub,
805 event_id_short,
806 event.kind.as_u16(),
807 &commit[..commit.len().min(12)],
808 source_str
809 );
810 }
811 }
812
813 self.mark_expired(event.id);
814 } else {
815 // Placeholder (git data arrived first, but PR event never came)
816 // Placeholders are always Direct source (from git push)
817 tracing::debug!(
818 "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"",
819 &event_id_str[..event_id_str.len().min(12)],
820 &commit[..commit.len().min(12)]
821 );
654 } 822 }
655 self.pr_events.remove(&event_id_str); 823 self.pr_events.remove(&event_id_str);
656 } 824 }
@@ -800,6 +968,7 @@ impl Purgatory {
800 author: e.author, 968 author: e.author,
801 created_at_offset_secs: created_offset.as_secs(), 969 created_at_offset_secs: created_offset.as_secs(),
802 expires_at_offset_secs: expires_offset.as_secs(), 970 expires_at_offset_secs: expires_offset.as_secs(),
971 source: e.source,
803 } 972 }
804 }) 973 })
805 .collect(); 974 .collect();
@@ -822,6 +991,7 @@ impl Purgatory {
822 commit: e.commit.clone(), 991 commit: e.commit.clone(),
823 created_at_offset_secs: created_offset.as_secs(), 992 created_at_offset_secs: created_offset.as_secs(),
824 expires_at_offset_secs: expires_offset.as_secs(), 993 expires_at_offset_secs: expires_offset.as_secs(),
994 source: e.source,
825 }; 995 };
826 pr_events.insert(event_id, serializable); 996 pr_events.insert(event_id, serializable);
827 } 997 }
@@ -923,6 +1093,7 @@ impl Purgatory {
923 author: e.author, 1093 author: e.author,
924 created_at, 1094 created_at,
925 expires_at, 1095 expires_at,
1096 source: e.source,
926 } 1097 }
927 }) 1098 })
928 .collect(); 1099 .collect();
@@ -948,6 +1119,7 @@ impl Purgatory {
948 commit: e.commit, 1119 commit: e.commit,
949 created_at, 1120 created_at,
950 expires_at, 1121 expires_at,
1122 source: e.source,
951 }; 1123 };
952 1124
953 self.pr_events.insert(event_id, entry); 1125 self.pr_events.insert(event_id, entry);
@@ -1005,8 +1177,18 @@ mod tests {
1005 .sign_with_keys(&keys) 1177 .sign_with_keys(&keys)
1006 .unwrap(); 1178 .unwrap();
1007 1179
1008 purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); 1180 purgatory.add_state(
1009 purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); 1181 event.clone(),
1182 "test-repo".to_string(),
1183 keys.public_key(),
1184 false,
1185 );
1186 purgatory.add_pr(
1187 event,
1188 "test-event-id".to_string(),
1189 "abc123".to_string(),
1190 false,
1191 );
1010 1192
1011 let (state_count, pr_count) = purgatory.count(); 1193 let (state_count, pr_count) = purgatory.count();
1012 assert_eq!(state_count, 1); 1194 assert_eq!(state_count, 1);
@@ -1057,7 +1239,7 @@ mod tests {
1057 let event = EventBuilder::text_note("state") 1239 let event = EventBuilder::text_note("state")
1058 .sign_with_keys(&keys) 1240 .sign_with_keys(&keys)
1059 .unwrap(); 1241 .unwrap();
1060 purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); 1242 purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false);
1061 1243
1062 // Now should have pending events 1244 // Now should have pending events
1063 assert!(purgatory.has_pending_events("test-repo")); 1245 assert!(purgatory.has_pending_events("test-repo"));
@@ -1087,7 +1269,12 @@ mod tests {
1087 .sign_with_keys(&keys) 1269 .sign_with_keys(&keys)
1088 .unwrap(); 1270 .unwrap();
1089 1271
1090 purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); 1272 purgatory.add_pr(
1273 event,
1274 "pr-event-id".to_string(),
1275 "commit123".to_string(),
1276 false,
1277 );
1091 1278
1092 // Now should have pending events for test-repo 1279 // Now should have pending events for test-repo
1093 assert!(purgatory.has_pending_events("test-repo")); 1280 assert!(purgatory.has_pending_events("test-repo"));
@@ -1152,6 +1339,7 @@ fn test_pr_event_vs_placeholder() {
1152 event.clone(), 1339 event.clone(),
1153 "event-id-1".to_string(), 1340 "event-id-1".to_string(),
1154 "commit-abc".to_string(), 1341 "commit-abc".to_string(),
1342 false,
1155 ); 1343 );
1156 1344
1157 // Add a placeholder (no event) 1345 // Add a placeholder (no event)
@@ -1208,8 +1396,14 @@ fn test_cleanup_removes_expired_entries() {
1208 state_event.clone(), 1396 state_event.clone(),
1209 "test-repo".to_string(), 1397 "test-repo".to_string(),
1210 keys.public_key(), 1398 keys.public_key(),
1399 false,
1400 );
1401 purgatory.add_pr(
1402 pr_event,
1403 "pr-123".to_string(),
1404 "commit-abc".to_string(),
1405 false,
1211 ); 1406 );
1212 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string());
1213 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); 1407 purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string());
1214 1408
1215 // Verify entries are there 1409 // Verify entries are there
@@ -1256,8 +1450,18 @@ fn test_cleanup_preserves_non_expired_entries() {
1256 .unwrap(); 1450 .unwrap();
1257 1451
1258 // Add fresh entries 1452 // Add fresh entries
1259 purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); 1453 purgatory.add_state(
1260 purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); 1454 state_event,
1455 "test-repo".to_string(),
1456 keys.public_key(),
1457 false,
1458 );
1459 purgatory.add_pr(
1460 pr_event,
1461 "pr-123".to_string(),
1462 "commit-abc".to_string(),
1463 false,
1464 );
1261 1465
1262 // Run cleanup 1466 // Run cleanup
1263 let (state_removed, pr_removed) = purgatory.cleanup(); 1467 let (state_removed, pr_removed) = purgatory.cleanup();
@@ -1287,8 +1491,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1287 .sign_with_keys(&keys) 1491 .sign_with_keys(&keys)
1288 .unwrap(); 1492 .unwrap();
1289 1493
1290 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); 1494 purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false);
1291 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); 1495 purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false);
1292 1496
1293 // Expire only the first one 1497 // Expire only the first one
1294 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { 1498 if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") {
@@ -1305,8 +1509,8 @@ fn test_cleanup_mixed_expired_and_fresh() {
1305 .sign_with_keys(&keys) 1509 .sign_with_keys(&keys)
1306 .unwrap(); 1510 .unwrap();
1307 1511
1308 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); 1512 purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false);
1309 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); 1513 purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false);
1310 1514
1311 // Expire only first PR 1515 // Expire only first PR
1312 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { 1516 if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") {
@@ -1338,8 +1542,8 @@ fn test_remove_expired_legacy_method() {
1338 .unwrap(); 1542 .unwrap();
1339 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); 1543 let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap();
1340 1544
1341 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 1545 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1342 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 1546 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1343 1547
1344 // Expire both 1548 // Expire both
1345 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1549 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1373,8 +1577,8 @@ fn test_expired_event_tracking() {
1373 let pr_event_id = pr_event.id; 1577 let pr_event_id = pr_event.id;
1374 1578
1375 // Add events to purgatory 1579 // Add events to purgatory
1376 purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); 1580 purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false);
1377 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); 1581 purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false);
1378 1582
1379 // Events should not be marked as expired yet 1583 // Events should not be marked as expired yet
1380 assert!(!purgatory.is_expired(&state_event_id)); 1584 assert!(!purgatory.is_expired(&state_event_id));
@@ -1426,7 +1630,7 @@ fn test_cleanup_expired_events() {
1426 let event2_id = event2.id; 1630 let event2_id = event2.id;
1427 1631
1428 // Add and immediately expire event1 1632 // Add and immediately expire event1
1429 purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); 1633 purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false);
1430 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { 1634 if let Some(mut entries) = purgatory.state_events.get_mut("repo1") {
1431 for entry in entries.iter_mut() { 1635 for entry in entries.iter_mut() {
1432 entry.expires_at = Instant::now() - Duration::from_secs(1); 1636 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1435,7 +1639,7 @@ fn test_cleanup_expired_events() {
1435 purgatory.cleanup(); 1639 purgatory.cleanup();
1436 1640
1437 // Add and expire event2 (will be more recent) 1641 // Add and expire event2 (will be more recent)
1438 purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); 1642 purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false);
1439 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { 1643 if let Some(mut entries) = purgatory.state_events.get_mut("repo2") {
1440 for entry in entries.iter_mut() { 1644 for entry in entries.iter_mut() {
1441 entry.expires_at = Instant::now() - Duration::from_secs(1); 1645 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1477,7 +1681,7 @@ fn test_expired_events_prevent_readdition() {
1477 let event_id = event.id; 1681 let event_id = event.id;
1478 1682
1479 // Add event to purgatory 1683 // Add event to purgatory
1480 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 1684 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1481 1685
1482 // Expire it 1686 // Expire it
1483 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1687 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1497,7 +1701,7 @@ fn test_expired_events_prevent_readdition() {
1497 // This simulates what negentropy/REQ+EOSE should do: 1701 // This simulates what negentropy/REQ+EOSE should do:
1498 // Check if event is in event_ids() before adding 1702 // Check if event is in event_ids() before adding
1499 if !ids.contains(&event_id) { 1703 if !ids.contains(&event_id) {
1500 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 1704 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
1501 } 1705 }
1502 1706
1503 // Event should NOT be re-added 1707 // Event should NOT be re-added
@@ -1540,7 +1744,7 @@ fn test_user_can_resubmit_expired_event() {
1540 let event_id = event.id; 1744 let event_id = event.id;
1541 1745
1542 // Add event to purgatory 1746 // Add event to purgatory
1543 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 1747 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1544 1748
1545 // Expire it 1749 // Expire it
1546 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1750 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
@@ -1589,8 +1793,18 @@ async fn test_save_and_restore_state_events() {
1589 let event1_id = event1.id; 1793 let event1_id = event1.id;
1590 let event2_id = event2.id; 1794 let event2_id = event2.id;
1591 1795
1592 purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); 1796 purgatory.add_state(
1593 purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); 1797 event1.clone(),
1798 "test-repo".to_string(),
1799 keys.public_key(),
1800 false,
1801 );
1802 purgatory.add_state(
1803 event2.clone(),
1804 "test-repo".to_string(),
1805 keys.public_key(),
1806 false,
1807 );
1594 1808
1595 // Save to disk 1809 // Save to disk
1596 purgatory.save_to_disk(&state_file).unwrap(); 1810 purgatory.save_to_disk(&state_file).unwrap();
@@ -1652,6 +1866,7 @@ async fn test_save_and_restore_pr_events() {
1652 pr_event.clone(), 1866 pr_event.clone(),
1653 "pr-event-id".to_string(), 1867 "pr-event-id".to_string(),
1654 "commit-abc".to_string(), 1868 "commit-abc".to_string(),
1869 false,
1655 ); 1870 );
1656 1871
1657 // Save to disk 1872 // Save to disk
@@ -1721,7 +1936,7 @@ async fn test_save_and_restore_expired_events() {
1721 let event_id = event.id; 1936 let event_id = event.id;
1722 1937
1723 // Add and expire event 1938 // Add and expire event
1724 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 1939 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
1725 if let Some(mut entries) = purgatory.state_events.get_mut("repo") { 1940 if let Some(mut entries) = purgatory.state_events.get_mut("repo") {
1726 for entry in entries.iter_mut() { 1941 for entry in entries.iter_mut() {
1727 entry.expires_at = Instant::now() - Duration::from_secs(1); 1942 entry.expires_at = Instant::now() - Duration::from_secs(1);
@@ -1860,7 +2075,7 @@ async fn test_downtime_calculation() {
1860 .sign_with_keys(&keys) 2075 .sign_with_keys(&keys)
1861 .unwrap(); 2076 .unwrap();
1862 2077
1863 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2078 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1864 2079
1865 // Get original expiry time 2080 // Get original expiry time
1866 let original_entries = purgatory.find_state("repo"); 2081 let original_entries = purgatory.find_state("repo");
@@ -1916,7 +2131,7 @@ async fn test_expiry_times_preserved() {
1916 .sign_with_keys(&keys) 2131 .sign_with_keys(&keys)
1917 .unwrap(); 2132 .unwrap();
1918 2133
1919 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); 2134 purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false);
1920 2135
1921 // Manually set expiry to a specific time in the future 2136 // Manually set expiry to a specific time in the future
1922 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes 2137 let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes
@@ -1975,16 +2190,19 @@ async fn test_multiple_state_events_same_identifier() {
1975 event1.clone(), 2190 event1.clone(),
1976 "shared-repo".to_string(), 2191 "shared-repo".to_string(),
1977 keys1.public_key(), 2192 keys1.public_key(),
2193 false,
1978 ); 2194 );
1979 purgatory.add_state( 2195 purgatory.add_state(
1980 event2.clone(), 2196 event2.clone(),
1981 "shared-repo".to_string(), 2197 "shared-repo".to_string(),
1982 keys2.public_key(), 2198 keys2.public_key(),
2199 false,
1983 ); 2200 );
1984 purgatory.add_state( 2201 purgatory.add_state(
1985 event3.clone(), 2202 event3.clone(),
1986 "shared-repo".to_string(), 2203 "shared-repo".to_string(),
1987 keys3.public_key(), 2204 keys3.public_key(),
2205 false,
1988 ); 2206 );
1989 2207
1990 // Save to disk 2208 // Save to disk
@@ -2031,6 +2249,7 @@ async fn test_mixed_pr_events_and_placeholders() {
2031 pr_event.clone(), 2249 pr_event.clone(),
2032 "pr-with-event".to_string(), 2250 "pr-with-event".to_string(),
2033 "commit-abc".to_string(), 2251 "commit-abc".to_string(),
2252 false,
2034 ); 2253 );
2035 2254
2036 // Add PR placeholder 2255 // Add PR placeholder
@@ -2076,7 +2295,7 @@ async fn test_file_cleanup_after_successful_restore() {
2076 let event = EventBuilder::text_note("test") 2295 let event = EventBuilder::text_note("test")
2077 .sign_with_keys(&keys) 2296 .sign_with_keys(&keys)
2078 .unwrap(); 2297 .unwrap();
2079 purgatory.add_state(event, "repo".to_string(), keys.public_key()); 2298 purgatory.add_state(event, "repo".to_string(), keys.public_key(), false);
2080 2299
2081 // Save to disk 2300 // Save to disk
2082 purgatory.save_to_disk(&state_file).unwrap(); 2301 purgatory.save_to_disk(&state_file).unwrap();
@@ -2110,8 +2329,18 @@ async fn test_comprehensive_roundtrip() {
2110 .sign_with_keys(&keys2) 2329 .sign_with_keys(&keys2)
2111 .unwrap(); 2330 .unwrap();
2112 2331
2113 purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); 2332 purgatory.add_state(
2114 purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); 2333 state1.clone(),
2334 "repo1".to_string(),
2335 keys1.public_key(),
2336 false,
2337 );
2338 purgatory.add_state(
2339 state2.clone(),
2340 "repo2".to_string(),
2341 keys2.public_key(),
2342 false,
2343 );
2115 2344
2116 // Add PR event 2345 // Add PR event
2117 let tags = vec![Tag::custom( 2346 let tags = vec![Tag::custom(
@@ -2122,7 +2351,12 @@ async fn test_comprehensive_roundtrip() {
2122 .tags(tags) 2351 .tags(tags)
2123 .sign_with_keys(&keys1) 2352 .sign_with_keys(&keys1)
2124 .unwrap(); 2353 .unwrap();
2125 purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); 2354 purgatory.add_pr(
2355 pr_event.clone(),
2356 "pr-1".to_string(),
2357 "commit-1".to_string(),
2358 false,
2359 );
2126 2360
2127 // Add PR placeholder 2361 // Add PR placeholder
2128 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); 2362 purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string());
@@ -2132,7 +2366,12 @@ async fn test_comprehensive_roundtrip() {
2132 .sign_with_keys(&keys1) 2366 .sign_with_keys(&keys1)
2133 .unwrap(); 2367 .unwrap();
2134 let expired_id = expired_event.id; 2368 let expired_id = expired_event.id;
2135 purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); 2369 purgatory.add_state(
2370 expired_event,
2371 "repo3".to_string(),
2372 keys1.public_key(),
2373 false,
2374 );
2136 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { 2375 if let Some(mut entries) = purgatory.state_events.get_mut("repo3") {
2137 for entry in entries.iter_mut() { 2376 for entry in entries.iter_mut() {
2138 entry.expires_at = Instant::now() - Duration::from_secs(1); 2377 entry.expires_at = Instant::now() - Duration::from_secs(1);
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs
index 33c2d12..904f8af 100644
--- a/src/purgatory/sync/context.rs
+++ b/src/purgatory/sync/context.rs
@@ -361,94 +361,121 @@ impl SyncContext for RealSyncContext {
361 let naughty_list = self.git_naughty_list.clone(); 361 let naughty_list = self.git_naughty_list.clone();
362 362
363 tokio::task::spawn_blocking(move || -> Result<Vec<String>> { 363 tokio::task::spawn_blocking(move || -> Result<Vec<String>> {
364 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history 364 let mut remaining_oids = missing_oids.clone();
365 let mut args = vec!["fetch", &url]; 365 let mut missing_from_remote: Vec<String> = Vec::new();
366 args.extend(missing_oids.iter().map(|s| s.as_str())); 366
367 367 // Retry loop: keep fetching until success or no OIDs left
368 let output = Command::new("git") 368 loop {
369 .args(&args) 369 if remaining_oids.is_empty() {
370 .current_dir(&repo_path) 370 // All OIDs were missing from remote
371 .output(); 371 debug!(
372 372 url = %url,
373 match output { 373 missing_count = missing_from_remote.len(),
374 Ok(result) if result.status.success() => { 374 "All requested OIDs missing from remote"
375 // Count how many OIDs we now have 375 );
376 let fetched: Vec<String> = missing_oids 376 return Ok(vec![]);
377 .iter()
378 .filter(|oid| crate::git::oid_exists(&repo_path, oid))
379 .cloned()
380 .collect();
381
382 debug!(fetched_count = fetched.len(), "Successfully fetched OIDs");
383
384 Ok(fetched)
385 } 377 }
386 Ok(result) => { 378
387 let stderr = String::from_utf8_lossy(&result.stderr); 379 // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history
388 380 let mut args = vec!["fetch".to_string(), url.clone()];
389 // Extract domain and classify error for naughty list 381 args.extend(remaining_oids.iter().cloned());
390 if let Some(domain) = extract_domain(&url) { 382
391 if let Some(category) = NaughtyListTracker::classify_error(&stderr) { 383 let output = Command::new("git")
392 let is_new = naughty_list.record(&domain, category, stderr.to_string()); 384 .args(&args)
393 385 .current_dir(&repo_path)
394 if is_new { 386 .output();
395 tracing::warn!( 387
396 domain = %domain, 388 match output {
397 category = %category, 389 Ok(result) if result.status.success() => {
398 error = %stderr, 390 // Fetch succeeded - count how many OIDs we now have
399 "Git remote domain added to naughty list" 391 let fetched: Vec<String> = missing_oids
400 ); 392 .iter()
401 } else { 393 .filter(|oid| crate::git::oid_exists(&repo_path, oid))
402 debug!( 394 .cloned()
403 domain = %domain, 395 .collect();
404 category = %category, 396
405 "Git remote domain still on naughty list" 397 if !missing_from_remote.is_empty() {
406 ); 398 debug!(
407 } 399 url = %url,
400 fetched_count = fetched.len(),
401 missing_count = missing_from_remote.len(),
402 missing_oids = ?missing_from_remote,
403 "Fetch completed after retries - some OIDs were missing from remote"
404 );
405 } else {
406 debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs");
408 } 407 }
408
409 return Ok(fetched);
409 } 410 }
411 Ok(result) => {
412 let stderr = String::from_utf8_lossy(&result.stderr);
410 413
411 // Check for "not our ref" errors and provide a clearer error message 414 // Check for "not our ref" error - this is retryable
412 let error_msg = if stderr.contains("upload-pack: not our ref") { 415 if stderr.contains("upload-pack: not our ref") {
413 // Parse out the missing OID from stderr (git only reports one at a time) 416 // Parse out the missing OID from stderr
414 let missing_oid = stderr 417 let missing_oid = stderr.lines().find_map(|line| {
415 .lines()
416 .find_map(|line| {
417 if line.contains("not our ref") { 418 if line.contains("not our ref") {
418 // Extract the OID from lines like: 419 // Extract the OID from lines like:
419 // "fatal: remote error: upload-pack: not our ref <oid>" 420 // "fatal: remote error: upload-pack: not our ref <oid>"
420 line.split("not our ref").nth(1).map(|s| s.trim().to_string()) 421 line.split("not our ref")
422 .nth(1)
423 .map(|s| s.trim().to_string())
421 } else { 424 } else {
422 None 425 None
423 } 426 }
424 }); 427 });
425 428
426 let total_requested = missing_oids.len(); 429 if let Some(ref oid) = missing_oid {
430 // Remove the missing OID and retry with remaining
431 remaining_oids.retain(|o| o != oid);
432 missing_from_remote.push(oid.clone());
427 433
428 if let Some(oid) = missing_oid { 434 debug!(
429 if total_requested > 1 {
430 // BUG: Git stops at first missing OID, so we don't know if the others exist
431 // We need retry logic to fetch remaining OIDs individually
432 tracing::warn!(
433 url = %url, 435 url = %url,
434 missing_oid = %oid, 436 missing_oid = %oid,
435 total_requested = total_requested, 437 remaining_count = remaining_oids.len(),
436 "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." 438 "OID not found on remote, retrying with remaining OIDs"
437 ); 439 );
438 format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) 440
439 } else { 441 continue; // Retry with remaining OIDs
440 format!("remote missing only oid requested: {}", oid) 442 }
443 }
444
445 // Non-retryable error - record to naughty list and return error
446 if let Some(domain) = extract_domain(&url) {
447 if let Some(category) = NaughtyListTracker::classify_error(&stderr) {
448 let is_new =
449 naughty_list.record(&domain, category, stderr.to_string());
450
451 if is_new {
452 tracing::warn!(
453 domain = %domain,
454 category = %category,
455 error = %stderr,
456 "Git remote domain added to naughty list"
457 );
458 } else {
459 debug!(
460 domain = %domain,
461 category = %category,
462 error = %stderr,
463 "Git fetch failed (domain on naughty list)"
464 );
465 }
441 } 466 }
442 } else {
443 format!("git fetch failed: {}", stderr)
444 } 467 }
445 } else {
446 format!("git fetch failed: {}", stderr)
447 };
448 468
449 Err(anyhow::anyhow!("{}", error_msg)) 469 return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr));
470 }
471 Err(e) => {
472 return Err(anyhow::anyhow!(
473 "git fetch command error for {}: {}",
474 url,
475 e
476 ))
477 }
450 } 478 }
451 Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)),
452 } 479 }
453 }) 480 })
454 .await 481 .await
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs
index 65d29af..9207d58 100644
--- a/src/purgatory/sync/functions.rs
+++ b/src/purgatory/sync/functions.rs
@@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
368 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; 368 let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await;
369 throttle_manager.complete_request(&domain); 369 throttle_manager.complete_request(&domain);
370 370
371 let oids_fetched = match fetch_result { 371 let fetched_oids = match fetch_result {
372 Ok(fetched) => { 372 Ok(fetched) if !fetched.is_empty() => {
373 debug!( 373 debug!(
374 identifier = %identifier, 374 identifier = %identifier,
375 url = %url, 375 url = %url,
376 oids_fetched = fetched.len(), 376 oids_fetched = fetched.len(),
377 "Fetch succeeded" 377 "Fetch succeeded"
378 ); 378 );
379 fetched.len() 379 fetched
380 }
381 Ok(_) => {
382 debug!(
383 identifier = %identifier,
384 url = %url,
385 "Fetch returned no OIDs (not available on remote)"
386 );
387 vec![]
380 } 388 }
381 Err(e) => { 389 Err(e) => {
382 debug!( 390 debug!(
@@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
385 error = %e, 393 error = %e,
386 "Fetch failed" 394 "Fetch failed"
387 ); 395 );
388 0 396 vec![]
389 } 397 }
390 }; 398 };
391 399
392 // Try to process any events that can now be satisfied 400 // Try to process any events that can now be satisfied
393 if oids_fetched > 0 { 401 if !fetched_oids.is_empty() {
394 let new_oids: HashSet<String> = needed_oids.into_iter().collect(); 402 let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect();
395 if let Err(e) = ctx 403 if let Err(e) = ctx
396 .process_newly_available_git_data(&target_repo, &new_oids) 404 .process_newly_available_git_data(&target_repo, &new_oids)
397 .await 405 .await
@@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>(
404 } 412 }
405 } 413 }
406 414
407 oids_fetched 415 fetched_oids.len()
408} 416}
409 417
410/// Sync git data for an identifier. 418/// Sync git data for an identifier.
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs
index 919504b..e37a3e1 100644
--- a/src/purgatory/types.rs
+++ b/src/purgatory/types.rs
@@ -8,6 +8,28 @@ use nostr_sdk::prelude::*;
8use serde::{Deserialize, Serialize}; 8use serde::{Deserialize, Serialize};
9use std::time::Instant; 9use std::time::Instant;
10 10
11/// Source of an event entering purgatory.
12///
13/// Tracks whether an event was submitted directly by a user or fetched via
14/// proactive sync from another relay. This distinction is used for:
15/// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG
16/// - Operational monitoring: Helps identify user-facing issues vs sync noise
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
18pub enum EventSource {
19 /// Event was published directly to this relay by a user
20 #[default]
21 Direct,
22 /// Event was fetched via proactive sync from another relay
23 Sync,
24}
25
26impl EventSource {
27 /// Returns true if this is a direct submission (not synced)
28 pub fn is_direct(&self) -> bool {
29 matches!(self, EventSource::Direct)
30 }
31}
32
11/// Default value for Instant fields during deserialization 33/// Default value for Instant fields during deserialization
12fn instant_now() -> Instant { 34fn instant_now() -> Instant {
13 Instant::now() 35 Instant::now()
@@ -86,6 +108,10 @@ pub struct StatePurgatoryEntry {
86 /// Expiry deadline (30 min from creation, may be extended) 108 /// Expiry deadline (30 min from creation, may be extended)
87 #[serde(skip, default = "instant_now")] 109 #[serde(skip, default = "instant_now")]
88 pub expires_at: Instant, 110 pub expires_at: Instant,
111
112 /// Source of this event (direct submission vs sync)
113 #[serde(default)]
114 pub source: EventSource,
89} 115}
90 116
91/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. 117/// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory.
@@ -112,4 +138,8 @@ pub struct PrPurgatoryEntry {
112 /// Expiry deadline (30 min from creation, may be extended) 138 /// Expiry deadline (30 min from creation, may be extended)
113 #[serde(skip, default = "instant_now")] 139 #[serde(skip, default = "instant_now")]
114 pub expires_at: Instant, 140 pub expires_at: Instant,
141
142 /// Source of this event (direct submission vs sync)
143 #[serde(default)]
144 pub source: EventSource,
115} 145}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index bc8c428..d6634ff 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -584,6 +584,7 @@ impl SyncManager {
584 /// * `config` - Configuration for sync settings 584 /// * `config` - Configuration for sync settings
585 /// * `data_path` - Path to git data directory (for persistence) 585 /// * `data_path` - Path to git data directory (for persistence)
586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) 586 /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled)
587 #[allow(clippy::too_many_arguments)]
587 pub fn new( 588 pub fn new(
588 bootstrap_relay_url: Option<String>, 589 bootstrap_relay_url: Option<String>,
589 service_domain: String, 590 service_domain: String,
@@ -1442,6 +1443,7 @@ impl SyncManager {
1442 self.service_domain.clone(), 1443 self.service_domain.clone(),
1443 Arc::clone(&self.repo_sync_index), 1444 Arc::clone(&self.repo_sync_index),
1444 action_tx, 1445 action_tx,
1446 self.database.clone(),
1445 ); 1447 );
1446 let subscriber_shutdown = shutdown_tx.subscribe(); 1448 let subscriber_shutdown = shutdown_tx.subscribe();
1447 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); 1449 tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await });
@@ -2811,6 +2813,7 @@ impl SyncManager {
2811 event_id = %event.id, 2813 event_id = %event.id,
2812 kind = %event.kind.as_u16(), 2814 kind = %event.kind.as_u16(),
2813 identifier = %identifier, 2815 identifier = %identifier,
2816 pubkey = %event.pubkey,
2814 "Added rejected announcement to two-tier index" 2817 "Added rejected announcement to two-tier index"
2815 ); 2818 );
2816 } 2819 }
diff --git a/src/sync/naughty_list.rs b/src/sync/naughty_list.rs
index 35fcc0f..0abb986 100644
--- a/src/sync/naughty_list.rs
+++ b/src/sync/naughty_list.rs
@@ -101,6 +101,69 @@ impl NaughtyListTracker {
101 Self::new(12) 101 Self::new(12)
102 } 102 }
103 103
104 /// Strip URLs from an error message to prevent false positives from URL components.
105 ///
106 /// URLs can contain path components, repository names, or user identifiers that
107 /// accidentally match error patterns (e.g., "my-openssl-project", "ssl-team",
108 /// "certificate-manager"). By stripping URLs before classification, we ensure
109 /// only the actual error message text is analyzed.
110 ///
111 /// Handles: http://, https://, git://, ws://, wss://
112 fn strip_urls(error: &str) -> String {
113 let mut result = String::with_capacity(error.len());
114 let mut chars = error.chars().peekable();
115
116 while let Some(c) = chars.next() {
117 // Check for URL start patterns
118 let potential_url = match c {
119 'h' => {
120 // Check for http:// or https://
121 let rest: String = chars.clone().take(7).collect();
122 rest.starts_with("ttp://") || rest.starts_with("ttps://")
123 }
124 'g' => {
125 // Check for git://
126 let rest: String = chars.clone().take(5).collect();
127 rest.starts_with("it://")
128 }
129 'w' => {
130 // Check for ws:// or wss://
131 let rest: String = chars.clone().take(5).collect();
132 rest.starts_with("s://") || rest.starts_with("ss://")
133 }
134 _ => false,
135 };
136
137 if potential_url {
138 // Found URL start, consume until URL end
139 result.push_str("[URL]");
140
141 // Skip until we hit a URL terminator
142 loop {
143 match chars.peek() {
144 Some(&ch) if Self::is_url_char(ch) => {
145 chars.next();
146 }
147 _ => break,
148 }
149 }
150 } else {
151 result.push(c);
152 }
153 }
154
155 result
156 }
157
158 /// Check if a character can be part of a URL
159 #[inline]
160 fn is_url_char(c: char) -> bool {
161 // URLs end at whitespace, quotes, or certain brackets
162 // This is conservative - real URLs can contain more, but git errors
163 // typically have URLs followed by these terminators
164 !matches!(c, ' ' | '\t' | '\n' | '\r' | '"' | '\'' | '>' | ']' | ')')
165 }
166
104 /// Classify an error string into a naughty category or return None for transient errors 167 /// Classify an error string into a naughty category or return None for transient errors
105 /// 168 ///
106 /// # Arguments 169 /// # Arguments
@@ -112,21 +175,56 @@ impl NaughtyListTracker {
112 /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue 175 /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue
113 /// - `None` if the error is a transient network issue (use HealthTracker backoff) 176 /// - `None` if the error is a transient network issue (use HealthTracker backoff)
114 pub fn classify_error(error: &str) -> Option<NaughtyCategory> { 177 pub fn classify_error(error: &str) -> Option<NaughtyCategory> {
115 let error_lower = error.to_lowercase(); 178 // Filter out remote warnings - these are informational messages from the remote
179 // server that don't indicate infrastructure problems with the domain itself.
180 // Example: "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"
181 // These warnings are about the remote server's internal configuration, not connectivity.
182 let filtered_error: String = error
183 .lines()
184 .filter(|line| {
185 let line_lower = line.to_lowercase();
186 // Keep lines that are NOT remote warnings
187 !(line_lower.starts_with("remote: warning:")
188 || line_lower.starts_with("warning: remote"))
189 })
190 .collect::<Vec<_>>()
191 .join("\n");
192
193 // If after filtering we have no content, this was just warnings - not a real error
194 if filtered_error.trim().is_empty() {
195 return None;
196 }
197
198 // Strip URLs to prevent false positives from URL components
199 // (e.g., repository named "openssl-test" or path containing "certificate")
200 let url_stripped = Self::strip_urls(&filtered_error);
201 let error_lower = url_stripped.to_lowercase();
116 202
117 // DNS lookup failures 203 // DNS lookup failures
118 if error_lower.contains("failed to lookup address") 204 if error_lower.contains("failed to lookup address")
119 || error_lower.contains("name or service not known") 205 || error_lower.contains("name or service not known")
120 || error_lower.contains("nodename nor servname provided") 206 || error_lower.contains("nodename nor servname provided")
121 || (error_lower.contains("dns") && !error_lower.contains("timeout")) 207 || error_lower.contains("dns error")
208 || error_lower.contains("dns lookup")
209 || error_lower.contains("dns resolution")
210 || error_lower.contains("getaddrinfo")
122 { 211 {
123 return Some(NaughtyCategory::DnsLookupFailed); 212 return Some(NaughtyCategory::DnsLookupFailed);
124 } 213 }
125 214
126 // TLS certificate errors 215 // TLS certificate errors
127 if error_lower.contains("certificate") 216 if error_lower.contains("certificate")
128 || error_lower.contains("ssl") 217 || error_lower.contains("ssl error")
129 || error_lower.contains("tls") 218 || error_lower.contains("ssl certificate")
219 || error_lower.contains("ssl handshake")
220 || error_lower.contains("ssl_error")
221 || error_lower.contains("tls error")
222 || error_lower.contains("tls handshake")
223 || error_lower.contains("tls alert")
224 || error_lower.contains("tls_error")
225 || error_lower.contains("openssl")
226 || error_lower.contains("schannel")
227 || error_lower.contains("secure channel")
130 { 228 {
131 // Exclude timeout errors that mention TLS 229 // Exclude timeout errors that mention TLS
132 if !error_lower.contains("timeout") && !error_lower.contains("timed out") { 230 if !error_lower.contains("timeout") && !error_lower.contains("timed out") {
@@ -134,12 +232,12 @@ impl NaughtyListTracker {
134 } 232 }
135 } 233 }
136 234
137 // Protocol errors 235 // Protocol errors - specifically WebSocket/Nostr protocol violations
138 if error_lower.contains("websocket") 236 // Note: We check for "websocket" specifically, NOT generic "protocol" keyword
139 || error_lower.contains("protocol") 237 // because git errors often contain "protocol error" (e.g., "fatal: protocol error: bad line length")
140 || error_lower.contains("invalid frame") 238 // which are transient network issues, not persistent infrastructure problems.
141 { 239 if error_lower.contains("websocket") || error_lower.contains("invalid frame") {
142 // Exclude connection errors 240 // Exclude connection errors (transient)
143 if !error_lower.contains("connection") 241 if !error_lower.contains("connection")
144 && !error_lower.contains("timeout") 242 && !error_lower.contains("timeout")
145 && !error_lower.contains("refused") 243 && !error_lower.contains("refused")
@@ -290,183 +388,216 @@ impl NaughtyListTracker {
290mod tests { 388mod tests {
291 use super::*; 389 use super::*;
292 390
391 // =========================================================================
392 // URL STRIPPING TESTS
393 // =========================================================================
394
293 #[test] 395 #[test]
294 fn test_classify_dns_errors() { 396 fn test_strip_urls_basic_protocols() {
295 assert_eq!( 397 // HTTP/HTTPS
296 NaughtyListTracker::classify_error("failed to lookup address information"),
297 Some(NaughtyCategory::DnsLookupFailed)
298 );
299 assert_eq!( 398 assert_eq!(
300 NaughtyListTracker::classify_error("Name or service not known"), 399 NaughtyListTracker::strip_urls("error: https://example.com/repo.git failed"),
301 Some(NaughtyCategory::DnsLookupFailed) 400 "error: [URL] failed"
302 );
303 assert_eq!(
304 NaughtyListTracker::classify_error("nodename nor servname provided"),
305 Some(NaughtyCategory::DnsLookupFailed)
306 ); 401 );
307 assert_eq!( 402 assert_eq!(
308 NaughtyListTracker::classify_error("dns error: NXDOMAIN"), 403 NaughtyListTracker::strip_urls("error: http://example.com/path failed"),
309 Some(NaughtyCategory::DnsLookupFailed) 404 "error: [URL] failed"
310 ); 405 );
311 }
312 406
313 #[test] 407 // Git protocol
314 fn test_classify_tls_errors() {
315 assert_eq!( 408 assert_eq!(
316 NaughtyListTracker::classify_error("certificate not valid for 'example.com'"), 409 NaughtyListTracker::strip_urls("fatal: git://github.com/user/repo.git not found"),
317 Some(NaughtyCategory::TlsCertificateInvalid) 410 "fatal: [URL] not found"
318 ); 411 );
412
413 // WebSocket protocols (used for relay URLs)
319 assert_eq!( 414 assert_eq!(
320 NaughtyListTracker::classify_error("SSL certificate problem"), 415 NaughtyListTracker::strip_urls("error: wss://relay.example.com failed"),
321 Some(NaughtyCategory::TlsCertificateInvalid) 416 "error: [URL] failed"
322 ); 417 );
323 assert_eq!( 418 assert_eq!(
324 NaughtyListTracker::classify_error("TLS handshake failed"), 419 NaughtyListTracker::strip_urls("error: ws://localhost:8080 failed"),
325 Some(NaughtyCategory::TlsCertificateInvalid) 420 "error: [URL] failed"
326 ); 421 );
422 }
327 423
328 // TLS timeout should NOT be classified as naughty 424 #[test]
329 assert_eq!( 425 fn test_strip_urls_multiple() {
330 NaughtyListTracker::classify_error("TLS connection timed out"), 426 let error = "failed to clone https://a.com/repo.git and wss://relay.com";
331 None 427 let stripped = NaughtyListTracker::strip_urls(error);
332 ); 428 assert_eq!(stripped, "failed to clone [URL] and [URL]");
333 } 429 }
334 430
335 #[test] 431 #[test]
336 fn test_classify_protocol_errors() { 432 fn test_strip_urls_preserves_error_text() {
337 assert_eq!( 433 let error =
338 NaughtyListTracker::classify_error("websocket protocol error"), 434 "fatal: unable to access 'https://example.com/repo.git/': SSL certificate problem";
339 Some(NaughtyCategory::ProtocolError) 435 let stripped = NaughtyListTracker::strip_urls(error);
340 ); 436 assert!(stripped.contains("SSL certificate problem"));
437 assert!(!stripped.contains("example.com"));
438 }
439
440 // =========================================================================
441 // EDGE CASES: TIMEOUT/CONNECTION EXCEPTIONS
442 // These are the "unusual rules" where a pattern matches but should be excluded
443 // =========================================================================
444
445 #[test]
446 fn test_tls_timeout_not_naughty() {
447 // TLS errors with timeout should NOT be classified as naughty
448 // (timeout is transient, not a certificate problem)
341 assert_eq!( 449 assert_eq!(
342 NaughtyListTracker::classify_error("invalid frame header"), 450 NaughtyListTracker::classify_error("TLS connection timed out"),
343 Some(NaughtyCategory::ProtocolError) 451 None
344 ); 452 );
345
346 // WebSocket connection errors should NOT be classified as naughty
347 assert_eq!( 453 assert_eq!(
348 NaughtyListTracker::classify_error("websocket connection refused"), 454 NaughtyListTracker::classify_error("SSL handshake timeout"),
349 None 455 None
350 ); 456 );
351 } 457 }
352 458
353 #[test] 459 #[test]
354 fn test_classify_transient_errors() { 460 fn test_websocket_connection_errors_not_naughty() {
355 // Timeouts are transient 461 // WebSocket connection errors are transient, not protocol violations
356 assert_eq!( 462 assert_eq!(
357 NaughtyListTracker::classify_error("connection timed out"), 463 NaughtyListTracker::classify_error("websocket connection refused"),
358 None 464 None
359 ); 465 );
360 assert_eq!( 466 assert_eq!(
361 NaughtyListTracker::classify_error("operation timed out"), 467 NaughtyListTracker::classify_error("websocket connection timeout"),
362 None 468 None
363 ); 469 );
470 }
364 471
365 // Connection refused is transient 472 #[test]
473 fn test_remote_warnings_filtered() {
474 // Remote warnings should be filtered out before classification
475 let warning_only =
476 "remote: warning: unable to access '/root/.config/git/attributes': Permission denied";
477 assert_eq!(NaughtyListTracker::classify_error(warning_only), None);
478
479 // But real errors after warnings should still be classified
480 let warning_with_error = "remote: warning: something\nfatal: failed to lookup address";
366 assert_eq!( 481 assert_eq!(
367 NaughtyListTracker::classify_error("connection refused"), 482 NaughtyListTracker::classify_error(warning_with_error),
368 None 483 Some(NaughtyCategory::DnsLookupFailed)
369 ); 484 );
485 }
370 486
371 // Generic network errors are transient 487 // =========================================================================
488 // INTEGRATION: FULL CLASSIFICATION FLOW
489 // Verify URL stripping + classification work together correctly
490 // =========================================================================
491
492 #[test]
493 fn test_url_with_keywords_not_false_positive() {
494 // URLs containing keywords should NOT trigger classification
495 let cases = [
496 ("https://example.com/my-openssl-project.git", "not found"),
497 ("https://example.com/ssl-team/repo.git", "not found"),
498 ("https://example.com/certificate-manager.git", "not found"),
499 ("https://example.com/dns-tools.git", "not found"),
500 ("wss://relay-tls-test.example.com", "connection refused"),
501 ];
502
503 for (url, suffix) in cases {
504 let error = format!("fatal: repository '{}/' {}", url, suffix);
505 assert_eq!(
506 NaughtyListTracker::classify_error(&error),
507 None,
508 "URL '{}' should not trigger false positive",
509 url
510 );
511 }
512 }
513
514 #[test]
515 fn test_real_errors_still_detected() {
516 // Real errors in the message text (not URL) should still be detected
372 assert_eq!( 517 assert_eq!(
373 NaughtyListTracker::classify_error("network unreachable"), 518 NaughtyListTracker::classify_error(
374 None 519 "fatal: 'https://example.com/repo.git': SSL certificate problem"
520 ),
521 Some(NaughtyCategory::TlsCertificateInvalid)
522 );
523 assert_eq!(
524 NaughtyListTracker::classify_error(
525 "fatal: 'https://example.com/repo.git': failed to lookup address"
526 ),
527 Some(NaughtyCategory::DnsLookupFailed)
528 );
529 assert_eq!(
530 NaughtyListTracker::classify_error("websocket protocol error"),
531 Some(NaughtyCategory::ProtocolError)
375 ); 532 );
376 } 533 }
377 534
378 #[test] 535 #[test]
379 fn test_record_new_entry() { 536 fn test_url_with_keyword_and_real_error() {
380 let tracker = NaughtyListTracker::with_defaults(); 537 // URL contains keyword AND there's a real error - should detect the error
381 let url = "wss://bad-relay.example.com"; 538 let error = "fatal: 'https://example.com/ssl-tools/repo.git': SSL certificate problem";
382 539 assert_eq!(
383 let is_new = tracker.record( 540 NaughtyListTracker::classify_error(error),
384 url, 541 Some(NaughtyCategory::TlsCertificateInvalid)
385 NaughtyCategory::DnsLookupFailed,
386 "failed to lookup address".to_string(),
387 ); 542 );
388
389 assert!(is_new);
390 assert!(tracker.is_naughty(url));
391
392 let entry = tracker.get_entry(url).unwrap();
393 assert_eq!(entry.category, NaughtyCategory::DnsLookupFailed);
394 assert_eq!(entry.occurrence_count, 1);
395 } 543 }
396 544
545 // =========================================================================
546 // TRACKER FUNCTIONALITY
547 // =========================================================================
548
397 #[test] 549 #[test]
398 fn test_record_updates_existing() { 550 fn test_tracker_record_and_update() {
399 let tracker = NaughtyListTracker::with_defaults(); 551 let tracker = NaughtyListTracker::with_defaults();
400 let url = "wss://bad-relay.example.com"; 552 let url = "wss://bad-relay.example.com";
401 553
402 // First occurrence 554 // First occurrence
403 let is_new1 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); 555 let is_new = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string());
404 assert!(is_new1); 556 assert!(is_new);
557 assert!(tracker.is_naughty(url));
405 558
406 // Second occurrence 559 // Second occurrence updates existing
407 let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); 560 let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string());
408 assert!(!is_new2); 561 assert!(!is_new2);
409 562
410 let entry = tracker.get_entry(url).unwrap(); 563 let entry = tracker.get_entry(url).unwrap();
411 assert_eq!(entry.occurrence_count, 2); 564 assert_eq!(entry.occurrence_count, 2);
412 assert_eq!(entry.reason, "error 2"); // Updated to latest 565 assert_eq!(entry.reason, "error 2");
413 } 566 }
414 567
415 #[test] 568 #[test]
416 fn test_is_naughty() { 569 fn test_tracker_expiration() {
417 let tracker = NaughtyListTracker::with_defaults(); 570 let tracker = NaughtyListTracker::new(0); // Expire immediately
418 let url = "wss://bad-relay.example.com";
419
420 assert!(!tracker.is_naughty(url));
421 571
422 tracker.record( 572 tracker.record(
423 url, 573 "wss://relay.example.com",
424 NaughtyCategory::TlsCertificateInvalid, 574 NaughtyCategory::DnsLookupFailed,
425 "cert error".to_string(), 575 "error".to_string(),
426 ); 576 );
427 577
428 assert!(tracker.is_naughty(url)); 578 // Entry exists but is expired
429 } 579 assert!(!tracker.is_naughty("wss://relay.example.com"));
430 580
431 #[test] 581 std::thread::sleep(std::time::Duration::from_millis(10));
432 fn test_get_all() {
433 let tracker = NaughtyListTracker::with_defaults();
434
435 tracker.record(
436 "wss://relay1.example.com",
437 NaughtyCategory::DnsLookupFailed,
438 "dns error".to_string(),
439 );
440 tracker.record(
441 "wss://relay2.example.com",
442 NaughtyCategory::TlsCertificateInvalid,
443 "tls error".to_string(),
444 );
445 582
446 let all = tracker.get_all(); 583 let expired = tracker.expire_old_entries();
447 assert_eq!(all.len(), 2); 584 assert_eq!(expired.len(), 1);
585 assert_eq!(tracker.total_count(), 0);
448 } 586 }
449 587
450 #[test] 588 #[test]
451 fn test_count_by_category() { 589 fn test_tracker_counts() {
452 let tracker = NaughtyListTracker::with_defaults(); 590 let tracker = NaughtyListTracker::with_defaults();
453 591
592 tracker.record("wss://r1.com", NaughtyCategory::DnsLookupFailed, "e".into());
593 tracker.record("wss://r2.com", NaughtyCategory::DnsLookupFailed, "e".into());
454 tracker.record( 594 tracker.record(
455 "wss://relay1.example.com", 595 "wss://r3.com",
456 NaughtyCategory::DnsLookupFailed,
457 "error".to_string(),
458 );
459 tracker.record(
460 "wss://relay2.example.com",
461 NaughtyCategory::DnsLookupFailed,
462 "error".to_string(),
463 );
464 tracker.record(
465 "wss://relay3.example.com",
466 NaughtyCategory::TlsCertificateInvalid, 596 NaughtyCategory::TlsCertificateInvalid,
467 "error".to_string(), 597 "e".into(),
468 ); 598 );
469 599
600 assert_eq!(tracker.total_count(), 3);
470 assert_eq!( 601 assert_eq!(
471 tracker.count_by_category(NaughtyCategory::DnsLookupFailed), 602 tracker.count_by_category(NaughtyCategory::DnsLookupFailed),
472 2 603 2
@@ -475,82 +606,84 @@ mod tests {
475 tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), 606 tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid),
476 1 607 1
477 ); 608 );
478 assert_eq!(tracker.count_by_category(NaughtyCategory::ProtocolError), 0); 609 assert_eq!(tracker.get_all().len(), 3);
479 } 610 }
480 611
481 #[test] 612 #[test]
482 fn test_total_count() { 613 fn test_category_display() {
483 let tracker = NaughtyListTracker::with_defaults(); 614 assert_eq!(
484 assert_eq!(tracker.total_count(), 0); 615 NaughtyCategory::DnsLookupFailed.as_str(),
485 616 "dns_lookup_failed"
486 tracker.record(
487 "wss://relay1.example.com",
488 NaughtyCategory::DnsLookupFailed,
489 "error".to_string(),
490 ); 617 );
491 assert_eq!(tracker.total_count(), 1); 618 assert_eq!(
492 619 NaughtyCategory::TlsCertificateInvalid.as_str(),
493 tracker.record( 620 "tls_certificate_invalid"
494 "wss://relay2.example.com",
495 NaughtyCategory::TlsCertificateInvalid,
496 "error".to_string(),
497 ); 621 );
498 assert_eq!(tracker.total_count(), 2); 622 assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error");
499 } 623 }
624}
500 625
501 #[test] 626#[cfg(test)]
502 fn test_expire_old_entries() { 627mod production_tests {
503 // Use very short expiration for testing 628 use super::*;
504 let tracker = NaughtyListTracker::new(0); // Expire immediately (0 hours)
505
506 tracker.record(
507 "wss://relay1.example.com",
508 NaughtyCategory::DnsLookupFailed,
509 "error".to_string(),
510 );
511
512 // Entry should exist in the map
513 assert_eq!(tracker.total_count(), 1);
514
515 // But is_naughty should return false since it's already expired (0 hours)
516 assert!(!tracker.is_naughty("wss://relay1.example.com"));
517
518 // Sleep to ensure time passes
519 std::thread::sleep(std::time::Duration::from_millis(10));
520 629
521 // Expire old entries (should remove the 0-hour expired entry) 630 /// Production case from relay.ngit.dev - remote warning should not be classified
522 let expired = tracker.expire_old_entries(); 631 #[test]
523 assert_eq!(expired.len(), 1); 632 fn test_classify_production_relay_ngit_dev_warning() {
524 assert_eq!(expired[0], "wss://relay1.example.com"); 633 let error =
634 "remote: warning: unable to access '/root/.config/git/attributes': Permission denied";
635 assert_eq!(NaughtyListTracker::classify_error(error), None);
636 }
525 637
526 // Entry should be gone 638 /// Git protocol errors are transient, not persistent infrastructure issues
527 assert!(!tracker.is_naughty("wss://relay1.example.com")); 639 #[test]
528 assert_eq!(tracker.total_count(), 0); 640 fn test_git_protocol_errors_not_naughty() {
641 // These are common git protocol errors that should NOT be classified as naughty
642 let git_protocol_errors = [
643 "fatal: protocol error: bad line length character: remo",
644 "fatal: protocol error: expected old/new/ref, got 'shallow",
645 "fatal: git upload-pack: protocol error",
646 "error: protocol error: bad pack header",
647 "fatal: protocol error: bad band #3",
648 ];
649
650 for error in git_protocol_errors {
651 assert_eq!(
652 NaughtyListTracker::classify_error(error),
653 None,
654 "Git protocol error should not be classified as naughty: {}",
655 error
656 );
657 }
529 } 658 }
530 659
660 /// Remote warning followed by git protocol error - both should be filtered/ignored
531 #[test] 661 #[test]
532 fn test_category_display() { 662 fn test_warning_with_git_protocol_error() {
663 let error = "remote: warning: unable to access '/root/.config/git/attributes': Permission denied\nfatal: protocol error: bad line length character: remo";
533 assert_eq!( 664 assert_eq!(
534 NaughtyCategory::DnsLookupFailed.to_string(), 665 NaughtyListTracker::classify_error(error),
535 "dns_lookup_failed" 666 None,
667 "Warning + git protocol error should not be classified as naughty"
536 ); 668 );
537 assert_eq!(
538 NaughtyCategory::TlsCertificateInvalid.to_string(),
539 "tls_certificate_invalid"
540 );
541 assert_eq!(NaughtyCategory::ProtocolError.to_string(), "protocol_error");
542 } 669 }
543 670
671 /// WebSocket protocol errors ARE naughty (persistent infrastructure issues)
544 #[test] 672 #[test]
545 fn test_category_as_str() { 673 fn test_websocket_errors_still_naughty() {
546 assert_eq!( 674 let websocket_errors = [
547 NaughtyCategory::DnsLookupFailed.as_str(), 675 "websocket protocol error",
548 "dns_lookup_failed" 676 "websocket handshake failed",
549 ); 677 "invalid frame received",
550 assert_eq!( 678 ];
551 NaughtyCategory::TlsCertificateInvalid.as_str(), 679
552 "tls_certificate_invalid" 680 for error in websocket_errors {
553 ); 681 assert_eq!(
554 assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); 682 NaughtyListTracker::classify_error(error),
683 Some(NaughtyCategory::ProtocolError),
684 "WebSocket error should be classified as protocol_error: {}",
685 error
686 );
687 }
555 } 688 }
556} 689}
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index 3cc408d..86e4583 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -16,6 +16,8 @@ use nostr_sdk::Timestamp;
16use tokio::sync::broadcast::error::RecvError; 16use tokio::sync::broadcast::error::RecvError;
17use tokio::sync::{broadcast, mpsc}; 17use tokio::sync::{broadcast, mpsc};
18 18
19use crate::nostr::builder::SharedDatabase;
20
19use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; 21use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds};
20 22
21// ============================================================================= 23// =============================================================================
@@ -98,6 +100,8 @@ pub struct SelfSubscriber {
98 action_tx: mpsc::Sender<AddFilters>, 100 action_tx: mpsc::Sender<AddFilters>,
99 /// Last time we connected - used for since filter on reconnect 101 /// Last time we connected - used for since filter on reconnect
100 last_connected: Option<Timestamp>, 102 last_connected: Option<Timestamp>,
103 /// Database for querying existing events on startup
104 database: SharedDatabase,
101} 105}
102 106
103impl SelfSubscriber { 107impl SelfSubscriber {
@@ -108,11 +112,13 @@ impl SelfSubscriber {
108 /// * `relay_domain` - Our service domain (used for filtering relevant repos) 112 /// * `relay_domain` - Our service domain (used for filtering relevant repos)
109 /// * `repo_sync_index` - Shared index to update with discovered repos 113 /// * `repo_sync_index` - Shared index to update with discovered repos
110 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager 114 /// * `action_tx` - Channel to send AddFilters actions to the SyncManager
115 /// * `database` - Database for querying existing events on startup
111 pub fn new( 116 pub fn new(
112 own_relay_url: String, 117 own_relay_url: String,
113 relay_domain: String, 118 relay_domain: String,
114 repo_sync_index: RepoSyncIndex, 119 repo_sync_index: RepoSyncIndex,
115 action_tx: mpsc::Sender<AddFilters>, 120 action_tx: mpsc::Sender<AddFilters>,
121 database: SharedDatabase,
116 ) -> Self { 122 ) -> Self {
117 Self { 123 Self {
118 own_relay_url, 124 own_relay_url,
@@ -120,6 +126,7 @@ impl SelfSubscriber {
120 repo_sync_index, 126 repo_sync_index,
121 action_tx, 127 action_tx,
122 last_connected: None, 128 last_connected: None,
129 database,
123 } 130 }
124 } 131 }
125 132
@@ -135,6 +142,101 @@ impl SelfSubscriber {
135 .unwrap_or(Duration::from_millis(5000)) 142 .unwrap_or(Duration::from_millis(5000))
136 } 143 }
137 144
145 /// Load existing events from database on startup
146 ///
147 /// Queries the database with two separate queries to build the initial
148 /// PendingUpdates state. This ensures all repos get Layer 2/3 filters
149 /// created, not just those returned by the WebSocket subscription
150 /// (which has limits on the number of events returned).
151 ///
152 /// Query order:
153 /// 1. First query: Get announcements (30617) to populate repo_sync_index
154 /// with repos and their relays
155 /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event()
156 /// to add root event IDs for Layer 3 filter creation
157 ///
158 /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters.
159 async fn load_existing_events(&self) -> PendingUpdates {
160 let mut pending = PendingUpdates::new();
161
162 tracing::info!("Loading all events from database");
163
164 // First query: Get all announcements to populate repo_sync_index
165 let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement);
166
167 let announcements = match self.database.query(announcement_filter).await {
168 Ok(events) => {
169 tracing::info!(count = events.len(), "Loaded announcements from database");
170 events
171 }
172 Err(e) => {
173 tracing::error!(
174 error = %e,
175 "Failed to query announcements from database"
176 );
177 return pending;
178 }
179 };
180
181 // Process announcements
182 let mut announcements_loaded = 0;
183 for event in announcements.iter() {
184 if let Some(repo_id) = Self::extract_repo_id(event) {
185 let relays = Self::extract_relay_urls(event);
186 pending.add_repo(repo_id, relays, HashSet::new());
187 announcements_loaded += 1;
188 }
189 }
190
191 // Update repo_sync_index with announcements BEFORE querying root events
192 {
193 let mut index = self.repo_sync_index.write().await;
194 for (repo_id, needs) in &pending.repos {
195 let entry = index
196 .entry(repo_id.clone())
197 .or_insert_with(|| RepoSyncNeeds {
198 relays: HashSet::new(),
199 root_events: HashSet::new(),
200 });
201 entry.relays.extend(needs.relays.clone());
202 }
203 }
204
205 // Second query: Get all root events for handle_root_event()
206 let root_filter =
207 Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]);
208
209 let root_events = match self.database.query(root_filter).await {
210 Ok(events) => {
211 tracing::info!(count = events.len(), "Loaded root events from database");
212 events
213 }
214 Err(e) => {
215 tracing::error!(
216 error = %e,
217 "Failed to query root events from database"
218 );
219 // Continue with just announcements
220 return pending;
221 }
222 };
223
224 // Process root events
225 let mut root_events_processed = 0;
226 for event in root_events.iter() {
227 self.handle_root_event(event, &mut pending).await;
228 root_events_processed += 1;
229 }
230
231 tracing::info!(
232 announcements_loaded = announcements_loaded,
233 root_events_processed = root_events_processed,
234 "Processed existing events from database"
235 );
236
237 pending
238 }
239
138 /// Process a relay pool notification 240 /// Process a relay pool notification
139 /// 241 ///
140 /// Handles incoming events from the subscription, queueing 30617 announcements 242 /// Handles incoming events from the subscription, queueing 30617 announcements
@@ -276,33 +378,22 @@ impl SelfSubscriber {
276 // Subscribe to announcement and root event kinds 378 // Subscribe to announcement and root event kinds
277 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) 379 // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618)
278 // Plus kind 10317 (User Grasp List) for GRASP discovery 380 // Plus kind 10317 (User Grasp List) for GRASP discovery
279 // Check if we have a last_connected time for reconnect filtering 381 let mut filter = Filter::new().kinds(vec![
280 let filter = if let Some(last) = self.last_connected { 382 Kind::GitRepoAnnouncement,
383 Kind::GitPatch,
384 Kind::GitIssue,
385 Kind::GitPullRequest,
386 Kind::GitUserGraspList,
387 ]);
388 if let Some(timestamp) = self.last_connected {
281 // Quick reconnect - use since filter (15 min buffer) 389 // Quick reconnect - use since filter (15 min buffer)
282 let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); 390 let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60));
283 tracing::debug!( 391 tracing::debug!(
284 since = %since, 392 since = %since,
285 "Using since filter for reconnect" 393 "Using since filter for reconnect"
286 ); 394 );
287 Filter::new() 395 filter = filter.since(since);
288 .kinds(vec![ 396 }
289 Kind::GitRepoAnnouncement, // Repository Announcements
290 Kind::GitPatch, // Patches
291 Kind::GitIssue, // Issues
292 Kind::GitPullRequest, // Pull Requests
293 Kind::GitUserGraspList, // User Grasp List
294 ])
295 .since(since)
296 } else {
297 // First connection - no since filter
298 Filter::new().kinds(vec![
299 Kind::GitRepoAnnouncement, // Repository Announcements
300 Kind::GitPatch, // Patches
301 Kind::GitIssue, // Issues
302 Kind::GitPullRequest, // Pull Requests
303 Kind::GitUserGraspList, // User Grasp List
304 ])
305 };
306 397
307 // Update last_connected AFTER creating filter but BEFORE subscribing 398 // Update last_connected AFTER creating filter but BEFORE subscribing
308 self.last_connected = Some(Timestamp::now()); 399 self.last_connected = Some(Timestamp::now());
@@ -323,7 +414,11 @@ impl SelfSubscriber {
323 414
324 let mut notifications = client.notifications(); 415 let mut notifications = client.notifications();
325 let batch_window = Self::get_batch_window(); 416 let batch_window = Self::get_batch_window();
326 let mut pending = PendingUpdates::new(); 417
418 // Load existing events from database on startup
419 // This ensures all repos get Layer 2/3 filters created, not just those
420 // returned by the WebSocket subscription (which has limits)
421 let mut pending = self.load_existing_events().await;
327 422
328 // Timer does NOT reset on new events - use interval 423 // Timer does NOT reset on new events - use interval
329 let mut timer = tokio::time::interval(batch_window); 424 let mut timer = tokio::time::interval(batch_window);