diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 74 | ||||
| -rw-r--r-- | src/git/handlers.rs | 66 | ||||
| -rw-r--r-- | src/main.rs | 20 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 159 | ||||
| -rw-r--r-- | src/nostr/events.rs | 16 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 9 | ||||
| -rw-r--r-- | src/purgatory/mod.rs | 339 | ||||
| -rw-r--r-- | src/purgatory/sync/context.rs | 163 | ||||
| -rw-r--r-- | src/purgatory/sync/functions.rs | 22 | ||||
| -rw-r--r-- | src/purgatory/types.rs | 30 | ||||
| -rw-r--r-- | src/sync/mod.rs | 3 | ||||
| -rw-r--r-- | src/sync/naughty_list.rs | 485 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 141 |
13 files changed, 1116 insertions, 411 deletions
diff --git a/src/config.rs b/src/config.rs index 271a340..dd7b1e3 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -109,22 +109,25 @@ impl WhitelistEntry { | |||
| 109 | } | 109 | } |
| 110 | 110 | ||
| 111 | /// GRASP-05 Archive mode configuration | 111 | /// GRASP-05 Archive mode configuration |
| 112 | #[derive(Debug, Clone, Serialize, Deserialize)] | 112 | #[derive(Debug, Clone, Serialize, Deserialize, Default)] |
| 113 | pub struct ArchiveConfig { | 113 | pub struct ArchiveConfig { |
| 114 | /// Accept all repository announcements (no filtering) | 114 | /// Accept all repository announcements (no filtering) |
| 115 | /// | 115 | /// |
| 116 | /// WARNING: Setting this to true allows anyone to mirror any repository | 116 | /// WARNING: Setting this to true allows anyone to mirror any repository |
| 117 | /// to this relay, potentially causing storage/bandwidth exhaustion. | 117 | /// to this relay, potentially causing storage/bandwidth exhaustion. |
| 118 | #[serde(default)] | ||
| 118 | pub archive_all: bool, | 119 | pub archive_all: bool, |
| 119 | 120 | ||
| 120 | /// Whitelist entries for selective archiving | 121 | /// Whitelist entries for selective archiving |
| 121 | /// | 122 | /// |
| 122 | /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode). | 123 | /// If empty and archive_all is false, GRASP-05 is disabled (GRASP-01 strict mode). |
| 124 | #[serde(default)] | ||
| 123 | pub whitelist: Vec<WhitelistEntry>, | 125 | pub whitelist: Vec<WhitelistEntry>, |
| 124 | 126 | ||
| 125 | /// GRASP server domains to archive (archive all repositories from these domains) | 127 | /// GRASP server domains to archive (archive all repositories from these domains) |
| 126 | /// | 128 | /// |
| 127 | /// If non-empty, archives all repositories from the specified GRASP server domains. | 129 | /// If non-empty, archives all repositories from the specified GRASP server domains. |
| 130 | #[serde(default)] | ||
| 128 | pub grasp_services: Vec<String>, | 131 | pub grasp_services: Vec<String>, |
| 129 | 132 | ||
| 130 | /// Read-only archive mode: relay is a read-only sync of archived repositories | 133 | /// Read-only archive mode: relay is a read-only sync of archived repositories |
| @@ -132,6 +135,7 @@ pub struct ArchiveConfig { | |||
| 132 | /// When true, the relay ONLY accepts announcements matching the archive whitelist/all. | 135 | /// When true, the relay ONLY accepts announcements matching the archive whitelist/all. |
| 133 | /// Announcements listing the relay but not in the whitelist are rejected. | 136 | /// Announcements listing the relay but not in the whitelist are rejected. |
| 134 | /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos. | 137 | /// When false, the relay operates in GRASP-01 mode for unwhitelisted repos. |
| 138 | #[serde(default)] | ||
| 135 | pub read_only: bool, | 139 | pub read_only: bool, |
| 136 | } | 140 | } |
| 137 | 141 | ||
| @@ -146,6 +150,7 @@ impl ArchiveConfig { | |||
| 146 | /// Returns true if: | 150 | /// Returns true if: |
| 147 | /// - archive_all is true, OR | 151 | /// - archive_all is true, OR |
| 148 | /// - announcement matches any whitelist entry | 152 | /// - announcement matches any whitelist entry |
| 153 | /// | ||
| 149 | /// Note: grasp_services matching is handled via matches_grasp_services() | 154 | /// Note: grasp_services matching is handled via matches_grasp_services() |
| 150 | pub fn matches(&self, npub: &str, identifier: &str) -> bool { | 155 | pub fn matches(&self, npub: &str, identifier: &str) -> bool { |
| 151 | if self.archive_all { | 156 | if self.archive_all { |
| @@ -171,23 +176,13 @@ impl ArchiveConfig { | |||
| 171 | } | 176 | } |
| 172 | } | 177 | } |
| 173 | 178 | ||
| 174 | impl Default for ArchiveConfig { | ||
| 175 | fn default() -> Self { | ||
| 176 | Self { | ||
| 177 | archive_all: false, | ||
| 178 | whitelist: Vec::new(), | ||
| 179 | grasp_services: Vec::new(), | ||
| 180 | read_only: false, | ||
| 181 | } | ||
| 182 | } | ||
| 183 | } | ||
| 184 | |||
| 185 | /// Repository whitelist configuration | 179 | /// Repository whitelist configuration |
| 186 | #[derive(Debug, Clone, Serialize, Deserialize)] | 180 | #[derive(Debug, Clone, Serialize, Deserialize, Default)] |
| 187 | pub struct RepositoryConfig { | 181 | pub struct RepositoryConfig { |
| 188 | /// Whitelist entries for selective repository acceptance | 182 | /// Whitelist entries for selective repository acceptance |
| 189 | /// | 183 | /// |
| 190 | /// If empty, all repositories listing the service are accepted (GRASP-01 mode). | 184 | /// If empty, all repositories listing the service are accepted (GRASP-01 mode). |
| 185 | #[serde(default)] | ||
| 191 | pub whitelist: Vec<WhitelistEntry>, | 186 | pub whitelist: Vec<WhitelistEntry>, |
| 192 | } | 187 | } |
| 193 | 188 | ||
| @@ -207,21 +202,14 @@ impl RepositoryConfig { | |||
| 207 | } | 202 | } |
| 208 | } | 203 | } |
| 209 | 204 | ||
| 210 | impl Default for RepositoryConfig { | ||
| 211 | fn default() -> Self { | ||
| 212 | Self { | ||
| 213 | whitelist: Vec::new(), | ||
| 214 | } | ||
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 218 | /// Repository blacklist configuration | 205 | /// Repository blacklist configuration |
| 219 | #[derive(Debug, Clone, Serialize, Deserialize)] | 206 | #[derive(Debug, Clone, Serialize, Deserialize, Default)] |
| 220 | pub struct BlacklistConfig { | 207 | pub struct BlacklistConfig { |
| 221 | /// Blacklist entries for blocking specific repositories | 208 | /// Blacklist entries for blocking specific repositories |
| 222 | /// | 209 | /// |
| 223 | /// If empty, no repositories are blacklisted. | 210 | /// If empty, no repositories are blacklisted. |
| 224 | /// Blacklist takes precedence over both archive and repository whitelists. | 211 | /// Blacklist takes precedence over both archive and repository whitelists. |
| 212 | #[serde(default)] | ||
| 225 | pub blacklist: Vec<WhitelistEntry>, | 213 | pub blacklist: Vec<WhitelistEntry>, |
| 226 | } | 214 | } |
| 227 | 215 | ||
| @@ -256,21 +244,14 @@ impl BlacklistConfig { | |||
| 256 | } | 244 | } |
| 257 | } | 245 | } |
| 258 | 246 | ||
| 259 | impl Default for BlacklistConfig { | ||
| 260 | fn default() -> Self { | ||
| 261 | Self { | ||
| 262 | blacklist: Vec::new(), | ||
| 263 | } | ||
| 264 | } | ||
| 265 | } | ||
| 266 | |||
| 267 | /// Event blacklist configuration for blocking events by author npub | 247 | /// Event blacklist configuration for blocking events by author npub |
| 268 | #[derive(Debug, Clone, Serialize, Deserialize)] | 248 | #[derive(Debug, Clone, Serialize, Deserialize, Default)] |
| 269 | pub struct EventBlacklistConfig { | 249 | pub struct EventBlacklistConfig { |
| 270 | /// Blacklisted npubs - events from these authors are rejected | 250 | /// Blacklisted npubs - events from these authors are rejected |
| 271 | /// | 251 | /// |
| 272 | /// If empty, no events are blacklisted by author. | 252 | /// If empty, no events are blacklisted by author. |
| 273 | /// Applies to ALL event types, preventing events from reaching both the relay and purgatory. | 253 | /// Applies to ALL event types, preventing events from reaching both the relay and purgatory. |
| 254 | #[serde(default)] | ||
| 274 | pub blacklisted_npubs: Vec<String>, | 255 | pub blacklisted_npubs: Vec<String>, |
| 275 | } | 256 | } |
| 276 | 257 | ||
| @@ -292,14 +273,6 @@ impl EventBlacklistConfig { | |||
| 292 | } | 273 | } |
| 293 | } | 274 | } |
| 294 | 275 | ||
| 295 | impl Default for EventBlacklistConfig { | ||
| 296 | fn default() -> Self { | ||
| 297 | Self { | ||
| 298 | blacklisted_npubs: Vec::new(), | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
| 302 | |||
| 303 | /// Database backend type for the relay | 276 | /// Database backend type for the relay |
| 304 | #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, ValueEnum)] | 277 | #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, ValueEnum)] |
| 305 | #[serde(rename_all = "lowercase")] | 278 | #[serde(rename_all = "lowercase")] |
| @@ -500,6 +473,10 @@ pub struct Config { | |||
| 500 | /// Prevents connection exhaustion DoS attacks | 473 | /// Prevents connection exhaustion DoS attacks |
| 501 | #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)] | 474 | #[arg(long, env = "NGIT_MAX_CONNECTIONS", default_value_t = 4096)] |
| 502 | pub max_connections: usize, | 475 | pub max_connections: usize, |
| 476 | |||
| 477 | /// Log level for application logging | ||
| 478 | #[arg(long, env = "NGIT_LOG_LEVEL", default_value = "info")] | ||
| 479 | pub log_level: String, | ||
| 503 | } | 480 | } |
| 504 | 481 | ||
| 505 | impl Config { | 482 | impl Config { |
| @@ -782,6 +759,7 @@ impl Config { | |||
| 782 | repository_blacklist: String::new(), | 759 | repository_blacklist: String::new(), |
| 783 | event_blacklist: String::new(), | 760 | event_blacklist: String::new(), |
| 784 | max_connections: 500, | 761 | max_connections: 500, |
| 762 | log_level: "debug".to_string(), | ||
| 785 | } | 763 | } |
| 786 | } | 764 | } |
| 787 | } | 765 | } |
| @@ -1103,14 +1081,14 @@ mod tests { | |||
| 1103 | fn test_archive_read_only_defaults() { | 1081 | fn test_archive_read_only_defaults() { |
| 1104 | // Default: false when no archive mode | 1082 | // Default: false when no archive mode |
| 1105 | let config = Config::for_testing(); | 1083 | let config = Config::for_testing(); |
| 1106 | assert_eq!(config.archive_config().read_only, false); | 1084 | assert!(!config.archive_config().read_only); |
| 1107 | 1085 | ||
| 1108 | // Default: true when archive_all is set | 1086 | // Default: true when archive_all is set |
| 1109 | let config = Config { | 1087 | let config = Config { |
| 1110 | archive_all: true, | 1088 | archive_all: true, |
| 1111 | ..Config::for_testing() | 1089 | ..Config::for_testing() |
| 1112 | }; | 1090 | }; |
| 1113 | assert_eq!(config.archive_config().read_only, true); | 1091 | assert!(config.archive_config().read_only); |
| 1114 | 1092 | ||
| 1115 | // Default: true when archive_whitelist is set | 1093 | // Default: true when archive_whitelist is set |
| 1116 | let keys = Keys::generate(); | 1094 | let keys = Keys::generate(); |
| @@ -1119,7 +1097,7 @@ mod tests { | |||
| 1119 | archive_whitelist: test_npub, | 1097 | archive_whitelist: test_npub, |
| 1120 | ..Config::for_testing() | 1098 | ..Config::for_testing() |
| 1121 | }; | 1099 | }; |
| 1122 | assert_eq!(config.archive_config().read_only, true); | 1100 | assert!(config.archive_config().read_only); |
| 1123 | } | 1101 | } |
| 1124 | 1102 | ||
| 1125 | #[test] | 1103 | #[test] |
| @@ -1130,7 +1108,7 @@ mod tests { | |||
| 1130 | archive_read_only: Some(true), | 1108 | archive_read_only: Some(true), |
| 1131 | ..Config::for_testing() | 1109 | ..Config::for_testing() |
| 1132 | }; | 1110 | }; |
| 1133 | assert_eq!(config.archive_config().read_only, true); | 1111 | assert!(config.archive_config().read_only); |
| 1134 | 1112 | ||
| 1135 | // Explicit false with archive_all (unusual but allowed) | 1113 | // Explicit false with archive_all (unusual but allowed) |
| 1136 | let config = Config { | 1114 | let config = Config { |
| @@ -1138,14 +1116,14 @@ mod tests { | |||
| 1138 | archive_read_only: Some(false), | 1116 | archive_read_only: Some(false), |
| 1139 | ..Config::for_testing() | 1117 | ..Config::for_testing() |
| 1140 | }; | 1118 | }; |
| 1141 | assert_eq!(config.archive_config().read_only, false); | 1119 | assert!(!config.archive_config().read_only); |
| 1142 | 1120 | ||
| 1143 | // Explicit false without archive mode | 1121 | // Explicit false without archive mode |
| 1144 | let config = Config { | 1122 | let config = Config { |
| 1145 | archive_read_only: Some(false), | 1123 | archive_read_only: Some(false), |
| 1146 | ..Config::for_testing() | 1124 | ..Config::for_testing() |
| 1147 | }; | 1125 | }; |
| 1148 | assert_eq!(config.archive_config().read_only, false); | 1126 | assert!(!config.archive_config().read_only); |
| 1149 | } | 1127 | } |
| 1150 | 1128 | ||
| 1151 | #[test] | 1129 | #[test] |
| @@ -1548,7 +1526,7 @@ mod tests { | |||
| 1548 | }; | 1526 | }; |
| 1549 | let archive_config = config.archive_config(); | 1527 | let archive_config = config.archive_config(); |
| 1550 | assert!(archive_config.enabled()); | 1528 | assert!(archive_config.enabled()); |
| 1551 | assert_eq!(archive_config.read_only, true); // Default to true | 1529 | assert!(archive_config.read_only); // Default to true |
| 1552 | } | 1530 | } |
| 1553 | 1531 | ||
| 1554 | #[test] | 1532 | #[test] |
| @@ -1558,7 +1536,7 @@ mod tests { | |||
| 1558 | archive_grasp_services: "git.example.com".to_string(), | 1536 | archive_grasp_services: "git.example.com".to_string(), |
| 1559 | ..Config::for_testing() | 1537 | ..Config::for_testing() |
| 1560 | }; | 1538 | }; |
| 1561 | assert_eq!(config.archive_config().read_only, true); | 1539 | assert!(config.archive_config().read_only); |
| 1562 | } | 1540 | } |
| 1563 | 1541 | ||
| 1564 | #[test] | 1542 | #[test] |
| @@ -1569,7 +1547,7 @@ mod tests { | |||
| 1569 | archive_read_only: Some(false), | 1547 | archive_read_only: Some(false), |
| 1570 | ..Config::for_testing() | 1548 | ..Config::for_testing() |
| 1571 | }; | 1549 | }; |
| 1572 | assert_eq!(config.archive_config().read_only, false); | 1550 | assert!(!config.archive_config().read_only); |
| 1573 | } | 1551 | } |
| 1574 | 1552 | ||
| 1575 | #[test] | 1553 | #[test] |
diff --git a/src/git/handlers.rs b/src/git/handlers.rs index 017eee4..e3a6ad4 100644 --- a/src/git/handlers.rs +++ b/src/git/handlers.rs | |||
| @@ -99,6 +99,42 @@ pub async fn handle_info_refs( | |||
| 99 | .unwrap()) | 99 | .unwrap()) |
| 100 | } | 100 | } |
| 101 | 101 | ||
| 102 | /// Build an HTTP 200 OK response with an ERR pkt-line for git protocol errors. | ||
| 103 | /// | ||
| 104 | /// Per the git smart HTTP protocol spec, protocol-level errors (like "not our ref") | ||
| 105 | /// should be returned as HTTP 200 OK with the error message in pkt-line format: | ||
| 106 | /// `PKT-LINE("ERR" SP explanation-text)` | ||
| 107 | /// | ||
| 108 | /// This allows git clients to properly parse and display the error message. | ||
| 109 | fn build_git_protocol_error_response( | ||
| 110 | service: GitService, | ||
| 111 | error_message: &str, | ||
| 112 | ) -> Response<Full<Bytes>> { | ||
| 113 | // Format: "ERR <message>\n" | ||
| 114 | let err_content = format!("ERR {}\n", error_message.trim()); | ||
| 115 | let err_pktline = PktLine::data(err_content.as_bytes()).encode(); | ||
| 116 | |||
| 117 | Response::builder() | ||
| 118 | .status(StatusCode::OK) | ||
| 119 | .header("content-type", service.result_content_type()) | ||
| 120 | .header("cache-control", "no-cache") | ||
| 121 | .body(Full::new(Bytes::from(err_pktline))) | ||
| 122 | .unwrap() | ||
| 123 | } | ||
| 124 | |||
| 125 | /// Check if a git process failure is a protocol error (vs transport error). | ||
| 126 | /// | ||
| 127 | /// Protocol errors are communicated via stderr when git exits with code 128. | ||
| 128 | /// These should be returned to the client as HTTP 200 with ERR pkt-line. | ||
| 129 | /// | ||
| 130 | /// Transport errors (process spawn failures, I/O errors, signals) should | ||
| 131 | /// remain as HTTP 500 errors. | ||
| 132 | fn is_git_protocol_error(exit_code: Option<i32>, stderr: &[u8]) -> bool { | ||
| 133 | // Git uses exit code 128 for protocol/usage errors | ||
| 134 | // If there's stderr content, it's a protocol error message | ||
| 135 | exit_code == Some(128) && !stderr.is_empty() | ||
| 136 | } | ||
| 137 | |||
| 102 | /// Handle POST /git-upload-pack (clone/fetch) | 138 | /// Handle POST /git-upload-pack (clone/fetch) |
| 103 | pub async fn handle_upload_pack( | 139 | pub async fn handle_upload_pack( |
| 104 | repo_path: PathBuf, | 140 | repo_path: PathBuf, |
| @@ -150,6 +186,21 @@ pub async fn handle_upload_pack( | |||
| 150 | 186 | ||
| 151 | if !status.success() { | 187 | if !status.success() { |
| 152 | let stderr_str = String::from_utf8_lossy(&stderr_output); | 188 | let stderr_str = String::from_utf8_lossy(&stderr_output); |
| 189 | |||
| 190 | // Check if this is a git protocol error (exit code 128 with stderr) | ||
| 191 | // Protocol errors should be returned as HTTP 200 with ERR pkt-line | ||
| 192 | if is_git_protocol_error(status.code(), &stderr_output) { | ||
| 193 | warn!( | ||
| 194 | "Git upload-pack protocol error (returning ERR pkt-line): {}", | ||
| 195 | stderr_str | ||
| 196 | ); | ||
| 197 | return Ok(build_git_protocol_error_response( | ||
| 198 | GitService::UploadPack, | ||
| 199 | &stderr_str, | ||
| 200 | )); | ||
| 201 | } | ||
| 202 | |||
| 203 | // Transport errors (spawn failures, signals, etc.) remain as HTTP 500 | ||
| 153 | error!("Git upload-pack failed: {}", stderr_str); | 204 | error!("Git upload-pack failed: {}", stderr_str); |
| 154 | return Err(GitError::GitFailed(status.code())); | 205 | return Err(GitError::GitFailed(status.code())); |
| 155 | } | 206 | } |
| @@ -277,6 +328,21 @@ pub async fn handle_receive_pack( | |||
| 277 | 328 | ||
| 278 | if !status.success() { | 329 | if !status.success() { |
| 279 | let stderr_str = String::from_utf8_lossy(&stderr_output); | 330 | let stderr_str = String::from_utf8_lossy(&stderr_output); |
| 331 | |||
| 332 | // Check if this is a git protocol error (exit code 128 with stderr) | ||
| 333 | // Protocol errors should be returned as HTTP 200 with ERR pkt-line | ||
| 334 | if is_git_protocol_error(status.code(), &stderr_output) { | ||
| 335 | warn!( | ||
| 336 | "Git receive-pack protocol error (returning ERR pkt-line): {}", | ||
| 337 | stderr_str | ||
| 338 | ); | ||
| 339 | return Ok(build_git_protocol_error_response( | ||
| 340 | GitService::ReceivePack, | ||
| 341 | &stderr_str, | ||
| 342 | )); | ||
| 343 | } | ||
| 344 | |||
| 345 | // Transport errors (spawn failures, signals, etc.) remain as HTTP 500 | ||
| 280 | error!("Git receive-pack failed: {}", stderr_str); | 346 | error!("Git receive-pack failed: {}", stderr_str); |
| 281 | return Err(GitError::GitFailed(status.code())); | 347 | return Err(GitError::GitFailed(status.code())); |
| 282 | } | 348 | } |
diff --git a/src/main.rs b/src/main.rs index 5e5b83a..6c9da05 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,8 +3,8 @@ use std::{path::PathBuf, sync::Arc}; | |||
| 3 | 3 | ||
| 4 | use anyhow::Result; | 4 | use anyhow::Result; |
| 5 | use tokio::signal; | 5 | use tokio::signal; |
| 6 | use tracing::{error, info, warn, Level}; | 6 | use tracing::{error, info, warn}; |
| 7 | use tracing_subscriber::FmtSubscriber; | 7 | use tracing_subscriber::{EnvFilter, FmtSubscriber}; |
| 8 | 8 | ||
| 9 | use ngit_grasp::{ | 9 | use ngit_grasp::{ |
| 10 | config::{Config, DatabaseBackend}, | 10 | config::{Config, DatabaseBackend}, |
| @@ -17,16 +17,16 @@ use ngit_grasp::{ | |||
| 17 | 17 | ||
| 18 | #[tokio::main] | 18 | #[tokio::main] |
| 19 | async fn main() -> Result<()> { | 19 | async fn main() -> Result<()> { |
| 20 | // Initialize tracing | 20 | // Load configuration first (priority: CLI flags > env vars > .env file > defaults) |
| 21 | let config = Config::load()?; | ||
| 22 | |||
| 23 | // Initialize tracing with configured log level | ||
| 21 | let subscriber = FmtSubscriber::builder() | 24 | let subscriber = FmtSubscriber::builder() |
| 22 | .with_max_level(Level::DEBUG) | 25 | .with_env_filter(EnvFilter::new(&config.log_level)) |
| 23 | .finish(); | 26 | .finish(); |
| 24 | tracing::subscriber::set_global_default(subscriber)?; | 27 | tracing::subscriber::set_global_default(subscriber)?; |
| 25 | 28 | ||
| 26 | info!("Starting ngit-grasp with nostr-relay-builder..."); | 29 | info!("Starting ngit-grasp with log level: {}", config.log_level); |
| 27 | |||
| 28 | // Load configuration (priority: CLI flags > env vars > .env file > defaults) | ||
| 29 | let config = Config::load()?; | ||
| 30 | 30 | ||
| 31 | // Validate configuration and fail fast on fatal errors | 31 | // Validate configuration and fail fast on fatal errors |
| 32 | // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings | 32 | // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings |
| @@ -187,8 +187,8 @@ async fn main() -> Result<()> { | |||
| 187 | )); | 187 | )); |
| 188 | 188 | ||
| 189 | // Create throttle manager for rate limiting remote git servers | 189 | // Create throttle manager for rate limiting remote git servers |
| 190 | // Default: 5 concurrent requests per domain, 30 requests per minute per domain | 190 | // Default: 5 concurrent requests per domain, 60 requests per minute per domain |
| 191 | let throttle_manager = Arc::new(ThrottleManager::new(5, 30)); | 191 | let throttle_manager = Arc::new(ThrottleManager::new(5, 60)); |
| 192 | throttle_manager.set_context(sync_ctx.clone()); | 192 | throttle_manager.set_context(sync_ctx.clone()); |
| 193 | throttle_manager.set_git_naughty_list(git_naughty_list.clone()); | 193 | throttle_manager.set_git_naughty_list(git_naughty_list.clone()); |
| 194 | 194 | ||
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 34014db..3baa2ff 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -98,6 +98,62 @@ impl Nip34WritePolicy { | |||
| 98 | self.ctx.set_local_relay(relay); | 98 | self.ctx.set_local_relay(relay); |
| 99 | } | 99 | } |
| 100 | 100 | ||
| 101 | /// Extract repository identifier from event's 'd' tag. | ||
| 102 | /// | ||
| 103 | /// Used for structured logging when parsing fails - we try to extract | ||
| 104 | /// the identifier even if full parsing failed. | ||
| 105 | fn extract_identifier_from_event(event: &Event) -> String { | ||
| 106 | use nostr_relay_builder::prelude::TagKind; | ||
| 107 | event | ||
| 108 | .tags | ||
| 109 | .iter() | ||
| 110 | .find(|t| t.kind() == TagKind::d()) | ||
| 111 | .and_then(|t| t.content()) | ||
| 112 | .map(|s| s.to_string()) | ||
| 113 | .unwrap_or_else(|| "unknown".to_string()) | ||
| 114 | } | ||
| 115 | |||
| 116 | /// Extract ALL repository identifiers from PR event's 'a' tags. | ||
| 117 | /// | ||
| 118 | /// PR events can reference multiple repositories via multiple 'a' tags | ||
| 119 | /// (e.g., when there are multiple maintainers). Each tag has format | ||
| 120 | /// `30617:<owner_pubkey>:<identifier>`. | ||
| 121 | /// | ||
| 122 | /// Returns a vector of unique identifiers, or `["unknown"]` if none found. | ||
| 123 | fn extract_repos_from_pr_event(event: &Event) -> Vec<String> { | ||
| 124 | let repos: Vec<String> = event | ||
| 125 | .tags | ||
| 126 | .iter() | ||
| 127 | .filter_map(|tag| { | ||
| 128 | let tag_vec = tag.clone().to_vec(); | ||
| 129 | if tag_vec.len() >= 2 && tag_vec[0] == "a" && tag_vec[1].starts_with("30617:") { | ||
| 130 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 131 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 132 | if parts.len() >= 3 { | ||
| 133 | Some(parts[2].to_string()) | ||
| 134 | } else { | ||
| 135 | None | ||
| 136 | } | ||
| 137 | } else { | ||
| 138 | None | ||
| 139 | } | ||
| 140 | }) | ||
| 141 | .collect(); | ||
| 142 | |||
| 143 | // Deduplicate while preserving order | ||
| 144 | let mut seen = std::collections::HashSet::new(); | ||
| 145 | let unique_repos: Vec<String> = repos | ||
| 146 | .into_iter() | ||
| 147 | .filter(|r| seen.insert(r.clone())) | ||
| 148 | .collect(); | ||
| 149 | |||
| 150 | if unique_repos.is_empty() { | ||
| 151 | vec!["unknown".to_string()] | ||
| 152 | } else { | ||
| 153 | unique_repos | ||
| 154 | } | ||
| 155 | } | ||
| 156 | |||
| 101 | /// Handle repository announcement event | 157 | /// Handle repository announcement event |
| 102 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { | 158 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { |
| 103 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | 159 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); |
| @@ -129,10 +185,21 @@ impl Nip34WritePolicy { | |||
| 129 | WritePolicyResult::Accept | 185 | WritePolicyResult::Accept |
| 130 | } | 186 | } |
| 131 | Err(e) => { | 187 | Err(e) => { |
| 188 | let npub = event | ||
| 189 | .pubkey | ||
| 190 | .to_bech32() | ||
| 191 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 192 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 193 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 194 | let repo = Self::extract_identifier_from_event(event); | ||
| 195 | // Structured log for migration scripts | ||
| 132 | tracing::warn!( | 196 | tracing::warn!( |
| 133 | "Failed to parse repository announcement {}: {}", | 197 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", |
| 134 | event_id_str, | 198 | event.kind.as_u16(), |
| 135 | e | 199 | event_id_short, |
| 200 | e, | ||
| 201 | repo, | ||
| 202 | npub | ||
| 136 | ); | 203 | ); |
| 137 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) | 204 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) |
| 138 | } | 205 | } |
| @@ -157,10 +224,21 @@ impl Nip34WritePolicy { | |||
| 157 | WritePolicyResult::Accept | 224 | WritePolicyResult::Accept |
| 158 | } | 225 | } |
| 159 | Err(e) => { | 226 | Err(e) => { |
| 227 | let npub = event | ||
| 228 | .pubkey | ||
| 229 | .to_bech32() | ||
| 230 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 231 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 232 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 233 | let repo = Self::extract_identifier_from_event(event); | ||
| 234 | // Structured log for migration scripts | ||
| 160 | tracing::warn!( | 235 | tracing::warn!( |
| 161 | "Failed to parse maintainer announcement {}: {}", | 236 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", |
| 162 | event_id_str, | 237 | event.kind.as_u16(), |
| 163 | e | 238 | event_id_short, |
| 239 | e, | ||
| 240 | repo, | ||
| 241 | npub | ||
| 164 | ); | 242 | ); |
| 165 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) | 243 | WritePolicyResult::reject(format!("Failed to parse announcement: {}", e)) |
| 166 | } | 244 | } |
| @@ -183,8 +261,6 @@ impl Nip34WritePolicy { | |||
| 183 | /// * `event` - The state event to validate | 261 | /// * `event` - The state event to validate |
| 184 | /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) | 262 | /// * `is_synced` - True if this event came from proactive sync (vs user-submitted) |
| 185 | async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult { | 263 | async fn handle_state(&self, event: &Event, is_synced: bool) -> WritePolicyResult { |
| 186 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | ||
| 187 | |||
| 188 | match self.state_policy.validate(event) { | 264 | match self.state_policy.validate(event) { |
| 189 | StateResult::Accept => { | 265 | StateResult::Accept => { |
| 190 | // Process state alignment asynchronously | 266 | // Process state alignment asynchronously |
| @@ -195,7 +271,22 @@ impl Nip34WritePolicy { | |||
| 195 | { | 271 | { |
| 196 | Ok(poilicy_result) => poilicy_result, | 272 | Ok(poilicy_result) => poilicy_result, |
| 197 | Err(e) => { | 273 | Err(e) => { |
| 198 | tracing::warn!("Failed to process state event {}: {}", event_id_str, e); | 274 | let npub = event |
| 275 | .pubkey | ||
| 276 | .to_bech32() | ||
| 277 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 278 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 279 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 280 | let repo = Self::extract_identifier_from_event(event); | ||
| 281 | // Structured log for migration scripts | ||
| 282 | tracing::warn!( | ||
| 283 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", | ||
| 284 | event.kind.as_u16(), | ||
| 285 | event_id_short, | ||
| 286 | e, | ||
| 287 | repo, | ||
| 288 | npub | ||
| 289 | ); | ||
| 199 | // reject if processing failed | 290 | // reject if processing failed |
| 200 | WritePolicyResult::Reject { | 291 | WritePolicyResult::Reject { |
| 201 | status: false, | 292 | status: false, |
| @@ -205,7 +296,22 @@ impl Nip34WritePolicy { | |||
| 205 | } | 296 | } |
| 206 | } | 297 | } |
| 207 | StateResult::Reject(reason) => { | 298 | StateResult::Reject(reason) => { |
| 208 | tracing::warn!("Rejected repository state {}: {}", event_id_str, reason); | 299 | let npub = event |
| 300 | .pubkey | ||
| 301 | .to_bech32() | ||
| 302 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 303 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 304 | // Try to extract repo identifier from 'd' tag even if parsing failed | ||
| 305 | let repo = Self::extract_identifier_from_event(event); | ||
| 306 | // Structured log for migration scripts | ||
| 307 | tracing::warn!( | ||
| 308 | "[PARSE_FAIL] kind={} event_id={}... reason=\"{}\" repo={} npub={}", | ||
| 309 | event.kind.as_u16(), | ||
| 310 | event_id_short, | ||
| 311 | reason, | ||
| 312 | repo, | ||
| 313 | npub | ||
| 314 | ); | ||
| 209 | WritePolicyResult::reject(reason) | 315 | WritePolicyResult::reject(reason) |
| 210 | } | 316 | } |
| 211 | } | 317 | } |
| @@ -303,9 +409,12 @@ impl Nip34WritePolicy { | |||
| 303 | ); | 409 | ); |
| 304 | 410 | ||
| 305 | // Add to purgatory | 411 | // Add to purgatory |
| 306 | self.ctx | 412 | self.ctx.purgatory.add_pr( |
| 307 | .purgatory | 413 | event.clone(), |
| 308 | .add_pr(event.clone(), event.id.to_hex(), commit.clone()); | 414 | event.id.to_hex(), |
| 415 | commit.clone(), | ||
| 416 | is_synced, | ||
| 417 | ); | ||
| 309 | 418 | ||
| 310 | WritePolicyResult::Reject { | 419 | WritePolicyResult::Reject { |
| 311 | status: true, // Client sees OK | 420 | status: true, // Client sees OK |
| @@ -323,11 +432,25 @@ impl Nip34WritePolicy { | |||
| 323 | } | 432 | } |
| 324 | Err(e) => { | 433 | Err(e) => { |
| 325 | // Error checking git data - reject event | 434 | // Error checking git data - reject event |
| 326 | tracing::warn!( | 435 | let npub = event |
| 327 | "Failed to check git data for PR event {}: {}", | 436 | .pubkey |
| 328 | event_id_str, | 437 | .to_bech32() |
| 329 | e | 438 | .unwrap_or_else(|_| event.pubkey.to_hex()); |
| 330 | ); | 439 | let event_id_short = &event.id.to_hex()[..12]; |
| 440 | // Extract ALL repo identifiers from 'a' tags for PR events | ||
| 441 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 442 | let repos = Self::extract_repos_from_pr_event(event); | ||
| 443 | // Structured log for migration scripts - log once per repo | ||
| 444 | for repo in &repos { | ||
| 445 | tracing::warn!( | ||
| 446 | "[PARSE_FAIL] kind={} event_id={}... reason=\"git data check failed: {}\" repo={} npub={}", | ||
| 447 | event.kind.as_u16(), | ||
| 448 | event_id_short, | ||
| 449 | e, | ||
| 450 | repo, | ||
| 451 | npub | ||
| 452 | ); | ||
| 453 | } | ||
| 331 | WritePolicyResult::reject(format!("Failed to check git data: {}", e)) | 454 | WritePolicyResult::reject(format!("Failed to check git data: {}", e)) |
| 332 | } | 455 | } |
| 333 | } | 456 | } |
diff --git a/src/nostr/events.rs b/src/nostr/events.rs index 718633e..a441742 100644 --- a/src/nostr/events.rs +++ b/src/nostr/events.rs | |||
| @@ -419,14 +419,14 @@ pub fn validate_announcement( | |||
| 419 | // GRASP-01: Normal mode - accept if announcement lists our service AND matches repository whitelist (if enabled) | 419 | // GRASP-01: Normal mode - accept if announcement lists our service AND matches repository whitelist (if enabled) |
| 420 | if lists_service && !archive_config.read_only { | 420 | if lists_service && !archive_config.read_only { |
| 421 | // Check repository whitelist if enabled | 421 | // Check repository whitelist if enabled |
| 422 | if repository_config.enabled() { | 422 | if repository_config.enabled() |
| 423 | if !repository_config.matches(&npub, &announcement.identifier) { | 423 | && !repository_config.matches(&npub, &announcement.identifier) |
| 424 | return AnnouncementResult::Reject(format!( | 424 | { |
| 425 | "Announcement lists service but does not match repository whitelist. \ | 425 | return AnnouncementResult::Reject(format!( |
| 426 | Repository {}/{} not in whitelist", | 426 | "Announcement lists service but does not match repository whitelist. \ |
| 427 | npub, announcement.identifier | 427 | Repository {}/{} not in whitelist", |
| 428 | )); | 428 | npub, announcement.identifier |
| 429 | } | 429 | )); |
| 430 | } | 430 | } |
| 431 | return AnnouncementResult::Accept; | 431 | return AnnouncementResult::Accept; |
| 432 | } | 432 | } |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index f94f004..3411077 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -205,9 +205,12 @@ impl StatePolicy { | |||
| 205 | 205 | ||
| 206 | // If no git data - add to purgatory | 206 | // If no git data - add to purgatory |
| 207 | // (add_state automatically enqueues for background sync) | 207 | // (add_state automatically enqueues for background sync) |
| 208 | self.ctx | 208 | self.ctx.purgatory.add_state( |
| 209 | .purgatory | 209 | event.clone(), |
| 210 | .add_state(event.clone(), state.identifier.clone(), event.pubkey); | 210 | state.identifier.clone(), |
| 211 | event.pubkey, | ||
| 212 | is_synced, | ||
| 213 | ); | ||
| 211 | 214 | ||
| 212 | tracing::info!( | 215 | tracing::info!( |
| 213 | "state event added to purgatory: eventid: {}, identifier: {}", | 216 | "state event added to purgatory: eventid: {}, identifier: {}", |
diff --git a/src/purgatory/mod.rs b/src/purgatory/mod.rs index 47798a6..8094450 100644 --- a/src/purgatory/mod.rs +++ b/src/purgatory/mod.rs | |||
| @@ -17,10 +17,11 @@ pub mod sync; | |||
| 17 | mod types; | 17 | mod types; |
| 18 | 18 | ||
| 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; | 19 | pub use helpers::{can_apply_state, can_satisfy_state, extract_refs_from_state, get_unpushed_refs}; |
| 20 | pub use types::{PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; | 20 | pub use types::{EventSource, PrPurgatoryEntry, RefPair, RefUpdate, StatePurgatoryEntry}; |
| 21 | 21 | ||
| 22 | use dashmap::DashMap; | 22 | use dashmap::DashMap; |
| 23 | use nostr_sdk::prelude::*; | 23 | use nostr_sdk::prelude::*; |
| 24 | use nostr_sdk::ToBech32; | ||
| 24 | use serde::{Deserialize, Serialize}; | 25 | use serde::{Deserialize, Serialize}; |
| 25 | use std::collections::HashMap; | 26 | use std::collections::HashMap; |
| 26 | use std::collections::HashSet; | 27 | use std::collections::HashSet; |
| @@ -57,6 +58,9 @@ struct SerializableStatePurgatoryEntry { | |||
| 57 | created_at_offset_secs: u64, | 58 | created_at_offset_secs: u64, |
| 58 | /// Duration offset from saved_at for expires_at | 59 | /// Duration offset from saved_at for expires_at |
| 59 | expires_at_offset_secs: u64, | 60 | expires_at_offset_secs: u64, |
| 61 | /// Source of this event (direct submission vs sync) | ||
| 62 | #[serde(default)] | ||
| 63 | source: types::EventSource, | ||
| 60 | } | 64 | } |
| 61 | 65 | ||
| 62 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. | 66 | /// Serializable wrapper for `PrPurgatoryEntry` with time offsets. |
| @@ -74,6 +78,9 @@ struct SerializablePrPurgatoryEntry { | |||
| 74 | created_at_offset_secs: u64, | 78 | created_at_offset_secs: u64, |
| 75 | /// Duration offset from saved_at for expires_at | 79 | /// Duration offset from saved_at for expires_at |
| 76 | expires_at_offset_secs: u64, | 80 | expires_at_offset_secs: u64, |
| 81 | /// Source of this event (direct submission vs sync) | ||
| 82 | #[serde(default)] | ||
| 83 | source: types::EventSource, | ||
| 77 | } | 84 | } |
| 78 | 85 | ||
| 79 | /// Serializable purgatory state for disk persistence. | 86 | /// Serializable purgatory state for disk persistence. |
| @@ -270,11 +277,38 @@ impl Purgatory { | |||
| 270 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately | 277 | /// For sync-triggered events, the SyncManager calls `enqueue_sync_immediate` separately |
| 271 | /// to override this delay. | 278 | /// to override this delay. |
| 272 | /// | 279 | /// |
| 280 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 281 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 282 | /// | ||
| 273 | /// # Arguments | 283 | /// # Arguments |
| 274 | /// * `event` - The state event (kind 30618) to hold | 284 | /// * `event` - The state event (kind 30618) to hold |
| 275 | /// * `identifier` - The repository identifier from the 'd' tag | 285 | /// * `identifier` - The repository identifier from the 'd' tag |
| 276 | /// * `author` - The event author's public key | 286 | /// * `author` - The event author's public key |
| 277 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey) { | 287 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 288 | pub fn add_state(&self, event: Event, identifier: String, author: PublicKey, from_sync: bool) { | ||
| 289 | let source = if from_sync { | ||
| 290 | types::EventSource::Sync | ||
| 291 | } else { | ||
| 292 | types::EventSource::Direct | ||
| 293 | }; | ||
| 294 | |||
| 295 | // Check if event already exists - if so, potentially upgrade source | ||
| 296 | if let Some(mut entries) = self.state_events.get_mut(&identifier) { | ||
| 297 | if let Some(existing) = entries.iter_mut().find(|e| e.event.id == event.id) { | ||
| 298 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 299 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 300 | existing.source = types::EventSource::Direct; | ||
| 301 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 302 | tracing::debug!( | ||
| 303 | event_id = %event.id, | ||
| 304 | identifier = %identifier, | ||
| 305 | "Upgraded purgatory entry source from Sync to Direct, reset expiry" | ||
| 306 | ); | ||
| 307 | } | ||
| 308 | return; // Event already exists, don't add duplicate | ||
| 309 | } | ||
| 310 | } | ||
| 311 | |||
| 278 | let now = Instant::now(); | 312 | let now = Instant::now(); |
| 279 | let entry = StatePurgatoryEntry { | 313 | let entry = StatePurgatoryEntry { |
| 280 | event, | 314 | event, |
| @@ -282,6 +316,7 @@ impl Purgatory { | |||
| 282 | author, | 316 | author, |
| 283 | created_at: now, | 317 | created_at: now, |
| 284 | expires_at: now + DEFAULT_EXPIRY, | 318 | expires_at: now + DEFAULT_EXPIRY, |
| 319 | source, | ||
| 285 | }; | 320 | }; |
| 286 | 321 | ||
| 287 | self.state_events | 322 | self.state_events |
| @@ -301,11 +336,35 @@ impl Purgatory { | |||
| 301 | /// Automatically enqueues the referenced repository identifier for background sync | 336 | /// Automatically enqueues the referenced repository identifier for background sync |
| 302 | /// with the default delay (3 minutes), giving time for a git push to arrive. | 337 | /// with the default delay (3 minutes), giving time for a git push to arrive. |
| 303 | /// | 338 | /// |
| 339 | /// If an event already exists in purgatory with `Sync` source and the new submission | ||
| 340 | /// is direct (`!from_sync`), the source is upgraded to `Direct` without extending expiry. | ||
| 341 | /// | ||
| 304 | /// # Arguments | 342 | /// # Arguments |
| 305 | /// * `event` - The PR event (kind 1617/1618) to hold | 343 | /// * `event` - The PR event (kind 1617/1618) to hold |
| 306 | /// * `event_id` - The event ID (hex string) from the 'e' tag | 344 | /// * `event_id` - The event ID (hex string) from the 'e' tag |
| 307 | /// * `commit` - The commit SHA from the 'c' tag | 345 | /// * `commit` - The commit SHA from the 'c' tag |
| 308 | pub fn add_pr(&self, event: Event, event_id: String, commit: String) { | 346 | /// * `from_sync` - True if this event came from proactive sync (vs user-submitted) |
| 347 | pub fn add_pr(&self, event: Event, event_id: String, commit: String, from_sync: bool) { | ||
| 348 | let source = if from_sync { | ||
| 349 | types::EventSource::Sync | ||
| 350 | } else { | ||
| 351 | types::EventSource::Direct | ||
| 352 | }; | ||
| 353 | |||
| 354 | // Check if event already exists - if so, potentially upgrade source | ||
| 355 | if let Some(mut existing) = self.pr_events.get_mut(&event_id) { | ||
| 356 | // Upgrade source from Sync to Direct if new submission is direct | ||
| 357 | if existing.source == types::EventSource::Sync && !from_sync { | ||
| 358 | existing.source = types::EventSource::Direct; | ||
| 359 | existing.expires_at = Instant::now() + DEFAULT_EXPIRY; | ||
| 360 | tracing::debug!( | ||
| 361 | event_id = %event_id, | ||
| 362 | "Upgraded PR purgatory entry source from Sync to Direct, reset expiry" | ||
| 363 | ); | ||
| 364 | } | ||
| 365 | return; // Event already exists, don't add duplicate | ||
| 366 | } | ||
| 367 | |||
| 309 | // Extract identifier from the event's `a` tag for sync enqueueing | 368 | // Extract identifier from the event's `a` tag for sync enqueueing |
| 310 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); | 369 | let identifier = crate::git::sync::extract_identifier_from_pr_event(&event); |
| 311 | 370 | ||
| @@ -315,6 +374,7 @@ impl Purgatory { | |||
| 315 | commit, | 374 | commit, |
| 316 | created_at: now, | 375 | created_at: now, |
| 317 | expires_at: now + DEFAULT_EXPIRY, | 376 | expires_at: now + DEFAULT_EXPIRY, |
| 377 | source, | ||
| 318 | }; | 378 | }; |
| 319 | 379 | ||
| 320 | self.pr_events.insert(event_id, entry); | 380 | self.pr_events.insert(event_id, entry); |
| @@ -328,6 +388,8 @@ impl Purgatory { | |||
| 328 | /// Add a PR placeholder (git data arrived before PR event). | 388 | /// Add a PR placeholder (git data arrived before PR event). |
| 329 | /// | 389 | /// |
| 330 | /// Creates a placeholder entry waiting for the corresponding PR event. | 390 | /// Creates a placeholder entry waiting for the corresponding PR event. |
| 391 | /// Placeholders are always marked as `Direct` source since they originate | ||
| 392 | /// from git pushes (direct user action). | ||
| 331 | /// | 393 | /// |
| 332 | /// # Arguments | 394 | /// # Arguments |
| 333 | /// * `event_id` - The expected event ID (from git ref name) | 395 | /// * `event_id` - The expected event ID (from git ref name) |
| @@ -339,6 +401,7 @@ impl Purgatory { | |||
| 339 | commit, | 401 | commit, |
| 340 | created_at: now, | 402 | created_at: now, |
| 341 | expires_at: now + DEFAULT_EXPIRY, | 403 | expires_at: now + DEFAULT_EXPIRY, |
| 404 | source: types::EventSource::Direct, // Git pushes are direct user actions | ||
| 342 | }; | 405 | }; |
| 343 | 406 | ||
| 344 | self.pr_events.insert(event_id, entry); | 407 | self.pr_events.insert(event_id, entry); |
| @@ -608,6 +671,9 @@ impl Purgatory { | |||
| 608 | /// prevent infinite re-sync loops. Events that expire without finding git data | 671 | /// prevent infinite re-sync loops. Events that expire without finding git data |
| 609 | /// will be filtered out during future negentropy/REQ sync operations. | 672 | /// will be filtered out during future negentropy/REQ sync operations. |
| 610 | /// | 673 | /// |
| 674 | /// Emits structured `[PURGATORY_EXPIRED]` log entries for each expired event | ||
| 675 | /// to support migration scripts and operational monitoring. | ||
| 676 | /// | ||
| 611 | /// # Returns | 677 | /// # Returns |
| 612 | /// Tuple of (num_state_removed, num_pr_removed) | 678 | /// Tuple of (num_state_removed, num_pr_removed) |
| 613 | pub fn cleanup(&self) -> (usize, usize) { | 679 | pub fn cleanup(&self) -> (usize, usize) { |
| @@ -615,18 +681,38 @@ impl Purgatory { | |||
| 615 | let mut state_removed = 0; | 681 | let mut state_removed = 0; |
| 616 | 682 | ||
| 617 | // Remove expired state events and mark them as expired | 683 | // Remove expired state events and mark them as expired |
| 618 | self.state_events.retain(|_, entries| { | 684 | self.state_events.retain(|identifier, entries| { |
| 619 | let original_len = entries.len(); | 685 | let original_len = entries.len(); |
| 620 | // Collect event IDs before removing | ||
| 621 | let expired_ids: Vec<EventId> = entries | ||
| 622 | .iter() | ||
| 623 | .filter(|entry| entry.expires_at <= now) | ||
| 624 | .map(|entry| entry.event.id) | ||
| 625 | .collect(); | ||
| 626 | 686 | ||
| 627 | // Mark as expired to prevent re-sync | 687 | // Log and collect expired entries before removing |
| 628 | for event_id in expired_ids { | 688 | for entry in entries.iter().filter(|e| e.expires_at <= now) { |
| 629 | self.mark_expired(event_id); | 689 | let npub = entry.author.to_bech32().unwrap_or_else(|_| entry.author.to_hex()); |
| 690 | let event_id_short = &entry.event.id.to_hex()[..12]; | ||
| 691 | let source_str = if entry.source.is_direct() { "direct" } else { "sync" }; | ||
| 692 | |||
| 693 | // Structured log for migration scripts | ||
| 694 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 695 | if entry.source.is_direct() { | ||
| 696 | tracing::warn!( | ||
| 697 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 698 | identifier, | ||
| 699 | npub, | ||
| 700 | event_id_short, | ||
| 701 | entry.event.kind.as_u16(), | ||
| 702 | source_str | ||
| 703 | ); | ||
| 704 | } else { | ||
| 705 | tracing::debug!( | ||
| 706 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 707 | identifier, | ||
| 708 | npub, | ||
| 709 | event_id_short, | ||
| 710 | entry.event.kind.as_u16(), | ||
| 711 | source_str | ||
| 712 | ); | ||
| 713 | } | ||
| 714 | |||
| 715 | self.mark_expired(entry.event.id); | ||
| 630 | } | 716 | } |
| 631 | 717 | ||
| 632 | // Remove expired entries | 718 | // Remove expired entries |
| @@ -636,21 +722,103 @@ impl Purgatory { | |||
| 636 | }); | 722 | }); |
| 637 | 723 | ||
| 638 | // Remove expired PR events and mark them as expired | 724 | // Remove expired PR events and mark them as expired |
| 639 | let expired_prs: Vec<(String, Option<EventId>)> = self | 725 | let expired_prs: Vec<_> = self |
| 640 | .pr_events | 726 | .pr_events |
| 641 | .iter() | 727 | .iter() |
| 642 | .filter(|entry| entry.value().expires_at <= now) | 728 | .filter(|entry| entry.value().expires_at <= now) |
| 643 | .map(|entry| { | 729 | .map(|entry| { |
| 644 | let event_id = entry.value().event.as_ref().map(|e| e.id); | 730 | let pr_entry = entry.value(); |
| 645 | (entry.key().clone(), event_id) | 731 | let event_id_str = entry.key().clone(); |
| 732 | let event_opt = pr_entry.event.clone(); | ||
| 733 | let commit = pr_entry.commit.clone(); | ||
| 734 | let source = pr_entry.source; | ||
| 735 | (event_id_str, event_opt, commit, source) | ||
| 646 | }) | 736 | }) |
| 647 | .collect(); | 737 | .collect(); |
| 648 | 738 | ||
| 649 | let pr_removed = expired_prs.len(); | 739 | let pr_removed = expired_prs.len(); |
| 650 | for (event_id_str, event_id_opt) in expired_prs { | 740 | for (event_id_str, event_opt, commit, source) in expired_prs { |
| 651 | // Mark actual PR events as expired (not placeholders) | 741 | // Log structured entry for PR events (not placeholders) |
| 652 | if let Some(event_id) = event_id_opt { | 742 | if let Some(ref event) = event_opt { |
| 653 | self.mark_expired(event_id); | 743 | let npub = event |
| 744 | .pubkey | ||
| 745 | .to_bech32() | ||
| 746 | .unwrap_or_else(|_| event.pubkey.to_hex()); | ||
| 747 | let event_id_short = &event.id.to_hex()[..12]; | ||
| 748 | let source_str = if source.is_direct() { "direct" } else { "sync" }; | ||
| 749 | |||
| 750 | // Extract ALL repo identifiers from 'a' tags | ||
| 751 | // (PR events can reference multiple repos when there are multiple maintainers) | ||
| 752 | let repos: Vec<String> = event | ||
| 753 | .tags | ||
| 754 | .iter() | ||
| 755 | .filter_map(|tag| { | ||
| 756 | let tag_vec = tag.clone().to_vec(); | ||
| 757 | if tag_vec.len() >= 2 | ||
| 758 | && tag_vec[0] == "a" | ||
| 759 | && tag_vec[1].starts_with("30617:") | ||
| 760 | { | ||
| 761 | // Format: 30617:<owner_pubkey>:<identifier> | ||
| 762 | let parts: Vec<&str> = tag_vec[1].split(':').collect(); | ||
| 763 | if parts.len() >= 3 { | ||
| 764 | Some(parts[2].to_string()) | ||
| 765 | } else { | ||
| 766 | None | ||
| 767 | } | ||
| 768 | } else { | ||
| 769 | None | ||
| 770 | } | ||
| 771 | }) | ||
| 772 | .collect(); | ||
| 773 | |||
| 774 | // Deduplicate while preserving order | ||
| 775 | let mut seen = std::collections::HashSet::new(); | ||
| 776 | let unique_repos: Vec<String> = repos | ||
| 777 | .into_iter() | ||
| 778 | .filter(|r| seen.insert(r.clone())) | ||
| 779 | .collect(); | ||
| 780 | |||
| 781 | let repos_to_log = if unique_repos.is_empty() { | ||
| 782 | vec!["unknown".to_string()] | ||
| 783 | } else { | ||
| 784 | unique_repos | ||
| 785 | }; | ||
| 786 | |||
| 787 | // Structured log for migration scripts - log once per repo | ||
| 788 | // Direct submissions log at WARN, synced events at DEBUG | ||
| 789 | for repo in &repos_to_log { | ||
| 790 | if source.is_direct() { | ||
| 791 | tracing::warn!( | ||
| 792 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 793 | repo, | ||
| 794 | npub, | ||
| 795 | event_id_short, | ||
| 796 | event.kind.as_u16(), | ||
| 797 | &commit[..commit.len().min(12)], | ||
| 798 | source_str | ||
| 799 | ); | ||
| 800 | } else { | ||
| 801 | tracing::debug!( | ||
| 802 | "[PURGATORY_EXPIRED] repo={} npub={} event_id={}... kind={} commit={} source={} reason=\"git data not received within 30 minutes\"", | ||
| 803 | repo, | ||
| 804 | npub, | ||
| 805 | event_id_short, | ||
| 806 | event.kind.as_u16(), | ||
| 807 | &commit[..commit.len().min(12)], | ||
| 808 | source_str | ||
| 809 | ); | ||
| 810 | } | ||
| 811 | } | ||
| 812 | |||
| 813 | self.mark_expired(event.id); | ||
| 814 | } else { | ||
| 815 | // Placeholder (git data arrived first, but PR event never came) | ||
| 816 | // Placeholders are always Direct source (from git push) | ||
| 817 | tracing::debug!( | ||
| 818 | "[PURGATORY_EXPIRED] placeholder event_id={} commit={} source=direct reason=\"PR event not received within 30 minutes\"", | ||
| 819 | &event_id_str[..event_id_str.len().min(12)], | ||
| 820 | &commit[..commit.len().min(12)] | ||
| 821 | ); | ||
| 654 | } | 822 | } |
| 655 | self.pr_events.remove(&event_id_str); | 823 | self.pr_events.remove(&event_id_str); |
| 656 | } | 824 | } |
| @@ -800,6 +968,7 @@ impl Purgatory { | |||
| 800 | author: e.author, | 968 | author: e.author, |
| 801 | created_at_offset_secs: created_offset.as_secs(), | 969 | created_at_offset_secs: created_offset.as_secs(), |
| 802 | expires_at_offset_secs: expires_offset.as_secs(), | 970 | expires_at_offset_secs: expires_offset.as_secs(), |
| 971 | source: e.source, | ||
| 803 | } | 972 | } |
| 804 | }) | 973 | }) |
| 805 | .collect(); | 974 | .collect(); |
| @@ -822,6 +991,7 @@ impl Purgatory { | |||
| 822 | commit: e.commit.clone(), | 991 | commit: e.commit.clone(), |
| 823 | created_at_offset_secs: created_offset.as_secs(), | 992 | created_at_offset_secs: created_offset.as_secs(), |
| 824 | expires_at_offset_secs: expires_offset.as_secs(), | 993 | expires_at_offset_secs: expires_offset.as_secs(), |
| 994 | source: e.source, | ||
| 825 | }; | 995 | }; |
| 826 | pr_events.insert(event_id, serializable); | 996 | pr_events.insert(event_id, serializable); |
| 827 | } | 997 | } |
| @@ -923,6 +1093,7 @@ impl Purgatory { | |||
| 923 | author: e.author, | 1093 | author: e.author, |
| 924 | created_at, | 1094 | created_at, |
| 925 | expires_at, | 1095 | expires_at, |
| 1096 | source: e.source, | ||
| 926 | } | 1097 | } |
| 927 | }) | 1098 | }) |
| 928 | .collect(); | 1099 | .collect(); |
| @@ -948,6 +1119,7 @@ impl Purgatory { | |||
| 948 | commit: e.commit, | 1119 | commit: e.commit, |
| 949 | created_at, | 1120 | created_at, |
| 950 | expires_at, | 1121 | expires_at, |
| 1122 | source: e.source, | ||
| 951 | }; | 1123 | }; |
| 952 | 1124 | ||
| 953 | self.pr_events.insert(event_id, entry); | 1125 | self.pr_events.insert(event_id, entry); |
| @@ -1005,8 +1177,18 @@ mod tests { | |||
| 1005 | .sign_with_keys(&keys) | 1177 | .sign_with_keys(&keys) |
| 1006 | .unwrap(); | 1178 | .unwrap(); |
| 1007 | 1179 | ||
| 1008 | purgatory.add_state(event.clone(), "test-repo".to_string(), keys.public_key()); | 1180 | purgatory.add_state( |
| 1009 | purgatory.add_pr(event, "test-event-id".to_string(), "abc123".to_string()); | 1181 | event.clone(), |
| 1182 | "test-repo".to_string(), | ||
| 1183 | keys.public_key(), | ||
| 1184 | false, | ||
| 1185 | ); | ||
| 1186 | purgatory.add_pr( | ||
| 1187 | event, | ||
| 1188 | "test-event-id".to_string(), | ||
| 1189 | "abc123".to_string(), | ||
| 1190 | false, | ||
| 1191 | ); | ||
| 1010 | 1192 | ||
| 1011 | let (state_count, pr_count) = purgatory.count(); | 1193 | let (state_count, pr_count) = purgatory.count(); |
| 1012 | assert_eq!(state_count, 1); | 1194 | assert_eq!(state_count, 1); |
| @@ -1057,7 +1239,7 @@ mod tests { | |||
| 1057 | let event = EventBuilder::text_note("state") | 1239 | let event = EventBuilder::text_note("state") |
| 1058 | .sign_with_keys(&keys) | 1240 | .sign_with_keys(&keys) |
| 1059 | .unwrap(); | 1241 | .unwrap(); |
| 1060 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key()); | 1242 | purgatory.add_state(event, "test-repo".to_string(), keys.public_key(), false); |
| 1061 | 1243 | ||
| 1062 | // Now should have pending events | 1244 | // Now should have pending events |
| 1063 | assert!(purgatory.has_pending_events("test-repo")); | 1245 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1087,7 +1269,12 @@ mod tests { | |||
| 1087 | .sign_with_keys(&keys) | 1269 | .sign_with_keys(&keys) |
| 1088 | .unwrap(); | 1270 | .unwrap(); |
| 1089 | 1271 | ||
| 1090 | purgatory.add_pr(event, "pr-event-id".to_string(), "commit123".to_string()); | 1272 | purgatory.add_pr( |
| 1273 | event, | ||
| 1274 | "pr-event-id".to_string(), | ||
| 1275 | "commit123".to_string(), | ||
| 1276 | false, | ||
| 1277 | ); | ||
| 1091 | 1278 | ||
| 1092 | // Now should have pending events for test-repo | 1279 | // Now should have pending events for test-repo |
| 1093 | assert!(purgatory.has_pending_events("test-repo")); | 1280 | assert!(purgatory.has_pending_events("test-repo")); |
| @@ -1152,6 +1339,7 @@ fn test_pr_event_vs_placeholder() { | |||
| 1152 | event.clone(), | 1339 | event.clone(), |
| 1153 | "event-id-1".to_string(), | 1340 | "event-id-1".to_string(), |
| 1154 | "commit-abc".to_string(), | 1341 | "commit-abc".to_string(), |
| 1342 | false, | ||
| 1155 | ); | 1343 | ); |
| 1156 | 1344 | ||
| 1157 | // Add a placeholder (no event) | 1345 | // Add a placeholder (no event) |
| @@ -1208,8 +1396,14 @@ fn test_cleanup_removes_expired_entries() { | |||
| 1208 | state_event.clone(), | 1396 | state_event.clone(), |
| 1209 | "test-repo".to_string(), | 1397 | "test-repo".to_string(), |
| 1210 | keys.public_key(), | 1398 | keys.public_key(), |
| 1399 | false, | ||
| 1400 | ); | ||
| 1401 | purgatory.add_pr( | ||
| 1402 | pr_event, | ||
| 1403 | "pr-123".to_string(), | ||
| 1404 | "commit-abc".to_string(), | ||
| 1405 | false, | ||
| 1211 | ); | 1406 | ); |
| 1212 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | ||
| 1213 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); | 1407 | purgatory.add_pr_placeholder("pr-456".to_string(), "commit-def".to_string()); |
| 1214 | 1408 | ||
| 1215 | // Verify entries are there | 1409 | // Verify entries are there |
| @@ -1256,8 +1450,18 @@ fn test_cleanup_preserves_non_expired_entries() { | |||
| 1256 | .unwrap(); | 1450 | .unwrap(); |
| 1257 | 1451 | ||
| 1258 | // Add fresh entries | 1452 | // Add fresh entries |
| 1259 | purgatory.add_state(state_event, "test-repo".to_string(), keys.public_key()); | 1453 | purgatory.add_state( |
| 1260 | purgatory.add_pr(pr_event, "pr-123".to_string(), "commit-abc".to_string()); | 1454 | state_event, |
| 1455 | "test-repo".to_string(), | ||
| 1456 | keys.public_key(), | ||
| 1457 | false, | ||
| 1458 | ); | ||
| 1459 | purgatory.add_pr( | ||
| 1460 | pr_event, | ||
| 1461 | "pr-123".to_string(), | ||
| 1462 | "commit-abc".to_string(), | ||
| 1463 | false, | ||
| 1464 | ); | ||
| 1261 | 1465 | ||
| 1262 | // Run cleanup | 1466 | // Run cleanup |
| 1263 | let (state_removed, pr_removed) = purgatory.cleanup(); | 1467 | let (state_removed, pr_removed) = purgatory.cleanup(); |
| @@ -1287,8 +1491,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1287 | .sign_with_keys(&keys) | 1491 | .sign_with_keys(&keys) |
| 1288 | .unwrap(); | 1492 | .unwrap(); |
| 1289 | 1493 | ||
| 1290 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key()); | 1494 | purgatory.add_state(event1, "test-repo".to_string(), keys.public_key(), false); |
| 1291 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key()); | 1495 | purgatory.add_state(event2, "test-repo".to_string(), keys.public_key(), false); |
| 1292 | 1496 | ||
| 1293 | // Expire only the first one | 1497 | // Expire only the first one |
| 1294 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { | 1498 | if let Some(mut entries) = purgatory.state_events.get_mut("test-repo") { |
| @@ -1305,8 +1509,8 @@ fn test_cleanup_mixed_expired_and_fresh() { | |||
| 1305 | .sign_with_keys(&keys) | 1509 | .sign_with_keys(&keys) |
| 1306 | .unwrap(); | 1510 | .unwrap(); |
| 1307 | 1511 | ||
| 1308 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string()); | 1512 | purgatory.add_pr(pr1, "pr-1".to_string(), "commit-1".to_string(), false); |
| 1309 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string()); | 1513 | purgatory.add_pr(pr2, "pr-2".to_string(), "commit-2".to_string(), false); |
| 1310 | 1514 | ||
| 1311 | // Expire only first PR | 1515 | // Expire only first PR |
| 1312 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { | 1516 | if let Some(mut entry) = purgatory.pr_events.get_mut("pr-1") { |
| @@ -1338,8 +1542,8 @@ fn test_remove_expired_legacy_method() { | |||
| 1338 | .unwrap(); | 1542 | .unwrap(); |
| 1339 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); | 1543 | let pr_event = EventBuilder::text_note("pr").sign_with_keys(&keys).unwrap(); |
| 1340 | 1544 | ||
| 1341 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1545 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1342 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1546 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1343 | 1547 | ||
| 1344 | // Expire both | 1548 | // Expire both |
| 1345 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1549 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1373,8 +1577,8 @@ fn test_expired_event_tracking() { | |||
| 1373 | let pr_event_id = pr_event.id; | 1577 | let pr_event_id = pr_event.id; |
| 1374 | 1578 | ||
| 1375 | // Add events to purgatory | 1579 | // Add events to purgatory |
| 1376 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key()); | 1580 | purgatory.add_state(state_event, "repo".to_string(), keys.public_key(), false); |
| 1377 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string()); | 1581 | purgatory.add_pr(pr_event, "pr-id".to_string(), "commit".to_string(), false); |
| 1378 | 1582 | ||
| 1379 | // Events should not be marked as expired yet | 1583 | // Events should not be marked as expired yet |
| 1380 | assert!(!purgatory.is_expired(&state_event_id)); | 1584 | assert!(!purgatory.is_expired(&state_event_id)); |
| @@ -1426,7 +1630,7 @@ fn test_cleanup_expired_events() { | |||
| 1426 | let event2_id = event2.id; | 1630 | let event2_id = event2.id; |
| 1427 | 1631 | ||
| 1428 | // Add and immediately expire event1 | 1632 | // Add and immediately expire event1 |
| 1429 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key()); | 1633 | purgatory.add_state(event1, "repo1".to_string(), keys.public_key(), false); |
| 1430 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { | 1634 | if let Some(mut entries) = purgatory.state_events.get_mut("repo1") { |
| 1431 | for entry in entries.iter_mut() { | 1635 | for entry in entries.iter_mut() { |
| 1432 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1636 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1435,7 +1639,7 @@ fn test_cleanup_expired_events() { | |||
| 1435 | purgatory.cleanup(); | 1639 | purgatory.cleanup(); |
| 1436 | 1640 | ||
| 1437 | // Add and expire event2 (will be more recent) | 1641 | // Add and expire event2 (will be more recent) |
| 1438 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key()); | 1642 | purgatory.add_state(event2, "repo2".to_string(), keys.public_key(), false); |
| 1439 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { | 1643 | if let Some(mut entries) = purgatory.state_events.get_mut("repo2") { |
| 1440 | for entry in entries.iter_mut() { | 1644 | for entry in entries.iter_mut() { |
| 1441 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1645 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1477,7 +1681,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1477 | let event_id = event.id; | 1681 | let event_id = event.id; |
| 1478 | 1682 | ||
| 1479 | // Add event to purgatory | 1683 | // Add event to purgatory |
| 1480 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1684 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1481 | 1685 | ||
| 1482 | // Expire it | 1686 | // Expire it |
| 1483 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1687 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1497,7 +1701,7 @@ fn test_expired_events_prevent_readdition() { | |||
| 1497 | // This simulates what negentropy/REQ+EOSE should do: | 1701 | // This simulates what negentropy/REQ+EOSE should do: |
| 1498 | // Check if event is in event_ids() before adding | 1702 | // Check if event is in event_ids() before adding |
| 1499 | if !ids.contains(&event_id) { | 1703 | if !ids.contains(&event_id) { |
| 1500 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1704 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1501 | } | 1705 | } |
| 1502 | 1706 | ||
| 1503 | // Event should NOT be re-added | 1707 | // Event should NOT be re-added |
| @@ -1540,7 +1744,7 @@ fn test_user_can_resubmit_expired_event() { | |||
| 1540 | let event_id = event.id; | 1744 | let event_id = event.id; |
| 1541 | 1745 | ||
| 1542 | // Add event to purgatory | 1746 | // Add event to purgatory |
| 1543 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 1747 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1544 | 1748 | ||
| 1545 | // Expire it | 1749 | // Expire it |
| 1546 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1750 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| @@ -1589,8 +1793,18 @@ async fn test_save_and_restore_state_events() { | |||
| 1589 | let event1_id = event1.id; | 1793 | let event1_id = event1.id; |
| 1590 | let event2_id = event2.id; | 1794 | let event2_id = event2.id; |
| 1591 | 1795 | ||
| 1592 | purgatory.add_state(event1.clone(), "test-repo".to_string(), keys.public_key()); | 1796 | purgatory.add_state( |
| 1593 | purgatory.add_state(event2.clone(), "test-repo".to_string(), keys.public_key()); | 1797 | event1.clone(), |
| 1798 | "test-repo".to_string(), | ||
| 1799 | keys.public_key(), | ||
| 1800 | false, | ||
| 1801 | ); | ||
| 1802 | purgatory.add_state( | ||
| 1803 | event2.clone(), | ||
| 1804 | "test-repo".to_string(), | ||
| 1805 | keys.public_key(), | ||
| 1806 | false, | ||
| 1807 | ); | ||
| 1594 | 1808 | ||
| 1595 | // Save to disk | 1809 | // Save to disk |
| 1596 | purgatory.save_to_disk(&state_file).unwrap(); | 1810 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -1652,6 +1866,7 @@ async fn test_save_and_restore_pr_events() { | |||
| 1652 | pr_event.clone(), | 1866 | pr_event.clone(), |
| 1653 | "pr-event-id".to_string(), | 1867 | "pr-event-id".to_string(), |
| 1654 | "commit-abc".to_string(), | 1868 | "commit-abc".to_string(), |
| 1869 | false, | ||
| 1655 | ); | 1870 | ); |
| 1656 | 1871 | ||
| 1657 | // Save to disk | 1872 | // Save to disk |
| @@ -1721,7 +1936,7 @@ async fn test_save_and_restore_expired_events() { | |||
| 1721 | let event_id = event.id; | 1936 | let event_id = event.id; |
| 1722 | 1937 | ||
| 1723 | // Add and expire event | 1938 | // Add and expire event |
| 1724 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 1939 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 1725 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { | 1940 | if let Some(mut entries) = purgatory.state_events.get_mut("repo") { |
| 1726 | for entry in entries.iter_mut() { | 1941 | for entry in entries.iter_mut() { |
| 1727 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 1942 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
| @@ -1860,7 +2075,7 @@ async fn test_downtime_calculation() { | |||
| 1860 | .sign_with_keys(&keys) | 2075 | .sign_with_keys(&keys) |
| 1861 | .unwrap(); | 2076 | .unwrap(); |
| 1862 | 2077 | ||
| 1863 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2078 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1864 | 2079 | ||
| 1865 | // Get original expiry time | 2080 | // Get original expiry time |
| 1866 | let original_entries = purgatory.find_state("repo"); | 2081 | let original_entries = purgatory.find_state("repo"); |
| @@ -1916,7 +2131,7 @@ async fn test_expiry_times_preserved() { | |||
| 1916 | .sign_with_keys(&keys) | 2131 | .sign_with_keys(&keys) |
| 1917 | .unwrap(); | 2132 | .unwrap(); |
| 1918 | 2133 | ||
| 1919 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key()); | 2134 | purgatory.add_state(event.clone(), "repo".to_string(), keys.public_key(), false); |
| 1920 | 2135 | ||
| 1921 | // Manually set expiry to a specific time in the future | 2136 | // Manually set expiry to a specific time in the future |
| 1922 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes | 2137 | let custom_expiry = Instant::now() + Duration::from_secs(600); // 10 minutes |
| @@ -1975,16 +2190,19 @@ async fn test_multiple_state_events_same_identifier() { | |||
| 1975 | event1.clone(), | 2190 | event1.clone(), |
| 1976 | "shared-repo".to_string(), | 2191 | "shared-repo".to_string(), |
| 1977 | keys1.public_key(), | 2192 | keys1.public_key(), |
| 2193 | false, | ||
| 1978 | ); | 2194 | ); |
| 1979 | purgatory.add_state( | 2195 | purgatory.add_state( |
| 1980 | event2.clone(), | 2196 | event2.clone(), |
| 1981 | "shared-repo".to_string(), | 2197 | "shared-repo".to_string(), |
| 1982 | keys2.public_key(), | 2198 | keys2.public_key(), |
| 2199 | false, | ||
| 1983 | ); | 2200 | ); |
| 1984 | purgatory.add_state( | 2201 | purgatory.add_state( |
| 1985 | event3.clone(), | 2202 | event3.clone(), |
| 1986 | "shared-repo".to_string(), | 2203 | "shared-repo".to_string(), |
| 1987 | keys3.public_key(), | 2204 | keys3.public_key(), |
| 2205 | false, | ||
| 1988 | ); | 2206 | ); |
| 1989 | 2207 | ||
| 1990 | // Save to disk | 2208 | // Save to disk |
| @@ -2031,6 +2249,7 @@ async fn test_mixed_pr_events_and_placeholders() { | |||
| 2031 | pr_event.clone(), | 2249 | pr_event.clone(), |
| 2032 | "pr-with-event".to_string(), | 2250 | "pr-with-event".to_string(), |
| 2033 | "commit-abc".to_string(), | 2251 | "commit-abc".to_string(), |
| 2252 | false, | ||
| 2034 | ); | 2253 | ); |
| 2035 | 2254 | ||
| 2036 | // Add PR placeholder | 2255 | // Add PR placeholder |
| @@ -2076,7 +2295,7 @@ async fn test_file_cleanup_after_successful_restore() { | |||
| 2076 | let event = EventBuilder::text_note("test") | 2295 | let event = EventBuilder::text_note("test") |
| 2077 | .sign_with_keys(&keys) | 2296 | .sign_with_keys(&keys) |
| 2078 | .unwrap(); | 2297 | .unwrap(); |
| 2079 | purgatory.add_state(event, "repo".to_string(), keys.public_key()); | 2298 | purgatory.add_state(event, "repo".to_string(), keys.public_key(), false); |
| 2080 | 2299 | ||
| 2081 | // Save to disk | 2300 | // Save to disk |
| 2082 | purgatory.save_to_disk(&state_file).unwrap(); | 2301 | purgatory.save_to_disk(&state_file).unwrap(); |
| @@ -2110,8 +2329,18 @@ async fn test_comprehensive_roundtrip() { | |||
| 2110 | .sign_with_keys(&keys2) | 2329 | .sign_with_keys(&keys2) |
| 2111 | .unwrap(); | 2330 | .unwrap(); |
| 2112 | 2331 | ||
| 2113 | purgatory.add_state(state1.clone(), "repo1".to_string(), keys1.public_key()); | 2332 | purgatory.add_state( |
| 2114 | purgatory.add_state(state2.clone(), "repo2".to_string(), keys2.public_key()); | 2333 | state1.clone(), |
| 2334 | "repo1".to_string(), | ||
| 2335 | keys1.public_key(), | ||
| 2336 | false, | ||
| 2337 | ); | ||
| 2338 | purgatory.add_state( | ||
| 2339 | state2.clone(), | ||
| 2340 | "repo2".to_string(), | ||
| 2341 | keys2.public_key(), | ||
| 2342 | false, | ||
| 2343 | ); | ||
| 2115 | 2344 | ||
| 2116 | // Add PR event | 2345 | // Add PR event |
| 2117 | let tags = vec![Tag::custom( | 2346 | let tags = vec![Tag::custom( |
| @@ -2122,7 +2351,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2122 | .tags(tags) | 2351 | .tags(tags) |
| 2123 | .sign_with_keys(&keys1) | 2352 | .sign_with_keys(&keys1) |
| 2124 | .unwrap(); | 2353 | .unwrap(); |
| 2125 | purgatory.add_pr(pr_event.clone(), "pr-1".to_string(), "commit-1".to_string()); | 2354 | purgatory.add_pr( |
| 2355 | pr_event.clone(), | ||
| 2356 | "pr-1".to_string(), | ||
| 2357 | "commit-1".to_string(), | ||
| 2358 | false, | ||
| 2359 | ); | ||
| 2126 | 2360 | ||
| 2127 | // Add PR placeholder | 2361 | // Add PR placeholder |
| 2128 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); | 2362 | purgatory.add_pr_placeholder("pr-2".to_string(), "commit-2".to_string()); |
| @@ -2132,7 +2366,12 @@ async fn test_comprehensive_roundtrip() { | |||
| 2132 | .sign_with_keys(&keys1) | 2366 | .sign_with_keys(&keys1) |
| 2133 | .unwrap(); | 2367 | .unwrap(); |
| 2134 | let expired_id = expired_event.id; | 2368 | let expired_id = expired_event.id; |
| 2135 | purgatory.add_state(expired_event, "repo3".to_string(), keys1.public_key()); | 2369 | purgatory.add_state( |
| 2370 | expired_event, | ||
| 2371 | "repo3".to_string(), | ||
| 2372 | keys1.public_key(), | ||
| 2373 | false, | ||
| 2374 | ); | ||
| 2136 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { | 2375 | if let Some(mut entries) = purgatory.state_events.get_mut("repo3") { |
| 2137 | for entry in entries.iter_mut() { | 2376 | for entry in entries.iter_mut() { |
| 2138 | entry.expires_at = Instant::now() - Duration::from_secs(1); | 2377 | entry.expires_at = Instant::now() - Duration::from_secs(1); |
diff --git a/src/purgatory/sync/context.rs b/src/purgatory/sync/context.rs index 33c2d12..904f8af 100644 --- a/src/purgatory/sync/context.rs +++ b/src/purgatory/sync/context.rs | |||
| @@ -361,94 +361,121 @@ impl SyncContext for RealSyncContext { | |||
| 361 | let naughty_list = self.git_naughty_list.clone(); | 361 | let naughty_list = self.git_naughty_list.clone(); |
| 362 | 362 | ||
| 363 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { | 363 | tokio::task::spawn_blocking(move || -> Result<Vec<String>> { |
| 364 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history | 364 | let mut remaining_oids = missing_oids.clone(); |
| 365 | let mut args = vec!["fetch", &url]; | 365 | let mut missing_from_remote: Vec<String> = Vec::new(); |
| 366 | args.extend(missing_oids.iter().map(|s| s.as_str())); | 366 | |
| 367 | 367 | // Retry loop: keep fetching until success or no OIDs left | |
| 368 | let output = Command::new("git") | 368 | loop { |
| 369 | .args(&args) | 369 | if remaining_oids.is_empty() { |
| 370 | .current_dir(&repo_path) | 370 | // All OIDs were missing from remote |
| 371 | .output(); | 371 | debug!( |
| 372 | 372 | url = %url, | |
| 373 | match output { | 373 | missing_count = missing_from_remote.len(), |
| 374 | Ok(result) if result.status.success() => { | 374 | "All requested OIDs missing from remote" |
| 375 | // Count how many OIDs we now have | 375 | ); |
| 376 | let fetched: Vec<String> = missing_oids | 376 | return Ok(vec![]); |
| 377 | .iter() | ||
| 378 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) | ||
| 379 | .cloned() | ||
| 380 | .collect(); | ||
| 381 | |||
| 382 | debug!(fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 383 | |||
| 384 | Ok(fetched) | ||
| 385 | } | 377 | } |
| 386 | Ok(result) => { | 378 | |
| 387 | let stderr = String::from_utf8_lossy(&result.stderr); | 379 | // git fetch <remote> <sha1> <sha2> ... - fetch all OIDs with full history |
| 388 | 380 | let mut args = vec!["fetch".to_string(), url.clone()]; | |
| 389 | // Extract domain and classify error for naughty list | 381 | args.extend(remaining_oids.iter().cloned()); |
| 390 | if let Some(domain) = extract_domain(&url) { | 382 | |
| 391 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | 383 | let output = Command::new("git") |
| 392 | let is_new = naughty_list.record(&domain, category, stderr.to_string()); | 384 | .args(&args) |
| 393 | 385 | .current_dir(&repo_path) | |
| 394 | if is_new { | 386 | .output(); |
| 395 | tracing::warn!( | 387 | |
| 396 | domain = %domain, | 388 | match output { |
| 397 | category = %category, | 389 | Ok(result) if result.status.success() => { |
| 398 | error = %stderr, | 390 | // Fetch succeeded - count how many OIDs we now have |
| 399 | "Git remote domain added to naughty list" | 391 | let fetched: Vec<String> = missing_oids |
| 400 | ); | 392 | .iter() |
| 401 | } else { | 393 | .filter(|oid| crate::git::oid_exists(&repo_path, oid)) |
| 402 | debug!( | 394 | .cloned() |
| 403 | domain = %domain, | 395 | .collect(); |
| 404 | category = %category, | 396 | |
| 405 | "Git remote domain still on naughty list" | 397 | if !missing_from_remote.is_empty() { |
| 406 | ); | 398 | debug!( |
| 407 | } | 399 | url = %url, |
| 400 | fetched_count = fetched.len(), | ||
| 401 | missing_count = missing_from_remote.len(), | ||
| 402 | missing_oids = ?missing_from_remote, | ||
| 403 | "Fetch completed after retries - some OIDs were missing from remote" | ||
| 404 | ); | ||
| 405 | } else { | ||
| 406 | debug!(url = %url, fetched_count = fetched.len(), "Successfully fetched OIDs"); | ||
| 408 | } | 407 | } |
| 408 | |||
| 409 | return Ok(fetched); | ||
| 409 | } | 410 | } |
| 411 | Ok(result) => { | ||
| 412 | let stderr = String::from_utf8_lossy(&result.stderr); | ||
| 410 | 413 | ||
| 411 | // Check for "not our ref" errors and provide a clearer error message | 414 | // Check for "not our ref" error - this is retryable |
| 412 | let error_msg = if stderr.contains("upload-pack: not our ref") { | 415 | if stderr.contains("upload-pack: not our ref") { |
| 413 | // Parse out the missing OID from stderr (git only reports one at a time) | 416 | // Parse out the missing OID from stderr |
| 414 | let missing_oid = stderr | 417 | let missing_oid = stderr.lines().find_map(|line| { |
| 415 | .lines() | ||
| 416 | .find_map(|line| { | ||
| 417 | if line.contains("not our ref") { | 418 | if line.contains("not our ref") { |
| 418 | // Extract the OID from lines like: | 419 | // Extract the OID from lines like: |
| 419 | // "fatal: remote error: upload-pack: not our ref <oid>" | 420 | // "fatal: remote error: upload-pack: not our ref <oid>" |
| 420 | line.split("not our ref").nth(1).map(|s| s.trim().to_string()) | 421 | line.split("not our ref") |
| 422 | .nth(1) | ||
| 423 | .map(|s| s.trim().to_string()) | ||
| 421 | } else { | 424 | } else { |
| 422 | None | 425 | None |
| 423 | } | 426 | } |
| 424 | }); | 427 | }); |
| 425 | 428 | ||
| 426 | let total_requested = missing_oids.len(); | 429 | if let Some(ref oid) = missing_oid { |
| 430 | // Remove the missing OID and retry with remaining | ||
| 431 | remaining_oids.retain(|o| o != oid); | ||
| 432 | missing_from_remote.push(oid.clone()); | ||
| 427 | 433 | ||
| 428 | if let Some(oid) = missing_oid { | 434 | debug!( |
| 429 | if total_requested > 1 { | ||
| 430 | // BUG: Git stops at first missing OID, so we don't know if the others exist | ||
| 431 | // We need retry logic to fetch remaining OIDs individually | ||
| 432 | tracing::warn!( | ||
| 433 | url = %url, | 435 | url = %url, |
| 434 | missing_oid = %oid, | 436 | missing_oid = %oid, |
| 435 | total_requested = total_requested, | 437 | remaining_count = remaining_oids.len(), |
| 436 | "Git fetch failed on first missing OID - other requested OIDs may exist but were not fetched. Retry logic needed." | 438 | "OID not found on remote, retrying with remaining OIDs" |
| 437 | ); | 439 | ); |
| 438 | format!("remote missing oid {} (BUG: {} other oids not attempted)", oid, total_requested - 1) | 440 | |
| 439 | } else { | 441 | continue; // Retry with remaining OIDs |
| 440 | format!("remote missing only oid requested: {}", oid) | 442 | } |
| 443 | } | ||
| 444 | |||
| 445 | // Non-retryable error - record to naughty list and return error | ||
| 446 | if let Some(domain) = extract_domain(&url) { | ||
| 447 | if let Some(category) = NaughtyListTracker::classify_error(&stderr) { | ||
| 448 | let is_new = | ||
| 449 | naughty_list.record(&domain, category, stderr.to_string()); | ||
| 450 | |||
| 451 | if is_new { | ||
| 452 | tracing::warn!( | ||
| 453 | domain = %domain, | ||
| 454 | category = %category, | ||
| 455 | error = %stderr, | ||
| 456 | "Git remote domain added to naughty list" | ||
| 457 | ); | ||
| 458 | } else { | ||
| 459 | debug!( | ||
| 460 | domain = %domain, | ||
| 461 | category = %category, | ||
| 462 | error = %stderr, | ||
| 463 | "Git fetch failed (domain on naughty list)" | ||
| 464 | ); | ||
| 465 | } | ||
| 441 | } | 466 | } |
| 442 | } else { | ||
| 443 | format!("git fetch failed: {}", stderr) | ||
| 444 | } | 467 | } |
| 445 | } else { | ||
| 446 | format!("git fetch failed: {}", stderr) | ||
| 447 | }; | ||
| 448 | 468 | ||
| 449 | Err(anyhow::anyhow!("{}", error_msg)) | 469 | return Err(anyhow::anyhow!("git fetch failed for {}: {}", url, stderr)); |
| 470 | } | ||
| 471 | Err(e) => { | ||
| 472 | return Err(anyhow::anyhow!( | ||
| 473 | "git fetch command error for {}: {}", | ||
| 474 | url, | ||
| 475 | e | ||
| 476 | )) | ||
| 477 | } | ||
| 450 | } | 478 | } |
| 451 | Err(e) => Err(anyhow::anyhow!("git fetch command error: {}", e)), | ||
| 452 | } | 479 | } |
| 453 | }) | 480 | }) |
| 454 | .await | 481 | .await |
diff --git a/src/purgatory/sync/functions.rs b/src/purgatory/sync/functions.rs index 65d29af..9207d58 100644 --- a/src/purgatory/sync/functions.rs +++ b/src/purgatory/sync/functions.rs | |||
| @@ -368,15 +368,23 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; | 368 | let fetch_result = ctx.fetch_oids(&target_repo, url, &needed_oids).await; |
| 369 | throttle_manager.complete_request(&domain); | 369 | throttle_manager.complete_request(&domain); |
| 370 | 370 | ||
| 371 | let oids_fetched = match fetch_result { | 371 | let fetched_oids = match fetch_result { |
| 372 | Ok(fetched) => { | 372 | Ok(fetched) if !fetched.is_empty() => { |
| 373 | debug!( | 373 | debug!( |
| 374 | identifier = %identifier, | 374 | identifier = %identifier, |
| 375 | url = %url, | 375 | url = %url, |
| 376 | oids_fetched = fetched.len(), | 376 | oids_fetched = fetched.len(), |
| 377 | "Fetch succeeded" | 377 | "Fetch succeeded" |
| 378 | ); | 378 | ); |
| 379 | fetched.len() | 379 | fetched |
| 380 | } | ||
| 381 | Ok(_) => { | ||
| 382 | debug!( | ||
| 383 | identifier = %identifier, | ||
| 384 | url = %url, | ||
| 385 | "Fetch returned no OIDs (not available on remote)" | ||
| 386 | ); | ||
| 387 | vec![] | ||
| 380 | } | 388 | } |
| 381 | Err(e) => { | 389 | Err(e) => { |
| 382 | debug!( | 390 | debug!( |
| @@ -385,13 +393,13 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 385 | error = %e, | 393 | error = %e, |
| 386 | "Fetch failed" | 394 | "Fetch failed" |
| 387 | ); | 395 | ); |
| 388 | 0 | 396 | vec![] |
| 389 | } | 397 | } |
| 390 | }; | 398 | }; |
| 391 | 399 | ||
| 392 | // Try to process any events that can now be satisfied | 400 | // Try to process any events that can now be satisfied |
| 393 | if oids_fetched > 0 { | 401 | if !fetched_oids.is_empty() { |
| 394 | let new_oids: HashSet<String> = needed_oids.into_iter().collect(); | 402 | let new_oids: HashSet<String> = fetched_oids.iter().cloned().collect(); |
| 395 | if let Err(e) = ctx | 403 | if let Err(e) = ctx |
| 396 | .process_newly_available_git_data(&target_repo, &new_oids) | 404 | .process_newly_available_git_data(&target_repo, &new_oids) |
| 397 | .await | 405 | .await |
| @@ -404,7 +412,7 @@ pub async fn sync_identifier_from_url<C: SyncContext + ?Sized>( | |||
| 404 | } | 412 | } |
| 405 | } | 413 | } |
| 406 | 414 | ||
| 407 | oids_fetched | 415 | fetched_oids.len() |
| 408 | } | 416 | } |
| 409 | 417 | ||
| 410 | /// Sync git data for an identifier. | 418 | /// Sync git data for an identifier. |
diff --git a/src/purgatory/types.rs b/src/purgatory/types.rs index 919504b..e37a3e1 100644 --- a/src/purgatory/types.rs +++ b/src/purgatory/types.rs | |||
| @@ -8,6 +8,28 @@ use nostr_sdk::prelude::*; | |||
| 8 | use serde::{Deserialize, Serialize}; | 8 | use serde::{Deserialize, Serialize}; |
| 9 | use std::time::Instant; | 9 | use std::time::Instant; |
| 10 | 10 | ||
| 11 | /// Source of an event entering purgatory. | ||
| 12 | /// | ||
| 13 | /// Tracks whether an event was submitted directly by a user or fetched via | ||
| 14 | /// proactive sync from another relay. This distinction is used for: | ||
| 15 | /// - Filtered logging: Direct submissions log at WARN level, synced at DEBUG | ||
| 16 | /// - Operational monitoring: Helps identify user-facing issues vs sync noise | ||
| 17 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] | ||
| 18 | pub enum EventSource { | ||
| 19 | /// Event was published directly to this relay by a user | ||
| 20 | #[default] | ||
| 21 | Direct, | ||
| 22 | /// Event was fetched via proactive sync from another relay | ||
| 23 | Sync, | ||
| 24 | } | ||
| 25 | |||
| 26 | impl EventSource { | ||
| 27 | /// Returns true if this is a direct submission (not synced) | ||
| 28 | pub fn is_direct(&self) -> bool { | ||
| 29 | matches!(self, EventSource::Direct) | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 11 | /// Default value for Instant fields during deserialization | 33 | /// Default value for Instant fields during deserialization |
| 12 | fn instant_now() -> Instant { | 34 | fn instant_now() -> Instant { |
| 13 | Instant::now() | 35 | Instant::now() |
| @@ -86,6 +108,10 @@ pub struct StatePurgatoryEntry { | |||
| 86 | /// Expiry deadline (30 min from creation, may be extended) | 108 | /// Expiry deadline (30 min from creation, may be extended) |
| 87 | #[serde(skip, default = "instant_now")] | 109 | #[serde(skip, default = "instant_now")] |
| 88 | pub expires_at: Instant, | 110 | pub expires_at: Instant, |
| 111 | |||
| 112 | /// Source of this event (direct submission vs sync) | ||
| 113 | #[serde(default)] | ||
| 114 | pub source: EventSource, | ||
| 89 | } | 115 | } |
| 90 | 116 | ||
| 91 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. | 117 | /// Entry for a PR event (kind 1617/1618) or placeholder waiting in purgatory. |
| @@ -112,4 +138,8 @@ pub struct PrPurgatoryEntry { | |||
| 112 | /// Expiry deadline (30 min from creation, may be extended) | 138 | /// Expiry deadline (30 min from creation, may be extended) |
| 113 | #[serde(skip, default = "instant_now")] | 139 | #[serde(skip, default = "instant_now")] |
| 114 | pub expires_at: Instant, | 140 | pub expires_at: Instant, |
| 141 | |||
| 142 | /// Source of this event (direct submission vs sync) | ||
| 143 | #[serde(default)] | ||
| 144 | pub source: EventSource, | ||
| 115 | } | 145 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index bc8c428..d6634ff 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -584,6 +584,7 @@ impl SyncManager { | |||
| 584 | /// * `config` - Configuration for sync settings | 584 | /// * `config` - Configuration for sync settings |
| 585 | /// * `data_path` - Path to git data directory (for persistence) | 585 | /// * `data_path` - Path to git data directory (for persistence) |
| 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) | 586 | /// * `sync_metrics` - Optional pre-registered SyncMetrics (passed from Metrics if metrics are enabled) |
| 587 | #[allow(clippy::too_many_arguments)] | ||
| 587 | pub fn new( | 588 | pub fn new( |
| 588 | bootstrap_relay_url: Option<String>, | 589 | bootstrap_relay_url: Option<String>, |
| 589 | service_domain: String, | 590 | service_domain: String, |
| @@ -1442,6 +1443,7 @@ impl SyncManager { | |||
| 1442 | self.service_domain.clone(), | 1443 | self.service_domain.clone(), |
| 1443 | Arc::clone(&self.repo_sync_index), | 1444 | Arc::clone(&self.repo_sync_index), |
| 1444 | action_tx, | 1445 | action_tx, |
| 1446 | self.database.clone(), | ||
| 1445 | ); | 1447 | ); |
| 1446 | let subscriber_shutdown = shutdown_tx.subscribe(); | 1448 | let subscriber_shutdown = shutdown_tx.subscribe(); |
| 1447 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); | 1449 | tokio::spawn(async move { self_subscriber.run(Some(subscriber_shutdown)).await }); |
| @@ -2811,6 +2813,7 @@ impl SyncManager { | |||
| 2811 | event_id = %event.id, | 2813 | event_id = %event.id, |
| 2812 | kind = %event.kind.as_u16(), | 2814 | kind = %event.kind.as_u16(), |
| 2813 | identifier = %identifier, | 2815 | identifier = %identifier, |
| 2816 | pubkey = %event.pubkey, | ||
| 2814 | "Added rejected announcement to two-tier index" | 2817 | "Added rejected announcement to two-tier index" |
| 2815 | ); | 2818 | ); |
| 2816 | } | 2819 | } |
diff --git a/src/sync/naughty_list.rs b/src/sync/naughty_list.rs index 35fcc0f..0abb986 100644 --- a/src/sync/naughty_list.rs +++ b/src/sync/naughty_list.rs | |||
| @@ -101,6 +101,69 @@ impl NaughtyListTracker { | |||
| 101 | Self::new(12) | 101 | Self::new(12) |
| 102 | } | 102 | } |
| 103 | 103 | ||
| 104 | /// Strip URLs from an error message to prevent false positives from URL components. | ||
| 105 | /// | ||
| 106 | /// URLs can contain path components, repository names, or user identifiers that | ||
| 107 | /// accidentally match error patterns (e.g., "my-openssl-project", "ssl-team", | ||
| 108 | /// "certificate-manager"). By stripping URLs before classification, we ensure | ||
| 109 | /// only the actual error message text is analyzed. | ||
| 110 | /// | ||
| 111 | /// Handles: http://, https://, git://, ws://, wss:// | ||
| 112 | fn strip_urls(error: &str) -> String { | ||
| 113 | let mut result = String::with_capacity(error.len()); | ||
| 114 | let mut chars = error.chars().peekable(); | ||
| 115 | |||
| 116 | while let Some(c) = chars.next() { | ||
| 117 | // Check for URL start patterns | ||
| 118 | let potential_url = match c { | ||
| 119 | 'h' => { | ||
| 120 | // Check for http:// or https:// | ||
| 121 | let rest: String = chars.clone().take(7).collect(); | ||
| 122 | rest.starts_with("ttp://") || rest.starts_with("ttps://") | ||
| 123 | } | ||
| 124 | 'g' => { | ||
| 125 | // Check for git:// | ||
| 126 | let rest: String = chars.clone().take(5).collect(); | ||
| 127 | rest.starts_with("it://") | ||
| 128 | } | ||
| 129 | 'w' => { | ||
| 130 | // Check for ws:// or wss:// | ||
| 131 | let rest: String = chars.clone().take(5).collect(); | ||
| 132 | rest.starts_with("s://") || rest.starts_with("ss://") | ||
| 133 | } | ||
| 134 | _ => false, | ||
| 135 | }; | ||
| 136 | |||
| 137 | if potential_url { | ||
| 138 | // Found URL start, consume until URL end | ||
| 139 | result.push_str("[URL]"); | ||
| 140 | |||
| 141 | // Skip until we hit a URL terminator | ||
| 142 | loop { | ||
| 143 | match chars.peek() { | ||
| 144 | Some(&ch) if Self::is_url_char(ch) => { | ||
| 145 | chars.next(); | ||
| 146 | } | ||
| 147 | _ => break, | ||
| 148 | } | ||
| 149 | } | ||
| 150 | } else { | ||
| 151 | result.push(c); | ||
| 152 | } | ||
| 153 | } | ||
| 154 | |||
| 155 | result | ||
| 156 | } | ||
| 157 | |||
| 158 | /// Check if a character can be part of a URL | ||
| 159 | #[inline] | ||
| 160 | fn is_url_char(c: char) -> bool { | ||
| 161 | // URLs end at whitespace, quotes, or certain brackets | ||
| 162 | // This is conservative - real URLs can contain more, but git errors | ||
| 163 | // typically have URLs followed by these terminators | ||
| 164 | !matches!(c, ' ' | '\t' | '\n' | '\r' | '"' | '\'' | '>' | ']' | ')') | ||
| 165 | } | ||
| 166 | |||
| 104 | /// Classify an error string into a naughty category or return None for transient errors | 167 | /// Classify an error string into a naughty category or return None for transient errors |
| 105 | /// | 168 | /// |
| 106 | /// # Arguments | 169 | /// # Arguments |
| @@ -112,21 +175,56 @@ impl NaughtyListTracker { | |||
| 112 | /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue | 175 | /// - `Some(NaughtyCategory)` if the error indicates a persistent infrastructure issue |
| 113 | /// - `None` if the error is a transient network issue (use HealthTracker backoff) | 176 | /// - `None` if the error is a transient network issue (use HealthTracker backoff) |
| 114 | pub fn classify_error(error: &str) -> Option<NaughtyCategory> { | 177 | pub fn classify_error(error: &str) -> Option<NaughtyCategory> { |
| 115 | let error_lower = error.to_lowercase(); | 178 | // Filter out remote warnings - these are informational messages from the remote |
| 179 | // server that don't indicate infrastructure problems with the domain itself. | ||
| 180 | // Example: "remote: warning: unable to access '/root/.config/git/attributes': Permission denied" | ||
| 181 | // These warnings are about the remote server's internal configuration, not connectivity. | ||
| 182 | let filtered_error: String = error | ||
| 183 | .lines() | ||
| 184 | .filter(|line| { | ||
| 185 | let line_lower = line.to_lowercase(); | ||
| 186 | // Keep lines that are NOT remote warnings | ||
| 187 | !(line_lower.starts_with("remote: warning:") | ||
| 188 | || line_lower.starts_with("warning: remote")) | ||
| 189 | }) | ||
| 190 | .collect::<Vec<_>>() | ||
| 191 | .join("\n"); | ||
| 192 | |||
| 193 | // If after filtering we have no content, this was just warnings - not a real error | ||
| 194 | if filtered_error.trim().is_empty() { | ||
| 195 | return None; | ||
| 196 | } | ||
| 197 | |||
| 198 | // Strip URLs to prevent false positives from URL components | ||
| 199 | // (e.g., repository named "openssl-test" or path containing "certificate") | ||
| 200 | let url_stripped = Self::strip_urls(&filtered_error); | ||
| 201 | let error_lower = url_stripped.to_lowercase(); | ||
| 116 | 202 | ||
| 117 | // DNS lookup failures | 203 | // DNS lookup failures |
| 118 | if error_lower.contains("failed to lookup address") | 204 | if error_lower.contains("failed to lookup address") |
| 119 | || error_lower.contains("name or service not known") | 205 | || error_lower.contains("name or service not known") |
| 120 | || error_lower.contains("nodename nor servname provided") | 206 | || error_lower.contains("nodename nor servname provided") |
| 121 | || (error_lower.contains("dns") && !error_lower.contains("timeout")) | 207 | || error_lower.contains("dns error") |
| 208 | || error_lower.contains("dns lookup") | ||
| 209 | || error_lower.contains("dns resolution") | ||
| 210 | || error_lower.contains("getaddrinfo") | ||
| 122 | { | 211 | { |
| 123 | return Some(NaughtyCategory::DnsLookupFailed); | 212 | return Some(NaughtyCategory::DnsLookupFailed); |
| 124 | } | 213 | } |
| 125 | 214 | ||
| 126 | // TLS certificate errors | 215 | // TLS certificate errors |
| 127 | if error_lower.contains("certificate") | 216 | if error_lower.contains("certificate") |
| 128 | || error_lower.contains("ssl") | 217 | || error_lower.contains("ssl error") |
| 129 | || error_lower.contains("tls") | 218 | || error_lower.contains("ssl certificate") |
| 219 | || error_lower.contains("ssl handshake") | ||
| 220 | || error_lower.contains("ssl_error") | ||
| 221 | || error_lower.contains("tls error") | ||
| 222 | || error_lower.contains("tls handshake") | ||
| 223 | || error_lower.contains("tls alert") | ||
| 224 | || error_lower.contains("tls_error") | ||
| 225 | || error_lower.contains("openssl") | ||
| 226 | || error_lower.contains("schannel") | ||
| 227 | || error_lower.contains("secure channel") | ||
| 130 | { | 228 | { |
| 131 | // Exclude timeout errors that mention TLS | 229 | // Exclude timeout errors that mention TLS |
| 132 | if !error_lower.contains("timeout") && !error_lower.contains("timed out") { | 230 | if !error_lower.contains("timeout") && !error_lower.contains("timed out") { |
| @@ -134,12 +232,12 @@ impl NaughtyListTracker { | |||
| 134 | } | 232 | } |
| 135 | } | 233 | } |
| 136 | 234 | ||
| 137 | // Protocol errors | 235 | // Protocol errors - specifically WebSocket/Nostr protocol violations |
| 138 | if error_lower.contains("websocket") | 236 | // Note: We check for "websocket" specifically, NOT generic "protocol" keyword |
| 139 | || error_lower.contains("protocol") | 237 | // because git errors often contain "protocol error" (e.g., "fatal: protocol error: bad line length") |
| 140 | || error_lower.contains("invalid frame") | 238 | // which are transient network issues, not persistent infrastructure problems. |
| 141 | { | 239 | if error_lower.contains("websocket") || error_lower.contains("invalid frame") { |
| 142 | // Exclude connection errors | 240 | // Exclude connection errors (transient) |
| 143 | if !error_lower.contains("connection") | 241 | if !error_lower.contains("connection") |
| 144 | && !error_lower.contains("timeout") | 242 | && !error_lower.contains("timeout") |
| 145 | && !error_lower.contains("refused") | 243 | && !error_lower.contains("refused") |
| @@ -290,183 +388,216 @@ impl NaughtyListTracker { | |||
| 290 | mod tests { | 388 | mod tests { |
| 291 | use super::*; | 389 | use super::*; |
| 292 | 390 | ||
| 391 | // ========================================================================= | ||
| 392 | // URL STRIPPING TESTS | ||
| 393 | // ========================================================================= | ||
| 394 | |||
| 293 | #[test] | 395 | #[test] |
| 294 | fn test_classify_dns_errors() { | 396 | fn test_strip_urls_basic_protocols() { |
| 295 | assert_eq!( | 397 | // HTTP/HTTPS |
| 296 | NaughtyListTracker::classify_error("failed to lookup address information"), | ||
| 297 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 298 | ); | ||
| 299 | assert_eq!( | 398 | assert_eq!( |
| 300 | NaughtyListTracker::classify_error("Name or service not known"), | 399 | NaughtyListTracker::strip_urls("error: https://example.com/repo.git failed"), |
| 301 | Some(NaughtyCategory::DnsLookupFailed) | 400 | "error: [URL] failed" |
| 302 | ); | ||
| 303 | assert_eq!( | ||
| 304 | NaughtyListTracker::classify_error("nodename nor servname provided"), | ||
| 305 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 306 | ); | 401 | ); |
| 307 | assert_eq!( | 402 | assert_eq!( |
| 308 | NaughtyListTracker::classify_error("dns error: NXDOMAIN"), | 403 | NaughtyListTracker::strip_urls("error: http://example.com/path failed"), |
| 309 | Some(NaughtyCategory::DnsLookupFailed) | 404 | "error: [URL] failed" |
| 310 | ); | 405 | ); |
| 311 | } | ||
| 312 | 406 | ||
| 313 | #[test] | 407 | // Git protocol |
| 314 | fn test_classify_tls_errors() { | ||
| 315 | assert_eq!( | 408 | assert_eq!( |
| 316 | NaughtyListTracker::classify_error("certificate not valid for 'example.com'"), | 409 | NaughtyListTracker::strip_urls("fatal: git://github.com/user/repo.git not found"), |
| 317 | Some(NaughtyCategory::TlsCertificateInvalid) | 410 | "fatal: [URL] not found" |
| 318 | ); | 411 | ); |
| 412 | |||
| 413 | // WebSocket protocols (used for relay URLs) | ||
| 319 | assert_eq!( | 414 | assert_eq!( |
| 320 | NaughtyListTracker::classify_error("SSL certificate problem"), | 415 | NaughtyListTracker::strip_urls("error: wss://relay.example.com failed"), |
| 321 | Some(NaughtyCategory::TlsCertificateInvalid) | 416 | "error: [URL] failed" |
| 322 | ); | 417 | ); |
| 323 | assert_eq!( | 418 | assert_eq!( |
| 324 | NaughtyListTracker::classify_error("TLS handshake failed"), | 419 | NaughtyListTracker::strip_urls("error: ws://localhost:8080 failed"), |
| 325 | Some(NaughtyCategory::TlsCertificateInvalid) | 420 | "error: [URL] failed" |
| 326 | ); | 421 | ); |
| 422 | } | ||
| 327 | 423 | ||
| 328 | // TLS timeout should NOT be classified as naughty | 424 | #[test] |
| 329 | assert_eq!( | 425 | fn test_strip_urls_multiple() { |
| 330 | NaughtyListTracker::classify_error("TLS connection timed out"), | 426 | let error = "failed to clone https://a.com/repo.git and wss://relay.com"; |
| 331 | None | 427 | let stripped = NaughtyListTracker::strip_urls(error); |
| 332 | ); | 428 | assert_eq!(stripped, "failed to clone [URL] and [URL]"); |
| 333 | } | 429 | } |
| 334 | 430 | ||
| 335 | #[test] | 431 | #[test] |
| 336 | fn test_classify_protocol_errors() { | 432 | fn test_strip_urls_preserves_error_text() { |
| 337 | assert_eq!( | 433 | let error = |
| 338 | NaughtyListTracker::classify_error("websocket protocol error"), | 434 | "fatal: unable to access 'https://example.com/repo.git/': SSL certificate problem"; |
| 339 | Some(NaughtyCategory::ProtocolError) | 435 | let stripped = NaughtyListTracker::strip_urls(error); |
| 340 | ); | 436 | assert!(stripped.contains("SSL certificate problem")); |
| 437 | assert!(!stripped.contains("example.com")); | ||
| 438 | } | ||
| 439 | |||
| 440 | // ========================================================================= | ||
| 441 | // EDGE CASES: TIMEOUT/CONNECTION EXCEPTIONS | ||
| 442 | // These are the "unusual rules" where a pattern matches but should be excluded | ||
| 443 | // ========================================================================= | ||
| 444 | |||
| 445 | #[test] | ||
| 446 | fn test_tls_timeout_not_naughty() { | ||
| 447 | // TLS errors with timeout should NOT be classified as naughty | ||
| 448 | // (timeout is transient, not a certificate problem) | ||
| 341 | assert_eq!( | 449 | assert_eq!( |
| 342 | NaughtyListTracker::classify_error("invalid frame header"), | 450 | NaughtyListTracker::classify_error("TLS connection timed out"), |
| 343 | Some(NaughtyCategory::ProtocolError) | 451 | None |
| 344 | ); | 452 | ); |
| 345 | |||
| 346 | // WebSocket connection errors should NOT be classified as naughty | ||
| 347 | assert_eq!( | 453 | assert_eq!( |
| 348 | NaughtyListTracker::classify_error("websocket connection refused"), | 454 | NaughtyListTracker::classify_error("SSL handshake timeout"), |
| 349 | None | 455 | None |
| 350 | ); | 456 | ); |
| 351 | } | 457 | } |
| 352 | 458 | ||
| 353 | #[test] | 459 | #[test] |
| 354 | fn test_classify_transient_errors() { | 460 | fn test_websocket_connection_errors_not_naughty() { |
| 355 | // Timeouts are transient | 461 | // WebSocket connection errors are transient, not protocol violations |
| 356 | assert_eq!( | 462 | assert_eq!( |
| 357 | NaughtyListTracker::classify_error("connection timed out"), | 463 | NaughtyListTracker::classify_error("websocket connection refused"), |
| 358 | None | 464 | None |
| 359 | ); | 465 | ); |
| 360 | assert_eq!( | 466 | assert_eq!( |
| 361 | NaughtyListTracker::classify_error("operation timed out"), | 467 | NaughtyListTracker::classify_error("websocket connection timeout"), |
| 362 | None | 468 | None |
| 363 | ); | 469 | ); |
| 470 | } | ||
| 364 | 471 | ||
| 365 | // Connection refused is transient | 472 | #[test] |
| 473 | fn test_remote_warnings_filtered() { | ||
| 474 | // Remote warnings should be filtered out before classification | ||
| 475 | let warning_only = | ||
| 476 | "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"; | ||
| 477 | assert_eq!(NaughtyListTracker::classify_error(warning_only), None); | ||
| 478 | |||
| 479 | // But real errors after warnings should still be classified | ||
| 480 | let warning_with_error = "remote: warning: something\nfatal: failed to lookup address"; | ||
| 366 | assert_eq!( | 481 | assert_eq!( |
| 367 | NaughtyListTracker::classify_error("connection refused"), | 482 | NaughtyListTracker::classify_error(warning_with_error), |
| 368 | None | 483 | Some(NaughtyCategory::DnsLookupFailed) |
| 369 | ); | 484 | ); |
| 485 | } | ||
| 370 | 486 | ||
| 371 | // Generic network errors are transient | 487 | // ========================================================================= |
| 488 | // INTEGRATION: FULL CLASSIFICATION FLOW | ||
| 489 | // Verify URL stripping + classification work together correctly | ||
| 490 | // ========================================================================= | ||
| 491 | |||
| 492 | #[test] | ||
| 493 | fn test_url_with_keywords_not_false_positive() { | ||
| 494 | // URLs containing keywords should NOT trigger classification | ||
| 495 | let cases = [ | ||
| 496 | ("https://example.com/my-openssl-project.git", "not found"), | ||
| 497 | ("https://example.com/ssl-team/repo.git", "not found"), | ||
| 498 | ("https://example.com/certificate-manager.git", "not found"), | ||
| 499 | ("https://example.com/dns-tools.git", "not found"), | ||
| 500 | ("wss://relay-tls-test.example.com", "connection refused"), | ||
| 501 | ]; | ||
| 502 | |||
| 503 | for (url, suffix) in cases { | ||
| 504 | let error = format!("fatal: repository '{}/' {}", url, suffix); | ||
| 505 | assert_eq!( | ||
| 506 | NaughtyListTracker::classify_error(&error), | ||
| 507 | None, | ||
| 508 | "URL '{}' should not trigger false positive", | ||
| 509 | url | ||
| 510 | ); | ||
| 511 | } | ||
| 512 | } | ||
| 513 | |||
| 514 | #[test] | ||
| 515 | fn test_real_errors_still_detected() { | ||
| 516 | // Real errors in the message text (not URL) should still be detected | ||
| 372 | assert_eq!( | 517 | assert_eq!( |
| 373 | NaughtyListTracker::classify_error("network unreachable"), | 518 | NaughtyListTracker::classify_error( |
| 374 | None | 519 | "fatal: 'https://example.com/repo.git': SSL certificate problem" |
| 520 | ), | ||
| 521 | Some(NaughtyCategory::TlsCertificateInvalid) | ||
| 522 | ); | ||
| 523 | assert_eq!( | ||
| 524 | NaughtyListTracker::classify_error( | ||
| 525 | "fatal: 'https://example.com/repo.git': failed to lookup address" | ||
| 526 | ), | ||
| 527 | Some(NaughtyCategory::DnsLookupFailed) | ||
| 528 | ); | ||
| 529 | assert_eq!( | ||
| 530 | NaughtyListTracker::classify_error("websocket protocol error"), | ||
| 531 | Some(NaughtyCategory::ProtocolError) | ||
| 375 | ); | 532 | ); |
| 376 | } | 533 | } |
| 377 | 534 | ||
| 378 | #[test] | 535 | #[test] |
| 379 | fn test_record_new_entry() { | 536 | fn test_url_with_keyword_and_real_error() { |
| 380 | let tracker = NaughtyListTracker::with_defaults(); | 537 | // URL contains keyword AND there's a real error - should detect the error |
| 381 | let url = "wss://bad-relay.example.com"; | 538 | let error = "fatal: 'https://example.com/ssl-tools/repo.git': SSL certificate problem"; |
| 382 | 539 | assert_eq!( | |
| 383 | let is_new = tracker.record( | 540 | NaughtyListTracker::classify_error(error), |
| 384 | url, | 541 | Some(NaughtyCategory::TlsCertificateInvalid) |
| 385 | NaughtyCategory::DnsLookupFailed, | ||
| 386 | "failed to lookup address".to_string(), | ||
| 387 | ); | 542 | ); |
| 388 | |||
| 389 | assert!(is_new); | ||
| 390 | assert!(tracker.is_naughty(url)); | ||
| 391 | |||
| 392 | let entry = tracker.get_entry(url).unwrap(); | ||
| 393 | assert_eq!(entry.category, NaughtyCategory::DnsLookupFailed); | ||
| 394 | assert_eq!(entry.occurrence_count, 1); | ||
| 395 | } | 543 | } |
| 396 | 544 | ||
| 545 | // ========================================================================= | ||
| 546 | // TRACKER FUNCTIONALITY | ||
| 547 | // ========================================================================= | ||
| 548 | |||
| 397 | #[test] | 549 | #[test] |
| 398 | fn test_record_updates_existing() { | 550 | fn test_tracker_record_and_update() { |
| 399 | let tracker = NaughtyListTracker::with_defaults(); | 551 | let tracker = NaughtyListTracker::with_defaults(); |
| 400 | let url = "wss://bad-relay.example.com"; | 552 | let url = "wss://bad-relay.example.com"; |
| 401 | 553 | ||
| 402 | // First occurrence | 554 | // First occurrence |
| 403 | let is_new1 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); | 555 | let is_new = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 1".to_string()); |
| 404 | assert!(is_new1); | 556 | assert!(is_new); |
| 557 | assert!(tracker.is_naughty(url)); | ||
| 405 | 558 | ||
| 406 | // Second occurrence | 559 | // Second occurrence updates existing |
| 407 | let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); | 560 | let is_new2 = tracker.record(url, NaughtyCategory::DnsLookupFailed, "error 2".to_string()); |
| 408 | assert!(!is_new2); | 561 | assert!(!is_new2); |
| 409 | 562 | ||
| 410 | let entry = tracker.get_entry(url).unwrap(); | 563 | let entry = tracker.get_entry(url).unwrap(); |
| 411 | assert_eq!(entry.occurrence_count, 2); | 564 | assert_eq!(entry.occurrence_count, 2); |
| 412 | assert_eq!(entry.reason, "error 2"); // Updated to latest | 565 | assert_eq!(entry.reason, "error 2"); |
| 413 | } | 566 | } |
| 414 | 567 | ||
| 415 | #[test] | 568 | #[test] |
| 416 | fn test_is_naughty() { | 569 | fn test_tracker_expiration() { |
| 417 | let tracker = NaughtyListTracker::with_defaults(); | 570 | let tracker = NaughtyListTracker::new(0); // Expire immediately |
| 418 | let url = "wss://bad-relay.example.com"; | ||
| 419 | |||
| 420 | assert!(!tracker.is_naughty(url)); | ||
| 421 | 571 | ||
| 422 | tracker.record( | 572 | tracker.record( |
| 423 | url, | 573 | "wss://relay.example.com", |
| 424 | NaughtyCategory::TlsCertificateInvalid, | 574 | NaughtyCategory::DnsLookupFailed, |
| 425 | "cert error".to_string(), | 575 | "error".to_string(), |
| 426 | ); | 576 | ); |
| 427 | 577 | ||
| 428 | assert!(tracker.is_naughty(url)); | 578 | // Entry exists but is expired |
| 429 | } | 579 | assert!(!tracker.is_naughty("wss://relay.example.com")); |
| 430 | 580 | ||
| 431 | #[test] | 581 | std::thread::sleep(std::time::Duration::from_millis(10)); |
| 432 | fn test_get_all() { | ||
| 433 | let tracker = NaughtyListTracker::with_defaults(); | ||
| 434 | |||
| 435 | tracker.record( | ||
| 436 | "wss://relay1.example.com", | ||
| 437 | NaughtyCategory::DnsLookupFailed, | ||
| 438 | "dns error".to_string(), | ||
| 439 | ); | ||
| 440 | tracker.record( | ||
| 441 | "wss://relay2.example.com", | ||
| 442 | NaughtyCategory::TlsCertificateInvalid, | ||
| 443 | "tls error".to_string(), | ||
| 444 | ); | ||
| 445 | 582 | ||
| 446 | let all = tracker.get_all(); | 583 | let expired = tracker.expire_old_entries(); |
| 447 | assert_eq!(all.len(), 2); | 584 | assert_eq!(expired.len(), 1); |
| 585 | assert_eq!(tracker.total_count(), 0); | ||
| 448 | } | 586 | } |
| 449 | 587 | ||
| 450 | #[test] | 588 | #[test] |
| 451 | fn test_count_by_category() { | 589 | fn test_tracker_counts() { |
| 452 | let tracker = NaughtyListTracker::with_defaults(); | 590 | let tracker = NaughtyListTracker::with_defaults(); |
| 453 | 591 | ||
| 592 | tracker.record("wss://r1.com", NaughtyCategory::DnsLookupFailed, "e".into()); | ||
| 593 | tracker.record("wss://r2.com", NaughtyCategory::DnsLookupFailed, "e".into()); | ||
| 454 | tracker.record( | 594 | tracker.record( |
| 455 | "wss://relay1.example.com", | 595 | "wss://r3.com", |
| 456 | NaughtyCategory::DnsLookupFailed, | ||
| 457 | "error".to_string(), | ||
| 458 | ); | ||
| 459 | tracker.record( | ||
| 460 | "wss://relay2.example.com", | ||
| 461 | NaughtyCategory::DnsLookupFailed, | ||
| 462 | "error".to_string(), | ||
| 463 | ); | ||
| 464 | tracker.record( | ||
| 465 | "wss://relay3.example.com", | ||
| 466 | NaughtyCategory::TlsCertificateInvalid, | 596 | NaughtyCategory::TlsCertificateInvalid, |
| 467 | "error".to_string(), | 597 | "e".into(), |
| 468 | ); | 598 | ); |
| 469 | 599 | ||
| 600 | assert_eq!(tracker.total_count(), 3); | ||
| 470 | assert_eq!( | 601 | assert_eq!( |
| 471 | tracker.count_by_category(NaughtyCategory::DnsLookupFailed), | 602 | tracker.count_by_category(NaughtyCategory::DnsLookupFailed), |
| 472 | 2 | 603 | 2 |
| @@ -475,82 +606,84 @@ mod tests { | |||
| 475 | tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), | 606 | tracker.count_by_category(NaughtyCategory::TlsCertificateInvalid), |
| 476 | 1 | 607 | 1 |
| 477 | ); | 608 | ); |
| 478 | assert_eq!(tracker.count_by_category(NaughtyCategory::ProtocolError), 0); | 609 | assert_eq!(tracker.get_all().len(), 3); |
| 479 | } | 610 | } |
| 480 | 611 | ||
| 481 | #[test] | 612 | #[test] |
| 482 | fn test_total_count() { | 613 | fn test_category_display() { |
| 483 | let tracker = NaughtyListTracker::with_defaults(); | 614 | assert_eq!( |
| 484 | assert_eq!(tracker.total_count(), 0); | 615 | NaughtyCategory::DnsLookupFailed.as_str(), |
| 485 | 616 | "dns_lookup_failed" | |
| 486 | tracker.record( | ||
| 487 | "wss://relay1.example.com", | ||
| 488 | NaughtyCategory::DnsLookupFailed, | ||
| 489 | "error".to_string(), | ||
| 490 | ); | 617 | ); |
| 491 | assert_eq!(tracker.total_count(), 1); | 618 | assert_eq!( |
| 492 | 619 | NaughtyCategory::TlsCertificateInvalid.as_str(), | |
| 493 | tracker.record( | 620 | "tls_certificate_invalid" |
| 494 | "wss://relay2.example.com", | ||
| 495 | NaughtyCategory::TlsCertificateInvalid, | ||
| 496 | "error".to_string(), | ||
| 497 | ); | 621 | ); |
| 498 | assert_eq!(tracker.total_count(), 2); | 622 | assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); |
| 499 | } | 623 | } |
| 624 | } | ||
| 500 | 625 | ||
| 501 | #[test] | 626 | #[cfg(test)] |
| 502 | fn test_expire_old_entries() { | 627 | mod production_tests { |
| 503 | // Use very short expiration for testing | 628 | use super::*; |
| 504 | let tracker = NaughtyListTracker::new(0); // Expire immediately (0 hours) | ||
| 505 | |||
| 506 | tracker.record( | ||
| 507 | "wss://relay1.example.com", | ||
| 508 | NaughtyCategory::DnsLookupFailed, | ||
| 509 | "error".to_string(), | ||
| 510 | ); | ||
| 511 | |||
| 512 | // Entry should exist in the map | ||
| 513 | assert_eq!(tracker.total_count(), 1); | ||
| 514 | |||
| 515 | // But is_naughty should return false since it's already expired (0 hours) | ||
| 516 | assert!(!tracker.is_naughty("wss://relay1.example.com")); | ||
| 517 | |||
| 518 | // Sleep to ensure time passes | ||
| 519 | std::thread::sleep(std::time::Duration::from_millis(10)); | ||
| 520 | 629 | ||
| 521 | // Expire old entries (should remove the 0-hour expired entry) | 630 | /// Production case from relay.ngit.dev - remote warning should not be classified |
| 522 | let expired = tracker.expire_old_entries(); | 631 | #[test] |
| 523 | assert_eq!(expired.len(), 1); | 632 | fn test_classify_production_relay_ngit_dev_warning() { |
| 524 | assert_eq!(expired[0], "wss://relay1.example.com"); | 633 | let error = |
| 634 | "remote: warning: unable to access '/root/.config/git/attributes': Permission denied"; | ||
| 635 | assert_eq!(NaughtyListTracker::classify_error(error), None); | ||
| 636 | } | ||
| 525 | 637 | ||
| 526 | // Entry should be gone | 638 | /// Git protocol errors are transient, not persistent infrastructure issues |
| 527 | assert!(!tracker.is_naughty("wss://relay1.example.com")); | 639 | #[test] |
| 528 | assert_eq!(tracker.total_count(), 0); | 640 | fn test_git_protocol_errors_not_naughty() { |
| 641 | // These are common git protocol errors that should NOT be classified as naughty | ||
| 642 | let git_protocol_errors = [ | ||
| 643 | "fatal: protocol error: bad line length character: remo", | ||
| 644 | "fatal: protocol error: expected old/new/ref, got 'shallow", | ||
| 645 | "fatal: git upload-pack: protocol error", | ||
| 646 | "error: protocol error: bad pack header", | ||
| 647 | "fatal: protocol error: bad band #3", | ||
| 648 | ]; | ||
| 649 | |||
| 650 | for error in git_protocol_errors { | ||
| 651 | assert_eq!( | ||
| 652 | NaughtyListTracker::classify_error(error), | ||
| 653 | None, | ||
| 654 | "Git protocol error should not be classified as naughty: {}", | ||
| 655 | error | ||
| 656 | ); | ||
| 657 | } | ||
| 529 | } | 658 | } |
| 530 | 659 | ||
| 660 | /// Remote warning followed by git protocol error - both should be filtered/ignored | ||
| 531 | #[test] | 661 | #[test] |
| 532 | fn test_category_display() { | 662 | fn test_warning_with_git_protocol_error() { |
| 663 | let error = "remote: warning: unable to access '/root/.config/git/attributes': Permission denied\nfatal: protocol error: bad line length character: remo"; | ||
| 533 | assert_eq!( | 664 | assert_eq!( |
| 534 | NaughtyCategory::DnsLookupFailed.to_string(), | 665 | NaughtyListTracker::classify_error(error), |
| 535 | "dns_lookup_failed" | 666 | None, |
| 667 | "Warning + git protocol error should not be classified as naughty" | ||
| 536 | ); | 668 | ); |
| 537 | assert_eq!( | ||
| 538 | NaughtyCategory::TlsCertificateInvalid.to_string(), | ||
| 539 | "tls_certificate_invalid" | ||
| 540 | ); | ||
| 541 | assert_eq!(NaughtyCategory::ProtocolError.to_string(), "protocol_error"); | ||
| 542 | } | 669 | } |
| 543 | 670 | ||
| 671 | /// WebSocket protocol errors ARE naughty (persistent infrastructure issues) | ||
| 544 | #[test] | 672 | #[test] |
| 545 | fn test_category_as_str() { | 673 | fn test_websocket_errors_still_naughty() { |
| 546 | assert_eq!( | 674 | let websocket_errors = [ |
| 547 | NaughtyCategory::DnsLookupFailed.as_str(), | 675 | "websocket protocol error", |
| 548 | "dns_lookup_failed" | 676 | "websocket handshake failed", |
| 549 | ); | 677 | "invalid frame received", |
| 550 | assert_eq!( | 678 | ]; |
| 551 | NaughtyCategory::TlsCertificateInvalid.as_str(), | 679 | |
| 552 | "tls_certificate_invalid" | 680 | for error in websocket_errors { |
| 553 | ); | 681 | assert_eq!( |
| 554 | assert_eq!(NaughtyCategory::ProtocolError.as_str(), "protocol_error"); | 682 | NaughtyListTracker::classify_error(error), |
| 683 | Some(NaughtyCategory::ProtocolError), | ||
| 684 | "WebSocket error should be classified as protocol_error: {}", | ||
| 685 | error | ||
| 686 | ); | ||
| 687 | } | ||
| 555 | } | 688 | } |
| 556 | } | 689 | } |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..86e4583 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -98,6 +100,8 @@ pub struct SelfSubscriber { | |||
| 98 | action_tx: mpsc::Sender<AddFilters>, | 100 | action_tx: mpsc::Sender<AddFilters>, |
| 99 | /// Last time we connected - used for since filter on reconnect | 101 | /// Last time we connected - used for since filter on reconnect |
| 100 | last_connected: Option<Timestamp>, | 102 | last_connected: Option<Timestamp>, |
| 103 | /// Database for querying existing events on startup | ||
| 104 | database: SharedDatabase, | ||
| 101 | } | 105 | } |
| 102 | 106 | ||
| 103 | impl SelfSubscriber { | 107 | impl SelfSubscriber { |
| @@ -108,11 +112,13 @@ impl SelfSubscriber { | |||
| 108 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 112 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 109 | /// * `repo_sync_index` - Shared index to update with discovered repos | 113 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 110 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 114 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 115 | /// * `database` - Database for querying existing events on startup | ||
| 111 | pub fn new( | 116 | pub fn new( |
| 112 | own_relay_url: String, | 117 | own_relay_url: String, |
| 113 | relay_domain: String, | 118 | relay_domain: String, |
| 114 | repo_sync_index: RepoSyncIndex, | 119 | repo_sync_index: RepoSyncIndex, |
| 115 | action_tx: mpsc::Sender<AddFilters>, | 120 | action_tx: mpsc::Sender<AddFilters>, |
| 121 | database: SharedDatabase, | ||
| 116 | ) -> Self { | 122 | ) -> Self { |
| 117 | Self { | 123 | Self { |
| 118 | own_relay_url, | 124 | own_relay_url, |
| @@ -120,6 +126,7 @@ impl SelfSubscriber { | |||
| 120 | repo_sync_index, | 126 | repo_sync_index, |
| 121 | action_tx, | 127 | action_tx, |
| 122 | last_connected: None, | 128 | last_connected: None, |
| 129 | database, | ||
| 123 | } | 130 | } |
| 124 | } | 131 | } |
| 125 | 132 | ||
| @@ -135,6 +142,101 @@ impl SelfSubscriber { | |||
| 135 | .unwrap_or(Duration::from_millis(5000)) | 142 | .unwrap_or(Duration::from_millis(5000)) |
| 136 | } | 143 | } |
| 137 | 144 | ||
| 145 | /// Load existing events from database on startup | ||
| 146 | /// | ||
| 147 | /// Queries the database with two separate queries to build the initial | ||
| 148 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 149 | /// created, not just those returned by the WebSocket subscription | ||
| 150 | /// (which has limits on the number of events returned). | ||
| 151 | /// | ||
| 152 | /// Query order: | ||
| 153 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 154 | /// with repos and their relays | ||
| 155 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 156 | /// to add root event IDs for Layer 3 filter creation | ||
| 157 | /// | ||
| 158 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 159 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 160 | let mut pending = PendingUpdates::new(); | ||
| 161 | |||
| 162 | tracing::info!("Loading all events from database"); | ||
| 163 | |||
| 164 | // First query: Get all announcements to populate repo_sync_index | ||
| 165 | let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 166 | |||
| 167 | let announcements = match self.database.query(announcement_filter).await { | ||
| 168 | Ok(events) => { | ||
| 169 | tracing::info!(count = events.len(), "Loaded announcements from database"); | ||
| 170 | events | ||
| 171 | } | ||
| 172 | Err(e) => { | ||
| 173 | tracing::error!( | ||
| 174 | error = %e, | ||
| 175 | "Failed to query announcements from database" | ||
| 176 | ); | ||
| 177 | return pending; | ||
| 178 | } | ||
| 179 | }; | ||
| 180 | |||
| 181 | // Process announcements | ||
| 182 | let mut announcements_loaded = 0; | ||
| 183 | for event in announcements.iter() { | ||
| 184 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 185 | let relays = Self::extract_relay_urls(event); | ||
| 186 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 187 | announcements_loaded += 1; | ||
| 188 | } | ||
| 189 | } | ||
| 190 | |||
| 191 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 192 | { | ||
| 193 | let mut index = self.repo_sync_index.write().await; | ||
| 194 | for (repo_id, needs) in &pending.repos { | ||
| 195 | let entry = index | ||
| 196 | .entry(repo_id.clone()) | ||
| 197 | .or_insert_with(|| RepoSyncNeeds { | ||
| 198 | relays: HashSet::new(), | ||
| 199 | root_events: HashSet::new(), | ||
| 200 | }); | ||
| 201 | entry.relays.extend(needs.relays.clone()); | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | // Second query: Get all root events for handle_root_event() | ||
| 206 | let root_filter = | ||
| 207 | Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]); | ||
| 208 | |||
| 209 | let root_events = match self.database.query(root_filter).await { | ||
| 210 | Ok(events) => { | ||
| 211 | tracing::info!(count = events.len(), "Loaded root events from database"); | ||
| 212 | events | ||
| 213 | } | ||
| 214 | Err(e) => { | ||
| 215 | tracing::error!( | ||
| 216 | error = %e, | ||
| 217 | "Failed to query root events from database" | ||
| 218 | ); | ||
| 219 | // Continue with just announcements | ||
| 220 | return pending; | ||
| 221 | } | ||
| 222 | }; | ||
| 223 | |||
| 224 | // Process root events | ||
| 225 | let mut root_events_processed = 0; | ||
| 226 | for event in root_events.iter() { | ||
| 227 | self.handle_root_event(event, &mut pending).await; | ||
| 228 | root_events_processed += 1; | ||
| 229 | } | ||
| 230 | |||
| 231 | tracing::info!( | ||
| 232 | announcements_loaded = announcements_loaded, | ||
| 233 | root_events_processed = root_events_processed, | ||
| 234 | "Processed existing events from database" | ||
| 235 | ); | ||
| 236 | |||
| 237 | pending | ||
| 238 | } | ||
| 239 | |||
| 138 | /// Process a relay pool notification | 240 | /// Process a relay pool notification |
| 139 | /// | 241 | /// |
| 140 | /// Handles incoming events from the subscription, queueing 30617 announcements | 242 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -276,33 +378,22 @@ impl SelfSubscriber { | |||
| 276 | // Subscribe to announcement and root event kinds | 378 | // Subscribe to announcement and root event kinds |
| 277 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 379 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 278 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 380 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 279 | // Check if we have a last_connected time for reconnect filtering | 381 | let mut filter = Filter::new().kinds(vec![ |
| 280 | let filter = if let Some(last) = self.last_connected { | 382 | Kind::GitRepoAnnouncement, |
| 383 | Kind::GitPatch, | ||
| 384 | Kind::GitIssue, | ||
| 385 | Kind::GitPullRequest, | ||
| 386 | Kind::GitUserGraspList, | ||
| 387 | ]); | ||
| 388 | if let Some(timestamp) = self.last_connected { | ||
| 281 | // Quick reconnect - use since filter (15 min buffer) | 389 | // Quick reconnect - use since filter (15 min buffer) |
| 282 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 390 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 283 | tracing::debug!( | 391 | tracing::debug!( |
| 284 | since = %since, | 392 | since = %since, |
| 285 | "Using since filter for reconnect" | 393 | "Using since filter for reconnect" |
| 286 | ); | 394 | ); |
| 287 | Filter::new() | 395 | filter = filter.since(since); |
| 288 | .kinds(vec![ | 396 | } |
| 289 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 290 | Kind::GitPatch, // Patches | ||
| 291 | Kind::GitIssue, // Issues | ||
| 292 | Kind::GitPullRequest, // Pull Requests | ||
| 293 | Kind::GitUserGraspList, // User Grasp List | ||
| 294 | ]) | ||
| 295 | .since(since) | ||
| 296 | } else { | ||
| 297 | // First connection - no since filter | ||
| 298 | Filter::new().kinds(vec![ | ||
| 299 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 300 | Kind::GitPatch, // Patches | ||
| 301 | Kind::GitIssue, // Issues | ||
| 302 | Kind::GitPullRequest, // Pull Requests | ||
| 303 | Kind::GitUserGraspList, // User Grasp List | ||
| 304 | ]) | ||
| 305 | }; | ||
| 306 | 397 | ||
| 307 | // Update last_connected AFTER creating filter but BEFORE subscribing | 398 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 308 | self.last_connected = Some(Timestamp::now()); | 399 | self.last_connected = Some(Timestamp::now()); |
| @@ -323,7 +414,11 @@ impl SelfSubscriber { | |||
| 323 | 414 | ||
| 324 | let mut notifications = client.notifications(); | 415 | let mut notifications = client.notifications(); |
| 325 | let batch_window = Self::get_batch_window(); | 416 | let batch_window = Self::get_batch_window(); |
| 326 | let mut pending = PendingUpdates::new(); | 417 | |
| 418 | // Load existing events from database on startup | ||
| 419 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 420 | // returned by the WebSocket subscription (which has limits) | ||
| 421 | let mut pending = self.load_existing_events().await; | ||
| 327 | 422 | ||
| 328 | // Timer does NOT reset on new events - use interval | 423 | // Timer does NOT reset on new events - use interval |
| 329 | let mut timer = tokio::time::interval(batch_window); | 424 | let mut timer = tokio::time::interval(batch_window); |