upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 16:27:38 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 16:27:38 +0000
commit895359aeb6746b98ff82944e4fca503f4a6e5439 (patch)
treeea01bf45433282a365748dd3ba102879946d2426 /src/sync
parent83d29a446d96f87e5c947faf49fb33f18db4fc17 (diff)
feat(sync): add cleanup loops and metrics for rejected events index
Add automatic cleanup and Prometheus metrics for the two-tier rejected events index that caches rejected announcements for re-processing. Cleanup loops: - Hot cache: Every 60 seconds (events expire after 2 minutes) - Cold index: Every 24 hours (metadata expires after 7 days) - Background task with graceful shutdown support New Prometheus metrics (7): - Gauges: hot_cache_current, cold_index_current - Counters: hits, misses, hot_expired, cold_expired, invalidated This completes the maintainer announcement re-processing feature, reducing wait time from 24 hours to <1 second when a maintainer's announcement arrives before the repository owner's announcement. Memory is bounded through automatic cleanup, and comprehensive metrics enable monitoring of hit rates, memory usage, and cleanup effectiveness. Changes: - src/sync/metrics.rs: Added 7 metrics with recording methods - src/sync/rejected_index.rs: Added optional metrics support - src/sync/mod.rs: Added cleanup background task Tests: 248 library tests passing, 3 integration tests passing
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/metrics.rs124
-rw-r--r--src/sync/mod.rs86
-rw-r--r--src/sync/rejected_index.rs80
3 files changed, 284 insertions, 6 deletions
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index 7907d8e..a175210 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -40,6 +40,22 @@ pub struct SyncMetrics {
40 relays_connected_total: IntGauge, 40 relays_connected_total: IntGauge,
41 /// Relays marked as dead 41 /// Relays marked as dead
42 relays_dead_total: IntGauge, 42 relays_dead_total: IntGauge,
43
44 // === Rejected Announcements Index Metrics ===
45 /// Current number of entries in hot cache
46 rejected_announcements_hot_cache_current: IntGauge,
47 /// Total hot cache hits (events re-processed from cache)
48 rejected_announcements_hot_cache_hits_total: IntCounter,
49 /// Total hot cache misses (events not in cache)
50 rejected_announcements_hot_cache_misses_total: IntCounter,
51 /// Total expired entries removed from hot cache
52 rejected_announcements_hot_cache_expired_total: IntCounter,
53 /// Current number of entries in cold index
54 rejected_announcements_cold_index_current: IntGauge,
55 /// Total cold index entries expired and removed
56 rejected_announcements_cold_index_expired_total: IntCounter,
57 /// Total invalidations (maintainer announcements invalidated)
58 rejected_announcements_invalidated_total: IntCounter,
43} 59}
44 60
45impl SyncMetrics { 61impl SyncMetrics {
@@ -113,6 +129,49 @@ impl SyncMetrics {
113 ))?; 129 ))?;
114 registry.register(Box::new(relays_dead_total.clone()))?; 130 registry.register(Box::new(relays_dead_total.clone()))?;
115 131
132 // Rejected announcements metrics
133 let rejected_announcements_hot_cache_current = IntGauge::with_opts(Opts::new(
134 "ngit_sync_rejected_announcements_hot_cache_current",
135 "Current number of entries in hot cache (full events, 2 min expiry)",
136 ))?;
137 registry.register(Box::new(rejected_announcements_hot_cache_current.clone()))?;
138
139 let rejected_announcements_hot_cache_hits_total = IntCounter::with_opts(Opts::new(
140 "ngit_sync_rejected_announcements_hot_cache_hits_total",
141 "Total hot cache hits (events re-processed from cache)",
142 ))?;
143 registry.register(Box::new(rejected_announcements_hot_cache_hits_total.clone()))?;
144
145 let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new(
146 "ngit_sync_rejected_announcements_hot_cache_misses_total",
147 "Total hot cache misses (events not in cache when invalidated)",
148 ))?;
149 registry.register(Box::new(rejected_announcements_hot_cache_misses_total.clone()))?;
150
151 let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new(
152 "ngit_sync_rejected_announcements_hot_cache_expired_total",
153 "Total expired entries removed from hot cache",
154 ))?;
155 registry.register(Box::new(rejected_announcements_hot_cache_expired_total.clone()))?;
156
157 let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new(
158 "ngit_sync_rejected_announcements_cold_index_current",
159 "Current number of entries in cold index (metadata only, 7 day expiry)",
160 ))?;
161 registry.register(Box::new(rejected_announcements_cold_index_current.clone()))?;
162
163 let rejected_announcements_cold_index_expired_total = IntCounter::with_opts(Opts::new(
164 "ngit_sync_rejected_announcements_cold_index_expired_total",
165 "Total expired entries removed from cold index",
166 ))?;
167 registry.register(Box::new(rejected_announcements_cold_index_expired_total.clone()))?;
168
169 let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new(
170 "ngit_sync_rejected_announcements_invalidated_total",
171 "Total invalidations (maintainer announcements invalidated when owner accepted)",
172 ))?;
173 registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?;
174
116 Ok(Self { 175 Ok(Self {
117 relay_connected, 176 relay_connected,
118 connection_attempts_total, 177 connection_attempts_total,
@@ -122,6 +181,13 @@ impl SyncMetrics {
122 relays_tracked_total, 181 relays_tracked_total,
123 relays_connected_total, 182 relays_connected_total,
124 relays_dead_total, 183 relays_dead_total,
184 rejected_announcements_hot_cache_current,
185 rejected_announcements_hot_cache_hits_total,
186 rejected_announcements_hot_cache_misses_total,
187 rejected_announcements_hot_cache_expired_total,
188 rejected_announcements_cold_index_current,
189 rejected_announcements_cold_index_expired_total,
190 rejected_announcements_invalidated_total,
125 }) 191 })
126 } 192 }
127 193
@@ -293,6 +359,43 @@ impl SyncMetrics {
293 pub fn get_dead_count(&self) -> i64 { 359 pub fn get_dead_count(&self) -> i64 {
294 self.relays_dead_total.get() 360 self.relays_dead_total.get()
295 } 361 }
362
363 // === Rejected Announcements Recording Methods ===
364
365 /// Update hot cache current size gauge.
366 pub fn update_hot_cache_size(&self, size: usize) {
367 self.rejected_announcements_hot_cache_current.set(size as i64);
368 }
369
370 /// Record hot cache hit (event re-processed from cache).
371 pub fn record_hot_cache_hit(&self) {
372 self.rejected_announcements_hot_cache_hits_total.inc();
373 }
374
375 /// Record hot cache miss (event not in cache when invalidated).
376 pub fn record_hot_cache_miss(&self) {
377 self.rejected_announcements_hot_cache_misses_total.inc();
378 }
379
380 /// Record hot cache expired entries.
381 pub fn record_hot_cache_expired(&self, count: usize) {
382 self.rejected_announcements_hot_cache_expired_total.inc_by(count as u64);
383 }
384
385 /// Update cold index current size gauge.
386 pub fn update_cold_index_size(&self, size: usize) {
387 self.rejected_announcements_cold_index_current.set(size as i64);
388 }
389
390 /// Record cold index expired entries.
391 pub fn record_cold_index_expired(&self, count: usize) {
392 self.rejected_announcements_cold_index_expired_total.inc_by(count as u64);
393 }
394
395 /// Record invalidation (maintainer announcement invalidated).
396 pub fn record_invalidation(&self, count: usize) {
397 self.rejected_announcements_invalidated_total.inc_by(count as u64);
398 }
296} 399}
297 400
298#[cfg(test)] 401#[cfg(test)]
@@ -395,4 +498,25 @@ mod tests {
395 let metrics2 = SyncMetrics::register(&registry); 498 let metrics2 = SyncMetrics::register(&registry);
396 assert!(metrics2.is_err()); 499 assert!(metrics2.is_err());
397 } 500 }
501
502 #[test]
503 fn test_rejected_announcements_metrics() {
504 let registry = create_test_registry();
505 let metrics = SyncMetrics::register(&registry).unwrap();
506
507 // Test hot cache metrics
508 metrics.update_hot_cache_size(10);
509 metrics.record_hot_cache_hit();
510 metrics.record_hot_cache_hit();
511 metrics.record_hot_cache_miss();
512 metrics.record_hot_cache_expired(5);
513
514 // Test cold index metrics
515 metrics.update_cold_index_size(100);
516 metrics.record_cold_index_expired(10);
517
518 // Test invalidation metrics
519 metrics.record_invalidation(3);
520 metrics.record_invalidation(2);
521 }
398} 522}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 35a8afb..f296c0f 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -367,6 +367,66 @@ async fn run_daily_timer(
367/// Run the combined health and metrics checker 367/// Run the combined health and metrics checker
368/// 368///
369/// This function runs in a loop with a 2-second interval, performing three tasks: 369/// This function runs in a loop with a 2-second interval, performing three tasks:
370/// Background task for cleaning up expired entries from the rejected events index
371///
372/// This task runs two cleanup operations at different intervals:
373/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache
374/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index
375///
376/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly).
377/// The cold index cleanup runs daily since metadata is small and expires slowly.
378async fn run_rejected_index_cleanup(
379 sync_manager: Arc<Mutex<SyncManager>>,
380 mut shutdown_rx: broadcast::Receiver<()>,
381) {
382 let hot_cache_interval = Duration::from_secs(60);
383 let cold_index_interval = Duration::from_secs(86400); // 24 hours
384
385 tracing::info!(
386 "Rejected index cleanup started (hot cache: 60s, cold index: daily)"
387 );
388
389 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval);
390 let mut cold_index_timer = tokio::time::interval(cold_index_interval);
391
392 // Tick immediately to set the initial delay
393 hot_cache_timer.tick().await;
394 cold_index_timer.tick().await;
395
396 loop {
397 tokio::select! {
398 _ = hot_cache_timer.tick() => {
399 let manager = sync_manager.lock().await;
400 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
401 if hot_expired > 0 {
402 tracing::debug!(
403 "Cleaned up {} expired entries from rejected announcements hot cache",
404 hot_expired
405 );
406 }
407 }
408 _ = cold_index_timer.tick() => {
409 let manager = sync_manager.lock().await;
410 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
411 if cold_expired > 0 {
412 tracing::info!(
413 "Cleaned up {} expired entries from rejected announcements cold index",
414 cold_expired
415 );
416 }
417 }
418 _ = shutdown_rx.recv() => {
419 tracing::info!("Rejected index cleanup received shutdown signal");
420 break;
421 }
422 }
423 }
424}
425
426/// Background task for checking relay health and updating metrics
427///
428/// This task runs every 2 seconds and performs three operations:
429///
370/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones 430/// 1. **Disconnect checking**: Check for empty relays and disconnect non-bootstrap ones
371/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired 431/// 2. **Rate limit recovery**: Check for relays whose rate limit cooldown has expired
372/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker 432/// 3. **Metrics update**: Update Prometheus metrics with current health states from health_tracker
@@ -499,10 +559,18 @@ impl SyncManager {
499 repo_sync_index: Arc::new(RwLock::new(HashMap::new())), 559 repo_sync_index: Arc::new(RwLock::new(HashMap::new())),
500 relay_sync_index: Arc::new(RwLock::new(HashMap::new())), 560 relay_sync_index: Arc::new(RwLock::new(HashMap::new())),
501 pending_sync_index: Arc::new(RwLock::new(HashMap::new())), 561 pending_sync_index: Arc::new(RwLock::new(HashMap::new())),
502 rejected_events_index: Arc::new(RejectedEventsIndex::new( 562 rejected_events_index: Arc::new(if let Some(ref metrics) = sync_metrics {
503 Duration::from_secs(config.rejected_hot_cache_duration_secs), 563 RejectedEventsIndex::with_metrics(
504 Duration::from_secs(config.rejected_cold_index_expiry_secs), 564 Duration::from_secs(config.rejected_hot_cache_duration_secs),
505 )), 565 Duration::from_secs(config.rejected_cold_index_expiry_secs),
566 metrics.clone(),
567 )
568 } else {
569 RejectedEventsIndex::new(
570 Duration::from_secs(config.rejected_hot_cache_duration_secs),
571 Duration::from_secs(config.rejected_cold_index_expiry_secs),
572 )
573 }),
506 connections: HashMap::new(), 574 connections: HashMap::new(),
507 health_tracker: Arc::new(RelayHealthTracker::new(config)), 575 health_tracker: Arc::new(RelayHealthTracker::new(config)),
508 next_batch_id: 0, 576 next_batch_id: 0,
@@ -1154,7 +1222,15 @@ impl SyncManager {
1154 run_health_and_metrics_checker(checker_manager, checker_shutdown).await; 1222 run_health_and_metrics_checker(checker_manager, checker_shutdown).await;
1155 }); 1223 });
1156 1224
1157 // 10. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications 1225 // 10. Spawn rejected events index cleanup task
1226 // Hot cache cleanup every 60s, cold index cleanup daily
1227 let cleanup_manager = Arc::clone(&sync_manager);
1228 let cleanup_shutdown = shutdown_tx.subscribe();
1229 tokio::spawn(async move {
1230 run_rejected_index_cleanup(cleanup_manager, cleanup_shutdown).await;
1231 });
1232
1233 // 11. Main loop - handle actions from self-subscriber, disconnect, EOSE, and connect notifications
1158 loop { 1234 loop {
1159 // Wait for an event without holding the lock 1235 // Wait for an event without holding the lock
1160 tokio::select! { 1236 tokio::select! {
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index 80d6b5b..4733d80 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -299,10 +299,22 @@ impl ColdIndex {
299/// 299///
300/// Combines hot cache (full events, short duration) with cold index 300/// Combines hot cache (full events, short duration) with cold index
301/// (metadata only, long duration) for efficient re-processing and deduplication. 301/// (metadata only, long duration) for efficient re-processing and deduplication.
302#[derive(Debug, Clone)] 302#[derive(Clone)]
303pub struct RejectedEventsIndex { 303pub struct RejectedEventsIndex {
304 hot_cache: HotCache, 304 hot_cache: HotCache,
305 cold_index: ColdIndex, 305 cold_index: ColdIndex,
306 metrics: Option<super::metrics::SyncMetrics>,
307}
308
309// Manual Debug impl to avoid requiring Debug on SyncMetrics
310impl std::fmt::Debug for RejectedEventsIndex {
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 f.debug_struct("RejectedEventsIndex")
313 .field("hot_cache", &self.hot_cache)
314 .field("cold_index", &self.cold_index)
315 .field("metrics", &self.metrics.is_some())
316 .finish()
317 }
306} 318}
307 319
308impl RejectedEventsIndex { 320impl RejectedEventsIndex {
@@ -316,6 +328,38 @@ impl RejectedEventsIndex {
316 Self { 328 Self {
317 hot_cache: HotCache::new(hot_cache_duration), 329 hot_cache: HotCache::new(hot_cache_duration),
318 cold_index: ColdIndex::new(cold_index_duration), 330 cold_index: ColdIndex::new(cold_index_duration),
331 metrics: None,
332 }
333 }
334
335 /// Create new rejected events index with metrics
336 ///
337 /// # Arguments
338 ///
339 /// * `hot_cache_duration` - How long to keep full events in hot cache (default: 2 minutes)
340 /// * `cold_index_duration` - How long to keep metadata in cold index (default: 7 days)
341 /// * `metrics` - Prometheus metrics for tracking index operations
342 pub fn with_metrics(
343 hot_cache_duration: Duration,
344 cold_index_duration: Duration,
345 metrics: super::metrics::SyncMetrics,
346 ) -> Self {
347 let index = Self {
348 hot_cache: HotCache::new(hot_cache_duration),
349 cold_index: ColdIndex::new(cold_index_duration),
350 metrics: Some(metrics),
351 };
352
353 // Initialize metrics with current sizes
354 index.update_metrics();
355 index
356 }
357
358 /// Update metrics with current sizes
359 fn update_metrics(&self) {
360 if let Some(ref metrics) = self.metrics {
361 metrics.update_hot_cache_size(self.hot_cache.len());
362 metrics.update_cold_index_size(self.cold_index.len());
319 } 363 }
320 } 364 }
321 365
@@ -344,6 +388,9 @@ impl RejectedEventsIndex {
344 388
345 // Add to cold index (metadata only) 389 // Add to cold index (metadata only)
346 self.cold_index.add(event.id, pubkey, identifier, reason); 390 self.cold_index.add(event.id, pubkey, identifier, reason);
391
392 // Update metrics
393 self.update_metrics();
347 } 394 }
348 395
349 /// Check if event is already rejected (in either tier) 396 /// Check if event is already rejected (in either tier)
@@ -375,6 +422,23 @@ impl RejectedEventsIndex {
375 .hot_cache 422 .hot_cache
376 .get_maintainer_events(maintainer_pubkey, identifier); 423 .get_maintainer_events(maintainer_pubkey, identifier);
377 424
425 // Track metrics
426 if let Some(ref metrics) = self.metrics {
427 if removed > 0 {
428 metrics.record_invalidation(removed);
429 }
430 if events.is_empty() {
431 metrics.record_hot_cache_miss();
432 } else {
433 for _ in &events {
434 metrics.record_hot_cache_hit();
435 }
436 }
437 }
438
439 // Update size metrics
440 self.update_metrics();
441
378 (removed, events) 442 (removed, events)
379 } 443 }
380 444
@@ -384,6 +448,20 @@ impl RejectedEventsIndex {
384 pub fn cleanup_expired(&self) -> (usize, usize) { 448 pub fn cleanup_expired(&self) -> (usize, usize) {
385 let hot_expired = self.hot_cache.cleanup_expired(); 449 let hot_expired = self.hot_cache.cleanup_expired();
386 let cold_expired = self.cold_index.cleanup_expired(); 450 let cold_expired = self.cold_index.cleanup_expired();
451
452 // Track metrics
453 if let Some(ref metrics) = self.metrics {
454 if hot_expired > 0 {
455 metrics.record_hot_cache_expired(hot_expired);
456 }
457 if cold_expired > 0 {
458 metrics.record_cold_index_expired(cold_expired);
459 }
460 }
461
462 // Update size metrics
463 self.update_metrics();
464
387 (hot_expired, cold_expired) 465 (hot_expired, cold_expired)
388 } 466 }
389 467