diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 20:51:49 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 20:51:49 +0000 |
| commit | 9bd58faad6be254f0221820fa5e8516b8b15e19d (patch) | |
| tree | b78870832e25f544f796bc80013eb3fea4016646 /src | |
| parent | b28a356cb41077ccee12a9c52f4ef2054e76cac6 (diff) | |
refactor(sync): add EventType enum and unify rejected index methods
Add EventType enum (Announcement, State) to distinguish event types within
RejectedEventsIndex. This consolidates the two-tier index design into a
single unified interface.
Changes:
- Add EventType enum with Announcement and State variants
- Add event_type field to HotCacheEntry and ColdIndexEntry
- Create unified invalidate_and_get() with optional event_type filter
- Update cleanup_expired_for_type() to handle both types
- Remove deprecated wrapper methods (invalidate_and_get_events,
invalidate_and_get_state_events, cleanup_expired, cleanup_states_expired)
Consolidates phases 2, 3, and 7 of rejected events index refactoring.
Diffstat (limited to 'src')
| -rw-r--r-- | src/sync/rejected_index.rs | 382 |
1 files changed, 259 insertions, 123 deletions
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 403792a..4d31901 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -55,7 +55,7 @@ | |||
| 55 | //! # Usage | 55 | //! # Usage |
| 56 | //! | 56 | //! |
| 57 | //! ```rust,ignore | 57 | //! ```rust,ignore |
| 58 | //! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; | 58 | //! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason, EventType}; |
| 59 | //! use nostr_sdk::{Event, PublicKey}; | 59 | //! use nostr_sdk::{Event, PublicKey}; |
| 60 | //! use std::time::Duration; | 60 | //! use std::time::Duration; |
| 61 | //! | 61 | //! |
| @@ -73,9 +73,10 @@ | |||
| 73 | //! ); | 73 | //! ); |
| 74 | //! | 74 | //! |
| 75 | //! // Later, when owner announcement accepted... | 75 | //! // Later, when owner announcement accepted... |
| 76 | //! let (removed, hot_events) = index.invalidate_and_get_events( | 76 | //! let (removed, hot_events) = index.invalidate_and_get( |
| 77 | //! &maintainer_pubkey, | 77 | //! &maintainer_pubkey, |
| 78 | //! "my-repo", | 78 | //! "my-repo", |
| 79 | //! Some(EventType::Announcement), | ||
| 79 | //! ); | 80 | //! ); |
| 80 | //! | 81 | //! |
| 81 | //! // Re-process events from hot cache immediately | 82 | //! // Re-process events from hot cache immediately |
| @@ -89,6 +90,24 @@ use std::collections::{HashMap, HashSet}; | |||
| 89 | use std::sync::{Arc, RwLock}; | 90 | use std::sync::{Arc, RwLock}; |
| 90 | use std::time::{Duration, Instant}; | 91 | use std::time::{Duration, Instant}; |
| 91 | 92 | ||
| 93 | /// Type of event stored in the rejected events index | ||
| 94 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| 95 | pub enum EventType { | ||
| 96 | /// Repository announcement (kind 30617) | ||
| 97 | Announcement, | ||
| 98 | /// Repository state event (kind 30618) | ||
| 99 | State, | ||
| 100 | } | ||
| 101 | |||
| 102 | impl std::fmt::Display for EventType { | ||
| 103 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 104 | match self { | ||
| 105 | Self::Announcement => write!(f, "announcement"), | ||
| 106 | Self::State => write!(f, "state"), | ||
| 107 | } | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 92 | /// Reason why a repository announcement was rejected | 111 | /// Reason why a repository announcement was rejected |
| 93 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | 112 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| 94 | pub enum RejectionReason { | 113 | pub enum RejectionReason { |
| @@ -116,6 +135,7 @@ struct HotCacheEntry { | |||
| 116 | event: Event, | 135 | event: Event, |
| 117 | pubkey: PublicKey, | 136 | pubkey: PublicKey, |
| 118 | identifier: String, | 137 | identifier: String, |
| 138 | event_type: EventType, | ||
| 119 | #[allow(dead_code)] // Used for metrics/debugging in future | 139 | #[allow(dead_code)] // Used for metrics/debugging in future |
| 120 | reason: RejectionReason, | 140 | reason: RejectionReason, |
| 121 | cached_at: Instant, | 141 | cached_at: Instant, |
| @@ -128,6 +148,7 @@ struct HotCacheEntry { | |||
| 128 | struct ColdIndexEntry { | 148 | struct ColdIndexEntry { |
| 129 | pubkey: PublicKey, | 149 | pubkey: PublicKey, |
| 130 | identifier: String, | 150 | identifier: String, |
| 151 | event_type: EventType, | ||
| 131 | #[allow(dead_code)] // Used for metrics/debugging in future | 152 | #[allow(dead_code)] // Used for metrics/debugging in future |
| 132 | reason: RejectionReason, | 153 | reason: RejectionReason, |
| 133 | rejected_at: Instant, | 154 | rejected_at: Instant, |
| @@ -155,11 +176,19 @@ impl HotCache { | |||
| 155 | } | 176 | } |
| 156 | 177 | ||
| 157 | /// Add event to hot cache | 178 | /// Add event to hot cache |
| 158 | fn add(&self, event: Event, pubkey: PublicKey, identifier: String, reason: RejectionReason) { | 179 | fn add( |
| 180 | &self, | ||
| 181 | event: Event, | ||
| 182 | pubkey: PublicKey, | ||
| 183 | identifier: String, | ||
| 184 | event_type: EventType, | ||
| 185 | reason: RejectionReason, | ||
| 186 | ) { | ||
| 159 | let entry = HotCacheEntry { | 187 | let entry = HotCacheEntry { |
| 160 | event, | 188 | event, |
| 161 | pubkey, | 189 | pubkey, |
| 162 | identifier, | 190 | identifier, |
| 191 | event_type, | ||
| 163 | reason, | 192 | reason, |
| 164 | cached_at: Instant::now(), | 193 | cached_at: Instant::now(), |
| 165 | }; | 194 | }; |
| @@ -168,7 +197,15 @@ impl HotCache { | |||
| 168 | } | 197 | } |
| 169 | 198 | ||
| 170 | /// Get events for a specific maintainer/identifier from hot cache | 199 | /// Get events for a specific maintainer/identifier from hot cache |
| 171 | fn get_maintainer_events(&self, pubkey: &PublicKey, identifier: &str) -> Vec<Event> { | 200 | /// |
| 201 | /// If `event_type` is `Some`, only returns events of that type. | ||
| 202 | /// If `event_type` is `None`, returns all event types. | ||
| 203 | fn get_maintainer_events( | ||
| 204 | &self, | ||
| 205 | pubkey: &PublicKey, | ||
| 206 | identifier: &str, | ||
| 207 | event_type: Option<EventType>, | ||
| 208 | ) -> Vec<Event> { | ||
| 172 | let entries = self.entries.read().unwrap(); | 209 | let entries = self.entries.read().unwrap(); |
| 173 | let now = Instant::now(); | 210 | let now = Instant::now(); |
| 174 | 211 | ||
| @@ -176,8 +213,10 @@ impl HotCache { | |||
| 176 | .values() | 213 | .values() |
| 177 | .filter(|entry| { | 214 | .filter(|entry| { |
| 178 | // Check if entry matches and hasn't expired | 215 | // Check if entry matches and hasn't expired |
| 216 | let matches_type = event_type.is_none_or(|et| entry.event_type == et); | ||
| 179 | entry.pubkey == *pubkey | 217 | entry.pubkey == *pubkey |
| 180 | && entry.identifier == identifier | 218 | && entry.identifier == identifier |
| 219 | && matches_type | ||
| 181 | && now.duration_since(entry.cached_at) < self.expiry_duration | 220 | && now.duration_since(entry.cached_at) < self.expiry_duration |
| 182 | }) | 221 | }) |
| 183 | .map(|entry| entry.event.clone()) | 222 | .map(|entry| entry.event.clone()) |
| @@ -233,11 +272,13 @@ impl ColdIndex { | |||
| 233 | event_id: EventId, | 272 | event_id: EventId, |
| 234 | pubkey: PublicKey, | 273 | pubkey: PublicKey, |
| 235 | identifier: String, | 274 | identifier: String, |
| 275 | event_type: EventType, | ||
| 236 | reason: RejectionReason, | 276 | reason: RejectionReason, |
| 237 | ) { | 277 | ) { |
| 238 | let entry = ColdIndexEntry { | 278 | let entry = ColdIndexEntry { |
| 239 | pubkey, | 279 | pubkey, |
| 240 | identifier, | 280 | identifier, |
| 281 | event_type, | ||
| 241 | reason, | 282 | reason, |
| 242 | rejected_at: Instant::now(), | 283 | rejected_at: Instant::now(), |
| 243 | }; | 284 | }; |
| @@ -256,21 +297,26 @@ impl ColdIndex { | |||
| 256 | } | 297 | } |
| 257 | } | 298 | } |
| 258 | 299 | ||
| 259 | /// Invalidate (remove) maintainer announcements from cold index | 300 | /// Invalidate (remove) entries from cold index |
| 260 | /// | 301 | /// |
| 261 | /// Called when an owner announcement is accepted that lists this maintainer. | 302 | /// Called when an owner announcement is accepted that lists this maintainer. |
| 262 | /// Removes the cold index entries so they can be re-fetched on next sync. | 303 | /// Removes the cold index entries so they can be re-fetched on next sync. |
| 304 | /// | ||
| 305 | /// If `event_type` is `Some`, only removes entries of that type. | ||
| 306 | /// If `event_type` is `None`, removes all event types matching pubkey/identifier. | ||
| 263 | fn invalidate_maintainer_announcements( | 307 | fn invalidate_maintainer_announcements( |
| 264 | &self, | 308 | &self, |
| 265 | maintainer_pubkey: &PublicKey, | 309 | maintainer_pubkey: &PublicKey, |
| 266 | identifier: &str, | 310 | identifier: &str, |
| 311 | event_type: Option<EventType>, | ||
| 267 | ) -> usize { | 312 | ) -> usize { |
| 268 | let mut entries = self.entries.write().unwrap(); | 313 | let mut entries = self.entries.write().unwrap(); |
| 269 | let initial_count = entries.len(); | 314 | let initial_count = entries.len(); |
| 270 | 315 | ||
| 271 | entries.retain(|_, entry| { | 316 | entries.retain(|_, entry| { |
| 272 | // Keep entries that DON'T match the maintainer/identifier | 317 | // Keep entries that DON'T match the maintainer/identifier/type |
| 273 | !(entry.pubkey == *maintainer_pubkey && entry.identifier == identifier) | 318 | let matches_type = event_type.is_none_or(|et| entry.event_type == et); |
| 319 | !(entry.pubkey == *maintainer_pubkey && entry.identifier == identifier && matches_type) | ||
| 274 | }); | 320 | }); |
| 275 | 321 | ||
| 276 | initial_count - entries.len() | 322 | initial_count - entries.len() |
| @@ -348,24 +394,21 @@ impl RejectedEventsIndex { | |||
| 348 | metrics: Some(metrics), | 394 | metrics: Some(metrics), |
| 349 | }; | 395 | }; |
| 350 | 396 | ||
| 351 | // Initialize metrics with current sizes | 397 | // Initialize metrics with current sizes for both event types |
| 352 | index.update_metrics(); | 398 | index.update_metrics_for_type("announcement"); |
| 399 | index.update_metrics_for_type("state"); | ||
| 353 | index | 400 | index |
| 354 | } | 401 | } |
| 355 | 402 | ||
| 356 | /// Update metrics with current sizes (for announcements) | 403 | /// Update metrics with current sizes for a specific event type |
| 357 | fn update_metrics(&self) { | 404 | /// |
| 358 | if let Some(ref metrics) = self.metrics { | 405 | /// # Arguments |
| 359 | metrics.update_hot_cache_size(self.hot_cache.len()); | 406 | /// |
| 360 | metrics.update_cold_index_size(self.cold_index.len()); | 407 | /// * `event_type` - The event type label ("announcement" or "state") |
| 361 | } | 408 | fn update_metrics_for_type(&self, event_type: &str) { |
| 362 | } | ||
| 363 | |||
| 364 | /// Update metrics with current sizes (for states) | ||
| 365 | fn update_states_metrics(&self) { | ||
| 366 | if let Some(ref metrics) = self.metrics { | 409 | if let Some(ref metrics) = self.metrics { |
| 367 | metrics.update_states_hot_cache_size(self.hot_cache.len()); | 410 | metrics.update_rejected_hot_cache_size(event_type, self.hot_cache.len()); |
| 368 | metrics.update_states_cold_index_size(self.cold_index.len()); | 411 | metrics.update_rejected_cold_index_size(event_type, self.cold_index.len()); |
| 369 | } | 412 | } |
| 370 | } | 413 | } |
| 371 | 414 | ||
| @@ -385,14 +428,25 @@ impl RejectedEventsIndex { | |||
| 385 | reason: RejectionReason, | 428 | reason: RejectionReason, |
| 386 | ) { | 429 | ) { |
| 387 | // Add to hot cache (full event) | 430 | // Add to hot cache (full event) |
| 388 | self.hot_cache | 431 | self.hot_cache.add( |
| 389 | .add(event.clone(), pubkey, identifier.clone(), reason); | 432 | event.clone(), |
| 433 | pubkey, | ||
| 434 | identifier.clone(), | ||
| 435 | EventType::Announcement, | ||
| 436 | reason, | ||
| 437 | ); | ||
| 390 | 438 | ||
| 391 | // Add to cold index (metadata only) | 439 | // Add to cold index (metadata only) |
| 392 | self.cold_index.add(event.id, pubkey, identifier, reason); | 440 | self.cold_index.add( |
| 441 | event.id, | ||
| 442 | pubkey, | ||
| 443 | identifier, | ||
| 444 | EventType::Announcement, | ||
| 445 | reason, | ||
| 446 | ); | ||
| 393 | 447 | ||
| 394 | // Update metrics | 448 | // Update metrics |
| 395 | self.update_metrics(); | 449 | self.update_metrics_for_type("announcement"); |
| 396 | } | 450 | } |
| 397 | 451 | ||
| 398 | /// Add rejected state event to both tiers | 452 | /// Add rejected state event to both tiers |
| @@ -411,14 +465,20 @@ impl RejectedEventsIndex { | |||
| 411 | reason: RejectionReason, | 465 | reason: RejectionReason, |
| 412 | ) { | 466 | ) { |
| 413 | // Add to hot cache (full event) | 467 | // Add to hot cache (full event) |
| 414 | self.hot_cache | 468 | self.hot_cache.add( |
| 415 | .add(event.clone(), pubkey, identifier.clone(), reason); | 469 | event.clone(), |
| 470 | pubkey, | ||
| 471 | identifier.clone(), | ||
| 472 | EventType::State, | ||
| 473 | reason, | ||
| 474 | ); | ||
| 416 | 475 | ||
| 417 | // Add to cold index (metadata only) | 476 | // Add to cold index (metadata only) |
| 418 | self.cold_index.add(event.id, pubkey, identifier, reason); | 477 | self.cold_index |
| 478 | .add(event.id, pubkey, identifier, EventType::State, reason); | ||
| 419 | 479 | ||
| 420 | // Update metrics (using states metrics) | 480 | // Update metrics |
| 421 | self.update_states_metrics(); | 481 | self.update_metrics_for_type("state"); |
| 422 | } | 482 | } |
| 423 | 483 | ||
| 424 | /// Check if event is already rejected (in either tier) | 484 | /// Check if event is already rejected (in either tier) |
| @@ -426,136 +486,92 @@ impl RejectedEventsIndex { | |||
| 426 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) | 486 | self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) |
| 427 | } | 487 | } |
| 428 | 488 | ||
| 429 | /// Invalidate maintainer announcements and get events for immediate re-processing | 489 | /// Invalidate events and get them for immediate re-processing (unified method) |
| 490 | /// | ||
| 491 | /// This is called when a dependency is satisfied (e.g., owner announcement accepted, | ||
| 492 | /// or announcement accepted for state events). It removes the cold index entries | ||
| 493 | /// (so they can be re-fetched on next sync) and returns any events still in the | ||
| 494 | /// hot cache for immediate re-processing. | ||
| 495 | /// | ||
| 496 | /// # Arguments | ||
| 430 | /// | 497 | /// |
| 431 | /// This is called when an owner announcement is accepted that lists a maintainer. | 498 | /// * `pubkey` - Public key to match (maintainer for announcements, author for states) |
| 432 | /// It removes the cold index entries (so they can be re-fetched on next sync) and | 499 | /// * `identifier` - Repository identifier (d tag) |
| 433 | /// returns any events still in the hot cache for immediate re-processing. | 500 | /// * `event_type` - If `Some`, filter to that event type; if `None`, return all types |
| 434 | /// | 501 | /// |
| 435 | /// # Returns | 502 | /// # Returns |
| 436 | /// | 503 | /// |
| 437 | /// Tuple of (number of cold index entries removed, events from hot cache) | 504 | /// Tuple of (number of cold index entries removed, events from hot cache) |
| 438 | pub fn invalidate_and_get_events( | 505 | pub fn invalidate_and_get( |
| 439 | &self, | 506 | &self, |
| 440 | maintainer_pubkey: &PublicKey, | 507 | pubkey: &PublicKey, |
| 441 | identifier: &str, | 508 | identifier: &str, |
| 509 | event_type: Option<EventType>, | ||
| 442 | ) -> (usize, Vec<Event>) { | 510 | ) -> (usize, Vec<Event>) { |
| 443 | // Remove from cold index (prevents re-fetch) | 511 | // Remove from cold index |
| 444 | let removed = self | 512 | let removed = self |
| 445 | .cold_index | 513 | .cold_index |
| 446 | .invalidate_maintainer_announcements(maintainer_pubkey, identifier); | 514 | .invalidate_maintainer_announcements(pubkey, identifier, event_type); |
| 447 | 515 | ||
| 448 | // Get from hot cache (for immediate re-processing) | 516 | // Get from hot cache (for immediate re-processing) |
| 449 | let events = self | 517 | let events = self |
| 450 | .hot_cache | 518 | .hot_cache |
| 451 | .get_maintainer_events(maintainer_pubkey, identifier); | 519 | .get_maintainer_events(pubkey, identifier, event_type); |
| 452 | 520 | ||
| 453 | // Track metrics | 521 | // Track metrics based on event type |
| 454 | if let Some(ref metrics) = self.metrics { | 522 | if let Some(ref metrics) = self.metrics { |
| 523 | let type_label = match event_type { | ||
| 524 | Some(EventType::State) => "state", | ||
| 525 | Some(EventType::Announcement) | None => "announcement", | ||
| 526 | }; | ||
| 527 | |||
| 455 | if removed > 0 { | 528 | if removed > 0 { |
| 456 | metrics.record_invalidation(removed); | 529 | metrics.record_rejected_invalidation(type_label, removed); |
| 457 | } | 530 | } |
| 458 | if events.is_empty() { | 531 | if events.is_empty() { |
| 459 | metrics.record_hot_cache_miss(); | 532 | metrics.record_rejected_hot_cache_miss(type_label); |
| 460 | } else { | 533 | } else { |
| 461 | for _ in &events { | 534 | for _ in &events { |
| 462 | metrics.record_hot_cache_hit(); | 535 | metrics.record_rejected_hot_cache_hit(type_label); |
| 463 | } | 536 | } |
| 464 | } | 537 | } |
| 465 | } | 538 | } |
| 466 | 539 | ||
| 467 | // Update size metrics | 540 | // Update size metrics based on event type |
| 468 | self.update_metrics(); | 541 | let type_label = match event_type { |
| 542 | Some(EventType::State) => "state", | ||
| 543 | Some(EventType::Announcement) | None => "announcement", | ||
| 544 | }; | ||
| 545 | self.update_metrics_for_type(type_label); | ||
| 469 | 546 | ||
| 470 | (removed, events) | 547 | (removed, events) |
| 471 | } | 548 | } |
| 472 | 549 | ||
| 473 | /// Invalidate state events and get events for immediate re-processing | 550 | /// Clean up expired entries from both tiers |
| 474 | /// | 551 | /// |
| 475 | /// This is called when an announcement is accepted that authorizes state events. | 552 | /// # Arguments |
| 476 | /// It removes the cold index entries (so they can be re-fetched on next sync) and | ||
| 477 | /// returns any events still in the hot cache for immediate re-processing. | ||
| 478 | /// | 553 | /// |
| 479 | /// # Returns | 554 | /// * `event_type` - The event type label for metrics ("announcement" or "state") |
| 480 | /// | 555 | /// |
| 481 | /// Tuple of (number of cold index entries removed, events from hot cache) | 556 | /// # Returns |
| 482 | pub fn invalidate_and_get_state_events( | ||
| 483 | &self, | ||
| 484 | maintainer_pubkey: &PublicKey, | ||
| 485 | identifier: &str, | ||
| 486 | ) -> (usize, Vec<Event>) { | ||
| 487 | // Remove from cold index (prevents re-fetch) | ||
| 488 | let removed = self | ||
| 489 | .cold_index | ||
| 490 | .invalidate_maintainer_announcements(maintainer_pubkey, identifier); | ||
| 491 | |||
| 492 | // Get from hot cache (for immediate re-processing) | ||
| 493 | let events = self | ||
| 494 | .hot_cache | ||
| 495 | .get_maintainer_events(maintainer_pubkey, identifier); | ||
| 496 | |||
| 497 | // Track metrics (using states metrics) | ||
| 498 | if let Some(ref metrics) = self.metrics { | ||
| 499 | if removed > 0 { | ||
| 500 | metrics.record_states_invalidation(removed); | ||
| 501 | } | ||
| 502 | if events.is_empty() { | ||
| 503 | metrics.record_states_hot_cache_miss(); | ||
| 504 | } else { | ||
| 505 | for _ in &events { | ||
| 506 | metrics.record_states_hot_cache_hit(); | ||
| 507 | } | ||
| 508 | } | ||
| 509 | } | ||
| 510 | |||
| 511 | // Update size metrics (using states metrics) | ||
| 512 | self.update_states_metrics(); | ||
| 513 | |||
| 514 | (removed, events) | ||
| 515 | } | ||
| 516 | |||
| 517 | /// Clean up expired entries from both tiers (for announcements) | ||
| 518 | /// | 557 | /// |
| 519 | /// Returns tuple of (hot cache expired, cold index expired) | 558 | /// Tuple of (hot cache expired, cold index expired) |
| 520 | pub fn cleanup_expired(&self) -> (usize, usize) { | 559 | pub fn cleanup_expired_for_type(&self, event_type: &str) -> (usize, usize) { |
| 521 | let hot_expired = self.hot_cache.cleanup_expired(); | 560 | let hot_expired = self.hot_cache.cleanup_expired(); |
| 522 | let cold_expired = self.cold_index.cleanup_expired(); | 561 | let cold_expired = self.cold_index.cleanup_expired(); |
| 523 | 562 | ||
| 524 | // Track metrics | 563 | // Track metrics |
| 525 | if let Some(ref metrics) = self.metrics { | 564 | if let Some(ref metrics) = self.metrics { |
| 526 | if hot_expired > 0 { | 565 | if hot_expired > 0 { |
| 527 | metrics.record_hot_cache_expired(hot_expired); | 566 | metrics.record_rejected_hot_cache_expired(event_type, hot_expired); |
| 528 | } | 567 | } |
| 529 | if cold_expired > 0 { | 568 | if cold_expired > 0 { |
| 530 | metrics.record_cold_index_expired(cold_expired); | 569 | metrics.record_rejected_cold_index_expired(event_type, cold_expired); |
| 531 | } | 570 | } |
| 532 | } | 571 | } |
| 533 | 572 | ||
| 534 | // Update size metrics | 573 | // Update size metrics |
| 535 | self.update_metrics(); | 574 | self.update_metrics_for_type(event_type); |
| 536 | |||
| 537 | (hot_expired, cold_expired) | ||
| 538 | } | ||
| 539 | |||
| 540 | /// Clean up expired entries from both tiers (for states) | ||
| 541 | /// | ||
| 542 | /// Returns tuple of (hot cache expired, cold index expired) | ||
| 543 | pub fn cleanup_states_expired(&self) -> (usize, usize) { | ||
| 544 | let hot_expired = self.hot_cache.cleanup_expired(); | ||
| 545 | let cold_expired = self.cold_index.cleanup_expired(); | ||
| 546 | |||
| 547 | // Track metrics (using states metrics) | ||
| 548 | if let Some(ref metrics) = self.metrics { | ||
| 549 | if hot_expired > 0 { | ||
| 550 | metrics.record_states_hot_cache_expired(hot_expired); | ||
| 551 | } | ||
| 552 | if cold_expired > 0 { | ||
| 553 | metrics.record_states_cold_index_expired(cold_expired); | ||
| 554 | } | ||
| 555 | } | ||
| 556 | |||
| 557 | // Update size metrics (using states metrics) | ||
| 558 | self.update_states_metrics(); | ||
| 559 | 575 | ||
| 560 | (hot_expired, cold_expired) | 576 | (hot_expired, cold_expired) |
| 561 | } | 577 | } |
| @@ -611,12 +627,13 @@ mod tests { | |||
| 611 | event.clone(), | 627 | event.clone(), |
| 612 | pubkey, | 628 | pubkey, |
| 613 | identifier.clone(), | 629 | identifier.clone(), |
| 630 | EventType::Announcement, | ||
| 614 | RejectionReason::DoesNotListService, | 631 | RejectionReason::DoesNotListService, |
| 615 | ); | 632 | ); |
| 616 | 633 | ||
| 617 | assert!(cache.contains(&event.id)); | 634 | assert!(cache.contains(&event.id)); |
| 618 | 635 | ||
| 619 | let retrieved = cache.get_maintainer_events(&pubkey, &identifier); | 636 | let retrieved = cache.get_maintainer_events(&pubkey, &identifier, None); |
| 620 | assert_eq!(retrieved.len(), 1); | 637 | assert_eq!(retrieved.len(), 1); |
| 621 | assert_eq!(retrieved[0].id, event.id); | 638 | assert_eq!(retrieved[0].id, event.id); |
| 622 | } | 639 | } |
| @@ -630,6 +647,7 @@ mod tests { | |||
| 630 | event.clone(), | 647 | event.clone(), |
| 631 | event.pubkey, | 648 | event.pubkey, |
| 632 | "test-repo".to_string(), | 649 | "test-repo".to_string(), |
| 650 | EventType::Announcement, | ||
| 633 | RejectionReason::DoesNotListService, | 651 | RejectionReason::DoesNotListService, |
| 634 | ); | 652 | ); |
| 635 | 653 | ||
| @@ -652,6 +670,7 @@ mod tests { | |||
| 652 | event.id, | 670 | event.id, |
| 653 | event.pubkey, | 671 | event.pubkey, |
| 654 | "test-repo".to_string(), | 672 | "test-repo".to_string(), |
| 673 | EventType::Announcement, | ||
| 655 | RejectionReason::DoesNotListService, | 674 | RejectionReason::DoesNotListService, |
| 656 | ); | 675 | ); |
| 657 | 676 | ||
| @@ -670,12 +689,17 @@ mod tests { | |||
| 670 | event.id, | 689 | event.id, |
| 671 | pubkey, | 690 | pubkey, |
| 672 | identifier.clone(), | 691 | identifier.clone(), |
| 692 | EventType::Announcement, | ||
| 673 | RejectionReason::MaintainerNotYetValid, | 693 | RejectionReason::MaintainerNotYetValid, |
| 674 | ); | 694 | ); |
| 675 | 695 | ||
| 676 | assert!(index.contains(&event.id)); | 696 | assert!(index.contains(&event.id)); |
| 677 | 697 | ||
| 678 | let removed = index.invalidate_maintainer_announcements(&pubkey, &identifier); | 698 | let removed = index.invalidate_maintainer_announcements( |
| 699 | &pubkey, | ||
| 700 | &identifier, | ||
| 701 | Some(EventType::Announcement), | ||
| 702 | ); | ||
| 679 | assert_eq!(removed, 1); | 703 | assert_eq!(removed, 1); |
| 680 | assert!(!index.contains(&event.id)); | 704 | assert!(!index.contains(&event.id)); |
| 681 | } | 705 | } |
| @@ -698,7 +722,7 @@ mod tests { | |||
| 698 | } | 722 | } |
| 699 | 723 | ||
| 700 | #[tokio::test] | 724 | #[tokio::test] |
| 701 | async fn test_invalidate_and_get_events() { | 725 | async fn test_invalidate_and_get_announcements() { |
| 702 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | 726 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); |
| 703 | let event = create_test_event().await; | 727 | let event = create_test_event().await; |
| 704 | let pubkey = event.pubkey; | 728 | let pubkey = event.pubkey; |
| @@ -711,7 +735,8 @@ mod tests { | |||
| 711 | RejectionReason::MaintainerNotYetValid, | 735 | RejectionReason::MaintainerNotYetValid, |
| 712 | ); | 736 | ); |
| 713 | 737 | ||
| 714 | let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); | 738 | let (removed, hot_events) = |
| 739 | index.invalidate_and_get(&pubkey, &identifier, Some(EventType::Announcement)); | ||
| 715 | 740 | ||
| 716 | assert_eq!(removed, 1); // Removed from cold index | 741 | assert_eq!(removed, 1); // Removed from cold index |
| 717 | assert_eq!(hot_events.len(), 1); // Retrieved from hot cache | 742 | assert_eq!(hot_events.len(), 1); // Retrieved from hot cache |
| @@ -740,14 +765,14 @@ mod tests { | |||
| 740 | // Wait for hot cache to expire | 765 | // Wait for hot cache to expire |
| 741 | std::thread::sleep(Duration::from_millis(60)); | 766 | std::thread::sleep(Duration::from_millis(60)); |
| 742 | 767 | ||
| 743 | let (hot_expired, cold_expired) = index.cleanup_expired(); | 768 | let (hot_expired, cold_expired) = index.cleanup_expired_for_type("announcement"); |
| 744 | assert_eq!(hot_expired, 1); | 769 | assert_eq!(hot_expired, 1); |
| 745 | assert_eq!(cold_expired, 0); // Not expired yet | 770 | assert_eq!(cold_expired, 0); // Not expired yet |
| 746 | 771 | ||
| 747 | // Wait for cold index to expire | 772 | // Wait for cold index to expire |
| 748 | std::thread::sleep(Duration::from_millis(50)); | 773 | std::thread::sleep(Duration::from_millis(50)); |
| 749 | 774 | ||
| 750 | let (hot_expired, cold_expired) = index.cleanup_expired(); | 775 | let (hot_expired, cold_expired) = index.cleanup_expired_for_type("announcement"); |
| 751 | assert_eq!(hot_expired, 0); // Already cleaned up | 776 | assert_eq!(hot_expired, 0); // Already cleaned up |
| 752 | assert_eq!(cold_expired, 1); | 777 | assert_eq!(cold_expired, 1); |
| 753 | } | 778 | } |
| @@ -770,7 +795,8 @@ mod tests { | |||
| 770 | // Wait for hot cache to expire | 795 | // Wait for hot cache to expire |
| 771 | std::thread::sleep(Duration::from_millis(60)); | 796 | std::thread::sleep(Duration::from_millis(60)); |
| 772 | 797 | ||
| 773 | let (removed, hot_events) = index.invalidate_and_get_events(&pubkey, &identifier); | 798 | let (removed, hot_events) = |
| 799 | index.invalidate_and_get(&pubkey, &identifier, Some(EventType::Announcement)); | ||
| 774 | 800 | ||
| 775 | assert_eq!(removed, 1); // Removed from cold index | 801 | assert_eq!(removed, 1); // Removed from cold index |
| 776 | assert_eq!(hot_events.len(), 0); // Hot cache expired - miss! | 802 | assert_eq!(hot_events.len(), 0); // Hot cache expired - miss! |
| @@ -810,7 +836,8 @@ mod tests { | |||
| 810 | assert_eq!(index.cold_index_len(), 2); | 836 | assert_eq!(index.cold_index_len(), 2); |
| 811 | 837 | ||
| 812 | // Invalidate only first maintainer | 838 | // Invalidate only first maintainer |
| 813 | let (removed, hot_events) = index.invalidate_and_get_events(&event1.pubkey, "repo1"); | 839 | let (removed, hot_events) = |
| 840 | index.invalidate_and_get(&event1.pubkey, "repo1", Some(EventType::Announcement)); | ||
| 814 | 841 | ||
| 815 | assert_eq!(removed, 1); | 842 | assert_eq!(removed, 1); |
| 816 | assert_eq!(hot_events.len(), 1); | 843 | assert_eq!(hot_events.len(), 1); |
| @@ -820,4 +847,113 @@ mod tests { | |||
| 820 | assert_eq!(index.cold_index_len(), 1); | 847 | assert_eq!(index.cold_index_len(), 1); |
| 821 | assert!(index.contains(&event2.id)); | 848 | assert!(index.contains(&event2.id)); |
| 822 | } | 849 | } |
| 850 | |||
| 851 | #[tokio::test] | ||
| 852 | async fn test_invalidate_and_get_unified_with_event_type_filter() { | ||
| 853 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 854 | let keys = Keys::generate(); | ||
| 855 | |||
| 856 | // Create an announcement event | ||
| 857 | let unsigned_ann = | ||
| 858 | nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); | ||
| 859 | let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); | ||
| 860 | |||
| 861 | // Create a state event | ||
| 862 | let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); | ||
| 863 | let event_state = keys.sign_event(unsigned_state).await.unwrap(); | ||
| 864 | |||
| 865 | let pubkey = event_ann.pubkey; | ||
| 866 | let identifier = "test-repo".to_string(); | ||
| 867 | |||
| 868 | // Add announcement and state for same pubkey/identifier | ||
| 869 | index.add_announcement( | ||
| 870 | event_ann.clone(), | ||
| 871 | pubkey, | ||
| 872 | identifier.clone(), | ||
| 873 | RejectionReason::MaintainerNotYetValid, | ||
| 874 | ); | ||
| 875 | |||
| 876 | index.add_state( | ||
| 877 | event_state.clone(), | ||
| 878 | pubkey, | ||
| 879 | identifier.clone(), | ||
| 880 | RejectionReason::Other, | ||
| 881 | ); | ||
| 882 | |||
| 883 | assert_eq!(index.hot_cache_len(), 2); | ||
| 884 | assert_eq!(index.cold_index_len(), 2); | ||
| 885 | |||
| 886 | // Invalidate only announcements | ||
| 887 | let (removed, hot_events) = | ||
| 888 | index.invalidate_and_get(&pubkey, &identifier, Some(EventType::Announcement)); | ||
| 889 | |||
| 890 | assert_eq!(removed, 1); // Only announcement removed from cold index | ||
| 891 | assert_eq!(hot_events.len(), 1); | ||
| 892 | assert_eq!(hot_events[0].id, event_ann.id); | ||
| 893 | |||
| 894 | // State is still in cold index | ||
| 895 | assert_eq!(index.cold_index_len(), 1); | ||
| 896 | assert!(index.contains(&event_state.id)); | ||
| 897 | |||
| 898 | // Now invalidate states | ||
| 899 | let (removed, hot_events) = | ||
| 900 | index.invalidate_and_get(&pubkey, &identifier, Some(EventType::State)); | ||
| 901 | |||
| 902 | assert_eq!(removed, 1); | ||
| 903 | assert_eq!(hot_events.len(), 1); | ||
| 904 | assert_eq!(hot_events[0].id, event_state.id); | ||
| 905 | |||
| 906 | // Cold index now empty | ||
| 907 | assert_eq!(index.cold_index_len(), 0); | ||
| 908 | } | ||
| 909 | |||
| 910 | #[tokio::test] | ||
| 911 | async fn test_invalidate_and_get_unified_without_filter() { | ||
| 912 | let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800)); | ||
| 913 | let keys = Keys::generate(); | ||
| 914 | |||
| 915 | // Create an announcement event | ||
| 916 | let unsigned_ann = | ||
| 917 | nostr_sdk::EventBuilder::text_note("announcement").build(keys.public_key()); | ||
| 918 | let event_ann = keys.sign_event(unsigned_ann).await.unwrap(); | ||
| 919 | |||
| 920 | // Create a state event | ||
| 921 | let unsigned_state = nostr_sdk::EventBuilder::text_note("state").build(keys.public_key()); | ||
| 922 | let event_state = keys.sign_event(unsigned_state).await.unwrap(); | ||
| 923 | |||
| 924 | let pubkey = event_ann.pubkey; | ||
| 925 | let identifier = "test-repo".to_string(); | ||
| 926 | |||
| 927 | // Add announcement and state for same pubkey/identifier | ||
| 928 | index.add_announcement( | ||
| 929 | event_ann.clone(), | ||
| 930 | pubkey, | ||
| 931 | identifier.clone(), | ||
| 932 | RejectionReason::MaintainerNotYetValid, | ||
| 933 | ); | ||
| 934 | |||
| 935 | index.add_state( | ||
| 936 | event_state.clone(), | ||
| 937 | pubkey, | ||
| 938 | identifier.clone(), | ||
| 939 | RejectionReason::Other, | ||
| 940 | ); | ||
| 941 | |||
| 942 | assert_eq!(index.hot_cache_len(), 2); | ||
| 943 | assert_eq!(index.cold_index_len(), 2); | ||
| 944 | |||
| 945 | // Invalidate all types (None filter) | ||
| 946 | let (removed, hot_events) = index.invalidate_and_get(&pubkey, &identifier, None); | ||
| 947 | |||
| 948 | assert_eq!(removed, 2); // Both removed from cold index | ||
| 949 | assert_eq!(hot_events.len(), 2); // Both returned from hot cache | ||
| 950 | |||
| 951 | // Both should be in the results | ||
| 952 | let event_ids: Vec<_> = hot_events.iter().map(|e| e.id).collect(); | ||
| 953 | assert!(event_ids.contains(&event_ann.id)); | ||
| 954 | assert!(event_ids.contains(&event_state.id)); | ||
| 955 | |||
| 956 | // Cold index now empty | ||
| 957 | assert_eq!(index.cold_index_len(), 0); | ||
| 958 | } | ||
| 823 | } | 959 | } |