diff options
Diffstat (limited to 'src/sync/rejected_index.rs')
| -rw-r--r-- | src/sync/rejected_index.rs | 109 |
1 files changed, 107 insertions, 2 deletions
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4733d80..f5ffef4 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -355,7 +355,7 @@ impl RejectedEventsIndex { | |||
| 355 | index | 355 | index |
| 356 | } | 356 | } |
| 357 | 357 | ||
| 358 | /// Update metrics with current sizes | 358 | /// Update metrics with current sizes (for announcements) |
| 359 | fn update_metrics(&self) { | 359 | fn update_metrics(&self) { |
| 360 | if let Some(ref metrics) = self.metrics { | 360 | if let Some(ref metrics) = self.metrics { |
| 361 | metrics.update_hot_cache_size(self.hot_cache.len()); | 361 | metrics.update_hot_cache_size(self.hot_cache.len()); |
| @@ -363,6 +363,14 @@ impl RejectedEventsIndex { | |||
| 363 | } | 363 | } |
| 364 | } | 364 | } |
| 365 | 365 | ||
| 366 | /// Update metrics with current sizes (for states) | ||
| 367 | fn update_states_metrics(&self) { | ||
| 368 | if let Some(ref metrics) = self.metrics { | ||
| 369 | metrics.update_states_hot_cache_size(self.hot_cache.len()); | ||
| 370 | metrics.update_states_cold_index_size(self.cold_index.len()); | ||
| 371 | } | ||
| 372 | } | ||
| 373 | |||
| 366 | /// Add rejected announcement to both tiers | 374 | /// Add rejected announcement to both tiers |
| 367 | /// | 375 | /// |
| 368 | /// # Arguments | 376 | /// # Arguments |
| @@ -393,6 +401,36 @@ impl RejectedEventsIndex { | |||
| 393 | self.update_metrics(); | 401 | self.update_metrics(); |
| 394 | } | 402 | } |
| 395 | 403 | ||
| 404 | /// Add rejected state event to both tiers | ||
| 405 | /// | ||
| 406 | /// # Arguments | ||
| 407 | /// | ||
| 408 | /// * `event` - Full event object (stored in hot cache) | ||
| 409 | /// * `pubkey` - Author's public key | ||
| 410 | /// * `identifier` - Repository identifier (d tag) | ||
| 411 | /// * `reason` - Why the state event was rejected | ||
| 412 | pub fn add_state( | ||
| 413 | &self, | ||
| 414 | event: Event, | ||
| 415 | pubkey: PublicKey, | ||
| 416 | identifier: String, | ||
| 417 | reason: RejectionReason, | ||
| 418 | ) { | ||
| 419 | // Add to hot cache (full event) | ||
| 420 | self.hot_cache.add( | ||
| 421 | event.clone(), | ||
| 422 | pubkey, | ||
| 423 | identifier.clone(), | ||
| 424 | reason, | ||
| 425 | ); | ||
| 426 | |||
| 427 | // Add to cold index (metadata only) | ||
| 428 | self.cold_index.add(event.id, pubkey, identifier, reason); | ||
| 429 | |||
| 430 | // Update metrics (using states metrics) | ||
| 431 | self.update_states_metrics(); | ||
| 432 | } | ||
| 433 | |||
| 396 | /// Check if event is already rejected (in either tier) | 434 | /// Check if event is already rejected (in either tier) |
| 397 | pub fn contains(&self, event_id: &EventId) -> bool { | 435 | pub fn contains(&self, event_id: &EventId) -> bool { |
| 398 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) | 436 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) |
| @@ -442,7 +480,51 @@ impl RejectedEventsIndex { | |||
| 442 | (removed, events) | 480 | (removed, events) |
| 443 | } | 481 | } |
| 444 | 482 | ||
| 445 | /// Clean up expired entries from both tiers | 483 | /// Invalidate state events and get events for immediate re-processing |
| 484 | /// | ||
| 485 | /// This is called when an announcement is accepted that authorizes state events. | ||
| 486 | /// It removes the cold index entries (so they can be re-fetched on next sync) and | ||
| 487 | /// returns any events still in the hot cache for immediate re-processing. | ||
| 488 | /// | ||
| 489 | /// # Returns | ||
| 490 | /// | ||
| 491 | /// Tuple of (number of cold index entries removed, events from hot cache) | ||
| 492 | pub fn invalidate_and_get_state_events( | ||
| 493 | &self, | ||
| 494 | maintainer_pubkey: &PublicKey, | ||
| 495 | identifier: &str, | ||
| 496 | ) -> (usize, Vec<Event>) { | ||
| 497 | // Remove from cold index (prevents re-fetch) | ||
| 498 | let removed = self | ||
| 499 | .cold_index | ||
| 500 | .invalidate_maintainer_announcements(maintainer_pubkey, identifier); | ||
| 501 | |||
| 502 | // Get from hot cache (for immediate re-processing) | ||
| 503 | let events = self | ||
| 504 | .hot_cache | ||
| 505 | .get_maintainer_events(maintainer_pubkey, identifier); | ||
| 506 | |||
| 507 | // Track metrics (using states metrics) | ||
| 508 | if let Some(ref metrics) = self.metrics { | ||
| 509 | if removed > 0 { | ||
| 510 | metrics.record_states_invalidation(removed); | ||
| 511 | } | ||
| 512 | if events.is_empty() { | ||
| 513 | metrics.record_states_hot_cache_miss(); | ||
| 514 | } else { | ||
| 515 | for _ in &events { | ||
| 516 | metrics.record_states_hot_cache_hit(); | ||
| 517 | } | ||
| 518 | } | ||
| 519 | } | ||
| 520 | |||
| 521 | // Update size metrics (using states metrics) | ||
| 522 | self.update_states_metrics(); | ||
| 523 | |||
| 524 | (removed, events) | ||
| 525 | } | ||
| 526 | |||
| 527 | /// Clean up expired entries from both tiers (for announcements) | ||
| 446 | /// | 528 | /// |
| 447 | /// Returns tuple of (hot cache expired, cold index expired) | 529 | /// Returns tuple of (hot cache expired, cold index expired) |
| 448 | pub fn cleanup_expired(&self) -> (usize, usize) { | 530 | pub fn cleanup_expired(&self) -> (usize, usize) { |
| @@ -465,6 +547,29 @@ impl RejectedEventsIndex { | |||
| 465 | (hot_expired, cold_expired) | 547 | (hot_expired, cold_expired) |
| 466 | } | 548 | } |
| 467 | 549 | ||
| 550 | /// Clean up expired entries from both tiers (for states) | ||
| 551 | /// | ||
| 552 | /// Returns tuple of (hot cache expired, cold index expired) | ||
| 553 | pub fn cleanup_states_expired(&self) -> (usize, usize) { | ||
| 554 | let hot_expired = self.hot_cache.cleanup_expired(); | ||
| 555 | let cold_expired = self.cold_index.cleanup_expired(); | ||
| 556 | |||
| 557 | // Track metrics (using states metrics) | ||
| 558 | if let Some(ref metrics) = self.metrics { | ||
| 559 | if hot_expired > 0 { | ||
| 560 | metrics.record_states_hot_cache_expired(hot_expired); | ||
| 561 | } | ||
| 562 | if cold_expired > 0 { | ||
| 563 | metrics.record_states_cold_index_expired(cold_expired); | ||
| 564 | } | ||
| 565 | } | ||
| 566 | |||
| 567 | // Update size metrics (using states metrics) | ||
| 568 | self.update_states_metrics(); | ||
| 569 | |||
| 570 | (hot_expired, cold_expired) | ||
| 571 | } | ||
| 572 | |||
| 468 | /// Get current number of entries in hot cache | 573 | /// Get current number of entries in hot cache |
| 469 | pub fn hot_cache_len(&self) -> usize { | 574 | pub fn hot_cache_len(&self) -> usize { |
| 470 | self.hot_cache.len() | 575 | self.hot_cache.len() |