upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 20:52:07 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 20:52:07 +0000
commita68e23733e78d33ca1d48b83414a8db63ca3d5fd (patch)
tree6193aa978c361581dc0f64c6f65e4ef8343a2ff7 /src
parent506829156784e87fd482b0b102540ea4a3c9f777 (diff)
refactor(sync): consolidate to single rejected index with helper extraction
Remove rejected_states_index and use single rejected_events_index for both announcement and state events. Extract duplicate re-processing logic into a consolidated helper function. Changes: - Eliminate duplicate RepositoryAnnouncement::from_event() call - Remove rejected_states_index field from SyncManager - Update cleanup loop to process both event types via single index - Add ReprocessingStats struct to track re-processing outcomes - Add reprocess_events_from_hot_cache() helper that handles: - Logging re-processing attempts with context - Calling process_event_static recursively - Tracking saved/duplicate/purgatory/rejected counts - Replace three nearly-identical re-processing loops with helper calls Consolidates phases 1, 5, and 6 of rejected events index refactoring.
Diffstat (limited to 'src')
-rw-r--r--src/sync/mod.rs389
1 files changed, 181 insertions, 208 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 07527c7..8b5e1c3 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds};
27pub use metrics::SyncMetrics; 27pub use metrics::SyncMetrics;
28 28
29// Re-export rejected index types 29// Re-export rejected index types
30pub use rejected_index::RejectionReason; 30pub use rejected_index::{EventType, RejectionReason};
31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used 31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used
32// Current code still uses the simple HashSet type alias below 32// Current code still uses the simple HashSet type alias below
33 33
@@ -211,6 +211,19 @@ pub enum ProcessResult {
211 Rejected, 211 Rejected,
212} 212}
213 213
214/// Statistics from re-processing events from hot cache
215#[derive(Debug, Clone, Default)]
216pub struct ReprocessingStats {
217 /// Number of events successfully saved
218 pub saved: usize,
219 /// Number of events that were duplicates
220 pub duplicate: usize,
221 /// Number of events added to purgatory
222 pub purgatory: usize,
223 /// Number of events still rejected
224 pub rejected: usize,
225}
226
214/// Pagination state for a subscription in non-Negentropy historic sync 227/// Pagination state for a subscription in non-Negentropy historic sync
215#[derive(Debug, Clone)] 228#[derive(Debug, Clone)]
216pub struct PaginationState { 229pub struct PaginationState {
@@ -366,16 +379,14 @@ async fn run_daily_timer(
366 379
367// Combined Health and Metrics Checker 380// Combined Health and Metrics Checker
368 381
369/// Run the combined health and metrics checker 382/// Background task for cleaning up expired entries from the rejected events index
370///
371/// This function runs in a loop with a 2-second interval, performing three tasks:
372/// Background task for cleaning up expired entries from the rejected events indexes
373/// 383///
374/// This task runs two cleanup operations at different intervals: 384/// This task runs two cleanup operations at different intervals:
375/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache 385/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache
376/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index 386/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index
377/// 387///
378/// Cleans up both the announcements index and the states index. 388/// A single `RejectedEventsIndex` handles both announcement and state events,
389/// differentiated by `EventType`. Each cleanup pass processes both types.
379/// 390///
380/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). 391/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly).
381/// The cold index cleanup runs daily since metadata is small and expires slowly. 392/// The cold index cleanup runs daily since metadata is small and expires slowly.
@@ -400,42 +411,31 @@ async fn run_rejected_index_cleanup(
400 _ = hot_cache_timer.tick() => { 411 _ = hot_cache_timer.tick() => {
401 let manager = sync_manager.lock().await; 412 let manager = sync_manager.lock().await;
402 413
403 // Clean up announcements index 414 // Clean up hot cache for both event types (single index handles both)
404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); 415 // Note: cleanup_expired_for_type updates metrics with type label
405 if hot_expired > 0 { 416 let (ann_hot_expired, _) = manager.rejected_events_index.cleanup_expired_for_type("announcement");
406 tracing::debug!( 417 let (state_hot_expired, _) = manager.rejected_events_index.cleanup_expired_for_type("state");
407 "Cleaned up {} expired entries from rejected announcements hot cache",
408 hot_expired
409 );
410 }
411 418
412 // Clean up states index 419 if ann_hot_expired + state_hot_expired > 0 {
413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired();
414 if states_hot_expired > 0 {
415 tracing::debug!( 420 tracing::debug!(
416 "Cleaned up {} expired entries from rejected states hot cache", 421 announcements = ann_hot_expired,
417 states_hot_expired 422 states = state_hot_expired,
423 "Cleaned up expired entries from rejected events hot cache"
418 ); 424 );
419 } 425 }
420 } 426 }
421 _ = cold_index_timer.tick() => { 427 _ = cold_index_timer.tick() => {
422 let manager = sync_manager.lock().await; 428 let manager = sync_manager.lock().await;
423 429
424 // Clean up announcements index 430 // Clean up cold index for both event types (single index handles both)
425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); 431 let (_, ann_cold_expired) = manager.rejected_events_index.cleanup_expired_for_type("announcement");
426 if cold_expired > 0 { 432 let (_, state_cold_expired) = manager.rejected_events_index.cleanup_expired_for_type("state");
427 tracing::info!(
428 "Cleaned up {} expired entries from rejected announcements cold index",
429 cold_expired
430 );
431 }
432 433
433 // Clean up states index 434 if ann_cold_expired + state_cold_expired > 0 {
434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired();
435 if states_cold_expired > 0 {
436 tracing::info!( 435 tracing::info!(
437 "Cleaned up {} expired entries from rejected states cold index", 436 announcements = ann_cold_expired,
438 states_cold_expired 437 states = state_cold_expired,
438 "Cleaned up expired entries from rejected events cold index"
439 ); 439 );
440 } 440 }
441 } 441 }
@@ -529,10 +529,9 @@ pub struct SyncManager {
529 relay_sync_index: RelaySyncIndex, 529 relay_sync_index: RelaySyncIndex,
530 /// In-flight subscription batches 530 /// In-flight subscription batches
531 pending_sync_index: PendingSyncIndex, 531 pending_sync_index: PendingSyncIndex,
532 /// Rejected announcement events (30617/30618) - two-tier storage for re-processing 532 /// Rejected events (30617/30618) - two-tier storage for re-processing
533 /// Handles both announcement and state events via EventType discriminator
533 rejected_events_index: Arc<RejectedEventsIndex>, 534 rejected_events_index: Arc<RejectedEventsIndex>,
534 /// Rejected state events (30618) - two-tier storage for re-processing
535 rejected_states_index: Arc<RejectedEventsIndex>,
536 /// Active relay connections - keyed by relay URL 535 /// Active relay connections - keyed by relay URL
537 connections: HashMap<String, RelayConnection>, 536 connections: HashMap<String, RelayConnection>,
538 /// Health tracker for relay connection state 537 /// Health tracker for relay connection state
@@ -597,18 +596,6 @@ impl SyncManager {
597 Duration::from_secs(config.rejected_cold_index_expiry_secs), 596 Duration::from_secs(config.rejected_cold_index_expiry_secs),
598 ) 597 )
599 }), 598 }),
600 rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics {
601 RejectedEventsIndex::with_metrics(
602 Duration::from_secs(config.rejected_hot_cache_duration_secs),
603 Duration::from_secs(config.rejected_cold_index_expiry_secs),
604 metrics.clone(),
605 )
606 } else {
607 RejectedEventsIndex::new(
608 Duration::from_secs(config.rejected_hot_cache_duration_secs),
609 Duration::from_secs(config.rejected_cold_index_expiry_secs),
610 )
611 }),
612 connections: HashMap::new(), 599 connections: HashMap::new(),
613 health_tracker: Arc::new(RelayHealthTracker::new(config)), 600 health_tracker: Arc::new(RelayHealthTracker::new(config)),
614 next_batch_id: 0, 601 next_batch_id: 0,
@@ -2111,6 +2098,107 @@ impl SyncManager {
2111 ); 2098 );
2112 } 2099 }
2113 2100
2101 /// Re-process events from hot cache after their dependencies become available
2102 ///
2103 /// This helper consolidates the common pattern of re-processing rejected events
2104 /// when their missing dependencies (owner announcements, git data, etc.) arrive.
2105 ///
2106 /// # Arguments
2107 /// * `events` - Events to re-process from hot cache
2108 /// * `context` - Description for logging (e.g., "maintainer announcement", "state event")
2109 /// * `pubkey` - Public key for logging context
2110 /// * `identifier` - Repository identifier for logging context
2111 /// * `relay_url` - Relay URL for process_event_static
2112 /// * `database` - Shared database for event storage
2113 /// * `write_policy` - Policy for validating events
2114 /// * `local_relay` - Local relay for broadcasting events
2115 /// * `rejected_events_index` - Index for tracking rejected events
2116 ///
2117 /// # Returns
2118 /// Statistics about re-processing outcomes
2119 #[allow(clippy::too_many_arguments)]
2120 async fn reprocess_events_from_hot_cache(
2121 events: Vec<Event>,
2122 context: &str,
2123 pubkey: &PublicKey,
2124 identifier: &str,
2125 relay_url: &str,
2126 database: &SharedDatabase,
2127 write_policy: &Nip34WritePolicy,
2128 local_relay: &LocalRelay,
2129 rejected_events_index: &Arc<RejectedEventsIndex>,
2130 ) -> ReprocessingStats {
2131 let mut stats = ReprocessingStats::default();
2132
2133 for event in events {
2134 tracing::info!(
2135 event_id = %event.id,
2136 pubkey = %pubkey,
2137 identifier = %identifier,
2138 context = %context,
2139 "Re-processing {} from hot cache",
2140 context
2141 );
2142
2143 // Recursive call to process_event_static
2144 // This is safe because:
2145 // 1. Event was removed from hot cache before this call
2146 // 2. Second attempt uses new context (different code path)
2147 // 3. If second attempt fails, stays in cold index only (no third attempt)
2148 // Use Box::pin to avoid infinitely sized future
2149 let reprocess_result = Box::pin(Self::process_event_static(
2150 &event,
2151 relay_url,
2152 database,
2153 write_policy,
2154 local_relay,
2155 rejected_events_index,
2156 ))
2157 .await;
2158
2159 match reprocess_result {
2160 ProcessResult::Saved => {
2161 stats.saved += 1;
2162 tracing::info!(
2163 event_id = %event.id,
2164 pubkey = %pubkey,
2165 identifier = %identifier,
2166 "{} accepted on re-processing",
2167 context
2168 );
2169 }
2170 ProcessResult::Duplicate => {
2171 stats.duplicate += 1;
2172 tracing::debug!(
2173 event_id = %event.id,
2174 "{} already exists (duplicate)",
2175 context
2176 );
2177 }
2178 ProcessResult::Purgatory => {
2179 stats.purgatory += 1;
2180 tracing::debug!(
2181 event_id = %event.id,
2182 "{} added to purgatory (waiting for git data)",
2183 context
2184 );
2185 }
2186 ProcessResult::Rejected => {
2187 stats.rejected += 1;
2188 tracing::warn!(
2189 event_id = %event.id,
2190 pubkey = %pubkey,
2191 identifier = %identifier,
2192 "{} still rejected on re-processing",
2193 context
2194 );
2195 }
2196 }
2197 }
2198
2199 stats
2200 }
2201
2114 /// Process a single event from a relay (static version for spawned tasks) 2202 /// Process a single event from a relay (static version for spawned tasks)
2115 /// 2203 ///
2116 /// Processes events with dedup, policy check, database save, and broadcast: 2204 /// Processes events with dedup, policy check, database save, and broadcast:
@@ -2173,16 +2261,16 @@ impl SyncManager {
2173 "Synced event saved and broadcast" 2261 "Synced event saved and broadcast"
2174 ); 2262 );
2175 2263
2176 // When an owner announcement is accepted, re-process any maintainer announcements 2264 // When a repository announcement is accepted, re-process any rejected events:
2177 // that were previously rejected because the owner announcement didn't exist yet. 2265 // 1. Maintainer announcements that were rejected because the owner announcement didn't exist yet
2178 // This handles the race condition where maintainer events arrive before owner events 2266 // 2. State events that were rejected because no announcement existed
2179 // during relay synchronization. Maintainer events in the hot cache are re-processed 2267 // This handles race conditions where events arrive before their dependencies during relay sync.
2180 // immediately and should now pass validation.
2181 if event.kind == Kind::GitRepoAnnouncement { 2268 if event.kind == Kind::GitRepoAnnouncement {
2182 use crate::nostr::events::RepositoryAnnouncement; 2269 use crate::nostr::events::RepositoryAnnouncement;
2183 2270
2184 match RepositoryAnnouncement::from_event(event.clone()) { 2271 match RepositoryAnnouncement::from_event(event.clone()) {
2185 Ok(announcement) => { 2272 Ok(announcement) => {
2273 // Re-process rejected maintainer announcements
2186 if !announcement.maintainers.is_empty() { 2274 if !announcement.maintainers.is_empty() {
2187 tracing::debug!( 2275 tracing::debug!(
2188 event_id = %event.id, 2276 event_id = %event.id,
@@ -2197,9 +2285,10 @@ impl SyncManager {
2197 match PublicKey::from_hex(maintainer_hex) { 2285 match PublicKey::from_hex(maintainer_hex) {
2198 Ok(maintainer_pubkey) => { 2286 Ok(maintainer_pubkey) => {
2199 let (removed, hot_events) = rejected_events_index 2287 let (removed, hot_events) = rejected_events_index
2200 .invalidate_and_get_events( 2288 .invalidate_and_get(
2201 &maintainer_pubkey, 2289 &maintainer_pubkey,
2202 &announcement.identifier, 2290 &announcement.identifier,
2291 Some(rejected_index::EventType::Announcement),
2203 ); 2292 );
2204 2293
2205 if removed > 0 { 2294 if removed > 0 {
@@ -2213,56 +2302,19 @@ impl SyncManager {
2213 } 2302 }
2214 2303
2215 // Re-process events from hot cache immediately 2304 // Re-process events from hot cache immediately
2216 for maintainer_event in hot_events { 2305 if !hot_events.is_empty() {
2217 tracing::info!( 2306 let _stats = Self::reprocess_events_from_hot_cache(
2218 event_id = %maintainer_event.id, 2307 hot_events,
2219 maintainer = %maintainer_hex, 2308 "maintainer announcement",
2220 identifier = %announcement.identifier, 2309 &maintainer_pubkey,
2221 "Re-processing maintainer announcement from hot cache" 2310 &announcement.identifier,
2222 ); 2311 relay_url,
2223 2312 database,
2224 // Recursive call to process_event_static 2313 write_policy,
2225 // This is safe because: 2314 local_relay,
2226 // 1. Event was removed from hot cache before this call 2315 rejected_events_index,
2227 // 2. Second attempt uses maintainer exception (different code path) 2316 )
2228 // 3. If second attempt fails, stays in cold index only (no third attempt) 2317 .await;
2229 // Use Box::pin to avoid infinitely sized future
2230 let reprocess_result =
2231 Box::pin(Self::process_event_static(
2232 &maintainer_event,
2233 relay_url,
2234 database,
2235 write_policy,
2236 local_relay,
2237 rejected_events_index,
2238 ))
2239 .await;
2240
2241 match reprocess_result {
2242 ProcessResult::Saved => {
2243 tracing::info!(
2244 event_id = %maintainer_event.id,
2245 maintainer = %maintainer_hex,
2246 identifier = %announcement.identifier,
2247 "Maintainer announcement accepted on re-processing"
2248 );
2249 }
2250 ProcessResult::Duplicate => {
2251 tracing::debug!(
2252 event_id = %maintainer_event.id,
2253 "Maintainer announcement already exists (duplicate)"
2254 );
2255 }
2256 other => {
2257 tracing::warn!(
2258 event_id = %maintainer_event.id,
2259 maintainer = %maintainer_hex,
2260 identifier = %announcement.identifier,
2261 result = ?other,
2262 "Maintainer announcement still rejected on re-processing"
2263 );
2264 }
2265 }
2266 } 2318 }
2267 } 2319 }
2268 Err(e) => { 2320 Err(e) => {
@@ -2275,28 +2327,13 @@ impl SyncManager {
2275 } 2327 }
2276 } 2328 }
2277 } 2329 }
2278 }
2279 Err(e) => {
2280 tracing::warn!(
2281 event_id = %event.id,
2282 error = %e,
2283 "Failed to parse repository announcement for maintainer invalidation"
2284 );
2285 }
2286 }
2287 2330
2288 // When a repository announcement is accepted, re-process any state events 2331 // Re-process rejected state events for this announcement
2289 // that were previously rejected because no announcement existed. 2332 let (removed, hot_events) = rejected_events_index.invalidate_and_get(
2290 // This handles the race condition where state events arrive before their 2333 &event.pubkey,
2291 // announcements during relay synchronization. 2334 &announcement.identifier,
2292 match RepositoryAnnouncement::from_event(event.clone()) { 2335 Some(rejected_index::EventType::State),
2293 Ok(announcement) => { 2336 );
2294 // Get the announcement author's state events that were rejected
2295 let (removed, hot_events) = rejected_events_index
2296 .invalidate_and_get_state_events(
2297 &event.pubkey,
2298 &announcement.identifier,
2299 );
2300 2337
2301 if removed > 0 { 2338 if removed > 0 {
2302 tracing::info!( 2339 tracing::info!(
@@ -2309,62 +2346,26 @@ impl SyncManager {
2309 } 2346 }
2310 2347
2311 // Re-process state events from hot cache immediately 2348 // Re-process state events from hot cache immediately
2312 for state_event in hot_events { 2349 if !hot_events.is_empty() {
2313 tracing::info!( 2350 let _stats = Self::reprocess_events_from_hot_cache(
2314 event_id = %state_event.id, 2351 hot_events,
2315 pubkey = %event.pubkey, 2352 "state event",
2316 identifier = %announcement.identifier, 2353 &event.pubkey,
2317 "Re-processing state event from hot cache (announcement now exists)" 2354 &announcement.identifier,
2318 );
2319
2320 let reprocess_result = Box::pin(Self::process_event_static(
2321 &state_event,
2322 relay_url, 2355 relay_url,
2323 database, 2356 database,
2324 write_policy, 2357 write_policy,
2325 local_relay, 2358 local_relay,
2326 rejected_events_index, 2359 rejected_events_index,
2327 )) 2360 )
2328 .await; 2361 .await;
2329
2330 match reprocess_result {
2331 ProcessResult::Saved => {
2332 tracing::info!(
2333 event_id = %state_event.id,
2334 pubkey = %event.pubkey,
2335 identifier = %announcement.identifier,
2336 "State event accepted on re-processing (announcement now exists)"
2337 );
2338 }
2339 ProcessResult::Purgatory => {
2340 tracing::debug!(
2341 event_id = %state_event.id,
2342 "State event added to purgatory (waiting for git data)"
2343 );
2344 }
2345 ProcessResult::Duplicate => {
2346 tracing::debug!(
2347 event_id = %state_event.id,
2348 "State event already exists (duplicate)"
2349 );
2350 }
2351 other => {
2352 tracing::warn!(
2353 event_id = %state_event.id,
2354 pubkey = %event.pubkey,
2355 identifier = %announcement.identifier,
2356 result = ?other,
2357 "State event still rejected on re-processing"
2358 );
2359 }
2360 }
2361 } 2362 }
2362 } 2363 }
2363 Err(e) => { 2364 Err(e) => {
2364 tracing::warn!( 2365 tracing::warn!(
2365 event_id = %event.id, 2366 event_id = %event.id,
2366 error = %e, 2367 error = %e,
2367 "Failed to parse repository announcement for state event invalidation" 2368 "Failed to parse repository announcement for rejected event invalidation"
2368 ); 2369 );
2369 } 2370 }
2370 } 2371 }
@@ -2383,8 +2384,11 @@ impl SyncManager {
2383 .and_then(|t| t.content()) 2384 .and_then(|t| t.content())
2384 { 2385 {
2385 // Get rejected state events for this pubkey + identifier 2386 // Get rejected state events for this pubkey + identifier
2386 let (removed, hot_events) = rejected_events_index 2387 let (removed, hot_events) = rejected_events_index.invalidate_and_get(
2387 .invalidate_and_get_state_events(&event.pubkey, identifier); 2388 &event.pubkey,
2389 identifier,
2390 Some(rejected_index::EventType::State),
2391 );
2388 2392
2389 if removed > 0 { 2393 if removed > 0 {
2390 tracing::info!( 2394 tracing::info!(
@@ -2397,50 +2401,19 @@ impl SyncManager {
2397 } 2401 }
2398 2402
2399 // Re-process events from hot cache immediately 2403 // Re-process events from hot cache immediately
2400 for state_event in hot_events { 2404 if !hot_events.is_empty() {
2401 tracing::info!( 2405 let _stats = Self::reprocess_events_from_hot_cache(
2402 event_id = %state_event.id, 2406 hot_events,
2403 pubkey = %event.pubkey, 2407 "state event",
2404 identifier = %identifier, 2408 &event.pubkey,
2405 "Re-processing state event from hot cache" 2409 identifier,
2406 );
2407
2408 // Recursive call to process_event_static
2409 let reprocess_result = Box::pin(Self::process_event_static(
2410 &state_event,
2411 relay_url, 2410 relay_url,
2412 database, 2411 database,
2413 write_policy, 2412 write_policy,
2414 local_relay, 2413 local_relay,
2415 rejected_events_index, 2414 rejected_events_index,
2416 )) 2415 )
2417 .await; 2416 .await;
2418
2419 match reprocess_result {
2420 ProcessResult::Saved => {
2421 tracing::info!(
2422 event_id = %state_event.id,
2423 pubkey = %event.pubkey,
2424 identifier = %identifier,
2425 "State event accepted on re-processing"
2426 );
2427 }
2428 ProcessResult::Duplicate => {
2429 tracing::debug!(
2430 event_id = %state_event.id,
2431 "State event already exists (duplicate)"
2432 );
2433 }
2434 other => {
2435 tracing::warn!(
2436 event_id = %state_event.id,
2437 pubkey = %event.pubkey,
2438 identifier = %identifier,
2439 result = ?other,
2440 "State event still rejected on re-processing"
2441 );
2442 }
2443 }
2444 } 2417 }
2445 } 2418 }
2446 } 2419 }