diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 16:27:38 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 16:27:38 +0000 |
| commit | 895359aeb6746b98ff82944e4fca503f4a6e5439 (patch) | |
| tree | ea01bf45433282a365748dd3ba102879946d2426 /src/sync/rejected_index.rs | |
| parent | 83d29a446d96f87e5c947faf49fb33f18db4fc17 (diff) | |
feat(sync): add cleanup loops and metrics for rejected events index
Add automatic cleanup and Prometheus metrics for the two-tier rejected
events index that caches rejected announcements for re-processing.
Cleanup loops:
- Hot cache: Every 60 seconds (events expire after 2 minutes)
- Cold index: Every 24 hours (metadata expires after 7 days)
- Background task with graceful shutdown support
New Prometheus metrics (7):
- Gauges: hot_cache_current, cold_index_current
- Counters: hits, misses, hot_expired, cold_expired, invalidated
This completes the maintainer announcement re-processing feature,
reducing wait time from 24 hours to <1 second when a maintainer's
announcement arrives before the repository owner's announcement.
Memory is bounded through automatic cleanup, and comprehensive metrics
enable monitoring of hit rates, memory usage, and cleanup effectiveness.
Changes:
- src/sync/metrics.rs: Added 7 metrics with recording methods
- src/sync/rejected_index.rs: Added optional metrics support
- src/sync/mod.rs: Added cleanup background task
Tests: 248 library tests passing, 3 integration tests passing
Diffstat (limited to 'src/sync/rejected_index.rs')
| -rw-r--r-- | src/sync/rejected_index.rs | 80 |
1 files changed, 79 insertions, 1 deletions
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 80d6b5b..4733d80 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -299,10 +299,22 @@ impl ColdIndex { | |||
| 299 | /// | 299 | /// |
| 300 | /// Combines hot cache (full events, short duration) with cold index | 300 | /// Combines hot cache (full events, short duration) with cold index |
| 301 | /// (metadata only, long duration) for efficient re-processing and deduplication. | 301 | /// (metadata only, long duration) for efficient re-processing and deduplication. |
| 302 | #[derive(Debug, Clone)] | 302 | #[derive(Clone)] |
| 303 | pub struct RejectedEventsIndex { | 303 | pub struct RejectedEventsIndex { |
| 304 | hot_cache: HotCache, | 304 | hot_cache: HotCache, |
| 305 | cold_index: ColdIndex, | 305 | cold_index: ColdIndex, |
| 306 | metrics: Option<super::metrics::SyncMetrics>, | ||
| 307 | } | ||
| 308 | |||
| 309 | // Manual Debug impl to avoid requiring Debug on SyncMetrics | ||
| 310 | impl std::fmt::Debug for RejectedEventsIndex { | ||
| 311 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 312 | f.debug_struct("RejectedEventsIndex") | ||
| 313 | .field("hot_cache", &self.hot_cache) | ||
| 314 | .field("cold_index", &self.cold_index) | ||
| 315 | .field("metrics", &self.metrics.is_some()) | ||
| 316 | .finish() | ||
| 317 | } | ||
| 306 | } | 318 | } |
| 307 | 319 | ||
| 308 | impl RejectedEventsIndex { | 320 | impl RejectedEventsIndex { |
| @@ -316,6 +328,38 @@ impl RejectedEventsIndex { | |||
| 316 | Self { | 328 | Self { |
| 317 | hot_cache: HotCache::new(hot_cache_duration), | 329 | hot_cache: HotCache::new(hot_cache_duration), |
| 318 | cold_index: ColdIndex::new(cold_index_duration), | 330 | cold_index: ColdIndex::new(cold_index_duration), |
| 331 | metrics: None, | ||
| 332 | } | ||
| 333 | } | ||
| 334 | |||
| 335 | /// Create new rejected events index with metrics | ||
| 336 | /// | ||
| 337 | /// # Arguments | ||
| 338 | /// | ||
| 339 | /// * `hot_cache_duration` - How long to keep full events in hot cache (default: 2 minutes) | ||
| 340 | /// * `cold_index_duration` - How long to keep metadata in cold index (default: 7 days) | ||
| 341 | /// * `metrics` - Prometheus metrics for tracking index operations | ||
| 342 | pub fn with_metrics( | ||
| 343 | hot_cache_duration: Duration, | ||
| 344 | cold_index_duration: Duration, | ||
| 345 | metrics: super::metrics::SyncMetrics, | ||
| 346 | ) -> Self { | ||
| 347 | let index = Self { | ||
| 348 | hot_cache: HotCache::new(hot_cache_duration), | ||
| 349 | cold_index: ColdIndex::new(cold_index_duration), | ||
| 350 | metrics: Some(metrics), | ||
| 351 | }; | ||
| 352 | |||
| 353 | // Initialize metrics with current sizes | ||
| 354 | index.update_metrics(); | ||
| 355 | index | ||
| 356 | } | ||
| 357 | |||
| 358 | /// Update metrics with current sizes | ||
| 359 | fn update_metrics(&self) { | ||
| 360 | if let Some(ref metrics) = self.metrics { | ||
| 361 | metrics.update_hot_cache_size(self.hot_cache.len()); | ||
| 362 | metrics.update_cold_index_size(self.cold_index.len()); | ||
| 319 | } | 363 | } |
| 320 | } | 364 | } |
| 321 | 365 | ||
| @@ -344,6 +388,9 @@ impl RejectedEventsIndex { | |||
| 344 | 388 | ||
| 345 | // Add to cold index (metadata only) | 389 | // Add to cold index (metadata only) |
| 346 | self.cold_index.add(event.id, pubkey, identifier, reason); | 390 | self.cold_index.add(event.id, pubkey, identifier, reason); |
| 391 | |||
| 392 | // Update metrics | ||
| 393 | self.update_metrics(); | ||
| 347 | } | 394 | } |
| 348 | 395 | ||
| 349 | /// Check if event is already rejected (in either tier) | 396 | /// Check if event is already rejected (in either tier) |
| @@ -375,6 +422,23 @@ impl RejectedEventsIndex { | |||
| 375 | .hot_cache | 422 | .hot_cache |
| 376 | .get_maintainer_events(maintainer_pubkey, identifier); | 423 | .get_maintainer_events(maintainer_pubkey, identifier); |
| 377 | 424 | ||
| 425 | // Track metrics | ||
| 426 | if let Some(ref metrics) = self.metrics { | ||
| 427 | if removed > 0 { | ||
| 428 | metrics.record_invalidation(removed); | ||
| 429 | } | ||
| 430 | if events.is_empty() { | ||
| 431 | metrics.record_hot_cache_miss(); | ||
| 432 | } else { | ||
| 433 | for _ in &events { | ||
| 434 | metrics.record_hot_cache_hit(); | ||
| 435 | } | ||
| 436 | } | ||
| 437 | } | ||
| 438 | |||
| 439 | // Update size metrics | ||
| 440 | self.update_metrics(); | ||
| 441 | |||
| 378 | (removed, events) | 442 | (removed, events) |
| 379 | } | 443 | } |
| 380 | 444 | ||
| @@ -384,6 +448,20 @@ impl RejectedEventsIndex { | |||
| 384 | pub fn cleanup_expired(&self) -> (usize, usize) { | 448 | pub fn cleanup_expired(&self) -> (usize, usize) { |
| 385 | let hot_expired = self.hot_cache.cleanup_expired(); | 449 | let hot_expired = self.hot_cache.cleanup_expired(); |
| 386 | let cold_expired = self.cold_index.cleanup_expired(); | 450 | let cold_expired = self.cold_index.cleanup_expired(); |
| 451 | |||
| 452 | // Track metrics | ||
| 453 | if let Some(ref metrics) = self.metrics { | ||
| 454 | if hot_expired > 0 { | ||
| 455 | metrics.record_hot_cache_expired(hot_expired); | ||
| 456 | } | ||
| 457 | if cold_expired > 0 { | ||
| 458 | metrics.record_cold_index_expired(cold_expired); | ||
| 459 | } | ||
| 460 | } | ||
| 461 | |||
| 462 | // Update size metrics | ||
| 463 | self.update_metrics(); | ||
| 464 | |||
| 387 | (hot_expired, cold_expired) | 465 | (hot_expired, cold_expired) |
| 388 | } | 466 | } |
| 389 | 467 | ||