diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/git/sync.rs | 32 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 72 | ||||
| -rw-r--r-- | src/nostr/policy/state.rs | 40 | ||||
| -rw-r--r-- | src/sync/metrics.rs | 103 | ||||
| -rw-r--r-- | src/sync/mod.rs | 40 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 109 |
6 files changed, 393 insertions, 3 deletions
diff --git a/src/git/sync.rs b/src/git/sync.rs index b05e2d0..e8e9655 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs | |||
| @@ -951,6 +951,38 @@ async fn process_purgatory_state_events( | |||
| 951 | } | 951 | } |
| 952 | }; | 952 | }; |
| 953 | 953 | ||
| 954 | // CRITICAL: Check authorization before processing | ||
| 955 | // State events MUST be rejected if author is not in maintainer set | ||
| 956 | let authorized_owners = crate::git::authorization::pubkey_authorised_for_repo_owners( | ||
| 957 | &entry.event.pubkey, | ||
| 958 | &db_repo_data, | ||
| 959 | ); | ||
| 960 | |||
| 961 | if authorized_owners.is_empty() { | ||
| 962 | warn!( | ||
| 963 | identifier = %identifier, | ||
| 964 | event_id = %entry.event.id, | ||
| 965 | author = %entry.event.pubkey.to_hex(), | ||
| 966 | "Rejecting state event from purgatory: author not in maintainer set" | ||
| 967 | ); | ||
| 968 | // Remove from purgatory - this event will never be authorized | ||
| 969 | purgatory.remove_state_event(identifier, &entry.event.id); | ||
| 970 | result.errors.push(format!( | ||
| 971 | "State event {} rejected: author {} not in maintainer set", | ||
| 972 | entry.event.id, | ||
| 973 | entry.event.pubkey.to_hex() | ||
| 974 | )); | ||
| 975 | continue; | ||
| 976 | } | ||
| 977 | |||
| 978 | debug!( | ||
| 979 | identifier = %identifier, | ||
| 980 | event_id = %entry.event.id, | ||
| 981 | author = %entry.event.pubkey.to_hex(), | ||
| 982 | authorized_for_owners = ?authorized_owners, | ||
| 983 | "State event author authorized via maintainer set" | ||
| 984 | ); | ||
| 985 | |||
| 954 | // Use unified processing function | 986 | // Use unified processing function |
| 955 | let process_result = crate::git::process::process_state_with_git_data( | 987 | let process_result = crate::git::process::process_state_with_git_data( |
| 956 | &state, | 988 | &state, |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 939ccef..acaac71 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -102,6 +102,11 @@ impl Nip34WritePolicy { | |||
| 102 | } | 102 | } |
| 103 | 103 | ||
| 104 | tracing::debug!("Accepted repository announcement: {}", event_id_str); | 104 | tracing::debug!("Accepted repository announcement: {}", event_id_str); |
| 105 | |||
| 106 | // Check purgatory for state events that might now be authorized | ||
| 107 | self.check_purgatory_state_events_for_identifier(&announcement.identifier) | ||
| 108 | .await; | ||
| 109 | |||
| 105 | WritePolicyResult::Accept | 110 | WritePolicyResult::Accept |
| 106 | } | 111 | } |
| 107 | Err(e) => { | 112 | Err(e) => { |
| @@ -125,6 +130,11 @@ impl Nip34WritePolicy { | |||
| 125 | announcement.identifier | 130 | announcement.identifier |
| 126 | ); | 131 | ); |
| 127 | // Don't create bare repository for external announcements | 132 | // Don't create bare repository for external announcements |
| 133 | |||
| 134 | // Check purgatory for state events that might now be authorized | ||
| 135 | self.check_purgatory_state_events_for_identifier(&announcement.identifier) | ||
| 136 | .await; | ||
| 137 | |||
| 128 | WritePolicyResult::Accept | 138 | WritePolicyResult::Accept |
| 129 | } | 139 | } |
| 130 | Err(e) => { | 140 | Err(e) => { |
| @@ -304,6 +314,68 @@ impl Nip34WritePolicy { | |||
| 304 | } | 314 | } |
| 305 | } | 315 | } |
| 306 | 316 | ||
| 317 | /// Check purgatory for state events that might now be authorized by a new announcement | ||
| 318 | /// | ||
| 319 | /// When an announcement is accepted, state events in purgatory that were previously | ||
| 320 | /// rejected due to missing announcements might now be authorized. This method: | ||
| 321 | /// 1. Finds all state events in purgatory for the identifier | ||
| 322 | /// 2. Re-evaluates authorization for each event | ||
| 323 | /// 3. Processes authorized events (releases from purgatory) | ||
| 324 | /// 4. Keeps unauthorized events in purgatory (will expire naturally) | ||
| 325 | async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) { | ||
| 326 | let state_events = self.ctx.purgatory.find_state(identifier); | ||
| 327 | |||
| 328 | if state_events.is_empty() { | ||
| 329 | return; | ||
| 330 | } | ||
| 331 | |||
| 332 | tracing::debug!( | ||
| 333 | identifier = %identifier, | ||
| 334 | count = state_events.len(), | ||
| 335 | "Checking purgatory state events after announcement acceptance" | ||
| 336 | ); | ||
| 337 | |||
| 338 | for entry in state_events { | ||
| 339 | // Re-evaluate authorization with the new announcement | ||
| 340 | match self.state_policy.process_state_event(&entry.event, false).await { | ||
| 341 | Ok(WritePolicyResult::Accept) => { | ||
| 342 | tracing::info!( | ||
| 343 | event_id = %entry.event.id, | ||
| 344 | identifier = %identifier, | ||
| 345 | "State event in purgatory now authorized, will be processed" | ||
| 346 | ); | ||
| 347 | // Event will be automatically removed from purgatory by process_state_event | ||
| 348 | // and broadcast to subscribers | ||
| 349 | } | ||
| 350 | Ok(WritePolicyResult::Reject { message, .. }) => { | ||
| 351 | if message.contains("not authorized") { | ||
| 352 | tracing::debug!( | ||
| 353 | event_id = %entry.event.id, | ||
| 354 | identifier = %identifier, | ||
| 355 | "State event in purgatory still not authorized, keeping in purgatory" | ||
| 356 | ); | ||
| 357 | // Keep in purgatory - will expire naturally after 30 minutes | ||
| 358 | } else { | ||
| 359 | tracing::debug!( | ||
| 360 | event_id = %entry.event.id, | ||
| 361 | identifier = %identifier, | ||
| 362 | reason = %message, | ||
| 363 | "State event in purgatory rejected for other reason" | ||
| 364 | ); | ||
| 365 | } | ||
| 366 | } | ||
| 367 | Err(e) => { | ||
| 368 | tracing::warn!( | ||
| 369 | event_id = %entry.event.id, | ||
| 370 | identifier = %identifier, | ||
| 371 | error = %e, | ||
| 372 | "Error re-evaluating state event in purgatory" | ||
| 373 | ); | ||
| 374 | } | ||
| 375 | } | ||
| 376 | } | ||
| 377 | } | ||
| 378 | |||
| 307 | /// Handle events that must reference accepted repositories or events | 379 | /// Handle events that must reference accepted repositories or events |
| 308 | async fn handle_related_event(&self, event: &Event, event_type: &str) -> WritePolicyResult { | 380 | async fn handle_related_event(&self, event: &Event, event_type: &str) -> WritePolicyResult { |
| 309 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | 381 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); |
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index acb76a3..d26b5ec 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs | |||
| @@ -78,6 +78,46 @@ impl StatePolicy { | |||
| 78 | // Get all repositories and state events from db with identifier | 78 | // Get all repositories and state events from db with identifier |
| 79 | let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; | 79 | let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; |
| 80 | 80 | ||
| 81 | // CRITICAL: Check if author is authorized via maintainer set | ||
| 82 | // State events MUST be rejected if author is not in maintainer set of any accepted announcement | ||
| 83 | if db_repo_data.announcements.is_empty() { | ||
| 84 | tracing::warn!( | ||
| 85 | event_id = %event.id, | ||
| 86 | identifier = %state.identifier, | ||
| 87 | author = %event.pubkey.to_hex(), | ||
| 88 | "Rejecting state event: no announcement exists for this repository" | ||
| 89 | ); | ||
| 90 | return Ok(WritePolicyResult::Reject { | ||
| 91 | status: false, | ||
| 92 | message: "invalid: no announcement exists for this repository".into(), | ||
| 93 | }); | ||
| 94 | } | ||
| 95 | |||
| 96 | let authorized_owners = | ||
| 97 | crate::git::authorization::pubkey_authorised_for_repo_owners(&event.pubkey, &db_repo_data); | ||
| 98 | |||
| 99 | if authorized_owners.is_empty() { | ||
| 100 | tracing::warn!( | ||
| 101 | event_id = %event.id, | ||
| 102 | identifier = %state.identifier, | ||
| 103 | author = %event.pubkey.to_hex(), | ||
| 104 | announcements_count = db_repo_data.announcements.len(), | ||
| 105 | "Rejecting state event: author not in maintainer set of any announcement" | ||
| 106 | ); | ||
| 107 | return Ok(WritePolicyResult::Reject { | ||
| 108 | status: false, | ||
| 109 | message: "invalid: author not authorized for this repository".into(), | ||
| 110 | }); | ||
| 111 | } | ||
| 112 | |||
| 113 | tracing::debug!( | ||
| 114 | event_id = %event.id, | ||
| 115 | identifier = %state.identifier, | ||
| 116 | author = %event.pubkey.to_hex(), | ||
| 117 | authorized_for_owners = ?authorized_owners, | ||
| 118 | "State event author authorized via maintainer set" | ||
| 119 | ); | ||
| 120 | |||
| 81 | // Duplicate check in db | 121 | // Duplicate check in db |
| 82 | if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { | 122 | if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { |
| 83 | tracing::debug!("processed state event duplicate (in db): {}", event.id); | 123 | tracing::debug!("processed state event duplicate (in db): {}", event.id); |
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index a175210..7d6d42d 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs | |||
| @@ -56,6 +56,22 @@ pub struct SyncMetrics { | |||
| 56 | rejected_announcements_cold_index_expired_total: IntCounter, | 56 | rejected_announcements_cold_index_expired_total: IntCounter, |
| 57 | /// Total invalidations (maintainer announcements invalidated) | 57 | /// Total invalidations (maintainer announcements invalidated) |
| 58 | rejected_announcements_invalidated_total: IntCounter, | 58 | rejected_announcements_invalidated_total: IntCounter, |
| 59 | |||
| 60 | // === Rejected States Index Metrics === | ||
| 61 | /// Current number of state events in hot cache | ||
| 62 | rejected_states_hot_cache_current: IntGauge, | ||
| 63 | /// Total hot cache hits (state events re-processed from cache) | ||
| 64 | rejected_states_hot_cache_hits_total: IntCounter, | ||
| 65 | /// Total hot cache misses (state events not in cache) | ||
| 66 | rejected_states_hot_cache_misses_total: IntCounter, | ||
| 67 | /// Total expired state events removed from hot cache | ||
| 68 | rejected_states_hot_cache_expired_total: IntCounter, | ||
| 69 | /// Current number of state event entries in cold index | ||
| 70 | rejected_states_cold_index_current: IntGauge, | ||
| 71 | /// Total state event cold index entries expired and removed | ||
| 72 | rejected_states_cold_index_expired_total: IntCounter, | ||
| 73 | /// Total state event invalidations | ||
| 74 | rejected_states_invalidated_total: IntCounter, | ||
| 59 | } | 75 | } |
| 60 | 76 | ||
| 61 | impl SyncMetrics { | 77 | impl SyncMetrics { |
| @@ -172,6 +188,49 @@ impl SyncMetrics { | |||
| 172 | ))?; | 188 | ))?; |
| 173 | registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?; | 189 | registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?; |
| 174 | 190 | ||
| 191 | // Rejected states metrics | ||
| 192 | let rejected_states_hot_cache_current = IntGauge::with_opts(Opts::new( | ||
| 193 | "ngit_sync_rejected_states_hot_cache_current", | ||
| 194 | "Current number of state events in hot cache (full events, 2 min expiry)", | ||
| 195 | ))?; | ||
| 196 | registry.register(Box::new(rejected_states_hot_cache_current.clone()))?; | ||
| 197 | |||
| 198 | let rejected_states_hot_cache_hits_total = IntCounter::with_opts(Opts::new( | ||
| 199 | "ngit_sync_rejected_states_hot_cache_hits_total", | ||
| 200 | "Total hot cache hits (state events re-processed from cache)", | ||
| 201 | ))?; | ||
| 202 | registry.register(Box::new(rejected_states_hot_cache_hits_total.clone()))?; | ||
| 203 | |||
| 204 | let rejected_states_hot_cache_misses_total = IntCounter::with_opts(Opts::new( | ||
| 205 | "ngit_sync_rejected_states_hot_cache_misses_total", | ||
| 206 | "Total hot cache misses (state events not in cache when invalidated)", | ||
| 207 | ))?; | ||
| 208 | registry.register(Box::new(rejected_states_hot_cache_misses_total.clone()))?; | ||
| 209 | |||
| 210 | let rejected_states_hot_cache_expired_total = IntCounter::with_opts(Opts::new( | ||
| 211 | "ngit_sync_rejected_states_hot_cache_expired_total", | ||
| 212 | "Total expired state events removed from hot cache", | ||
| 213 | ))?; | ||
| 214 | registry.register(Box::new(rejected_states_hot_cache_expired_total.clone()))?; | ||
| 215 | |||
| 216 | let rejected_states_cold_index_current = IntGauge::with_opts(Opts::new( | ||
| 217 | "ngit_sync_rejected_states_cold_index_current", | ||
| 218 | "Current number of state event entries in cold index (metadata only, 7 day expiry)", | ||
| 219 | ))?; | ||
| 220 | registry.register(Box::new(rejected_states_cold_index_current.clone()))?; | ||
| 221 | |||
| 222 | let rejected_states_cold_index_expired_total = IntCounter::with_opts(Opts::new( | ||
| 223 | "ngit_sync_rejected_states_cold_index_expired_total", | ||
| 224 | "Total state event cold index entries expired and removed", | ||
| 225 | ))?; | ||
| 226 | registry.register(Box::new(rejected_states_cold_index_expired_total.clone()))?; | ||
| 227 | |||
| 228 | let rejected_states_invalidated_total = IntCounter::with_opts(Opts::new( | ||
| 229 | "ngit_sync_rejected_states_invalidated_total", | ||
| 230 | "Total state event invalidations (when announcements accepted)", | ||
| 231 | ))?; | ||
| 232 | registry.register(Box::new(rejected_states_invalidated_total.clone()))?; | ||
| 233 | |||
| 175 | Ok(Self { | 234 | Ok(Self { |
| 176 | relay_connected, | 235 | relay_connected, |
| 177 | connection_attempts_total, | 236 | connection_attempts_total, |
| @@ -188,6 +247,13 @@ impl SyncMetrics { | |||
| 188 | rejected_announcements_cold_index_current, | 247 | rejected_announcements_cold_index_current, |
| 189 | rejected_announcements_cold_index_expired_total, | 248 | rejected_announcements_cold_index_expired_total, |
| 190 | rejected_announcements_invalidated_total, | 249 | rejected_announcements_invalidated_total, |
| 250 | rejected_states_hot_cache_current, | ||
| 251 | rejected_states_hot_cache_hits_total, | ||
| 252 | rejected_states_hot_cache_misses_total, | ||
| 253 | rejected_states_hot_cache_expired_total, | ||
| 254 | rejected_states_cold_index_current, | ||
| 255 | rejected_states_cold_index_expired_total, | ||
| 256 | rejected_states_invalidated_total, | ||
| 191 | }) | 257 | }) |
| 192 | } | 258 | } |
| 193 | 259 | ||
| @@ -396,6 +462,43 @@ impl SyncMetrics { | |||
| 396 | pub fn record_invalidation(&self, count: usize) { | 462 | pub fn record_invalidation(&self, count: usize) { |
| 397 | self.rejected_announcements_invalidated_total.inc_by(count as u64); | 463 | self.rejected_announcements_invalidated_total.inc_by(count as u64); |
| 398 | } | 464 | } |
| 465 | |||
| 466 | // === Rejected States Recording Methods === | ||
| 467 | |||
| 468 | /// Update state events hot cache current size gauge. | ||
| 469 | pub fn update_states_hot_cache_size(&self, size: usize) { | ||
| 470 | self.rejected_states_hot_cache_current.set(size as i64); | ||
| 471 | } | ||
| 472 | |||
| 473 | /// Record state event hot cache hit (event re-processed from cache). | ||
| 474 | pub fn record_states_hot_cache_hit(&self) { | ||
| 475 | self.rejected_states_hot_cache_hits_total.inc(); | ||
| 476 | } | ||
| 477 | |||
| 478 | /// Record state event hot cache miss (event not in cache when invalidated). | ||
| 479 | pub fn record_states_hot_cache_miss(&self) { | ||
| 480 | self.rejected_states_hot_cache_misses_total.inc(); | ||
| 481 | } | ||
| 482 | |||
| 483 | /// Record state event hot cache expired entries. | ||
| 484 | pub fn record_states_hot_cache_expired(&self, count: usize) { | ||
| 485 | self.rejected_states_hot_cache_expired_total.inc_by(count as u64); | ||
| 486 | } | ||
| 487 | |||
| 488 | /// Update state events cold index current size gauge. | ||
| 489 | pub fn update_states_cold_index_size(&self, size: usize) { | ||
| 490 | self.rejected_states_cold_index_current.set(size as i64); | ||
| 491 | } | ||
| 492 | |||
| 493 | /// Record state event cold index expired entries. | ||
| 494 | pub fn record_states_cold_index_expired(&self, count: usize) { | ||
| 495 | self.rejected_states_cold_index_expired_total.inc_by(count as u64); | ||
| 496 | } | ||
| 497 | |||
| 498 | /// Record state event invalidation. | ||
| 499 | pub fn record_states_invalidation(&self, count: usize) { | ||
| 500 | self.rejected_states_invalidated_total.inc_by(count as u64); | ||
| 501 | } | ||
| 399 | } | 502 | } |
| 400 | 503 | ||
| 401 | #[cfg(test)] | 504 | #[cfg(test)] |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f296c0f..93b0e38 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -367,12 +367,14 @@ async fn run_daily_timer( | |||
| 367 | /// Run the combined health and metrics checker | 367 | /// Run the combined health and metrics checker |
| 368 | /// | 368 | /// |
| 369 | /// This function runs in a loop with a 2-second interval, performing three tasks: | 369 | /// This function runs in a loop with a 2-second interval, performing three tasks: |
| 370 | /// Background task for cleaning up expired entries from the rejected events index | 370 | /// Background task for cleaning up expired entries from the rejected events indexes |
| 371 | /// | 371 | /// |
| 372 | /// This task runs two cleanup operations at different intervals: | 372 | /// This task runs two cleanup operations at different intervals: |
| 373 | /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache | 373 | /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache |
| 374 | /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index | 374 | /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index |
| 375 | /// | 375 | /// |
| 376 | /// Cleans up both the announcements index and the states index. | ||
| 377 | /// | ||
| 376 | /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). | 378 | /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). |
| 377 | /// The cold index cleanup runs daily since metadata is small and expires slowly. | 379 | /// The cold index cleanup runs daily since metadata is small and expires slowly. |
| 378 | async fn run_rejected_index_cleanup( | 380 | async fn run_rejected_index_cleanup( |
| @@ -397,6 +399,8 @@ async fn run_rejected_index_cleanup( | |||
| 397 | tokio::select! { | 399 | tokio::select! { |
| 398 | _ = hot_cache_timer.tick() => { | 400 | _ = hot_cache_timer.tick() => { |
| 399 | let manager = sync_manager.lock().await; | 401 | let manager = sync_manager.lock().await; |
| 402 | |||
| 403 | // Clean up announcements index | ||
| 400 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); | 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); |
| 401 | if hot_expired > 0 { | 405 | if hot_expired > 0 { |
| 402 | tracing::debug!( | 406 | tracing::debug!( |
| @@ -404,9 +408,20 @@ async fn run_rejected_index_cleanup( | |||
| 404 | hot_expired | 408 | hot_expired |
| 405 | ); | 409 | ); |
| 406 | } | 410 | } |
| 411 | |||
| 412 | // Clean up states index | ||
| 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); | ||
| 414 | if states_hot_expired > 0 { | ||
| 415 | tracing::debug!( | ||
| 416 | "Cleaned up {} expired entries from rejected states hot cache", | ||
| 417 | states_hot_expired | ||
| 418 | ); | ||
| 419 | } | ||
| 407 | } | 420 | } |
| 408 | _ = cold_index_timer.tick() => { | 421 | _ = cold_index_timer.tick() => { |
| 409 | let manager = sync_manager.lock().await; | 422 | let manager = sync_manager.lock().await; |
| 423 | |||
| 424 | // Clean up announcements index | ||
| 410 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); | 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); |
| 411 | if cold_expired > 0 { | 426 | if cold_expired > 0 { |
| 412 | tracing::info!( | 427 | tracing::info!( |
| @@ -414,6 +429,15 @@ async fn run_rejected_index_cleanup( | |||
| 414 | cold_expired | 429 | cold_expired |
| 415 | ); | 430 | ); |
| 416 | } | 431 | } |
| 432 | |||
| 433 | // Clean up states index | ||
| 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); | ||
| 435 | if states_cold_expired > 0 { | ||
| 436 | tracing::info!( | ||
| 437 | "Cleaned up {} expired entries from rejected states cold index", | ||
| 438 | states_cold_expired | ||
| 439 | ); | ||
| 440 | } | ||
| 417 | } | 441 | } |
| 418 | _ = shutdown_rx.recv() => { | 442 | _ = shutdown_rx.recv() => { |
| 419 | tracing::info!("Rejected index cleanup received shutdown signal"); | 443 | tracing::info!("Rejected index cleanup received shutdown signal"); |
| @@ -507,6 +531,8 @@ pub struct SyncManager { | |||
| 507 | pending_sync_index: PendingSyncIndex, | 531 | pending_sync_index: PendingSyncIndex, |
| 508 | /// Rejected announcement events (30617/30618) - two-tier storage for re-processing | 532 | /// Rejected announcement events (30617/30618) - two-tier storage for re-processing |
| 509 | rejected_events_index: Arc<RejectedEventsIndex>, | 533 | rejected_events_index: Arc<RejectedEventsIndex>, |
| 534 | /// Rejected state events (30618) - two-tier storage for re-processing | ||
| 535 | rejected_states_index: Arc<RejectedEventsIndex>, | ||
| 510 | /// Active relay connections - keyed by relay URL | 536 | /// Active relay connections - keyed by relay URL |
| 511 | connections: HashMap<String, RelayConnection>, | 537 | connections: HashMap<String, RelayConnection>, |
| 512 | /// Health tracker for relay connection state | 538 | /// Health tracker for relay connection state |
| @@ -571,6 +597,18 @@ impl SyncManager { | |||
| 571 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | 597 | Duration::from_secs(config.rejected_cold_index_expiry_secs), |
| 572 | ) | 598 | ) |
| 573 | }), | 599 | }), |
| 600 | rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 601 | RejectedEventsIndex::with_metrics( | ||
| 602 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 603 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 604 | metrics.clone(), | ||
| 605 | ) | ||
| 606 | } else { | ||
| 607 | RejectedEventsIndex::new( | ||
| 608 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 609 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 610 | ) | ||
| 611 | }), | ||
| 574 | connections: HashMap::new(), | 612 | connections: HashMap::new(), |
| 575 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 613 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 576 | next_batch_id: 0, | 614 | next_batch_id: 0, |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4733d80..f5ffef4 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -355,7 +355,7 @@ impl RejectedEventsIndex { | |||
| 355 | index | 355 | index |
| 356 | } | 356 | } |
| 357 | 357 | ||
| 358 | /// Update metrics with current sizes | 358 | /// Update metrics with current sizes (for announcements) |
| 359 | fn update_metrics(&self) { | 359 | fn update_metrics(&self) { |
| 360 | if let Some(ref metrics) = self.metrics { | 360 | if let Some(ref metrics) = self.metrics { |
| 361 | metrics.update_hot_cache_size(self.hot_cache.len()); | 361 | metrics.update_hot_cache_size(self.hot_cache.len()); |
| @@ -363,6 +363,14 @@ impl RejectedEventsIndex { | |||
| 363 | } | 363 | } |
| 364 | } | 364 | } |
| 365 | 365 | ||
| 366 | /// Update metrics with current sizes (for states) | ||
| 367 | fn update_states_metrics(&self) { | ||
| 368 | if let Some(ref metrics) = self.metrics { | ||
| 369 | metrics.update_states_hot_cache_size(self.hot_cache.len()); | ||
| 370 | metrics.update_states_cold_index_size(self.cold_index.len()); | ||
| 371 | } | ||
| 372 | } | ||
| 373 | |||
| 366 | /// Add rejected announcement to both tiers | 374 | /// Add rejected announcement to both tiers |
| 367 | /// | 375 | /// |
| 368 | /// # Arguments | 376 | /// # Arguments |
| @@ -393,6 +401,36 @@ impl RejectedEventsIndex { | |||
| 393 | self.update_metrics(); | 401 | self.update_metrics(); |
| 394 | } | 402 | } |
| 395 | 403 | ||
| 404 | /// Add rejected state event to both tiers | ||
| 405 | /// | ||
| 406 | /// # Arguments | ||
| 407 | /// | ||
| 408 | /// * `event` - Full event object (stored in hot cache) | ||
| 409 | /// * `pubkey` - Author's public key | ||
| 410 | /// * `identifier` - Repository identifier (d tag) | ||
| 411 | /// * `reason` - Why the state event was rejected | ||
| 412 | pub fn add_state( | ||
| 413 | &self, | ||
| 414 | event: Event, | ||
| 415 | pubkey: PublicKey, | ||
| 416 | identifier: String, | ||
| 417 | reason: RejectionReason, | ||
| 418 | ) { | ||
| 419 | // Add to hot cache (full event) | ||
| 420 | self.hot_cache.add( | ||
| 421 | event.clone(), | ||
| 422 | pubkey, | ||
| 423 | identifier.clone(), | ||
| 424 | reason, | ||
| 425 | ); | ||
| 426 | |||
| 427 | // Add to cold index (metadata only) | ||
| 428 | self.cold_index.add(event.id, pubkey, identifier, reason); | ||
| 429 | |||
| 430 | // Update metrics (using states metrics) | ||
| 431 | self.update_states_metrics(); | ||
| 432 | } | ||
| 433 | |||
| 396 | /// Check if event is already rejected (in either tier) | 434 | /// Check if event is already rejected (in either tier) |
| 397 | pub fn contains(&self, event_id: &EventId) -> bool { | 435 | pub fn contains(&self, event_id: &EventId) -> bool { |
| 398 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) | 436 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) |
| @@ -442,7 +480,51 @@ impl RejectedEventsIndex { | |||
| 442 | (removed, events) | 480 | (removed, events) |
| 443 | } | 481 | } |
| 444 | 482 | ||
| 445 | /// Clean up expired entries from both tiers | 483 | /// Invalidate state events and get events for immediate re-processing |
| 484 | /// | ||
| 485 | /// This is called when an announcement is accepted that authorizes state events. | ||
| 486 | /// It removes the cold index entries (so they can be re-fetched on next sync) and | ||
| 487 | /// returns any events still in the hot cache for immediate re-processing. | ||
| 488 | /// | ||
| 489 | /// # Returns | ||
| 490 | /// | ||
| 491 | /// Tuple of (number of cold index entries removed, events from hot cache) | ||
| 492 | pub fn invalidate_and_get_state_events( | ||
| 493 | &self, | ||
| 494 | maintainer_pubkey: &PublicKey, | ||
| 495 | identifier: &str, | ||
| 496 | ) -> (usize, Vec<Event>) { | ||
| 497 | // Remove from cold index (prevents re-fetch) | ||
| 498 | let removed = self | ||
| 499 | .cold_index | ||
| 500 | .invalidate_maintainer_announcements(maintainer_pubkey, identifier); | ||
| 501 | |||
| 502 | // Get from hot cache (for immediate re-processing) | ||
| 503 | let events = self | ||
| 504 | .hot_cache | ||
| 505 | .get_maintainer_events(maintainer_pubkey, identifier); | ||
| 506 | |||
| 507 | // Track metrics (using states metrics) | ||
| 508 | if let Some(ref metrics) = self.metrics { | ||
| 509 | if removed > 0 { | ||
| 510 | metrics.record_states_invalidation(removed); | ||
| 511 | } | ||
| 512 | if events.is_empty() { | ||
| 513 | metrics.record_states_hot_cache_miss(); | ||
| 514 | } else { | ||
| 515 | for _ in &events { | ||
| 516 | metrics.record_states_hot_cache_hit(); | ||
| 517 | } | ||
| 518 | } | ||
| 519 | } | ||
| 520 | |||
| 521 | // Update size metrics (using states metrics) | ||
| 522 | self.update_states_metrics(); | ||
| 523 | |||
| 524 | (removed, events) | ||
| 525 | } | ||
| 526 | |||
| 527 | /// Clean up expired entries from both tiers (for announcements) | ||
| 446 | /// | 528 | /// |
| 447 | /// Returns tuple of (hot cache expired, cold index expired) | 529 | /// Returns tuple of (hot cache expired, cold index expired) |
| 448 | pub fn cleanup_expired(&self) -> (usize, usize) { | 530 | pub fn cleanup_expired(&self) -> (usize, usize) { |
| @@ -465,6 +547,29 @@ impl RejectedEventsIndex { | |||
| 465 | (hot_expired, cold_expired) | 547 | (hot_expired, cold_expired) |
| 466 | } | 548 | } |
| 467 | 549 | ||
| 550 | /// Clean up expired entries from both tiers (for states) | ||
| 551 | /// | ||
| 552 | /// Returns tuple of (hot cache expired, cold index expired) | ||
| 553 | pub fn cleanup_states_expired(&self) -> (usize, usize) { | ||
| 554 | let hot_expired = self.hot_cache.cleanup_expired(); | ||
| 555 | let cold_expired = self.cold_index.cleanup_expired(); | ||
| 556 | |||
| 557 | // Track metrics (using states metrics) | ||
| 558 | if let Some(ref metrics) = self.metrics { | ||
| 559 | if hot_expired > 0 { | ||
| 560 | metrics.record_states_hot_cache_expired(hot_expired); | ||
| 561 | } | ||
| 562 | if cold_expired > 0 { | ||
| 563 | metrics.record_states_cold_index_expired(cold_expired); | ||
| 564 | } | ||
| 565 | } | ||
| 566 | |||
| 567 | // Update size metrics (using states metrics) | ||
| 568 | self.update_states_metrics(); | ||
| 569 | |||
| 570 | (hot_expired, cold_expired) | ||
| 571 | } | ||
| 572 | |||
| 468 | /// Get current number of entries in hot cache | 573 | /// Get current number of entries in hot cache |
| 469 | pub fn hot_cache_len(&self) -> usize { | 574 | pub fn hot_cache_len(&self) -> usize { |
| 470 | self.hot_cache.len() | 575 | self.hot_cache.len() |