diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 19:58:41 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 19:58:41 +0000 |
| commit | b28a356cb41077ccee12a9c52f4ef2054e76cac6 (patch) | |
| tree | 2a0867f1ab0216e86efa062aef90b2b8077e6fb9 | |
| parent | 6dd9fcd5392891b0ddb7894e2c5cb40450eae00e (diff) | |
chore: cargo fmt
| -rw-r--r-- | src/config.rs | 33 | ||||
| -rw-r--r-- | src/http/nip11.rs | 4 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 20 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 8 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 37 | ||||
| -rw-r--r-- | src/sync/mod.rs | 185 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 57 | ||||
| -rw-r--r-- | src/sync/relay_connection.rs | 30 | ||||
| -rw-r--r-- | tests/common/sync_helpers.rs | 2 | ||||
| -rw-r--r-- | tests/state_authorization.rs | 242 | ||||
| -rw-r--r-- | tests/sync/maintainer_reprocessing.rs | 311 |
11 files changed, 462 insertions, 467 deletions
diff --git a/src/config.rs b/src/config.rs index 2343c88..44001d8 100644 --- a/src/config.rs +++ b/src/config.rs | |||
| @@ -141,12 +141,20 @@ pub struct Config { | |||
| 141 | /// Stores full event objects for immediate re-processing when dependencies resolve. | 141 | /// Stores full event objects for immediate re-processing when dependencies resolve. |
| 142 | /// Too short (<30s): Miss events from slow relays | 142 | /// Too short (<30s): Miss events from slow relays |
| 143 | /// Too long (>5min): Waste memory | 143 | /// Too long (>5min): Waste memory |
| 144 | #[arg(long, env = "NGIT_REJECTED_HOT_CACHE_DURATION_SECS", default_value_t = 120)] | 144 | #[arg( |
| 145 | long, | ||
| 146 | env = "NGIT_REJECTED_HOT_CACHE_DURATION_SECS", | ||
| 147 | default_value_t = 120 | ||
| 148 | )] | ||
| 145 | pub rejected_hot_cache_duration_secs: u64, | 149 | pub rejected_hot_cache_duration_secs: u64, |
| 146 | 150 | ||
| 147 | /// Cold index expiry in seconds for rejected announcements (default: 604800 = 7 days) | 151 | /// Cold index expiry in seconds for rejected announcements (default: 604800 = 7 days) |
| 148 | /// Stores metadata only to prevent repeated downloads of rejected events. | 152 | /// Stores metadata only to prevent repeated downloads of rejected events. |
| 149 | #[arg(long, env = "NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS", default_value_t = 604800)] | 153 | #[arg( |
| 154 | long, | ||
| 155 | env = "NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS", | ||
| 156 | default_value_t = 604800 | ||
| 157 | )] | ||
| 150 | pub rejected_cold_index_expiry_secs: u64, | 158 | pub rejected_cold_index_expiry_secs: u64, |
| 151 | } | 159 | } |
| 152 | 160 | ||
| @@ -190,10 +198,7 @@ impl Config { | |||
| 190 | // Validate it's a valid nsec | 198 | // Validate it's a valid nsec |
| 191 | Keys::parse(&nsec).context("Invalid nsec in relay owner key file")?; | 199 | Keys::parse(&nsec).context("Invalid nsec in relay owner key file")?; |
| 192 | 200 | ||
| 193 | tracing::info!( | 201 | tracing::info!("Loaded relay owner key from {}", key_path.display()); |
| 194 | "Loaded relay owner key from {}", | ||
| 195 | key_path.display() | ||
| 196 | ); | ||
| 197 | return Ok(nsec); | 202 | return Ok(nsec); |
| 198 | } | 203 | } |
| 199 | 204 | ||
| @@ -202,8 +207,7 @@ impl Config { | |||
| 202 | let nsec = keys.secret_key().to_bech32()?; | 207 | let nsec = keys.secret_key().to_bech32()?; |
| 203 | 208 | ||
| 204 | // Save to file | 209 | // Save to file |
| 205 | fs::write(&key_path, &nsec) | 210 | fs::write(&key_path, &nsec).context("Failed to write relay owner key file")?; |
| 206 | .context("Failed to write relay owner key file")?; | ||
| 207 | 211 | ||
| 208 | tracing::info!( | 212 | tracing::info!( |
| 209 | "Generated new relay owner key and saved to {}", | 213 | "Generated new relay owner key and saved to {}", |
| @@ -215,7 +219,9 @@ impl Config { | |||
| 215 | 219 | ||
| 216 | /// Get the relay owner's Keys object | 220 | /// Get the relay owner's Keys object |
| 217 | pub fn relay_owner_keys(&self) -> Result<Keys> { | 221 | pub fn relay_owner_keys(&self) -> Result<Keys> { |
| 218 | let nsec = self.relay_owner_nsec.as_ref() | 222 | let nsec = self |
| 223 | .relay_owner_nsec | ||
| 224 | .as_ref() | ||
| 219 | .context("relay_owner_nsec not set (should be set by Config::load())")?; | 225 | .context("relay_owner_nsec not set (should be set by Config::load())")?; |
| 220 | Keys::parse(nsec).context("Invalid relay_owner_nsec") | 226 | Keys::parse(nsec).context("Invalid relay_owner_nsec") |
| 221 | } | 227 | } |
| @@ -251,8 +257,11 @@ impl Config { | |||
| 251 | pub fn for_testing() -> Self { | 257 | pub fn for_testing() -> Self { |
| 252 | // Generate a test key deterministically for consistent tests | 258 | // Generate a test key deterministically for consistent tests |
| 253 | let keys = Keys::generate(); | 259 | let keys = Keys::generate(); |
| 254 | let nsec = keys.secret_key().to_bech32().expect("Failed to generate test nsec"); | 260 | let nsec = keys |
| 255 | 261 | .secret_key() | |
| 262 | .to_bech32() | ||
| 263 | .expect("Failed to generate test nsec"); | ||
| 264 | |||
| 256 | Self { | 265 | Self { |
| 257 | domain: "localhost:8080".to_string(), | 266 | domain: "localhost:8080".to_string(), |
| 258 | relay_owner_nsec: Some(nsec), | 267 | relay_owner_nsec: Some(nsec), |
| @@ -348,7 +357,7 @@ mod tests { | |||
| 348 | let config = Config::for_testing(); | 357 | let config = Config::for_testing(); |
| 349 | let keys = config.relay_owner_keys().expect("Should have valid keys"); | 358 | let keys = config.relay_owner_keys().expect("Should have valid keys"); |
| 350 | let npub = config.relay_owner_npub().expect("Should derive npub"); | 359 | let npub = config.relay_owner_npub().expect("Should derive npub"); |
| 351 | 360 | ||
| 352 | // Verify the npub matches the keys | 361 | // Verify the npub matches the keys |
| 353 | assert_eq!(npub, keys.public_key().to_bech32().unwrap()); | 362 | assert_eq!(npub, keys.public_key().to_bech32().unwrap()); |
| 354 | assert!(npub.starts_with("npub1")); | 363 | assert!(npub.starts_with("npub1")); |
diff --git a/src/http/nip11.rs b/src/http/nip11.rs index cf31cf3..de714cb 100644 --- a/src/http/nip11.rs +++ b/src/http/nip11.rs | |||
| @@ -102,12 +102,12 @@ mod tests { | |||
| 102 | 102 | ||
| 103 | assert_eq!(doc.name, "Test Relay"); | 103 | assert_eq!(doc.name, "Test Relay"); |
| 104 | assert_eq!(doc.description, "A test relay"); | 104 | assert_eq!(doc.description, "A test relay"); |
| 105 | 105 | ||
| 106 | // Verify pubkey is present and is a valid npub | 106 | // Verify pubkey is present and is a valid npub |
| 107 | assert!(doc.pubkey.is_some()); | 107 | assert!(doc.pubkey.is_some()); |
| 108 | let pubkey = doc.pubkey.unwrap(); | 108 | let pubkey = doc.pubkey.unwrap(); |
| 109 | assert!(pubkey.starts_with("npub1")); | 109 | assert!(pubkey.starts_with("npub1")); |
| 110 | 110 | ||
| 111 | assert!(doc.supported_nips.contains(&1)); | 111 | assert!(doc.supported_nips.contains(&1)); |
| 112 | assert!(doc.supported_nips.contains(&11)); | 112 | assert!(doc.supported_nips.contains(&11)); |
| 113 | assert!(doc.supported_nips.contains(&34)); | 113 | assert!(doc.supported_nips.contains(&34)); |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index acaac71..c010854 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -102,11 +102,11 @@ impl Nip34WritePolicy { | |||
| 102 | } | 102 | } |
| 103 | 103 | ||
| 104 | tracing::debug!("Accepted repository announcement: {}", event_id_str); | 104 | tracing::debug!("Accepted repository announcement: {}", event_id_str); |
| 105 | 105 | ||
| 106 | // Check purgatory for state events that might now be authorized | 106 | // Check purgatory for state events that might now be authorized |
| 107 | self.check_purgatory_state_events_for_identifier(&announcement.identifier) | 107 | self.check_purgatory_state_events_for_identifier(&announcement.identifier) |
| 108 | .await; | 108 | .await; |
| 109 | 109 | ||
| 110 | WritePolicyResult::Accept | 110 | WritePolicyResult::Accept |
| 111 | } | 111 | } |
| 112 | Err(e) => { | 112 | Err(e) => { |
| @@ -130,11 +130,11 @@ impl Nip34WritePolicy { | |||
| 130 | announcement.identifier | 130 | announcement.identifier |
| 131 | ); | 131 | ); |
| 132 | // Don't create bare repository for external announcements | 132 | // Don't create bare repository for external announcements |
| 133 | 133 | ||
| 134 | // Check purgatory for state events that might now be authorized | 134 | // Check purgatory for state events that might now be authorized |
| 135 | self.check_purgatory_state_events_for_identifier(&announcement.identifier) | 135 | self.check_purgatory_state_events_for_identifier(&announcement.identifier) |
| 136 | .await; | 136 | .await; |
| 137 | 137 | ||
| 138 | WritePolicyResult::Accept | 138 | WritePolicyResult::Accept |
| 139 | } | 139 | } |
| 140 | Err(e) => { | 140 | Err(e) => { |
| @@ -324,20 +324,24 @@ impl Nip34WritePolicy { | |||
| 324 | /// 4. Keeps unauthorized events in purgatory (will expire naturally) | 324 | /// 4. Keeps unauthorized events in purgatory (will expire naturally) |
| 325 | async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) { | 325 | async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) { |
| 326 | let state_events = self.ctx.purgatory.find_state(identifier); | 326 | let state_events = self.ctx.purgatory.find_state(identifier); |
| 327 | 327 | ||
| 328 | if state_events.is_empty() { | 328 | if state_events.is_empty() { |
| 329 | return; | 329 | return; |
| 330 | } | 330 | } |
| 331 | 331 | ||
| 332 | tracing::debug!( | 332 | tracing::debug!( |
| 333 | identifier = %identifier, | 333 | identifier = %identifier, |
| 334 | count = state_events.len(), | 334 | count = state_events.len(), |
| 335 | "Checking purgatory state events after announcement acceptance" | 335 | "Checking purgatory state events after announcement acceptance" |
| 336 | ); | 336 | ); |
| 337 | 337 | ||
| 338 | for entry in state_events { | 338 | for entry in state_events { |
| 339 | // Re-evaluate authorization with the new announcement | 339 | // Re-evaluate authorization with the new announcement |
| 340 | match self.state_policy.process_state_event(&entry.event, false).await { | 340 | match self |
| 341 | .state_policy | ||
| 342 | .process_state_event(&entry.event, false) | ||
| 343 | .await | ||
| 344 | { | ||
| 341 | Ok(WritePolicyResult::Accept) => { | 345 | Ok(WritePolicyResult::Accept) => { |
| 342 | tracing::info!( | 346 | tracing::info!( |
| 343 | event_id = %entry.event.id, | 347 | event_id = %entry.event.id, |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index d26b5ec..b850e7b 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -93,9 +93,11 @@ impl StatePolicy { | |||
| 93 | }); | 93 | }); |
| 94 | } | 94 | } |
| 95 | 95 | ||
| 96 | let authorized_owners = | 96 | let authorized_owners = crate::git::authorization::pubkey_authorised_for_repo_owners( |
| 97 | crate::git::authorization::pubkey_authorised_for_repo_owners(&event.pubkey, &db_repo_data); | 97 | &event.pubkey, |
| 98 | 98 | &db_repo_data, | |
| 99 | ); | ||
| 100 | |||
| 99 | if authorized_owners.is_empty() { | 101 | if authorized_owners.is_empty() { |
| 100 | tracing::warn!( | 102 | tracing::warn!( |
| 101 | event_id = %event.id, | 103 | event_id = %event.id, |
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index 7d6d42d..2ed983e 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -156,19 +156,25 @@ impl SyncMetrics { | |||
| 156 | "ngit_sync_rejected_announcements_hot_cache_hits_total", | 156 | "ngit_sync_rejected_announcements_hot_cache_hits_total", |
| 157 | "Total hot cache hits (events re-processed from cache)", | 157 | "Total hot cache hits (events re-processed from cache)", |
| 158 | ))?; | 158 | ))?; |
| 159 | registry.register(Box::new(rejected_announcements_hot_cache_hits_total.clone()))?; | 159 | registry.register(Box::new( |
| 160 | rejected_announcements_hot_cache_hits_total.clone(), | ||
| 161 | ))?; | ||
| 160 | 162 | ||
| 161 | let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( | 163 | let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( |
| 162 | "ngit_sync_rejected_announcements_hot_cache_misses_total", | 164 | "ngit_sync_rejected_announcements_hot_cache_misses_total", |
| 163 | "Total hot cache misses (events not in cache when invalidated)", | 165 | "Total hot cache misses (events not in cache when invalidated)", |
| 164 | ))?; | 166 | ))?; |
| 165 | registry.register(Box::new(rejected_announcements_hot_cache_misses_total.clone()))?; | 167 | registry.register(Box::new( |
| 168 | rejected_announcements_hot_cache_misses_total.clone(), | ||
| 169 | ))?; | ||
| 166 | 170 | ||
| 167 | let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( | 171 | let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( |
| 168 | "ngit_sync_rejected_announcements_hot_cache_expired_total", | 172 | "ngit_sync_rejected_announcements_hot_cache_expired_total", |
| 169 | "Total expired entries removed from hot cache", | 173 | "Total expired entries removed from hot cache", |
| 170 | ))?; | 174 | ))?; |
| 171 | registry.register(Box::new(rejected_announcements_hot_cache_expired_total.clone()))?; | 175 | registry.register(Box::new( |
| 176 | rejected_announcements_hot_cache_expired_total.clone(), | ||
| 177 | ))?; | ||
| 172 | 178 | ||
| 173 | let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( | 179 | let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( |
| 174 | "ngit_sync_rejected_announcements_cold_index_current", | 180 | "ngit_sync_rejected_announcements_cold_index_current", |
| @@ -180,7 +186,9 @@ impl SyncMetrics { | |||
| 180 | "ngit_sync_rejected_announcements_cold_index_expired_total", | 186 | "ngit_sync_rejected_announcements_cold_index_expired_total", |
| 181 | "Total expired entries removed from cold index", | 187 | "Total expired entries removed from cold index", |
| 182 | ))?; | 188 | ))?; |
| 183 | registry.register(Box::new(rejected_announcements_cold_index_expired_total.clone()))?; | 189 | registry.register(Box::new( |
| 190 | rejected_announcements_cold_index_expired_total.clone(), | ||
| 191 | ))?; | ||
| 184 | 192 | ||
| 185 | let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( | 193 | let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( |
| 186 | "ngit_sync_rejected_announcements_invalidated_total", | 194 | "ngit_sync_rejected_announcements_invalidated_total", |
| @@ -430,7 +438,8 @@ impl SyncMetrics { | |||
| 430 | 438 | ||
| 431 | /// Update hot cache current size gauge. | 439 | /// Update hot cache current size gauge. |
| 432 | pub fn update_hot_cache_size(&self, size: usize) { | 440 | pub fn update_hot_cache_size(&self, size: usize) { |
| 433 | self.rejected_announcements_hot_cache_current.set(size as i64); | 441 | self.rejected_announcements_hot_cache_current |
| 442 | .set(size as i64); | ||
| 434 | } | 443 | } |
| 435 | 444 | ||
| 436 | /// Record hot cache hit (event re-processed from cache). | 445 | /// Record hot cache hit (event re-processed from cache). |
| @@ -445,22 +454,26 @@ impl SyncMetrics { | |||
| 445 | 454 | ||
| 446 | /// Record hot cache expired entries. | 455 | /// Record hot cache expired entries. |
| 447 | pub fn record_hot_cache_expired(&self, count: usize) { | 456 | pub fn record_hot_cache_expired(&self, count: usize) { |
| 448 | self.rejected_announcements_hot_cache_expired_total.inc_by(count as u64); | 457 | self.rejected_announcements_hot_cache_expired_total |
| 458 | .inc_by(count as u64); | ||
| 449 | } | 459 | } |
| 450 | 460 | ||
| 451 | /// Update cold index current size gauge. | 461 | /// Update cold index current size gauge. |
| 452 | pub fn update_cold_index_size(&self, size: usize) { | 462 | pub fn update_cold_index_size(&self, size: usize) { |
| 453 | self.rejected_announcements_cold_index_current.set(size as i64); | 463 | self.rejected_announcements_cold_index_current |
| 464 | .set(size as i64); | ||
| 454 | } | 465 | } |
| 455 | 466 | ||
| 456 | /// Record cold index expired entries. | 467 | /// Record cold index expired entries. |
| 457 | pub fn record_cold_index_expired(&self, count: usize) { | 468 | pub fn record_cold_index_expired(&self, count: usize) { |
| 458 | self.rejected_announcements_cold_index_expired_total.inc_by(count as u64); | 469 | self.rejected_announcements_cold_index_expired_total |
| 470 | .inc_by(count as u64); | ||
| 459 | } | 471 | } |
| 460 | 472 | ||
| 461 | /// Record invalidation (maintainer announcement invalidated). | 473 | /// Record invalidation (maintainer announcement invalidated). |
| 462 | pub fn record_invalidation(&self, count: usize) { | 474 | pub fn record_invalidation(&self, count: usize) { |
| 463 | self.rejected_announcements_invalidated_total.inc_by(count as u64); | 475 | self.rejected_announcements_invalidated_total |
| 476 | .inc_by(count as u64); | ||
| 464 | } | 477 | } |
| 465 | 478 | ||
| 466 | // === Rejected States Recording Methods === | 479 | // === Rejected States Recording Methods === |
| @@ -482,7 +495,8 @@ impl SyncMetrics { | |||
| 482 | 495 | ||
| 483 | /// Record state event hot cache expired entries. | 496 | /// Record state event hot cache expired entries. |
| 484 | pub fn record_states_hot_cache_expired(&self, count: usize) { | 497 | pub fn record_states_hot_cache_expired(&self, count: usize) { |
| 485 | self.rejected_states_hot_cache_expired_total.inc_by(count as u64); | 498 | self.rejected_states_hot_cache_expired_total |
| 499 | .inc_by(count as u64); | ||
| 486 | } | 500 | } |
| 487 | 501 | ||
| 488 | /// Update state events cold index current size gauge. | 502 | /// Update state events cold index current size gauge. |
| @@ -492,7 +506,8 @@ impl SyncMetrics { | |||
| 492 | 506 | ||
| 493 | /// Record state event cold index expired entries. | 507 | /// Record state event cold index expired entries. |
| 494 | pub fn record_states_cold_index_expired(&self, count: usize) { | 508 | pub fn record_states_cold_index_expired(&self, count: usize) { |
| 495 | self.rejected_states_cold_index_expired_total.inc_by(count as u64); | 509 | self.rejected_states_cold_index_expired_total |
| 510 | .inc_by(count as u64); | ||
| 496 | } | 511 | } |
| 497 | 512 | ||
| 498 | /// Record state event invalidation. | 513 | /// Record state event invalidation. |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ed3b78c..07527c7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; | |||
| 27 | pub use metrics::SyncMetrics; | 27 | pub use metrics::SyncMetrics; |
| 28 | 28 | ||
| 29 | // Re-export rejected index types | 29 | // Re-export rejected index types |
| 30 | pub use rejected_index::{RejectionReason}; | 30 | pub use rejected_index::RejectionReason; |
| 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used | 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used |
| 32 | // Current code still uses the simple HashSet type alias below | 32 | // Current code still uses the simple HashSet type alias below |
| 33 | 33 | ||
| @@ -73,7 +73,7 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | |||
| 73 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. | 73 | /// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. |
| 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing | 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing |
| 75 | /// to avoid repeatedly fetching and rejecting the same events. | 75 | /// to avoid repeatedly fetching and rejecting the same events. |
| 76 | /// | 76 | /// |
| 77 | /// Uses the two-tier RejectedEventsIndex from rejected_index.rs: | 77 | /// Uses the two-tier RejectedEventsIndex from rejected_index.rs: |
| 78 | /// - Hot cache: Full events for 2 minutes (enables immediate re-processing) | 78 | /// - Hot cache: Full events for 2 minutes (enables immediate re-processing) |
| 79 | /// - Cold index: Metadata for 7 days (prevents repeated downloads) | 79 | /// - Cold index: Metadata for 7 days (prevents repeated downloads) |
| @@ -113,7 +113,9 @@ impl ConnectionStatus { | |||
| 113 | pub fn is_live_sync_active(&self) -> bool { | 113 | pub fn is_live_sync_active(&self) -> bool { |
| 114 | matches!( | 114 | matches!( |
| 115 | self, | 115 | self, |
| 116 | ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedHistoricSyncFailures | 116 | ConnectionStatus::Syncing |
| 117 | | ConnectionStatus::Connected | ||
| 118 | | ConnectionStatus::ConnectedHistoricSyncFailures | ||
| 117 | ) | 119 | ) |
| 118 | } | 120 | } |
| 119 | } | 121 | } |
| @@ -384,9 +386,7 @@ async fn run_rejected_index_cleanup( | |||
| 384 | let hot_cache_interval = Duration::from_secs(60); | 386 | let hot_cache_interval = Duration::from_secs(60); |
| 385 | let cold_index_interval = Duration::from_secs(86400); // 24 hours | 387 | let cold_index_interval = Duration::from_secs(86400); // 24 hours |
| 386 | 388 | ||
| 387 | tracing::info!( | 389 | tracing::info!("Rejected index cleanup started (hot cache: 60s, cold index: daily)"); |
| 388 | "Rejected index cleanup started (hot cache: 60s, cold index: daily)" | ||
| 389 | ); | ||
| 390 | 390 | ||
| 391 | let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); | 391 | let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); |
| 392 | let mut cold_index_timer = tokio::time::interval(cold_index_interval); | 392 | let mut cold_index_timer = tokio::time::interval(cold_index_interval); |
| @@ -399,7 +399,7 @@ async fn run_rejected_index_cleanup( | |||
| 399 | tokio::select! { | 399 | tokio::select! { |
| 400 | _ = hot_cache_timer.tick() => { | 400 | _ = hot_cache_timer.tick() => { |
| 401 | let manager = sync_manager.lock().await; | 401 | let manager = sync_manager.lock().await; |
| 402 | 402 | ||
| 403 | // Clean up announcements index | 403 | // Clean up announcements index |
| 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); | 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); |
| 405 | if hot_expired > 0 { | 405 | if hot_expired > 0 { |
| @@ -408,7 +408,7 @@ async fn run_rejected_index_cleanup( | |||
| 408 | hot_expired | 408 | hot_expired |
| 409 | ); | 409 | ); |
| 410 | } | 410 | } |
| 411 | 411 | ||
| 412 | // Clean up states index | 412 | // Clean up states index |
| 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); | 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); |
| 414 | if states_hot_expired > 0 { | 414 | if states_hot_expired > 0 { |
| @@ -420,7 +420,7 @@ async fn run_rejected_index_cleanup( | |||
| 420 | } | 420 | } |
| 421 | _ = cold_index_timer.tick() => { | 421 | _ = cold_index_timer.tick() => { |
| 422 | let manager = sync_manager.lock().await; | 422 | let manager = sync_manager.lock().await; |
| 423 | 423 | ||
| 424 | // Clean up announcements index | 424 | // Clean up announcements index |
| 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); | 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); |
| 426 | if cold_expired > 0 { | 426 | if cold_expired > 0 { |
| @@ -429,7 +429,7 @@ async fn run_rejected_index_cleanup( | |||
| 429 | cold_expired | 429 | cold_expired |
| 430 | ); | 430 | ); |
| 431 | } | 431 | } |
| 432 | 432 | ||
| 433 | // Clean up states index | 433 | // Clean up states index |
| 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); | 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); |
| 435 | if states_cold_expired > 0 { | 435 | if states_cold_expired > 0 { |
| @@ -799,8 +799,7 @@ impl SyncManager { | |||
| 799 | if let (Some(requested), Some(received)) = | 799 | if let (Some(requested), Some(received)) = |
| 800 | (&batch.requested_event_ids, &batch.received_event_ids) | 800 | (&batch.requested_event_ids, &batch.received_event_ids) |
| 801 | { | 801 | { |
| 802 | let missing: Vec<EventId> = | 802 | let missing: Vec<EventId> = requested.difference(received).cloned().collect(); |
| 803 | requested.difference(received).cloned().collect(); | ||
| 804 | 803 | ||
| 805 | if !missing.is_empty() { | 804 | if !missing.is_empty() { |
| 806 | let requested_count = requested.len(); | 805 | let requested_count = requested.len(); |
| @@ -884,13 +883,11 @@ impl SyncManager { | |||
| 884 | // Re-acquire lock and update batch with new subscriptions | 883 | // Re-acquire lock and update batch with new subscriptions |
| 885 | let mut pending = self.pending_sync_index.write().await; | 884 | let mut pending = self.pending_sync_index.write().await; |
| 886 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 885 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 887 | if let Some(batch) = | 886 | if let Some(batch) = batches.iter_mut().find(|b| b.batch_id == batch_id) |
| 888 | batches.iter_mut().find(|b| b.batch_id == batch_id) | ||
| 889 | { | 887 | { |
| 890 | batch.outstanding_subs.extend(new_sub_ids.clone()); | 888 | batch.outstanding_subs.extend(new_sub_ids.clone()); |
| 891 | // Update requested_event_ids to only include missing ones | 889 | // Update requested_event_ids to only include missing ones |
| 892 | batch.requested_event_ids = | 890 | batch.requested_event_ids = Some(missing.iter().cloned().collect()); |
| 893 | Some(missing.iter().cloned().collect()); | ||
| 894 | // Clear received_event_ids for fresh tracking | 891 | // Clear received_event_ids for fresh tracking |
| 895 | batch.received_event_ids = Some(HashSet::new()); | 892 | batch.received_event_ids = Some(HashSet::new()); |
| 896 | // Increment retry counter | 893 | // Increment retry counter |
| @@ -921,14 +918,14 @@ impl SyncManager { | |||
| 921 | // Re-acquire lock to extract the batch | 918 | // Re-acquire lock to extract the batch |
| 922 | let mut pending = self.pending_sync_index.write().await; | 919 | let mut pending = self.pending_sync_index.write().await; |
| 923 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 920 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 924 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) | 921 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { |
| 925 | { | ||
| 926 | let completed_batch = batches.remove(idx); | 922 | let completed_batch = batches.remove(idx); |
| 927 | if batches.is_empty() { | 923 | if batches.is_empty() { |
| 928 | pending.remove(&relay_url_for_retry); | 924 | pending.remove(&relay_url_for_retry); |
| 929 | } | 925 | } |
| 930 | drop(pending); | 926 | drop(pending); |
| 931 | self.confirm_batch(&relay_url_for_retry, completed_batch).await; | 927 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 928 | .await; | ||
| 932 | } | 929 | } |
| 933 | } | 930 | } |
| 934 | return; | 931 | return; |
| @@ -1023,17 +1020,17 @@ impl SyncManager { | |||
| 1023 | "Batch completed but no RelayState found for relay" | 1020 | "Batch completed but no RelayState found for relay" |
| 1024 | ); | 1021 | ); |
| 1025 | } | 1022 | } |
| 1026 | 1023 | ||
| 1027 | // Release lock before checking if historic sync is complete | 1024 | // Release lock before checking if historic sync is complete |
| 1028 | drop(relay_index); | 1025 | drop(relay_index); |
| 1029 | 1026 | ||
| 1030 | // Spawn background task to check if historic sync is complete | 1027 | // Spawn background task to check if historic sync is complete |
| 1031 | // This avoids blocking the confirm_batch flow for 6 seconds | 1028 | // This avoids blocking the confirm_batch flow for 6 seconds |
| 1032 | let relay_url = relay_url.to_string(); | 1029 | let relay_url = relay_url.to_string(); |
| 1033 | let pending_index = self.pending_sync_index.clone(); | 1030 | let pending_index = self.pending_sync_index.clone(); |
| 1034 | let relay_index = self.relay_sync_index.clone(); | 1031 | let relay_index = self.relay_sync_index.clone(); |
| 1035 | let metrics = self.metrics.clone(); | 1032 | let metrics = self.metrics.clone(); |
| 1036 | 1033 | ||
| 1037 | tokio::spawn(async move { | 1034 | tokio::spawn(async move { |
| 1038 | Self::check_and_complete_historic_sync_impl( | 1035 | Self::check_and_complete_historic_sync_impl( |
| 1039 | &relay_url, | 1036 | &relay_url, |
| @@ -1073,29 +1070,33 @@ impl SyncManager { | |||
| 1073 | // First check: Are there any pending batches? | 1070 | // First check: Are there any pending batches? |
| 1074 | let has_pending = { | 1071 | let has_pending = { |
| 1075 | let pending = pending_index.read().await; | 1072 | let pending = pending_index.read().await; |
| 1076 | pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) | 1073 | pending |
| 1074 | .get(relay_url) | ||
| 1075 | .is_some_and(|batches| !batches.is_empty()) | ||
| 1077 | }; | 1076 | }; |
| 1078 | 1077 | ||
| 1079 | if has_pending { | 1078 | if has_pending { |
| 1080 | // Still syncing, don't transition yet | 1079 | // Still syncing, don't transition yet |
| 1081 | return; | 1080 | return; |
| 1082 | } | 1081 | } |
| 1083 | 1082 | ||
| 1084 | // Wait for self-subscriber batch window + buffer to catch any in-flight events | 1083 | // Wait for self-subscriber batch window + buffer to catch any in-flight events |
| 1085 | // that might create new Layer 2/3 filters | 1084 | // that might create new Layer 2/3 filters |
| 1086 | tokio::time::sleep(Duration::from_millis(6000)).await; | 1085 | tokio::time::sleep(Duration::from_millis(6000)).await; |
| 1087 | 1086 | ||
| 1088 | // Second check: Are there still no pending batches? | 1087 | // Second check: Are there still no pending batches? |
| 1089 | let has_pending = { | 1088 | let has_pending = { |
| 1090 | let pending = pending_index.read().await; | 1089 | let pending = pending_index.read().await; |
| 1091 | pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) | 1090 | pending |
| 1091 | .get(relay_url) | ||
| 1092 | .is_some_and(|batches| !batches.is_empty()) | ||
| 1092 | }; | 1093 | }; |
| 1093 | 1094 | ||
| 1094 | if has_pending { | 1095 | if has_pending { |
| 1095 | // New batches appeared during the wait - still syncing | 1096 | // New batches appeared during the wait - still syncing |
| 1096 | return; | 1097 | return; |
| 1097 | } | 1098 | } |
| 1098 | 1099 | ||
| 1099 | // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded | 1100 | // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded |
| 1100 | let mut relay_index_guard = relay_index.write().await; | 1101 | let mut relay_index_guard = relay_index.write().await; |
| 1101 | if let Some(state) = relay_index_guard.get_mut(relay_url) { | 1102 | if let Some(state) = relay_index_guard.get_mut(relay_url) { |
| @@ -1106,11 +1107,11 @@ impl SyncManager { | |||
| 1106 | } else { | 1107 | } else { |
| 1107 | ConnectionStatus::Connected | 1108 | ConnectionStatus::Connected |
| 1108 | }; | 1109 | }; |
| 1109 | 1110 | ||
| 1110 | state.connection_status = new_status; | 1111 | state.connection_status = new_status; |
| 1111 | state.historic_sync_completed = true; | 1112 | state.historic_sync_completed = true; |
| 1112 | state.historic_sync_completed_at = Some(Timestamp::now()); | 1113 | state.historic_sync_completed_at = Some(Timestamp::now()); |
| 1113 | 1114 | ||
| 1114 | tracing::info!( | 1115 | tracing::info!( |
| 1115 | relay = %relay_url, | 1116 | relay = %relay_url, |
| 1116 | repos_synced = state.repos.len(), | 1117 | repos_synced = state.repos.len(), |
| @@ -1120,7 +1121,7 @@ impl SyncManager { | |||
| 1120 | "Historic sync complete - transitioned to {} status", | 1121 | "Historic sync complete - transitioned to {} status", |
| 1121 | if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } | 1122 | if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } |
| 1122 | ); | 1123 | ); |
| 1123 | 1124 | ||
| 1124 | // Update metrics | 1125 | // Update metrics |
| 1125 | if let Some(ref metrics) = metrics { | 1126 | if let Some(ref metrics) = metrics { |
| 1126 | metrics.record_connection_status(relay_url, new_status); | 1127 | metrics.record_connection_status(relay_url, new_status); |
| @@ -1362,8 +1363,8 @@ impl SyncManager { | |||
| 1362 | ); | 1363 | ); |
| 1363 | return; | 1364 | return; |
| 1364 | } | 1365 | } |
| 1365 | Some(ConnectionStatus::Syncing) | 1366 | Some(ConnectionStatus::Syncing) |
| 1366 | | Some(ConnectionStatus::Connected) | 1367 | | Some(ConnectionStatus::Connected) |
| 1367 | | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { | 1368 | | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { |
| 1368 | // Continue to subscribe - live sync is active, can accept new filters | 1369 | // Continue to subscribe - live sync is active, can accept new filters |
| 1369 | } | 1370 | } |
| @@ -1468,7 +1469,8 @@ impl SyncManager { | |||
| 1468 | match relay_event { | 1469 | match relay_event { |
| 1469 | RelayEvent::Event(event, subscription_id) => { | 1470 | RelayEvent::Event(event, subscription_id) => { |
| 1470 | // Skip events we've already rejected (announcements only) | 1471 | // Skip events we've already rejected (announcements only) |
| 1471 | if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) | 1472 | if (event.kind == Kind::GitRepoAnnouncement |
| 1473 | || event.kind == Kind::RepoState) | ||
| 1472 | && rejected_events_index.contains(&event.id) | 1474 | && rejected_events_index.contains(&event.id) |
| 1473 | { | 1475 | { |
| 1474 | tracing::trace!( | 1476 | tracing::trace!( |
| @@ -1479,7 +1481,7 @@ impl SyncManager { | |||
| 1479 | ); | 1481 | ); |
| 1480 | continue; | 1482 | continue; |
| 1481 | } | 1483 | } |
| 1482 | 1484 | ||
| 1483 | let result = Self::process_event_static( | 1485 | let result = Self::process_event_static( |
| 1484 | &event, | 1486 | &event, |
| 1485 | &relay_url_clone, | 1487 | &relay_url_clone, |
| @@ -1863,11 +1865,16 @@ impl SyncManager { | |||
| 1863 | // Create RelayConnection if not exists | 1865 | // Create RelayConnection if not exists |
| 1864 | if !self.connections.contains_key(&relay_url) { | 1866 | if !self.connections.contains_key(&relay_url) { |
| 1865 | // Get relay owner keys for NIP-42 authentication | 1867 | // Get relay owner keys for NIP-42 authentication |
| 1866 | let keys = self.config.relay_owner_keys() | 1868 | let keys = self |
| 1869 | .config | ||
| 1870 | .relay_owner_keys() | ||
| 1867 | .expect("relay_owner_keys should be available"); | 1871 | .expect("relay_owner_keys should be available"); |
| 1868 | 1872 | ||
| 1869 | let connection = | 1873 | let connection = RelayConnection::new_with_database( |
| 1870 | RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database), keys); | 1874 | relay_url.clone(), |
| 1875 | Arc::clone(&self.database), | ||
| 1876 | keys, | ||
| 1877 | ); | ||
| 1871 | self.connections.insert(relay_url.clone(), connection); | 1878 | self.connections.insert(relay_url.clone(), connection); |
| 1872 | tracing::debug!(relay = %relay_url, "Registered new relay connection"); | 1879 | tracing::debug!(relay = %relay_url, "Registered new relay connection"); |
| 1873 | } | 1880 | } |
| @@ -1919,7 +1926,7 @@ impl SyncManager { | |||
| 1919 | state.connection_status = ConnectionStatus::Connecting; | 1926 | state.connection_status = ConnectionStatus::Connecting; |
| 1920 | } | 1927 | } |
| 1921 | } | 1928 | } |
| 1922 | 1929 | ||
| 1923 | // Update metrics to show connecting status | 1930 | // Update metrics to show connecting status |
| 1924 | if let Some(ref metrics) = self.metrics { | 1931 | if let Some(ref metrics) = self.metrics { |
| 1925 | metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); | 1932 | metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); |
| @@ -1974,7 +1981,8 @@ impl SyncManager { | |||
| 1974 | if let Some(ref metrics) = self.metrics { | 1981 | if let Some(ref metrics) = self.metrics { |
| 1975 | metrics.record_connection_attempt(relay_url, false); | 1982 | metrics.record_connection_attempt(relay_url, false); |
| 1976 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); | 1983 | metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); |
| 1977 | metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | 1984 | metrics |
| 1985 | .record_health_state(relay_url, self.health_tracker.get_state(relay_url)); | ||
| 1978 | } | 1986 | } |
| 1979 | } | 1987 | } |
| 1980 | } | 1988 | } |
| @@ -2172,7 +2180,7 @@ impl SyncManager { | |||
| 2172 | // immediately and should now pass validation. | 2180 | // immediately and should now pass validation. |
| 2173 | if event.kind == Kind::GitRepoAnnouncement { | 2181 | if event.kind == Kind::GitRepoAnnouncement { |
| 2174 | use crate::nostr::events::RepositoryAnnouncement; | 2182 | use crate::nostr::events::RepositoryAnnouncement; |
| 2175 | 2183 | ||
| 2176 | match RepositoryAnnouncement::from_event(event.clone()) { | 2184 | match RepositoryAnnouncement::from_event(event.clone()) { |
| 2177 | Ok(announcement) => { | 2185 | Ok(announcement) => { |
| 2178 | if !announcement.maintainers.is_empty() { | 2186 | if !announcement.maintainers.is_empty() { |
| @@ -2219,15 +2227,16 @@ impl SyncManager { | |||
| 2219 | // 2. Second attempt uses maintainer exception (different code path) | 2227 | // 2. Second attempt uses maintainer exception (different code path) |
| 2220 | // 3. If second attempt fails, stays in cold index only (no third attempt) | 2228 | // 3. If second attempt fails, stays in cold index only (no third attempt) |
| 2221 | // Use Box::pin to avoid infinitely sized future | 2229 | // Use Box::pin to avoid infinitely sized future |
| 2222 | let reprocess_result = Box::pin(Self::process_event_static( | 2230 | let reprocess_result = |
| 2223 | &maintainer_event, | 2231 | Box::pin(Self::process_event_static( |
| 2224 | relay_url, | 2232 | &maintainer_event, |
| 2225 | database, | 2233 | relay_url, |
| 2226 | write_policy, | 2234 | database, |
| 2227 | local_relay, | 2235 | write_policy, |
| 2228 | rejected_events_index, | 2236 | local_relay, |
| 2229 | )) | 2237 | rejected_events_index, |
| 2230 | .await; | 2238 | )) |
| 2239 | .await; | ||
| 2231 | 2240 | ||
| 2232 | match reprocess_result { | 2241 | match reprocess_result { |
| 2233 | ProcessResult::Saved => { | 2242 | ProcessResult::Saved => { |
| @@ -2275,7 +2284,7 @@ impl SyncManager { | |||
| 2275 | ); | 2284 | ); |
| 2276 | } | 2285 | } |
| 2277 | } | 2286 | } |
| 2278 | 2287 | ||
| 2279 | // When a repository announcement is accepted, re-process any state events | 2288 | // When a repository announcement is accepted, re-process any state events |
| 2280 | // that were previously rejected because no announcement existed. | 2289 | // that were previously rejected because no announcement existed. |
| 2281 | // This handles the race condition where state events arrive before their | 2290 | // This handles the race condition where state events arrive before their |
| @@ -2284,7 +2293,10 @@ impl SyncManager { | |||
| 2284 | Ok(announcement) => { | 2293 | Ok(announcement) => { |
| 2285 | // Get the announcement author's state events that were rejected | 2294 | // Get the announcement author's state events that were rejected |
| 2286 | let (removed, hot_events) = rejected_events_index | 2295 | let (removed, hot_events) = rejected_events_index |
| 2287 | .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); | 2296 | .invalidate_and_get_state_events( |
| 2297 | &event.pubkey, | ||
| 2298 | &announcement.identifier, | ||
| 2299 | ); | ||
| 2288 | 2300 | ||
| 2289 | if removed > 0 { | 2301 | if removed > 0 { |
| 2290 | tracing::info!( | 2302 | tracing::info!( |
| @@ -2357,7 +2369,7 @@ impl SyncManager { | |||
| 2357 | } | 2369 | } |
| 2358 | } | 2370 | } |
| 2359 | } | 2371 | } |
| 2360 | 2372 | ||
| 2361 | // When a state event is accepted (git data arrived), re-process any other | 2373 | // When a state event is accepted (git data arrived), re-process any other |
| 2362 | // rejected state events for the same repository. This handles the case where | 2374 | // rejected state events for the same repository. This handles the case where |
| 2363 | // multiple state events arrive but only one has git data initially. | 2375 | // multiple state events arrive but only one has git data initially. |
| @@ -2454,7 +2466,7 @@ impl SyncManager { | |||
| 2454 | reason = %message, | 2466 | reason = %message, |
| 2455 | "Event rejected by write policy" | 2467 | "Event rejected by write policy" |
| 2456 | ); | 2468 | ); |
| 2457 | 2469 | ||
| 2458 | // Track rejected announcement and state events to avoid re-fetching them | 2470 | // Track rejected announcement and state events to avoid re-fetching them |
| 2459 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | 2471 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { |
| 2460 | // Extract identifier from 'd' tag | 2472 | // Extract identifier from 'd' tag |
| @@ -2465,12 +2477,14 @@ impl SyncManager { | |||
| 2465 | .and_then(|t| t.content()) | 2477 | .and_then(|t| t.content()) |
| 2466 | { | 2478 | { |
| 2467 | // Determine rejection reason based on message | 2479 | // Determine rejection reason based on message |
| 2468 | let reason = if message.contains("doesn't list this service") | 2480 | let reason = if message.contains("doesn't list this service") |
| 2469 | || message.contains("Announcement must list service") { | 2481 | || message.contains("Announcement must list service") |
| 2482 | { | ||
| 2470 | rejected_index::RejectionReason::DoesNotListService | 2483 | rejected_index::RejectionReason::DoesNotListService |
| 2471 | } else if message.contains("maintainer") | 2484 | } else if message.contains("maintainer") |
| 2472 | || message.contains("no announcement exists") | 2485 | || message.contains("no announcement exists") |
| 2473 | || message.contains("not authorized") { | 2486 | || message.contains("not authorized") |
| 2487 | { | ||
| 2474 | rejected_index::RejectionReason::MaintainerNotYetValid | 2488 | rejected_index::RejectionReason::MaintainerNotYetValid |
| 2475 | } else { | 2489 | } else { |
| 2476 | rejected_index::RejectionReason::Other | 2490 | rejected_index::RejectionReason::Other |
| @@ -2512,7 +2526,7 @@ impl SyncManager { | |||
| 2512 | ); | 2526 | ); |
| 2513 | } | 2527 | } |
| 2514 | } | 2528 | } |
| 2515 | 2529 | ||
| 2516 | ProcessResult::Rejected | 2530 | ProcessResult::Rejected |
| 2517 | } | 2531 | } |
| 2518 | } | 2532 | } |
| @@ -3042,7 +3056,8 @@ impl SyncManager { | |||
| 3042 | // Get event IDs to exclude: purgatory + rejected announcements | 3056 | // Get event IDs to exclude: purgatory + rejected announcements |
| 3043 | let purgatory_ids = self.purgatory.event_ids(); | 3057 | let purgatory_ids = self.purgatory.event_ids(); |
| 3044 | let rejected_ids = self.rejected_events_index.get_all_event_ids(); | 3058 | let rejected_ids = self.rejected_events_index.get_all_event_ids(); |
| 3045 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); | 3059 | let excluded_ids: HashSet<EventId> = |
| 3060 | purgatory_ids.union(&rejected_ids).cloned().collect(); | ||
| 3046 | 3061 | ||
| 3047 | for (idx, result) in diff_results { | 3062 | for (idx, result) in diff_results { |
| 3048 | match result { | 3063 | match result { |
| @@ -3166,8 +3181,7 @@ impl SyncManager { | |||
| 3166 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { | 3181 | if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { |
| 3167 | batch.outstanding_subs.extend(subscription_ids.clone()); | 3182 | batch.outstanding_subs.extend(subscription_ids.clone()); |
| 3168 | // Store requested event IDs for validation after EOSE | 3183 | // Store requested event IDs for validation after EOSE |
| 3169 | batch.requested_event_ids = | 3184 | batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect()); |
| 3170 | Some(all_remote_ids.iter().cloned().collect()); | ||
| 3171 | batch.received_event_ids = Some(HashSet::new()); | 3185 | batch.received_event_ids = Some(HashSet::new()); |
| 3172 | } | 3186 | } |
| 3173 | } | 3187 | } |
| @@ -3359,20 +3373,16 @@ mod tests { | |||
| 3359 | async fn test_rejected_events_excluded_from_negentropy() { | 3373 | async fn test_rejected_events_excluded_from_negentropy() { |
| 3360 | // Create indices | 3374 | // Create indices |
| 3361 | let purgatory_ids: HashSet<EventId> = HashSet::new(); | 3375 | let purgatory_ids: HashSet<EventId> = HashSet::new(); |
| 3362 | let rejected_index = RejectedEventsIndex::new( | 3376 | let rejected_index = |
| 3363 | Duration::from_secs(120), | 3377 | RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 3364 | Duration::from_secs(604800), | ||
| 3365 | ); | ||
| 3366 | 3378 | ||
| 3367 | // Create test event IDs | 3379 | // Create test event IDs |
| 3368 | let _rejected_id = EventId::from_hex( | 3380 | let _rejected_id = |
| 3369 | "0000000000000000000000000000000000000000000000000000000000000001", | 3381 | EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001") |
| 3370 | ) | 3382 | .unwrap(); |
| 3371 | .unwrap(); | 3383 | let valid_id = |
| 3372 | let valid_id = EventId::from_hex( | 3384 | EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000002") |
| 3373 | "0000000000000000000000000000000000000000000000000000000000000002", | 3385 | .unwrap(); |
| 3374 | ) | ||
| 3375 | .unwrap(); | ||
| 3376 | 3386 | ||
| 3377 | // Add rejected event to index | 3387 | // Add rejected event to index |
| 3378 | let keys = Keys::generate(); | 3388 | let keys = Keys::generate(); |
| @@ -3383,7 +3393,7 @@ mod tests { | |||
| 3383 | )) | 3393 | )) |
| 3384 | .sign_with_keys(&keys) | 3394 | .sign_with_keys(&keys) |
| 3385 | .unwrap(); | 3395 | .unwrap(); |
| 3386 | 3396 | ||
| 3387 | // Override the event ID for testing (we need a specific ID) | 3397 | // Override the event ID for testing (we need a specific ID) |
| 3388 | // Since we can't override the ID, let's use the actual event ID | 3398 | // Since we can't override the ID, let's use the actual event ID |
| 3389 | let rejected_id = rejected_event.id; | 3399 | let rejected_id = rejected_event.id; |
| @@ -3403,8 +3413,7 @@ mod tests { | |||
| 3403 | remote_ids.insert(valid_id); | 3413 | remote_ids.insert(valid_id); |
| 3404 | 3414 | ||
| 3405 | // Exclude rejected and purgatory events | 3415 | // Exclude rejected and purgatory events |
| 3406 | let excluded_ids: HashSet<EventId> = | 3416 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); |
| 3407 | purgatory_ids.union(&rejected_ids).cloned().collect(); | ||
| 3408 | let filtered_ids: HashSet<EventId> = | 3417 | let filtered_ids: HashSet<EventId> = |
| 3409 | remote_ids.difference(&excluded_ids).cloned().collect(); | 3418 | remote_ids.difference(&excluded_ids).cloned().collect(); |
| 3410 | 3419 | ||
| @@ -3422,22 +3431,14 @@ mod tests { | |||
| 3422 | // Requested 5 events from negentropy diff | 3431 | // Requested 5 events from negentropy diff |
| 3423 | let mut requested: HashSet<EventId> = HashSet::new(); | 3432 | let mut requested: HashSet<EventId> = HashSet::new(); |
| 3424 | for i in 1u8..=5 { | 3433 | for i in 1u8..=5 { |
| 3425 | let id = EventId::from_hex(&format!( | 3434 | let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); |
| 3426 | "{:0>64}", | ||
| 3427 | format!("{:x}", i) | ||
| 3428 | )) | ||
| 3429 | .unwrap(); | ||
| 3430 | requested.insert(id); | 3435 | requested.insert(id); |
| 3431 | } | 3436 | } |
| 3432 | 3437 | ||
| 3433 | // Only received 3 events (simulating relay limit) | 3438 | // Only received 3 events (simulating relay limit) |
| 3434 | let mut received: HashSet<EventId> = HashSet::new(); | 3439 | let mut received: HashSet<EventId> = HashSet::new(); |
| 3435 | for i in 1u8..=3 { | 3440 | for i in 1u8..=3 { |
| 3436 | let id = EventId::from_hex(&format!( | 3441 | let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); |
| 3437 | "{:0>64}", | ||
| 3438 | format!("{:x}", i) | ||
| 3439 | )) | ||
| 3440 | .unwrap(); | ||
| 3441 | received.insert(id); | 3442 | received.insert(id); |
| 3442 | } | 3443 | } |
| 3443 | 3444 | ||
| @@ -3461,11 +3462,7 @@ mod tests { | |||
| 3461 | // Simulate scenario where all requested events are received | 3462 | // Simulate scenario where all requested events are received |
| 3462 | let mut requested: HashSet<EventId> = HashSet::new(); | 3463 | let mut requested: HashSet<EventId> = HashSet::new(); |
| 3463 | for i in 1u8..=3 { | 3464 | for i in 1u8..=3 { |
| 3464 | let id = EventId::from_hex(&format!( | 3465 | let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap(); |
| 3465 | "{:0>64}", | ||
| 3466 | format!("{:x}", i) | ||
| 3467 | )) | ||
| 3468 | .unwrap(); | ||
| 3469 | requested.insert(id); | 3466 | requested.insert(id); |
| 3470 | } | 3467 | } |
| 3471 | 3468 | ||
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index a9d7a4d..403792a 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -190,9 +190,7 @@ impl HotCache { | |||
| 190 | let now = Instant::now(); | 190 | let now = Instant::now(); |
| 191 | let initial_count = entries.len(); | 191 | let initial_count = entries.len(); |
| 192 | 192 | ||
| 193 | entries.retain(|_, entry| { | 193 | entries.retain(|_, entry| now.duration_since(entry.cached_at) < self.expiry_duration); |
| 194 | now.duration_since(entry.cached_at) < self.expiry_duration | ||
| 195 | }); | ||
| 196 | 194 | ||
| 197 | initial_count - entries.len() | 195 | initial_count - entries.len() |
| 198 | } | 196 | } |
| @@ -284,9 +282,7 @@ impl ColdIndex { | |||
| 284 | let now = Instant::now(); | 282 | let now = Instant::now(); |
| 285 | let initial_count = entries.len(); | 283 | let initial_count = entries.len(); |
| 286 | 284 | ||
| 287 | entries.retain(|_, entry| { | 285 | entries.retain(|_, entry| now.duration_since(entry.rejected_at) < self.expiry_duration); |
| 288 | now.duration_since(entry.rejected_at) < self.expiry_duration | ||
| 289 | }); | ||
| 290 | 286 | ||
| 291 | initial_count - entries.len() | 287 | initial_count - entries.len() |
| 292 | } | 288 | } |
| @@ -389,12 +385,8 @@ impl RejectedEventsIndex { | |||
| 389 | reason: RejectionReason, | 385 | reason: RejectionReason, |
| 390 | ) { | 386 | ) { |
| 391 | // Add to hot cache (full event) | 387 | // Add to hot cache (full event) |
| 392 | self.hot_cache.add( | 388 | self.hot_cache |
| 393 | event.clone(), | 389 | .add(event.clone(), pubkey, identifier.clone(), reason); |
| 394 | pubkey, | ||
| 395 | identifier.clone(), | ||
| 396 | reason, | ||
| 397 | ); | ||
| 398 | 390 | ||
| 399 | // Add to cold index (metadata only) | 391 | // Add to cold index (metadata only) |
| 400 | self.cold_index.add(event.id, pubkey, identifier, reason); | 392 | self.cold_index.add(event.id, pubkey, identifier, reason); |
| @@ -419,12 +411,8 @@ impl RejectedEventsIndex { | |||
| 419 | reason: RejectionReason, | 411 | reason: RejectionReason, |
| 420 | ) { | 412 | ) { |
| 421 | // Add to hot cache (full event) | 413 | // Add to hot cache (full event) |
| 422 | self.hot_cache.add( | 414 | self.hot_cache |
| 423 | event.clone(), | 415 | .add(event.clone(), pubkey, identifier.clone(), reason); |
| 424 | pubkey, | ||
| 425 | identifier.clone(), | ||
| 426 | reason, | ||
| 427 | ); | ||
| 428 | 416 | ||
| 429 | // Add to cold index (metadata only) | 417 | // Add to cold index (metadata only) |
| 430 | self.cold_index.add(event.id, pubkey, identifier, reason); | 418 | self.cold_index.add(event.id, pubkey, identifier, reason); |
| @@ -608,8 +596,7 @@ mod tests { | |||
| 608 | 596 | ||
| 609 | async fn create_test_event() -> Event { | 597 | async fn create_test_event() -> Event { |
| 610 | let keys = Keys::generate(); | 598 | let keys = Keys::generate(); |
| 611 | let unsigned = nostr_sdk::EventBuilder::text_note("test") | 599 | let unsigned = nostr_sdk::EventBuilder::text_note("test").build(keys.public_key()); |
| 612 | .build(keys.public_key()); | ||
| 613 | keys.sign_event(unsigned).await.unwrap() | 600 | keys.sign_event(unsigned).await.unwrap() |
| 614 | } | 601 | } |
| 615 | 602 | ||
| @@ -695,10 +682,7 @@ mod tests { | |||
| 695 | 682 | ||
| 696 | #[tokio::test] | 683 | #[tokio::test] |
| 697 | async fn test_two_tier_index_add_and_contains() { | 684 | async fn test_two_tier_index_add_and_contains() { |
| 698 | let index = RejectedEventsIndex::new( | 685 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 699 | Duration::from_secs(120), | ||
| 700 | Duration::from_secs(604800), | ||
| 701 | ); | ||
| 702 | let event = create_test_event().await; | 686 | let event = create_test_event().await; |
| 703 | 687 | ||
| 704 | index.add_announcement( | 688 | index.add_announcement( |
| @@ -715,10 +699,7 @@ mod tests { | |||
| 715 | 699 | ||
| 716 | #[tokio::test] | 700 | #[tokio::test] |
| 717 | async fn test_invalidate_and_get_events() { | 701 | async fn test_invalidate_and_get_events() { |
| 718 | let index = RejectedEventsIndex::new( | 702 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 719 | Duration::from_secs(120), | ||
| 720 | Duration::from_secs(604800), | ||
| 721 | ); | ||
| 722 | let event = create_test_event().await; | 703 | let event = create_test_event().await; |
| 723 | let pubkey = event.pubkey; | 704 | let pubkey = event.pubkey; |
| 724 | let identifier = "test-repo".to_string(); | 705 | let identifier = "test-repo".to_string(); |
| @@ -773,10 +754,8 @@ mod tests { | |||
| 773 | 754 | ||
| 774 | #[tokio::test] | 755 | #[tokio::test] |
| 775 | async fn test_hot_cache_miss_after_expiry() { | 756 | async fn test_hot_cache_miss_after_expiry() { |
| 776 | let index = RejectedEventsIndex::new( | 757 | let index = |
| 777 | Duration::from_millis(50), | 758 | RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800)); |
| 778 | Duration::from_secs(604800), | ||
| 779 | ); | ||
| 780 | let event = create_test_event().await; | 759 | let event = create_test_event().await; |
| 781 | let pubkey = event.pubkey; | 760 | let pubkey = event.pubkey; |
| 782 | let identifier = "test-repo".to_string(); | 761 | let identifier = "test-repo".to_string(); |
| @@ -801,20 +780,15 @@ mod tests { | |||
| 801 | 780 | ||
| 802 | #[tokio::test] | 781 | #[tokio::test] |
| 803 | async fn test_multiple_maintainer_repos() { | 782 | async fn test_multiple_maintainer_repos() { |
| 804 | let index = RejectedEventsIndex::new( | 783 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 805 | Duration::from_secs(120), | ||
| 806 | Duration::from_secs(604800), | ||
| 807 | ); | ||
| 808 | 784 | ||
| 809 | let keys1 = Keys::generate(); | 785 | let keys1 = Keys::generate(); |
| 810 | let keys2 = Keys::generate(); | 786 | let keys2 = Keys::generate(); |
| 811 | 787 | ||
| 812 | let unsigned1 = nostr_sdk::EventBuilder::text_note("test1") | 788 | let unsigned1 = nostr_sdk::EventBuilder::text_note("test1").build(keys1.public_key()); |
| 813 | .build(keys1.public_key()); | ||
| 814 | let event1 = keys1.sign_event(unsigned1).await.unwrap(); | 789 | let event1 = keys1.sign_event(unsigned1).await.unwrap(); |
| 815 | 790 | ||
| 816 | let unsigned2 = nostr_sdk::EventBuilder::text_note("test2") | 791 | let unsigned2 = nostr_sdk::EventBuilder::text_note("test2").build(keys2.public_key()); |
| 817 | .build(keys2.public_key()); | ||
| 818 | let event2 = keys2.sign_event(unsigned2).await.unwrap(); | 792 | let event2 = keys2.sign_event(unsigned2).await.unwrap(); |
| 819 | 793 | ||
| 820 | // Add two different maintainer repos | 794 | // Add two different maintainer repos |
| @@ -836,8 +810,7 @@ mod tests { | |||
| 836 | assert_eq!(index.cold_index_len(), 2); | 810 | assert_eq!(index.cold_index_len(), 2); |
| 837 | 811 | ||
| 838 | // Invalidate only first maintainer | 812 | // Invalidate only first maintainer |
| 839 | let (removed, hot_events) = | 813 | let (removed, hot_events) = index.invalidate_and_get_events(&event1.pubkey, "repo1"); |
| 840 | index.invalidate_and_get_events(&event1.pubkey, "repo1"); | ||
| 841 | 814 | ||
| 842 | assert_eq!(removed, 1); | 815 | assert_eq!(removed, 1); |
| 843 | assert_eq!(hot_events.len(), 1); | 816 | assert_eq!(hot_events.len(), 1); |
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs index d0090c8..b86d298 100644 --- a/src/sync/relay_connection.rs +++ b/src/sync/relay_connection.rs | |||
| @@ -526,31 +526,46 @@ mod tests { | |||
| 526 | #[test] | 526 | #[test] |
| 527 | fn test_normalize_url_with_wss_scheme() { | 527 | fn test_normalize_url_with_wss_scheme() { |
| 528 | let url = "wss://relay.example.com"; | 528 | let url = "wss://relay.example.com"; |
| 529 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); | 529 | assert_eq!( |
| 530 | RelayConnection::normalize_url(url), | ||
| 531 | "wss://relay.example.com" | ||
| 532 | ); | ||
| 530 | } | 533 | } |
| 531 | 534 | ||
| 532 | #[test] | 535 | #[test] |
| 533 | fn test_normalize_url_with_ws_scheme() { | 536 | fn test_normalize_url_with_ws_scheme() { |
| 534 | let url = "ws://relay.example.com"; | 537 | let url = "ws://relay.example.com"; |
| 535 | assert_eq!(RelayConnection::normalize_url(url), "ws://relay.example.com"); | 538 | assert_eq!( |
| 539 | RelayConnection::normalize_url(url), | ||
| 540 | "ws://relay.example.com" | ||
| 541 | ); | ||
| 536 | } | 542 | } |
| 537 | 543 | ||
| 538 | #[test] | 544 | #[test] |
| 539 | fn test_normalize_url_without_scheme() { | 545 | fn test_normalize_url_without_scheme() { |
| 540 | let url = "relay.example.com"; | 546 | let url = "relay.example.com"; |
| 541 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); | 547 | assert_eq!( |
| 548 | RelayConnection::normalize_url(url), | ||
| 549 | "wss://relay.example.com" | ||
| 550 | ); | ||
| 542 | } | 551 | } |
| 543 | 552 | ||
| 544 | #[test] | 553 | #[test] |
| 545 | fn test_normalize_url_without_scheme_with_port() { | 554 | fn test_normalize_url_without_scheme_with_port() { |
| 546 | let url = "relay.example.com:8080"; | 555 | let url = "relay.example.com:8080"; |
| 547 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com:8080"); | 556 | assert_eq!( |
| 557 | RelayConnection::normalize_url(url), | ||
| 558 | "wss://relay.example.com:8080" | ||
| 559 | ); | ||
| 548 | } | 560 | } |
| 549 | 561 | ||
| 550 | #[test] | 562 | #[test] |
| 551 | fn test_normalize_url_with_path() { | 563 | fn test_normalize_url_with_path() { |
| 552 | let url = "relay.example.com/nostr"; | 564 | let url = "relay.example.com/nostr"; |
| 553 | assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com/nostr"); | 565 | assert_eq!( |
| 566 | RelayConnection::normalize_url(url), | ||
| 567 | "wss://relay.example.com/nostr" | ||
| 568 | ); | ||
| 554 | } | 569 | } |
| 555 | 570 | ||
| 556 | #[test] | 571 | #[test] |
| @@ -587,6 +602,9 @@ mod tests { | |||
| 587 | fn test_normalize_url_real_world_example() { | 602 | fn test_normalize_url_real_world_example() { |
| 588 | // Test the exact case from the bug report | 603 | // Test the exact case from the bug report |
| 589 | let url = "git.shakespeare.diy"; | 604 | let url = "git.shakespeare.diy"; |
| 590 | assert_eq!(RelayConnection::normalize_url(url), "wss://git.shakespeare.diy"); | 605 | assert_eq!( |
| 606 | RelayConnection::normalize_url(url), | ||
| 607 | "wss://git.shakespeare.diy" | ||
| 608 | ); | ||
| 591 | } | 609 | } |
| 592 | } | 610 | } |
diff --git a/tests/common/sync_helpers.rs b/tests/common/sync_helpers.rs index d6a6ee4..5fc2ad7 100644 --- a/tests/common/sync_helpers.rs +++ b/tests/common/sync_helpers.rs | |||
| @@ -708,7 +708,7 @@ impl ParsedMetrics { | |||
| 708 | /// Check if a specific relay is connected | 708 | /// Check if a specific relay is connected |
| 709 | pub fn relay_connected(&self, relay: &str) -> Option<bool> { | 709 | pub fn relay_connected(&self, relay: &str) -> Option<bool> { |
| 710 | self.gauge("ngit_sync_relay_connected", &[("relay", relay)]) | 710 | self.gauge("ngit_sync_relay_connected", &[("relay", relay)]) |
| 711 | .map(|v| v >= 2) // Syncing (2), Connected (3), or ConnectedHistoricSyncFailures (4) | 711 | .map(|v| v >= 2) // Syncing (2), Connected (3), or ConnectedHistoricSyncFailures (4) |
| 712 | } | 712 | } |
| 713 | 713 | ||
| 714 | /// Get total number of connected relays | 714 | /// Get total number of connected relays |
diff --git a/tests/state_authorization.rs b/tests/state_authorization.rs index a5dfa2d..d443005 100644 --- a/tests/state_authorization.rs +++ b/tests/state_authorization.rs | |||
| @@ -13,30 +13,27 @@ use nostr_sdk::prelude::*; | |||
| 13 | async fn test_reject_state_without_announcement() { | 13 | async fn test_reject_state_without_announcement() { |
| 14 | // Start test relay | 14 | // Start test relay |
| 15 | let relay = TestRelay::start().await; | 15 | let relay = TestRelay::start().await; |
| 16 | 16 | ||
| 17 | // Create test keypair | 17 | // Create test keypair |
| 18 | let keys = Keys::generate(); | 18 | let keys = Keys::generate(); |
| 19 | 19 | ||
| 20 | // Create a state event without any announcement | 20 | // Create a state event without any announcement |
| 21 | let state_event = EventBuilder::new( | 21 | let state_event = EventBuilder::new(Kind::RepoState, "") |
| 22 | Kind::RepoState, | 22 | .tags([ |
| 23 | "", | 23 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 24 | ) | 24 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), |
| 25 | .tags([ | 25 | ]) |
| 26 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 26 | .sign_with_keys(&keys) |
| 27 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), | 27 | .unwrap(); |
| 28 | ]) | 28 | |
| 29 | .sign_with_keys(&keys) | ||
| 30 | .unwrap(); | ||
| 31 | |||
| 32 | // Connect to relay | 29 | // Connect to relay |
| 33 | let client = Client::default(); | 30 | let client = Client::default(); |
| 34 | client.add_relay(relay.url()).await.unwrap(); | 31 | client.add_relay(relay.url()).await.unwrap(); |
| 35 | client.connect().await; | 32 | client.connect().await; |
| 36 | 33 | ||
| 37 | // Try to send state event | 34 | // Try to send state event |
| 38 | let result = client.send_event(&state_event).await; | 35 | let result = client.send_event(&state_event).await; |
| 39 | 36 | ||
| 40 | // Should be rejected | 37 | // Should be rejected |
| 41 | match result { | 38 | match result { |
| 42 | Ok(output) => { | 39 | Ok(output) => { |
| @@ -45,22 +42,26 @@ async fn test_reject_state_without_announcement() { | |||
| 45 | "Event should be processed" | 42 | "Event should be processed" |
| 46 | ); | 43 | ); |
| 47 | // Check if any relay rejected it | 44 | // Check if any relay rejected it |
| 48 | let rejected = output.failed.values().any(|err| { | 45 | let rejected = output |
| 49 | err.to_string().contains("no announcement exists") | 46 | .failed |
| 50 | }); | 47 | .values() |
| 51 | assert!(rejected, "Event should be rejected due to missing announcement"); | 48 | .any(|err| err.to_string().contains("no announcement exists")); |
| 49 | assert!( | ||
| 50 | rejected, | ||
| 51 | "Event should be rejected due to missing announcement" | ||
| 52 | ); | ||
| 52 | } | 53 | } |
| 53 | Err(e) => { | 54 | Err(e) => { |
| 54 | // Also acceptable - relay rejected the event | 55 | // Also acceptable - relay rejected the event |
| 55 | assert!( | 56 | assert!( |
| 56 | e.to_string().contains("no announcement exists") || | 57 | e.to_string().contains("no announcement exists") |
| 57 | e.to_string().contains("rejected"), | 58 | || e.to_string().contains("rejected"), |
| 58 | "Error should indicate missing announcement: {}", | 59 | "Error should indicate missing announcement: {}", |
| 59 | e | 60 | e |
| 60 | ); | 61 | ); |
| 61 | } | 62 | } |
| 62 | } | 63 | } |
| 63 | 64 | ||
| 64 | relay.stop().await; | 65 | relay.stop().await; |
| 65 | } | 66 | } |
| 66 | 67 | ||
| @@ -68,67 +69,67 @@ async fn test_reject_state_without_announcement() { | |||
| 68 | async fn test_reject_state_from_unauthorized_author() { | 69 | async fn test_reject_state_from_unauthorized_author() { |
| 69 | // Start test relay | 70 | // Start test relay |
| 70 | let relay = TestRelay::start().await; | 71 | let relay = TestRelay::start().await; |
| 71 | 72 | ||
| 72 | // Create two keypairs: one for announcement, one for unauthorized state | 73 | // Create two keypairs: one for announcement, one for unauthorized state |
| 73 | let announcement_keys = Keys::generate(); | 74 | let announcement_keys = Keys::generate(); |
| 74 | let unauthorized_keys = Keys::generate(); | 75 | let unauthorized_keys = Keys::generate(); |
| 75 | 76 | ||
| 76 | // Create announcement | 77 | // Create announcement |
| 77 | let announcement = EventBuilder::new( | 78 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "") |
| 78 | Kind::GitRepoAnnouncement, | 79 | .tags([ |
| 79 | "", | 80 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 80 | ) | 81 | Tag::custom( |
| 81 | .tags([ | 82 | TagKind::custom("clone"), |
| 82 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 83 | [format!("https://{}/test.git", relay.domain())], |
| 83 | Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]), | 84 | ), |
| 84 | Tag::custom(TagKind::custom("relays"), [relay.url()]), | 85 | Tag::custom(TagKind::custom("relays"), [relay.url()]), |
| 85 | ]) | 86 | ]) |
| 86 | .sign_with_keys(&announcement_keys) | 87 | .sign_with_keys(&announcement_keys) |
| 87 | .unwrap(); | 88 | .unwrap(); |
| 88 | 89 | ||
| 89 | // Connect to relay | 90 | // Connect to relay |
| 90 | let client = Client::default(); | 91 | let client = Client::default(); |
| 91 | client.add_relay(relay.url()).await.unwrap(); | 92 | client.add_relay(relay.url()).await.unwrap(); |
| 92 | client.connect().await; | 93 | client.connect().await; |
| 93 | 94 | ||
| 94 | // Send announcement | 95 | // Send announcement |
| 95 | client.send_event(&announcement).await.unwrap(); | 96 | client.send_event(&announcement).await.unwrap(); |
| 96 | 97 | ||
| 97 | // Wait for announcement to be processed | 98 | // Wait for announcement to be processed |
| 98 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; | 99 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; |
| 99 | 100 | ||
| 100 | // Try to send state event from unauthorized author | 101 | // Try to send state event from unauthorized author |
| 101 | let state_event = EventBuilder::new( | 102 | let state_event = EventBuilder::new(Kind::RepoState, "") |
| 102 | Kind::RepoState, | 103 | .tags([ |
| 103 | "", | 104 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 104 | ) | 105 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), |
| 105 | .tags([ | 106 | ]) |
| 106 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 107 | .sign_with_keys(&unauthorized_keys) |
| 107 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), | 108 | .unwrap(); |
| 108 | ]) | 109 | |
| 109 | .sign_with_keys(&unauthorized_keys) | ||
| 110 | .unwrap(); | ||
| 111 | |||
| 112 | let result = client.send_event(&state_event).await; | 110 | let result = client.send_event(&state_event).await; |
| 113 | 111 | ||
| 114 | // Should be rejected | 112 | // Should be rejected |
| 115 | match result { | 113 | match result { |
| 116 | Ok(output) => { | 114 | Ok(output) => { |
| 117 | let rejected = output.failed.values().any(|err| { | 115 | let rejected = output |
| 118 | err.to_string().contains("not authorized") | 116 | .failed |
| 119 | }); | 117 | .values() |
| 120 | assert!(rejected, "Event should be rejected due to unauthorized author"); | 118 | .any(|err| err.to_string().contains("not authorized")); |
| 119 | assert!( | ||
| 120 | rejected, | ||
| 121 | "Event should be rejected due to unauthorized author" | ||
| 122 | ); | ||
| 121 | } | 123 | } |
| 122 | Err(e) => { | 124 | Err(e) => { |
| 123 | assert!( | 125 | assert!( |
| 124 | e.to_string().contains("not authorized") || | 126 | e.to_string().contains("not authorized") || e.to_string().contains("rejected"), |
| 125 | e.to_string().contains("rejected"), | ||
| 126 | "Error should indicate unauthorized author: {}", | 127 | "Error should indicate unauthorized author: {}", |
| 127 | e | 128 | e |
| 128 | ); | 129 | ); |
| 129 | } | 130 | } |
| 130 | } | 131 | } |
| 131 | 132 | ||
| 132 | relay.stop().await; | 133 | relay.stop().await; |
| 133 | } | 134 | } |
| 134 | 135 | ||
| @@ -136,48 +137,45 @@ async fn test_reject_state_from_unauthorized_author() { | |||
| 136 | async fn test_accept_state_from_announcement_author() { | 137 | async fn test_accept_state_from_announcement_author() { |
| 137 | // Start test relay | 138 | // Start test relay |
| 138 | let relay = TestRelay::start().await; | 139 | let relay = TestRelay::start().await; |
| 139 | 140 | ||
| 140 | // Create keypair | 141 | // Create keypair |
| 141 | let keys = Keys::generate(); | 142 | let keys = Keys::generate(); |
| 142 | 143 | ||
| 143 | // Create announcement | 144 | // Create announcement |
| 144 | let announcement = EventBuilder::new( | 145 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "") |
| 145 | Kind::GitRepoAnnouncement, | 146 | .tags([ |
| 146 | "", | 147 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 147 | ) | 148 | Tag::custom( |
| 148 | .tags([ | 149 | TagKind::custom("clone"), |
| 149 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 150 | [format!("https://{}/test.git", relay.domain())], |
| 150 | Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]), | 151 | ), |
| 151 | Tag::custom(TagKind::custom("relays"), [relay.url()]), | 152 | Tag::custom(TagKind::custom("relays"), [relay.url()]), |
| 152 | ]) | 153 | ]) |
| 153 | .sign_with_keys(&keys) | 154 | .sign_with_keys(&keys) |
| 154 | .unwrap(); | 155 | .unwrap(); |
| 155 | 156 | ||
| 156 | // Connect to relay | 157 | // Connect to relay |
| 157 | let client = Client::default(); | 158 | let client = Client::default(); |
| 158 | client.add_relay(relay.url()).await.unwrap(); | 159 | client.add_relay(relay.url()).await.unwrap(); |
| 159 | client.connect().await; | 160 | client.connect().await; |
| 160 | 161 | ||
| 161 | // Send announcement | 162 | // Send announcement |
| 162 | client.send_event(&announcement).await.unwrap(); | 163 | client.send_event(&announcement).await.unwrap(); |
| 163 | 164 | ||
| 164 | // Wait for announcement to be processed | 165 | // Wait for announcement to be processed |
| 165 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; | 166 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; |
| 166 | 167 | ||
| 167 | // Send state event from same author (should be accepted or go to purgatory) | 168 | // Send state event from same author (should be accepted or go to purgatory) |
| 168 | let state_event = EventBuilder::new( | 169 | let state_event = EventBuilder::new(Kind::RepoState, "") |
| 169 | Kind::RepoState, | 170 | .tags([ |
| 170 | "", | 171 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 171 | ) | 172 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), |
| 172 | .tags([ | 173 | ]) |
| 173 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 174 | .sign_with_keys(&keys) |
| 174 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), | 175 | .unwrap(); |
| 175 | ]) | 176 | |
| 176 | .sign_with_keys(&keys) | ||
| 177 | .unwrap(); | ||
| 178 | |||
| 179 | let result = client.send_event(&state_event).await; | 177 | let result = client.send_event(&state_event).await; |
| 180 | 178 | ||
| 181 | // Should be accepted or go to purgatory (not permanently rejected) | 179 | // Should be accepted or go to purgatory (not permanently rejected) |
| 182 | match result { | 180 | match result { |
| 183 | Ok(output) => { | 181 | Ok(output) => { |
| @@ -194,14 +192,13 @@ async fn test_accept_state_from_announcement_author() { | |||
| 194 | Err(e) => { | 192 | Err(e) => { |
| 195 | // Purgatory is acceptable | 193 | // Purgatory is acceptable |
| 196 | assert!( | 194 | assert!( |
| 197 | e.to_string().contains("purgatory") || | 195 | e.to_string().contains("purgatory") || e.to_string().contains("waiting for git"), |
| 198 | e.to_string().contains("waiting for git"), | ||
| 199 | "Error should be about purgatory, not authorization: {}", | 196 | "Error should be about purgatory, not authorization: {}", |
| 200 | e | 197 | e |
| 201 | ); | 198 | ); |
| 202 | } | 199 | } |
| 203 | } | 200 | } |
| 204 | 201 | ||
| 205 | relay.stop().await; | 202 | relay.stop().await; |
| 206 | } | 203 | } |
| 207 | 204 | ||
| @@ -209,50 +206,50 @@ async fn test_accept_state_from_announcement_author() { | |||
| 209 | async fn test_accept_state_from_maintainer() { | 206 | async fn test_accept_state_from_maintainer() { |
| 210 | // Start test relay | 207 | // Start test relay |
| 211 | let relay = TestRelay::start().await; | 208 | let relay = TestRelay::start().await; |
| 212 | 209 | ||
| 213 | // Create two keypairs: owner and maintainer | 210 | // Create two keypairs: owner and maintainer |
| 214 | let owner_keys = Keys::generate(); | 211 | let owner_keys = Keys::generate(); |
| 215 | let maintainer_keys = Keys::generate(); | 212 | let maintainer_keys = Keys::generate(); |
| 216 | 213 | ||
| 217 | // Create announcement with maintainer | 214 | // Create announcement with maintainer |
| 218 | let announcement = EventBuilder::new( | 215 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "") |
| 219 | Kind::GitRepoAnnouncement, | 216 | .tags([ |
| 220 | "", | 217 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 221 | ) | 218 | Tag::custom( |
| 222 | .tags([ | 219 | TagKind::custom("clone"), |
| 223 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 220 | [format!("https://{}/test.git", relay.domain())], |
| 224 | Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]), | 221 | ), |
| 225 | Tag::custom(TagKind::custom("relays"), [relay.url()]), | 222 | Tag::custom(TagKind::custom("relays"), [relay.url()]), |
| 226 | Tag::custom(TagKind::custom("maintainers"), [maintainer_keys.public_key().to_hex()]), | 223 | Tag::custom( |
| 227 | ]) | 224 | TagKind::custom("maintainers"), |
| 228 | .sign_with_keys(&owner_keys) | 225 | [maintainer_keys.public_key().to_hex()], |
| 229 | .unwrap(); | 226 | ), |
| 230 | 227 | ]) | |
| 228 | .sign_with_keys(&owner_keys) | ||
| 229 | .unwrap(); | ||
| 230 | |||
| 231 | // Connect to relay | 231 | // Connect to relay |
| 232 | let client = Client::default(); | 232 | let client = Client::default(); |
| 233 | client.add_relay(relay.url()).await.unwrap(); | 233 | client.add_relay(relay.url()).await.unwrap(); |
| 234 | client.connect().await; | 234 | client.connect().await; |
| 235 | 235 | ||
| 236 | // Send announcement | 236 | // Send announcement |
| 237 | client.send_event(&announcement).await.unwrap(); | 237 | client.send_event(&announcement).await.unwrap(); |
| 238 | 238 | ||
| 239 | // Wait for announcement to be processed | 239 | // Wait for announcement to be processed |
| 240 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; | 240 | tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; |
| 241 | 241 | ||
| 242 | // Send state event from maintainer | 242 | // Send state event from maintainer |
| 243 | let state_event = EventBuilder::new( | 243 | let state_event = EventBuilder::new(Kind::RepoState, "") |
| 244 | Kind::RepoState, | 244 | .tags([ |
| 245 | "", | 245 | Tag::custom(TagKind::custom("d"), ["test-repo"]), |
| 246 | ) | 246 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), |
| 247 | .tags([ | 247 | ]) |
| 248 | Tag::custom(TagKind::custom("d"), ["test-repo"]), | 248 | .sign_with_keys(&maintainer_keys) |
| 249 | Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), | 249 | .unwrap(); |
| 250 | ]) | 250 | |
| 251 | .sign_with_keys(&maintainer_keys) | ||
| 252 | .unwrap(); | ||
| 253 | |||
| 254 | let result = client.send_event(&state_event).await; | 251 | let result = client.send_event(&state_event).await; |
| 255 | 252 | ||
| 256 | // Should be accepted or go to purgatory (not permanently rejected) | 253 | // Should be accepted or go to purgatory (not permanently rejected) |
| 257 | match result { | 254 | match result { |
| 258 | Ok(output) => { | 255 | Ok(output) => { |
| @@ -268,13 +265,12 @@ async fn test_accept_state_from_maintainer() { | |||
| 268 | Err(e) => { | 265 | Err(e) => { |
| 269 | // Purgatory is acceptable | 266 | // Purgatory is acceptable |
| 270 | assert!( | 267 | assert!( |
| 271 | e.to_string().contains("purgatory") || | 268 | e.to_string().contains("purgatory") || e.to_string().contains("waiting for git"), |
| 272 | e.to_string().contains("waiting for git"), | ||
| 273 | "Error should be about purgatory, not authorization: {}", | 269 | "Error should be about purgatory, not authorization: {}", |
| 274 | e | 270 | e |
| 275 | ); | 271 | ); |
| 276 | } | 272 | } |
| 277 | } | 273 | } |
| 278 | 274 | ||
| 279 | relay.stop().await; | 275 | relay.stop().await; |
| 280 | } | 276 | } |
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index 2b7fb0f..df1bf78 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs | |||
| @@ -42,23 +42,18 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 42 | .await | 42 | .await |
| 43 | .expect("Failed to connect to relay_a"); | 43 | .expect("Failed to connect to relay_a"); |
| 44 | 44 | ||
| 45 | let maintainer_announcement = EventBuilder::new( | 45 | let maintainer_announcement = |
| 46 | Kind::GitRepoAnnouncement, | 46 | EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") |
| 47 | "Maintainer's repository", | 47 | .tags(vec![ |
| 48 | ) | 48 | Tag::identifier(identifier), |
| 49 | .tags(vec![ | 49 | Tag::custom( |
| 50 | Tag::identifier(identifier), | 50 | TagKind::custom("clone"), |
| 51 | Tag::custom( | 51 | vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], |
| 52 | TagKind::custom("clone"), | 52 | ), |
| 53 | vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], | 53 | Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), |
| 54 | ), | 54 | ]) |
| 55 | Tag::custom( | 55 | .sign_with_keys(&maintainer_keys) |
| 56 | TagKind::custom("relays"), | 56 | .unwrap(); |
| 57 | vec![relay_a.url().to_string()], | ||
| 58 | ), | ||
| 59 | ]) | ||
| 60 | .sign_with_keys(&maintainer_keys) | ||
| 61 | .unwrap(); | ||
| 62 | 57 | ||
| 63 | client_a.send_event(&maintainer_announcement).await.unwrap(); | 58 | client_a.send_event(&maintainer_announcement).await.unwrap(); |
| 64 | println!("✓ Maintainer announcement sent to relay_a"); | 59 | println!("✓ Maintainer announcement sent to relay_a"); |
| @@ -68,27 +63,24 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 68 | .await | 63 | .await |
| 69 | .expect("Failed to connect to relay_b"); | 64 | .expect("Failed to connect to relay_b"); |
| 70 | 65 | ||
| 71 | let owner_announcement = EventBuilder::new( | 66 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 72 | Kind::GitRepoAnnouncement, | 67 | .tags(vec![ |
| 73 | "Owner's repository", | 68 | Tag::identifier(identifier), |
| 74 | ) | 69 | Tag::custom( |
| 75 | .tags(vec![ | 70 | TagKind::custom("clone"), |
| 76 | Tag::identifier(identifier), | 71 | vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], |
| 77 | Tag::custom( | 72 | ), |
| 78 | TagKind::custom("clone"), | 73 | Tag::custom( |
| 79 | vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], | 74 | TagKind::custom("relays"), |
| 80 | ), | 75 | vec![relay_a.url().to_string(), relay_b.url().to_string()], |
| 81 | Tag::custom( | 76 | ), |
| 82 | TagKind::custom("relays"), | 77 | Tag::custom( |
| 83 | vec![relay_a.url().to_string(), relay_b.url().to_string()], | 78 | TagKind::custom("maintainers"), |
| 84 | ), | 79 | vec![maintainer_keys.public_key().to_hex()], |
| 85 | Tag::custom( | 80 | ), |
| 86 | TagKind::custom("maintainers"), | 81 | ]) |
| 87 | vec![maintainer_keys.public_key().to_hex()], | 82 | .sign_with_keys(&owner_keys) |
| 88 | ), | 83 | .unwrap(); |
| 89 | ]) | ||
| 90 | .sign_with_keys(&owner_keys) | ||
| 91 | .unwrap(); | ||
| 92 | 84 | ||
| 93 | client_b.send_event(&owner_announcement).await.unwrap(); | 85 | client_b.send_event(&owner_announcement).await.unwrap(); |
| 94 | println!("✓ Owner announcement sent to relay_b"); | 86 | println!("✓ Owner announcement sent to relay_b"); |
| @@ -104,7 +96,8 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 104 | .author(owner_keys.public_key()) | 96 | .author(owner_keys.public_key()) |
| 105 | .identifier(identifier); | 97 | .identifier(identifier); |
| 106 | 98 | ||
| 107 | let owner_found = wait_for_event_on_relay(relay_b.url(), owner_filter, Duration::from_secs(2)).await; | 99 | let owner_found = |
| 100 | wait_for_event_on_relay(relay_b.url(), owner_filter, Duration::from_secs(2)).await; | ||
| 108 | assert!(owner_found, "Owner announcement should be in relay_b"); | 101 | assert!(owner_found, "Owner announcement should be in relay_b"); |
| 109 | 102 | ||
| 110 | let maintainer_filter = Filter::new() | 103 | let maintainer_filter = Filter::new() |
| @@ -112,8 +105,12 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 112 | .author(maintainer_keys.public_key()) | 105 | .author(maintainer_keys.public_key()) |
| 113 | .identifier(identifier); | 106 | .identifier(identifier); |
| 114 | 107 | ||
| 115 | let maintainer_found = wait_for_event_on_relay(relay_b.url(), maintainer_filter, Duration::from_secs(2)).await; | 108 | let maintainer_found = |
| 116 | assert!(maintainer_found, "Maintainer announcement should be re-processed and accepted in relay_b"); | 109 | wait_for_event_on_relay(relay_b.url(), maintainer_filter, Duration::from_secs(2)).await; |
| 110 | assert!( | ||
| 111 | maintainer_found, | ||
| 112 | "Maintainer announcement should be re-processed and accepted in relay_b" | ||
| 113 | ); | ||
| 117 | 114 | ||
| 118 | // Step 5: Verify it happened quickly (not 24 hours!) | 115 | // Step 5: Verify it happened quickly (not 24 hours!) |
| 119 | assert!( | 116 | assert!( |
| @@ -145,36 +142,34 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 145 | #[ignore] // Skip by default due to 2+ minute duration | 142 | #[ignore] // Skip by default due to 2+ minute duration |
| 146 | async fn test_maintainer_announcement_cold_index_prevents_refetch() { | 143 | async fn test_maintainer_announcement_cold_index_prevents_refetch() { |
| 147 | let relay = TestRelay::start().await; | 144 | let relay = TestRelay::start().await; |
| 148 | 145 | ||
| 149 | // Create keys | 146 | // Create keys |
| 150 | let owner_keys = Keys::generate(); | 147 | let owner_keys = Keys::generate(); |
| 151 | let maintainer_keys = Keys::generate(); | 148 | let maintainer_keys = Keys::generate(); |
| 152 | 149 | ||
| 153 | let identifier = "test-repo-cold"; | 150 | let identifier = "test-repo-cold"; |
| 154 | 151 | ||
| 155 | // Create client using TestClient helper | 152 | // Create client using TestClient helper |
| 156 | let client = TestClient::new(relay.url(), maintainer_keys.clone()) | 153 | let client = TestClient::new(relay.url(), maintainer_keys.clone()) |
| 157 | .await | 154 | .await |
| 158 | .expect("Failed to connect to relay"); | 155 | .expect("Failed to connect to relay"); |
| 159 | 156 | ||
| 160 | // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) | 157 | // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) |
| 161 | let maintainer_announcement = EventBuilder::new( | 158 | let maintainer_announcement = |
| 162 | Kind::GitRepoAnnouncement, | 159 | EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") |
| 163 | "Maintainer's repository", | 160 | .tags(vec![ |
| 164 | ) | 161 | Tag::identifier(identifier), |
| 165 | .tags(vec![ | 162 | Tag::custom( |
| 166 | Tag::identifier(identifier), | 163 | TagKind::custom("clone"), |
| 167 | Tag::custom( | 164 | vec![format!("https://example.com/{}.git", identifier)], |
| 168 | TagKind::custom("clone"), | 165 | ), |
| 169 | vec![format!("https://example.com/{}.git", identifier)], | 166 | Tag::custom( |
| 170 | ), | 167 | TagKind::custom("relays"), |
| 171 | Tag::custom( | 168 | vec!["wss://example.com".to_string()], |
| 172 | TagKind::custom("relays"), | 169 | ), |
| 173 | vec!["wss://example.com".to_string()], | 170 | ]) |
| 174 | ), | 171 | .sign_with_keys(&maintainer_keys) |
| 175 | ]) | 172 | .unwrap(); |
| 176 | .sign_with_keys(&maintainer_keys) | ||
| 177 | .unwrap(); | ||
| 178 | 173 | ||
| 179 | // Send maintainer announcement - expect it to be rejected | 174 | // Send maintainer announcement - expect it to be rejected |
| 180 | let _ = client.send_event(&maintainer_announcement).await; | 175 | let _ = client.send_event(&maintainer_announcement).await; |
| @@ -185,27 +180,21 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { | |||
| 185 | tokio::time::sleep(Duration::from_secs(125)).await; | 180 | tokio::time::sleep(Duration::from_secs(125)).await; |
| 186 | 181 | ||
| 187 | // Step 3: Send owner announcement (lists maintainer) | 182 | // Step 3: Send owner announcement (lists maintainer) |
| 188 | let owner_announcement = EventBuilder::new( | 183 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 189 | Kind::GitRepoAnnouncement, | 184 | .tags(vec![ |
| 190 | "Owner's repository", | 185 | Tag::identifier(identifier), |
| 191 | ) | 186 | Tag::custom( |
| 192 | .tags(vec![ | 187 | TagKind::custom("clone"), |
| 193 | Tag::identifier(identifier), | 188 | vec![format!("https://{}/{}.git", relay.domain(), identifier)], |
| 194 | Tag::custom( | 189 | ), |
| 195 | TagKind::custom("clone"), | 190 | Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]), |
| 196 | vec![format!("https://{}/{}.git", relay.domain(), identifier)], | 191 | Tag::custom( |
| 197 | ), | 192 | TagKind::custom("maintainers"), |
| 198 | Tag::custom( | 193 | vec![maintainer_keys.public_key().to_hex()], |
| 199 | TagKind::custom("relays"), | 194 | ), |
| 200 | vec![relay.url().to_string()], | 195 | ]) |
| 201 | ), | 196 | .sign_with_keys(&owner_keys) |
| 202 | Tag::custom( | 197 | .unwrap(); |
| 203 | TagKind::custom("maintainers"), | ||
| 204 | vec![maintainer_keys.public_key().to_hex()], | ||
| 205 | ), | ||
| 206 | ]) | ||
| 207 | .sign_with_keys(&owner_keys) | ||
| 208 | .unwrap(); | ||
| 209 | 198 | ||
| 210 | client.send_event(&owner_announcement).await.unwrap(); | 199 | client.send_event(&owner_announcement).await.unwrap(); |
| 211 | tokio::time::sleep(Duration::from_millis(500)).await; | 200 | tokio::time::sleep(Duration::from_millis(500)).await; |
| @@ -215,16 +204,18 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { | |||
| 215 | .kind(Kind::GitRepoAnnouncement) | 204 | .kind(Kind::GitRepoAnnouncement) |
| 216 | .author(owner_keys.public_key()) | 205 | .author(owner_keys.public_key()) |
| 217 | .identifier(identifier); | 206 | .identifier(identifier); |
| 218 | 207 | ||
| 219 | let owner_found = wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await; | 208 | let owner_found = |
| 209 | wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await; | ||
| 220 | assert!(owner_found, "Owner announcement should be accepted"); | 210 | assert!(owner_found, "Owner announcement should be accepted"); |
| 221 | 211 | ||
| 222 | let maintainer_filter = Filter::new() | 212 | let maintainer_filter = Filter::new() |
| 223 | .kind(Kind::GitRepoAnnouncement) | 213 | .kind(Kind::GitRepoAnnouncement) |
| 224 | .author(maintainer_keys.public_key()) | 214 | .author(maintainer_keys.public_key()) |
| 225 | .identifier(identifier); | 215 | .identifier(identifier); |
| 226 | 216 | ||
| 227 | let maintainer_found = wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await; | 217 | let maintainer_found = |
| 218 | wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await; | ||
| 228 | assert!( | 219 | assert!( |
| 229 | !maintainer_found, | 220 | !maintainer_found, |
| 230 | "Maintainer announcement should NOT be re-processed (hot cache expired)" | 221 | "Maintainer announcement should NOT be re-processed (hot cache expired)" |
| @@ -267,7 +258,10 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 267 | .await | 258 | .await |
| 268 | .expect("Failed to connect to relay_a"); | 259 | .expect("Failed to connect to relay_a"); |
| 269 | 260 | ||
| 270 | for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys].iter().enumerate() { | 261 | for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] |
| 262 | .iter() | ||
| 263 | .enumerate() | ||
| 264 | { | ||
| 271 | let announcement = EventBuilder::new( | 265 | let announcement = EventBuilder::new( |
| 272 | Kind::GitRepoAnnouncement, | 266 | Kind::GitRepoAnnouncement, |
| 273 | format!("Maintainer {} repository", idx + 1), | 267 | format!("Maintainer {} repository", idx + 1), |
| @@ -278,10 +272,7 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 278 | TagKind::custom("clone"), | 272 | TagKind::custom("clone"), |
| 279 | vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], | 273 | vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], |
| 280 | ), | 274 | ), |
| 281 | Tag::custom( | 275 | Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), |
| 282 | TagKind::custom("relays"), | ||
| 283 | vec![relay_a.url().to_string()], | ||
| 284 | ), | ||
| 285 | ]) | 276 | ]) |
| 286 | .sign_with_keys(maintainer_keys) | 277 | .sign_with_keys(maintainer_keys) |
| 287 | .unwrap(); | 278 | .unwrap(); |
| @@ -295,31 +286,28 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 295 | .await | 286 | .await |
| 296 | .expect("Failed to connect to relay_b"); | 287 | .expect("Failed to connect to relay_b"); |
| 297 | 288 | ||
| 298 | let owner_announcement = EventBuilder::new( | 289 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 299 | Kind::GitRepoAnnouncement, | 290 | .tags(vec![ |
| 300 | "Owner's repository", | 291 | Tag::identifier(identifier), |
| 301 | ) | 292 | Tag::custom( |
| 302 | .tags(vec![ | 293 | TagKind::custom("clone"), |
| 303 | Tag::identifier(identifier), | 294 | vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], |
| 304 | Tag::custom( | 295 | ), |
| 305 | TagKind::custom("clone"), | 296 | Tag::custom( |
| 306 | vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], | 297 | TagKind::custom("relays"), |
| 307 | ), | 298 | vec![relay_a.url().to_string(), relay_b.url().to_string()], |
| 308 | Tag::custom( | 299 | ), |
| 309 | TagKind::custom("relays"), | 300 | Tag::custom( |
| 310 | vec![relay_a.url().to_string(), relay_b.url().to_string()], | 301 | TagKind::custom("maintainers"), |
| 311 | ), | 302 | vec![ |
| 312 | Tag::custom( | 303 | maintainer1_keys.public_key().to_hex(), |
| 313 | TagKind::custom("maintainers"), | 304 | maintainer2_keys.public_key().to_hex(), |
| 314 | vec![ | 305 | maintainer3_keys.public_key().to_hex(), |
| 315 | maintainer1_keys.public_key().to_hex(), | 306 | ], |
| 316 | maintainer2_keys.public_key().to_hex(), | 307 | ), |
| 317 | maintainer3_keys.public_key().to_hex(), | 308 | ]) |
| 318 | ], | 309 | .sign_with_keys(&owner_keys) |
| 319 | ), | 310 | .unwrap(); |
| 320 | ]) | ||
| 321 | .sign_with_keys(&owner_keys) | ||
| 322 | .unwrap(); | ||
| 323 | 311 | ||
| 324 | client_b.send_event(&owner_announcement).await.unwrap(); | 312 | client_b.send_event(&owner_announcement).await.unwrap(); |
| 325 | println!("✓ Owner announcement sent to relay_b"); | 313 | println!("✓ Owner announcement sent to relay_b"); |
| @@ -340,11 +328,7 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 340 | .identifier(identifier); | 328 | .identifier(identifier); |
| 341 | 329 | ||
| 342 | let found = wait_for_event_on_relay(relay_b.url(), filter, Duration::from_secs(2)).await; | 330 | let found = wait_for_event_on_relay(relay_b.url(), filter, Duration::from_secs(2)).await; |
| 343 | assert!( | 331 | assert!(found, "{} announcement should be in relay_b", name); |
| 344 | found, | ||
| 345 | "{} announcement should be in relay_b", | ||
| 346 | name | ||
| 347 | ); | ||
| 348 | } | 332 | } |
| 349 | 333 | ||
| 350 | println!("✅ All three maintainer announcements re-processed successfully"); | 334 | println!("✅ All three maintainer announcements re-processed successfully"); |
| @@ -365,63 +349,55 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 365 | #[tokio::test] | 349 | #[tokio::test] |
| 366 | async fn test_invalid_maintainer_pubkey_handled_gracefully() { | 350 | async fn test_invalid_maintainer_pubkey_handled_gracefully() { |
| 367 | let relay = TestRelay::start().await; | 351 | let relay = TestRelay::start().await; |
| 368 | 352 | ||
| 369 | // Create keys | 353 | // Create keys |
| 370 | let owner_keys = Keys::generate(); | 354 | let owner_keys = Keys::generate(); |
| 371 | let maintainer_keys = Keys::generate(); | 355 | let maintainer_keys = Keys::generate(); |
| 372 | 356 | ||
| 373 | let identifier = "invalid-maintainer-repo"; | 357 | let identifier = "invalid-maintainer-repo"; |
| 374 | 358 | ||
| 375 | // Create client using TestClient helper | 359 | // Create client using TestClient helper |
| 376 | let client = TestClient::new(relay.url(), owner_keys.clone()) | 360 | let client = TestClient::new(relay.url(), owner_keys.clone()) |
| 377 | .await | 361 | .await |
| 378 | .expect("Failed to connect to relay"); | 362 | .expect("Failed to connect to relay"); |
| 379 | 363 | ||
| 380 | // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) | 364 | // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) |
| 381 | let maintainer_announcement = EventBuilder::new( | 365 | let maintainer_announcement = |
| 382 | Kind::GitRepoAnnouncement, | 366 | EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") |
| 383 | "Maintainer's repository", | 367 | .tags(vec![ |
| 384 | ) | 368 | Tag::identifier(identifier), |
| 385 | .tags(vec![ | 369 | Tag::custom( |
| 386 | Tag::identifier(identifier), | 370 | TagKind::custom("clone"), |
| 387 | Tag::custom( | 371 | vec![format!("https://example.com/{}.git", identifier)], |
| 388 | TagKind::custom("clone"), | 372 | ), |
| 389 | vec![format!("https://example.com/{}.git", identifier)], | 373 | Tag::custom( |
| 390 | ), | 374 | TagKind::custom("relays"), |
| 391 | Tag::custom( | 375 | vec!["wss://example.com".to_string()], |
| 392 | TagKind::custom("relays"), | 376 | ), |
| 393 | vec!["wss://example.com".to_string()], | 377 | ]) |
| 394 | ), | 378 | .sign_with_keys(&maintainer_keys) |
| 395 | ]) | 379 | .unwrap(); |
| 396 | .sign_with_keys(&maintainer_keys) | ||
| 397 | .unwrap(); | ||
| 398 | 380 | ||
| 399 | // Send maintainer announcement - expect it to be rejected | 381 | // Send maintainer announcement - expect it to be rejected |
| 400 | let _ = client.send_event(&maintainer_announcement).await; | 382 | let _ = client.send_event(&maintainer_announcement).await; |
| 401 | tokio::time::sleep(Duration::from_millis(200)).await; | 383 | tokio::time::sleep(Duration::from_millis(200)).await; |
| 402 | 384 | ||
| 403 | // Step 2: Send owner announcement with INVALID maintainer hex | 385 | // Step 2: Send owner announcement with INVALID maintainer hex |
| 404 | let owner_announcement = EventBuilder::new( | 386 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 405 | Kind::GitRepoAnnouncement, | 387 | .tags(vec![ |
| 406 | "Owner's repository", | 388 | Tag::identifier(identifier), |
| 407 | ) | 389 | Tag::custom( |
| 408 | .tags(vec![ | 390 | TagKind::custom("clone"), |
| 409 | Tag::identifier(identifier), | 391 | vec![format!("https://{}/{}.git", relay.domain(), identifier)], |
| 410 | Tag::custom( | 392 | ), |
| 411 | TagKind::custom("clone"), | 393 | Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]), |
| 412 | vec![format!("https://{}/{}.git", relay.domain(), identifier)], | 394 | Tag::custom( |
| 413 | ), | 395 | TagKind::custom("maintainers"), |
| 414 | Tag::custom( | 396 | vec!["invalid-hex-not-a-pubkey".to_string()], |
| 415 | TagKind::custom("relays"), | 397 | ), |
| 416 | vec![relay.url().to_string()], | 398 | ]) |
| 417 | ), | 399 | .sign_with_keys(&owner_keys) |
| 418 | Tag::custom( | 400 | .unwrap(); |
| 419 | TagKind::custom("maintainers"), | ||
| 420 | vec!["invalid-hex-not-a-pubkey".to_string()], | ||
| 421 | ), | ||
| 422 | ]) | ||
| 423 | .sign_with_keys(&owner_keys) | ||
| 424 | .unwrap(); | ||
| 425 | 401 | ||
| 426 | client.send_event(&owner_announcement).await.unwrap(); | 402 | client.send_event(&owner_announcement).await.unwrap(); |
| 427 | tokio::time::sleep(Duration::from_millis(500)).await; | 403 | tokio::time::sleep(Duration::from_millis(500)).await; |
| @@ -431,16 +407,21 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { | |||
| 431 | .kind(Kind::GitRepoAnnouncement) | 407 | .kind(Kind::GitRepoAnnouncement) |
| 432 | .author(owner_keys.public_key()) | 408 | .author(owner_keys.public_key()) |
| 433 | .identifier(identifier); | 409 | .identifier(identifier); |
| 434 | 410 | ||
| 435 | let owner_found = wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await; | 411 | let owner_found = |
| 436 | assert!(owner_found, "Owner announcement should be accepted despite invalid maintainer"); | 412 | wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await; |
| 413 | assert!( | ||
| 414 | owner_found, | ||
| 415 | "Owner announcement should be accepted despite invalid maintainer" | ||
| 416 | ); | ||
| 437 | 417 | ||
| 438 | let maintainer_filter = Filter::new() | 418 | let maintainer_filter = Filter::new() |
| 439 | .kind(Kind::GitRepoAnnouncement) | 419 | .kind(Kind::GitRepoAnnouncement) |
| 440 | .author(maintainer_keys.public_key()) | 420 | .author(maintainer_keys.public_key()) |
| 441 | .identifier(identifier); | 421 | .identifier(identifier); |
| 442 | 422 | ||
| 443 | let maintainer_found = wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await; | 423 | let maintainer_found = |
| 424 | wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await; | ||
| 444 | assert!( | 425 | assert!( |
| 445 | !maintainer_found, | 426 | !maintainer_found, |
| 446 | "Maintainer announcement should NOT be re-processed (invalid pubkey)" | 427 | "Maintainer announcement should NOT be re-processed (invalid pubkey)" |