diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 20:52:07 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 20:52:07 +0000 |
| commit | a68e23733e78d33ca1d48b83414a8db63ca3d5fd (patch) | |
| tree | 6193aa978c361581dc0f64c6f65e4ef8343a2ff7 | |
| parent | 506829156784e87fd482b0b102540ea4a3c9f777 (diff) | |
refactor(sync): consolidate to single rejected index with helper extraction
Remove rejected_states_index and use single rejected_events_index for both
announcement and state events. Extract duplicate re-processing logic into
a consolidated helper function.
Changes:
- Eliminate duplicate RepositoryAnnouncement::from_event() call
- Remove rejected_states_index field from SyncManager
- Update cleanup loop to process both event types via single index
- Add ReprocessingStats struct to track re-processing outcomes
- Add reprocess_events_from_hot_cache() helper that handles:
- Logging re-processing attempts with context
- Calling process_event_static recursively
- Tracking saved/duplicate/purgatory/rejected counts
- Replace three nearly-identical re-processing loops with helper calls
Consolidates phases 1, 5, and 6 of rejected events index refactoring.
| -rw-r--r-- | src/sync/mod.rs | 389 |
1 files changed, 181 insertions, 208 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 07527c7..8b5e1c3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds}; | |||
| 27 | pub use metrics::SyncMetrics; | 27 | pub use metrics::SyncMetrics; |
| 28 | 28 | ||
| 29 | // Re-export rejected index types | 29 | // Re-export rejected index types |
| 30 | pub use rejected_index::RejectionReason; | 30 | pub use rejected_index::{EventType, RejectionReason}; |
| 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used | 31 | // Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used |
| 32 | // Current code still uses the simple HashSet type alias below | 32 | // Current code still uses the simple HashSet type alias below |
| 33 | 33 | ||
| @@ -211,6 +211,19 @@ pub enum ProcessResult { | |||
| 211 | Rejected, | 211 | Rejected, |
| 212 | } | 212 | } |
| 213 | 213 | ||
| 214 | /// Statistics from re-processing events from hot cache | ||
| 215 | #[derive(Debug, Clone, Default)] | ||
| 216 | pub struct ReprocessingStats { | ||
| 217 | /// Number of events successfully saved | ||
| 218 | pub saved: usize, | ||
| 219 | /// Number of events that were duplicates | ||
| 220 | pub duplicate: usize, | ||
| 221 | /// Number of events added to purgatory | ||
| 222 | pub purgatory: usize, | ||
| 223 | /// Number of events still rejected | ||
| 224 | pub rejected: usize, | ||
| 225 | } | ||
| 226 | |||
| 214 | /// Pagination state for a subscription in non-Negentropy historic sync | 227 | /// Pagination state for a subscription in non-Negentropy historic sync |
| 215 | #[derive(Debug, Clone)] | 228 | #[derive(Debug, Clone)] |
| 216 | pub struct PaginationState { | 229 | pub struct PaginationState { |
| @@ -366,16 +379,14 @@ async fn run_daily_timer( | |||
| 366 | 379 | ||
| 367 | // Combined Health and Metrics Checker | 380 | // Combined Health and Metrics Checker |
| 368 | 381 | ||
| 369 | /// Run the combined health and metrics checker | 382 | /// Background task for cleaning up expired entries from the rejected events index |
| 370 | /// | ||
| 371 | /// This function runs in a loop with a 2-second interval, performing three tasks: | ||
| 372 | /// Background task for cleaning up expired entries from the rejected events indexes | ||
| 373 | /// | 383 | /// |
| 374 | /// This task runs two cleanup operations at different intervals: | 384 | /// This task runs two cleanup operations at different intervals: |
| 375 | /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache | 385 | /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache |
| 376 | /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index | 386 | /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index |
| 377 | /// | 387 | /// |
| 378 | /// Cleans up both the announcements index and the states index. | 388 | /// A single `RejectedEventsIndex` handles both announcement and state events, |
| 389 | /// differentiated by `EventType`. Each cleanup pass processes both types. | ||
| 379 | /// | 390 | /// |
| 380 | /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). | 391 | /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). |
| 381 | /// The cold index cleanup runs daily since metadata is small and expires slowly. | 392 | /// The cold index cleanup runs daily since metadata is small and expires slowly. |
| @@ -400,42 +411,31 @@ async fn run_rejected_index_cleanup( | |||
| 400 | _ = hot_cache_timer.tick() => { | 411 | _ = hot_cache_timer.tick() => { |
| 401 | let manager = sync_manager.lock().await; | 412 | let manager = sync_manager.lock().await; |
| 402 | 413 | ||
| 403 | // Clean up announcements index | 414 | // Clean up hot cache for both event types (single index handles both) |
| 404 | let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); | 415 | // Note: cleanup_expired_for_type updates metrics with type label |
| 405 | if hot_expired > 0 { | 416 | let (ann_hot_expired, _) = manager.rejected_events_index.cleanup_expired_for_type("announcement"); |
| 406 | tracing::debug!( | 417 | let (state_hot_expired, _) = manager.rejected_events_index.cleanup_expired_for_type("state"); |
| 407 | "Cleaned up {} expired entries from rejected announcements hot cache", | ||
| 408 | hot_expired | ||
| 409 | ); | ||
| 410 | } | ||
| 411 | 418 | ||
| 412 | // Clean up states index | 419 | if ann_hot_expired + state_hot_expired > 0 { |
| 413 | let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); | ||
| 414 | if states_hot_expired > 0 { | ||
| 415 | tracing::debug!( | 420 | tracing::debug!( |
| 416 | "Cleaned up {} expired entries from rejected states hot cache", | 421 | announcements = ann_hot_expired, |
| 417 | states_hot_expired | 422 | states = state_hot_expired, |
| 423 | "Cleaned up expired entries from rejected events hot cache" | ||
| 418 | ); | 424 | ); |
| 419 | } | 425 | } |
| 420 | } | 426 | } |
| 421 | _ = cold_index_timer.tick() => { | 427 | _ = cold_index_timer.tick() => { |
| 422 | let manager = sync_manager.lock().await; | 428 | let manager = sync_manager.lock().await; |
| 423 | 429 | ||
| 424 | // Clean up announcements index | 430 | // Clean up cold index for both event types (single index handles both) |
| 425 | let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); | 431 | let (_, ann_cold_expired) = manager.rejected_events_index.cleanup_expired_for_type("announcement"); |
| 426 | if cold_expired > 0 { | 432 | let (_, state_cold_expired) = manager.rejected_events_index.cleanup_expired_for_type("state"); |
| 427 | tracing::info!( | ||
| 428 | "Cleaned up {} expired entries from rejected announcements cold index", | ||
| 429 | cold_expired | ||
| 430 | ); | ||
| 431 | } | ||
| 432 | 433 | ||
| 433 | // Clean up states index | 434 | if ann_cold_expired + state_cold_expired > 0 { |
| 434 | let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); | ||
| 435 | if states_cold_expired > 0 { | ||
| 436 | tracing::info!( | 435 | tracing::info!( |
| 437 | "Cleaned up {} expired entries from rejected states cold index", | 436 | announcements = ann_cold_expired, |
| 438 | states_cold_expired | 437 | states = state_cold_expired, |
| 438 | "Cleaned up expired entries from rejected events cold index" | ||
| 439 | ); | 439 | ); |
| 440 | } | 440 | } |
| 441 | } | 441 | } |
| @@ -529,10 +529,9 @@ pub struct SyncManager { | |||
| 529 | relay_sync_index: RelaySyncIndex, | 529 | relay_sync_index: RelaySyncIndex, |
| 530 | /// In-flight subscription batches | 530 | /// In-flight subscription batches |
| 531 | pending_sync_index: PendingSyncIndex, | 531 | pending_sync_index: PendingSyncIndex, |
| 532 | /// Rejected announcement events (30617/30618) - two-tier storage for re-processing | 532 | /// Rejected events (30617/30618) - two-tier storage for re-processing |
| 533 | /// Handles both announcement and state events via EventType discriminator | ||
| 533 | rejected_events_index: Arc<RejectedEventsIndex>, | 534 | rejected_events_index: Arc<RejectedEventsIndex>, |
| 534 | /// Rejected state events (30618) - two-tier storage for re-processing | ||
| 535 | rejected_states_index: Arc<RejectedEventsIndex>, | ||
| 536 | /// Active relay connections - keyed by relay URL | 535 | /// Active relay connections - keyed by relay URL |
| 537 | connections: HashMap<String, RelayConnection>, | 536 | connections: HashMap<String, RelayConnection>, |
| 538 | /// Health tracker for relay connection state | 537 | /// Health tracker for relay connection state |
| @@ -597,18 +596,6 @@ impl SyncManager { | |||
| 597 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | 596 | Duration::from_secs(config.rejected_cold_index_expiry_secs), |
| 598 | ) | 597 | ) |
| 599 | }), | 598 | }), |
| 600 | rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics { | ||
| 601 | RejectedEventsIndex::with_metrics( | ||
| 602 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 603 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 604 | metrics.clone(), | ||
| 605 | ) | ||
| 606 | } else { | ||
| 607 | RejectedEventsIndex::new( | ||
| 608 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 609 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 610 | ) | ||
| 611 | }), | ||
| 612 | connections: HashMap::new(), | 599 | connections: HashMap::new(), |
| 613 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 600 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 614 | next_batch_id: 0, | 601 | next_batch_id: 0, |
| @@ -2111,6 +2098,107 @@ impl SyncManager { | |||
| 2111 | ); | 2098 | ); |
| 2112 | } | 2099 | } |
| 2113 | 2100 | ||
| 2101 | /// Re-process events from hot cache after their dependencies become available | ||
| 2102 | /// | ||
| 2103 | /// This helper consolidates the common pattern of re-processing rejected events | ||
| 2104 | /// when their missing dependencies (owner announcements, git data, etc.) arrive. | ||
| 2105 | /// | ||
| 2106 | /// # Arguments | ||
| 2107 | /// * `events` - Events to re-process from hot cache | ||
| 2108 | /// * `context` - Description for logging (e.g., "maintainer announcement", "state event") | ||
| 2109 | /// * `pubkey` - Public key for logging context | ||
| 2110 | /// * `identifier` - Repository identifier for logging context | ||
| 2111 | /// * `relay_url` - Relay URL for process_event_static | ||
| 2112 | /// * `database` - Shared database for event storage | ||
| 2113 | /// * `write_policy` - Policy for validating events | ||
| 2114 | /// * `local_relay` - Local relay for broadcasting events | ||
| 2115 | /// * `rejected_events_index` - Index for tracking rejected events | ||
| 2116 | /// | ||
| 2117 | /// # Returns | ||
| 2118 | /// Statistics about re-processing outcomes | ||
| 2119 | #[allow(clippy::too_many_arguments)] | ||
| 2120 | async fn reprocess_events_from_hot_cache( | ||
| 2121 | events: Vec<Event>, | ||
| 2122 | context: &str, | ||
| 2123 | pubkey: &PublicKey, | ||
| 2124 | identifier: &str, | ||
| 2125 | relay_url: &str, | ||
| 2126 | database: &SharedDatabase, | ||
| 2127 | write_policy: &Nip34WritePolicy, | ||
| 2128 | local_relay: &LocalRelay, | ||
| 2129 | rejected_events_index: &Arc<RejectedEventsIndex>, | ||
| 2130 | ) -> ReprocessingStats { | ||
| 2131 | let mut stats = ReprocessingStats::default(); | ||
| 2132 | |||
| 2133 | for event in events { | ||
| 2134 | tracing::info!( | ||
| 2135 | event_id = %event.id, | ||
| 2136 | pubkey = %pubkey, | ||
| 2137 | identifier = %identifier, | ||
| 2138 | context = %context, | ||
| 2139 | "Re-processing {} from hot cache", | ||
| 2140 | context | ||
| 2141 | ); | ||
| 2142 | |||
| 2143 | // Recursive call to process_event_static | ||
| 2144 | // This is safe because: | ||
| 2145 | // 1. Event was removed from hot cache before this call | ||
| 2146 | // 2. Second attempt uses new context (different code path) | ||
| 2147 | // 3. If second attempt fails, stays in cold index only (no third attempt) | ||
| 2148 | // Use Box::pin to avoid infinitely sized future | ||
| 2149 | let reprocess_result = Box::pin(Self::process_event_static( | ||
| 2150 | &event, | ||
| 2151 | relay_url, | ||
| 2152 | database, | ||
| 2153 | write_policy, | ||
| 2154 | local_relay, | ||
| 2155 | rejected_events_index, | ||
| 2156 | )) | ||
| 2157 | .await; | ||
| 2158 | |||
| 2159 | match reprocess_result { | ||
| 2160 | ProcessResult::Saved => { | ||
| 2161 | stats.saved += 1; | ||
| 2162 | tracing::info!( | ||
| 2163 | event_id = %event.id, | ||
| 2164 | pubkey = %pubkey, | ||
| 2165 | identifier = %identifier, | ||
| 2166 | "{} accepted on re-processing", | ||
| 2167 | context | ||
| 2168 | ); | ||
| 2169 | } | ||
| 2170 | ProcessResult::Duplicate => { | ||
| 2171 | stats.duplicate += 1; | ||
| 2172 | tracing::debug!( | ||
| 2173 | event_id = %event.id, | ||
| 2174 | "{} already exists (duplicate)", | ||
| 2175 | context | ||
| 2176 | ); | ||
| 2177 | } | ||
| 2178 | ProcessResult::Purgatory => { | ||
| 2179 | stats.purgatory += 1; | ||
| 2180 | tracing::debug!( | ||
| 2181 | event_id = %event.id, | ||
| 2182 | "{} added to purgatory (waiting for git data)", | ||
| 2183 | context | ||
| 2184 | ); | ||
| 2185 | } | ||
| 2186 | ProcessResult::Rejected => { | ||
| 2187 | stats.rejected += 1; | ||
| 2188 | tracing::warn!( | ||
| 2189 | event_id = %event.id, | ||
| 2190 | pubkey = %pubkey, | ||
| 2191 | identifier = %identifier, | ||
| 2192 | "{} still rejected on re-processing", | ||
| 2193 | context | ||
| 2194 | ); | ||
| 2195 | } | ||
| 2196 | } | ||
| 2197 | } | ||
| 2198 | |||
| 2199 | stats | ||
| 2200 | } | ||
| 2201 | |||
| 2114 | /// Process a single event from a relay (static version for spawned tasks) | 2202 | /// Process a single event from a relay (static version for spawned tasks) |
| 2115 | /// | 2203 | /// |
| 2116 | /// Processes events with dedup, policy check, database save, and broadcast: | 2204 | /// Processes events with dedup, policy check, database save, and broadcast: |
| @@ -2173,16 +2261,16 @@ impl SyncManager { | |||
| 2173 | "Synced event saved and broadcast" | 2261 | "Synced event saved and broadcast" |
| 2174 | ); | 2262 | ); |
| 2175 | 2263 | ||
| 2176 | // When an owner announcement is accepted, re-process any maintainer announcements | 2264 | // When a repository announcement is accepted, re-process any rejected events: |
| 2177 | // that were previously rejected because the owner announcement didn't exist yet. | 2265 | // 1. Maintainer announcements that were rejected because the owner announcement didn't exist yet |
| 2178 | // This handles the race condition where maintainer events arrive before owner events | 2266 | // 2. State events that were rejected because no announcement existed |
| 2179 | // during relay synchronization. Maintainer events in the hot cache are re-processed | 2267 | // This handles race conditions where events arrive before their dependencies during relay sync. |
| 2180 | // immediately and should now pass validation. | ||
| 2181 | if event.kind == Kind::GitRepoAnnouncement { | 2268 | if event.kind == Kind::GitRepoAnnouncement { |
| 2182 | use crate::nostr::events::RepositoryAnnouncement; | 2269 | use crate::nostr::events::RepositoryAnnouncement; |
| 2183 | 2270 | ||
| 2184 | match RepositoryAnnouncement::from_event(event.clone()) { | 2271 | match RepositoryAnnouncement::from_event(event.clone()) { |
| 2185 | Ok(announcement) => { | 2272 | Ok(announcement) => { |
| 2273 | // Re-process rejected maintainer announcements | ||
| 2186 | if !announcement.maintainers.is_empty() { | 2274 | if !announcement.maintainers.is_empty() { |
| 2187 | tracing::debug!( | 2275 | tracing::debug!( |
| 2188 | event_id = %event.id, | 2276 | event_id = %event.id, |
| @@ -2197,9 +2285,10 @@ impl SyncManager { | |||
| 2197 | match PublicKey::from_hex(maintainer_hex) { | 2285 | match PublicKey::from_hex(maintainer_hex) { |
| 2198 | Ok(maintainer_pubkey) => { | 2286 | Ok(maintainer_pubkey) => { |
| 2199 | let (removed, hot_events) = rejected_events_index | 2287 | let (removed, hot_events) = rejected_events_index |
| 2200 | .invalidate_and_get_events( | 2288 | .invalidate_and_get( |
| 2201 | &maintainer_pubkey, | 2289 | &maintainer_pubkey, |
| 2202 | &announcement.identifier, | 2290 | &announcement.identifier, |
| 2291 | Some(rejected_index::EventType::Announcement), | ||
| 2203 | ); | 2292 | ); |
| 2204 | 2293 | ||
| 2205 | if removed > 0 { | 2294 | if removed > 0 { |
| @@ -2213,56 +2302,19 @@ impl SyncManager { | |||
| 2213 | } | 2302 | } |
| 2214 | 2303 | ||
| 2215 | // Re-process events from hot cache immediately | 2304 | // Re-process events from hot cache immediately |
| 2216 | for maintainer_event in hot_events { | 2305 | if !hot_events.is_empty() { |
| 2217 | tracing::info!( | 2306 | let _stats = Self::reprocess_events_from_hot_cache( |
| 2218 | event_id = %maintainer_event.id, | 2307 | hot_events, |
| 2219 | maintainer = %maintainer_hex, | 2308 | "maintainer announcement", |
| 2220 | identifier = %announcement.identifier, | 2309 | &maintainer_pubkey, |
| 2221 | "Re-processing maintainer announcement from hot cache" | 2310 | &announcement.identifier, |
| 2222 | ); | 2311 | relay_url, |
| 2223 | 2312 | database, | |
| 2224 | // Recursive call to process_event_static | 2313 | write_policy, |
| 2225 | // This is safe because: | 2314 | local_relay, |
| 2226 | // 1. Event was removed from hot cache before this call | 2315 | rejected_events_index, |
| 2227 | // 2. Second attempt uses maintainer exception (different code path) | 2316 | ) |
| 2228 | // 3. If second attempt fails, stays in cold index only (no third attempt) | 2317 | .await; |
| 2229 | // Use Box::pin to avoid infinitely sized future | ||
| 2230 | let reprocess_result = | ||
| 2231 | Box::pin(Self::process_event_static( | ||
| 2232 | &maintainer_event, | ||
| 2233 | relay_url, | ||
| 2234 | database, | ||
| 2235 | write_policy, | ||
| 2236 | local_relay, | ||
| 2237 | rejected_events_index, | ||
| 2238 | )) | ||
| 2239 | .await; | ||
| 2240 | |||
| 2241 | match reprocess_result { | ||
| 2242 | ProcessResult::Saved => { | ||
| 2243 | tracing::info!( | ||
| 2244 | event_id = %maintainer_event.id, | ||
| 2245 | maintainer = %maintainer_hex, | ||
| 2246 | identifier = %announcement.identifier, | ||
| 2247 | "Maintainer announcement accepted on re-processing" | ||
| 2248 | ); | ||
| 2249 | } | ||
| 2250 | ProcessResult::Duplicate => { | ||
| 2251 | tracing::debug!( | ||
| 2252 | event_id = %maintainer_event.id, | ||
| 2253 | "Maintainer announcement already exists (duplicate)" | ||
| 2254 | ); | ||
| 2255 | } | ||
| 2256 | other => { | ||
| 2257 | tracing::warn!( | ||
| 2258 | event_id = %maintainer_event.id, | ||
| 2259 | maintainer = %maintainer_hex, | ||
| 2260 | identifier = %announcement.identifier, | ||
| 2261 | result = ?other, | ||
| 2262 | "Maintainer announcement still rejected on re-processing" | ||
| 2263 | ); | ||
| 2264 | } | ||
| 2265 | } | ||
| 2266 | } | 2318 | } |
| 2267 | } | 2319 | } |
| 2268 | Err(e) => { | 2320 | Err(e) => { |
| @@ -2275,28 +2327,13 @@ impl SyncManager { | |||
| 2275 | } | 2327 | } |
| 2276 | } | 2328 | } |
| 2277 | } | 2329 | } |
| 2278 | } | ||
| 2279 | Err(e) => { | ||
| 2280 | tracing::warn!( | ||
| 2281 | event_id = %event.id, | ||
| 2282 | error = %e, | ||
| 2283 | "Failed to parse repository announcement for maintainer invalidation" | ||
| 2284 | ); | ||
| 2285 | } | ||
| 2286 | } | ||
| 2287 | 2330 | ||
| 2288 | // When a repository announcement is accepted, re-process any state events | 2331 | // Re-process rejected state events for this announcement |
| 2289 | // that were previously rejected because no announcement existed. | 2332 | let (removed, hot_events) = rejected_events_index.invalidate_and_get( |
| 2290 | // This handles the race condition where state events arrive before their | 2333 | &event.pubkey, |
| 2291 | // announcements during relay synchronization. | 2334 | &announcement.identifier, |
| 2292 | match RepositoryAnnouncement::from_event(event.clone()) { | 2335 | Some(rejected_index::EventType::State), |
| 2293 | Ok(announcement) => { | 2336 | ); |
| 2294 | // Get the announcement author's state events that were rejected | ||
| 2295 | let (removed, hot_events) = rejected_events_index | ||
| 2296 | .invalidate_and_get_state_events( | ||
| 2297 | &event.pubkey, | ||
| 2298 | &announcement.identifier, | ||
| 2299 | ); | ||
| 2300 | 2337 | ||
| 2301 | if removed > 0 { | 2338 | if removed > 0 { |
| 2302 | tracing::info!( | 2339 | tracing::info!( |
| @@ -2309,62 +2346,26 @@ impl SyncManager { | |||
| 2309 | } | 2346 | } |
| 2310 | 2347 | ||
| 2311 | // Re-process state events from hot cache immediately | 2348 | // Re-process state events from hot cache immediately |
| 2312 | for state_event in hot_events { | 2349 | if !hot_events.is_empty() { |
| 2313 | tracing::info!( | 2350 | let _stats = Self::reprocess_events_from_hot_cache( |
| 2314 | event_id = %state_event.id, | 2351 | hot_events, |
| 2315 | pubkey = %event.pubkey, | 2352 | "state event", |
| 2316 | identifier = %announcement.identifier, | 2353 | &event.pubkey, |
| 2317 | "Re-processing state event from hot cache (announcement now exists)" | 2354 | &announcement.identifier, |
| 2318 | ); | ||
| 2319 | |||
| 2320 | let reprocess_result = Box::pin(Self::process_event_static( | ||
| 2321 | &state_event, | ||
| 2322 | relay_url, | 2355 | relay_url, |
| 2323 | database, | 2356 | database, |
| 2324 | write_policy, | 2357 | write_policy, |
| 2325 | local_relay, | 2358 | local_relay, |
| 2326 | rejected_events_index, | 2359 | rejected_events_index, |
| 2327 | )) | 2360 | ) |
| 2328 | .await; | 2361 | .await; |
| 2329 | |||
| 2330 | match reprocess_result { | ||
| 2331 | ProcessResult::Saved => { | ||
| 2332 | tracing::info!( | ||
| 2333 | event_id = %state_event.id, | ||
| 2334 | pubkey = %event.pubkey, | ||
| 2335 | identifier = %announcement.identifier, | ||
| 2336 | "State event accepted on re-processing (announcement now exists)" | ||
| 2337 | ); | ||
| 2338 | } | ||
| 2339 | ProcessResult::Purgatory => { | ||
| 2340 | tracing::debug!( | ||
| 2341 | event_id = %state_event.id, | ||
| 2342 | "State event added to purgatory (waiting for git data)" | ||
| 2343 | ); | ||
| 2344 | } | ||
| 2345 | ProcessResult::Duplicate => { | ||
| 2346 | tracing::debug!( | ||
| 2347 | event_id = %state_event.id, | ||
| 2348 | "State event already exists (duplicate)" | ||
| 2349 | ); | ||
| 2350 | } | ||
| 2351 | other => { | ||
| 2352 | tracing::warn!( | ||
| 2353 | event_id = %state_event.id, | ||
| 2354 | pubkey = %event.pubkey, | ||
| 2355 | identifier = %announcement.identifier, | ||
| 2356 | result = ?other, | ||
| 2357 | "State event still rejected on re-processing" | ||
| 2358 | ); | ||
| 2359 | } | ||
| 2360 | } | ||
| 2361 | } | 2362 | } |
| 2362 | } | 2363 | } |
| 2363 | Err(e) => { | 2364 | Err(e) => { |
| 2364 | tracing::warn!( | 2365 | tracing::warn!( |
| 2365 | event_id = %event.id, | 2366 | event_id = %event.id, |
| 2366 | error = %e, | 2367 | error = %e, |
| 2367 | "Failed to parse repository announcement for state event invalidation" | 2368 | "Failed to parse repository announcement for rejected event invalidation" |
| 2368 | ); | 2369 | ); |
| 2369 | } | 2370 | } |
| 2370 | } | 2371 | } |
| @@ -2383,8 +2384,11 @@ impl SyncManager { | |||
| 2383 | .and_then(|t| t.content()) | 2384 | .and_then(|t| t.content()) |
| 2384 | { | 2385 | { |
| 2385 | // Get rejected state events for this pubkey + identifier | 2386 | // Get rejected state events for this pubkey + identifier |
| 2386 | let (removed, hot_events) = rejected_events_index | 2387 | let (removed, hot_events) = rejected_events_index.invalidate_and_get( |
| 2387 | .invalidate_and_get_state_events(&event.pubkey, identifier); | 2388 | &event.pubkey, |
| 2389 | identifier, | ||
| 2390 | Some(rejected_index::EventType::State), | ||
| 2391 | ); | ||
| 2388 | 2392 | ||
| 2389 | if removed > 0 { | 2393 | if removed > 0 { |
| 2390 | tracing::info!( | 2394 | tracing::info!( |
| @@ -2397,50 +2401,19 @@ impl SyncManager { | |||
| 2397 | } | 2401 | } |
| 2398 | 2402 | ||
| 2399 | // Re-process events from hot cache immediately | 2403 | // Re-process events from hot cache immediately |
| 2400 | for state_event in hot_events { | 2404 | if !hot_events.is_empty() { |
| 2401 | tracing::info!( | 2405 | let _stats = Self::reprocess_events_from_hot_cache( |
| 2402 | event_id = %state_event.id, | 2406 | hot_events, |
| 2403 | pubkey = %event.pubkey, | 2407 | "state event", |
| 2404 | identifier = %identifier, | 2408 | &event.pubkey, |
| 2405 | "Re-processing state event from hot cache" | 2409 | identifier, |
| 2406 | ); | ||
| 2407 | |||
| 2408 | // Recursive call to process_event_static | ||
| 2409 | let reprocess_result = Box::pin(Self::process_event_static( | ||
| 2410 | &state_event, | ||
| 2411 | relay_url, | 2410 | relay_url, |
| 2412 | database, | 2411 | database, |
| 2413 | write_policy, | 2412 | write_policy, |
| 2414 | local_relay, | 2413 | local_relay, |
| 2415 | rejected_events_index, | 2414 | rejected_events_index, |
| 2416 | )) | 2415 | ) |
| 2417 | .await; | 2416 | .await; |
| 2418 | |||
| 2419 | match reprocess_result { | ||
| 2420 | ProcessResult::Saved => { | ||
| 2421 | tracing::info!( | ||
| 2422 | event_id = %state_event.id, | ||
| 2423 | pubkey = %event.pubkey, | ||
| 2424 | identifier = %identifier, | ||
| 2425 | "State event accepted on re-processing" | ||
| 2426 | ); | ||
| 2427 | } | ||
| 2428 | ProcessResult::Duplicate => { | ||
| 2429 | tracing::debug!( | ||
| 2430 | event_id = %state_event.id, | ||
| 2431 | "State event already exists (duplicate)" | ||
| 2432 | ); | ||
| 2433 | } | ||
| 2434 | other => { | ||
| 2435 | tracing::warn!( | ||
| 2436 | event_id = %state_event.id, | ||
| 2437 | pubkey = %event.pubkey, | ||
| 2438 | identifier = %identifier, | ||
| 2439 | result = ?other, | ||
| 2440 | "State event still rejected on re-processing" | ||
| 2441 | ); | ||
| 2442 | } | ||
| 2443 | } | ||
| 2444 | } | 2417 | } |
| 2445 | } | 2418 | } |
| 2446 | } | 2419 | } |