diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 226 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 12 |
2 files changed, 206 insertions, 32 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 93b0e38..d5c4856 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -1468,16 +1468,16 @@ impl SyncManager { | |||
| 1468 | match relay_event { | 1468 | match relay_event { |
| 1469 | RelayEvent::Event(event, subscription_id) => { | 1469 | RelayEvent::Event(event, subscription_id) => { |
| 1470 | // Skip events we've already rejected (announcements only) | 1470 | // Skip events we've already rejected (announcements only) |
| 1471 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | 1471 | if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) |
| 1472 | if rejected_events_index.contains(&event.id) { | 1472 | && rejected_events_index.contains(&event.id) |
| 1473 | tracing::trace!( | 1473 | { |
| 1474 | event_id = %event.id, | 1474 | tracing::trace!( |
| 1475 | kind = %event.kind.as_u16(), | 1475 | event_id = %event.id, |
| 1476 | relay = %relay_url_clone, | 1476 | kind = %event.kind.as_u16(), |
| 1477 | "Skipping previously rejected announcement event" | 1477 | relay = %relay_url_clone, |
| 1478 | ); | 1478 | "Skipping previously rejected announcement event" |
| 1479 | continue; | 1479 | ); |
| 1480 | } | 1480 | continue; |
| 1481 | } | 1481 | } |
| 1482 | 1482 | ||
| 1483 | let result = Self::process_event_static( | 1483 | let result = Self::process_event_static( |
| @@ -2275,6 +2275,161 @@ impl SyncManager { | |||
| 2275 | ); | 2275 | ); |
| 2276 | } | 2276 | } |
| 2277 | } | 2277 | } |
| 2278 | |||
| 2279 | // GRASP-02 PR4.1: Re-process state events that were rejected because no announcement existed | ||
| 2280 | // When an announcement is accepted, check for state events that were rejected | ||
| 2281 | // because "no announcement exists for this repository". These should now pass. | ||
| 2282 | match RepositoryAnnouncement::from_event(event.clone()) { | ||
| 2283 | Ok(announcement) => { | ||
| 2284 | // Get the announcement author's state events that were rejected | ||
| 2285 | let (removed, hot_events) = rejected_events_index | ||
| 2286 | .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); | ||
| 2287 | |||
| 2288 | if removed > 0 { | ||
| 2289 | tracing::info!( | ||
| 2290 | pubkey = %event.pubkey, | ||
| 2291 | identifier = %announcement.identifier, | ||
| 2292 | removed_from_cold_index = removed, | ||
| 2293 | hot_cache_events = hot_events.len(), | ||
| 2294 | "Invalidated rejected state events (announcement now exists)" | ||
| 2295 | ); | ||
| 2296 | } | ||
| 2297 | |||
| 2298 | // Re-process state events from hot cache immediately | ||
| 2299 | for state_event in hot_events { | ||
| 2300 | tracing::info!( | ||
| 2301 | event_id = %state_event.id, | ||
| 2302 | pubkey = %event.pubkey, | ||
| 2303 | identifier = %announcement.identifier, | ||
| 2304 | "Re-processing state event from hot cache (announcement now exists)" | ||
| 2305 | ); | ||
| 2306 | |||
| 2307 | let reprocess_result = Box::pin(Self::process_event_static( | ||
| 2308 | &state_event, | ||
| 2309 | relay_url, | ||
| 2310 | database, | ||
| 2311 | write_policy, | ||
| 2312 | local_relay, | ||
| 2313 | rejected_events_index, | ||
| 2314 | )) | ||
| 2315 | .await; | ||
| 2316 | |||
| 2317 | match reprocess_result { | ||
| 2318 | ProcessResult::Saved => { | ||
| 2319 | tracing::info!( | ||
| 2320 | event_id = %state_event.id, | ||
| 2321 | pubkey = %event.pubkey, | ||
| 2322 | identifier = %announcement.identifier, | ||
| 2323 | "State event accepted on re-processing (announcement now exists)" | ||
| 2324 | ); | ||
| 2325 | } | ||
| 2326 | ProcessResult::Purgatory => { | ||
| 2327 | tracing::debug!( | ||
| 2328 | event_id = %state_event.id, | ||
| 2329 | "State event added to purgatory (waiting for git data)" | ||
| 2330 | ); | ||
| 2331 | } | ||
| 2332 | ProcessResult::Duplicate => { | ||
| 2333 | tracing::debug!( | ||
| 2334 | event_id = %state_event.id, | ||
| 2335 | "State event already exists (duplicate)" | ||
| 2336 | ); | ||
| 2337 | } | ||
| 2338 | other => { | ||
| 2339 | tracing::warn!( | ||
| 2340 | event_id = %state_event.id, | ||
| 2341 | pubkey = %event.pubkey, | ||
| 2342 | identifier = %announcement.identifier, | ||
| 2343 | result = ?other, | ||
| 2344 | "State event still rejected on re-processing" | ||
| 2345 | ); | ||
| 2346 | } | ||
| 2347 | } | ||
| 2348 | } | ||
| 2349 | } | ||
| 2350 | Err(e) => { | ||
| 2351 | tracing::warn!( | ||
| 2352 | event_id = %event.id, | ||
| 2353 | error = %e, | ||
| 2354 | "Failed to parse repository announcement for state event invalidation" | ||
| 2355 | ); | ||
| 2356 | } | ||
| 2357 | } | ||
| 2358 | } | ||
| 2359 | |||
| 2360 | // GRASP-02 PR4.2: Invalidate and re-process state events | ||
| 2361 | // When a state event is accepted (git data arrived), check if there are any | ||
| 2362 | // other rejected state events for the same repository in the hot cache. | ||
| 2363 | // Re-process them immediately since git data is now available. | ||
| 2364 | if event.kind == Kind::RepoState { | ||
| 2365 | // Extract identifier from 'd' tag | ||
| 2366 | if let Some(identifier) = event | ||
| 2367 | .tags | ||
| 2368 | .iter() | ||
| 2369 | .find(|t| t.kind() == nostr_sdk::TagKind::d()) | ||
| 2370 | .and_then(|t| t.content()) | ||
| 2371 | { | ||
| 2372 | // Get rejected state events for this pubkey + identifier | ||
| 2373 | let (removed, hot_events) = rejected_events_index | ||
| 2374 | .invalidate_and_get_state_events(&event.pubkey, identifier); | ||
| 2375 | |||
| 2376 | if removed > 0 { | ||
| 2377 | tracing::info!( | ||
| 2378 | pubkey = %event.pubkey, | ||
| 2379 | identifier = %identifier, | ||
| 2380 | removed_from_cold_index = removed, | ||
| 2381 | hot_cache_events = hot_events.len(), | ||
| 2382 | "Invalidated rejected state events (git data now available)" | ||
| 2383 | ); | ||
| 2384 | } | ||
| 2385 | |||
| 2386 | // Re-process events from hot cache immediately | ||
| 2387 | for state_event in hot_events { | ||
| 2388 | tracing::info!( | ||
| 2389 | event_id = %state_event.id, | ||
| 2390 | pubkey = %event.pubkey, | ||
| 2391 | identifier = %identifier, | ||
| 2392 | "Re-processing state event from hot cache" | ||
| 2393 | ); | ||
| 2394 | |||
| 2395 | // Recursive call to process_event_static | ||
| 2396 | let reprocess_result = Box::pin(Self::process_event_static( | ||
| 2397 | &state_event, | ||
| 2398 | relay_url, | ||
| 2399 | database, | ||
| 2400 | write_policy, | ||
| 2401 | local_relay, | ||
| 2402 | rejected_events_index, | ||
| 2403 | )) | ||
| 2404 | .await; | ||
| 2405 | |||
| 2406 | match reprocess_result { | ||
| 2407 | ProcessResult::Saved => { | ||
| 2408 | tracing::info!( | ||
| 2409 | event_id = %state_event.id, | ||
| 2410 | pubkey = %event.pubkey, | ||
| 2411 | identifier = %identifier, | ||
| 2412 | "State event accepted on re-processing" | ||
| 2413 | ); | ||
| 2414 | } | ||
| 2415 | ProcessResult::Duplicate => { | ||
| 2416 | tracing::debug!( | ||
| 2417 | event_id = %state_event.id, | ||
| 2418 | "State event already exists (duplicate)" | ||
| 2419 | ); | ||
| 2420 | } | ||
| 2421 | other => { | ||
| 2422 | tracing::warn!( | ||
| 2423 | event_id = %state_event.id, | ||
| 2424 | pubkey = %event.pubkey, | ||
| 2425 | identifier = %identifier, | ||
| 2426 | result = ?other, | ||
| 2427 | "State event still rejected on re-processing" | ||
| 2428 | ); | ||
| 2429 | } | ||
| 2430 | } | ||
| 2431 | } | ||
| 2432 | } | ||
| 2278 | } | 2433 | } |
| 2279 | 2434 | ||
| 2280 | ProcessResult::Saved | 2435 | ProcessResult::Saved |
| @@ -2299,7 +2454,7 @@ impl SyncManager { | |||
| 2299 | "Event rejected by write policy" | 2454 | "Event rejected by write policy" |
| 2300 | ); | 2455 | ); |
| 2301 | 2456 | ||
| 2302 | // Track rejected announcement events to avoid re-fetching them | 2457 | // Track rejected announcement and state events to avoid re-fetching them |
| 2303 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | 2458 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { |
| 2304 | // Extract identifier from 'd' tag | 2459 | // Extract identifier from 'd' tag |
| 2305 | if let Some(identifier) = event | 2460 | if let Some(identifier) = event |
| @@ -2312,30 +2467,47 @@ impl SyncManager { | |||
| 2312 | let reason = if message.contains("doesn't list this service") | 2467 | let reason = if message.contains("doesn't list this service") |
| 2313 | || message.contains("Announcement must list service") { | 2468 | || message.contains("Announcement must list service") { |
| 2314 | rejected_index::RejectionReason::DoesNotListService | 2469 | rejected_index::RejectionReason::DoesNotListService |
| 2315 | } else if message.contains("maintainer") { | 2470 | } else if message.contains("maintainer") |
| 2471 | || message.contains("no announcement exists") | ||
| 2472 | || message.contains("not authorized") { | ||
| 2316 | rejected_index::RejectionReason::MaintainerNotYetValid | 2473 | rejected_index::RejectionReason::MaintainerNotYetValid |
| 2317 | } else { | 2474 | } else { |
| 2318 | rejected_index::RejectionReason::Other | 2475 | rejected_index::RejectionReason::Other |
| 2319 | }; | 2476 | }; |
| 2320 | 2477 | ||
| 2321 | rejected_events_index.add_announcement( | 2478 | // Use appropriate method based on event kind |
| 2322 | event.clone(), | 2479 | if event.kind == Kind::RepoState { |
| 2323 | event.pubkey, | 2480 | rejected_events_index.add_state( |
| 2324 | identifier.to_string(), | 2481 | event.clone(), |
| 2325 | reason, | 2482 | event.pubkey, |
| 2326 | ); | 2483 | identifier.to_string(), |
| 2327 | 2484 | reason, | |
| 2328 | tracing::debug!( | 2485 | ); |
| 2329 | event_id = %event.id, | 2486 | tracing::debug!( |
| 2330 | kind = %event.kind.as_u16(), | 2487 | event_id = %event.id, |
| 2331 | identifier = %identifier, | 2488 | kind = %event.kind.as_u16(), |
| 2332 | "Added rejected announcement to two-tier index" | 2489 | identifier = %identifier, |
| 2333 | ); | 2490 | "Added rejected state event to two-tier index" |
| 2491 | ); | ||
| 2492 | } else { | ||
| 2493 | rejected_events_index.add_announcement( | ||
| 2494 | event.clone(), | ||
| 2495 | event.pubkey, | ||
| 2496 | identifier.to_string(), | ||
| 2497 | reason, | ||
| 2498 | ); | ||
| 2499 | tracing::debug!( | ||
| 2500 | event_id = %event.id, | ||
| 2501 | kind = %event.kind.as_u16(), | ||
| 2502 | identifier = %identifier, | ||
| 2503 | "Added rejected announcement to two-tier index" | ||
| 2504 | ); | ||
| 2505 | } | ||
| 2334 | } else { | 2506 | } else { |
| 2335 | tracing::warn!( | 2507 | tracing::warn!( |
| 2336 | event_id = %event.id, | 2508 | event_id = %event.id, |
| 2337 | kind = %event.kind.as_u16(), | 2509 | kind = %event.kind.as_u16(), |
| 2338 | "Announcement missing 'd' tag, cannot track in rejected index" | 2510 | "Event missing 'd' tag, cannot track in rejected index" |
| 2339 | ); | 2511 | ); |
| 2340 | } | 2512 | } |
| 2341 | } | 2513 | } |
| @@ -3192,7 +3364,7 @@ mod tests { | |||
| 3192 | ); | 3364 | ); |
| 3193 | 3365 | ||
| 3194 | // Create test event IDs | 3366 | // Create test event IDs |
| 3195 | let rejected_id = EventId::from_hex( | 3367 | let _rejected_id = EventId::from_hex( |
| 3196 | "0000000000000000000000000000000000000000000000000000000000000001", | 3368 | "0000000000000000000000000000000000000000000000000000000000000001", |
| 3197 | ) | 3369 | ) |
| 3198 | .unwrap(); | 3370 | .unwrap(); |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index f5ffef4..a9d7a4d 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -54,9 +54,9 @@ | |||
| 54 | //! | 54 | //! |
| 55 | //! # Usage | 55 | //! # Usage |
| 56 | //! | 56 | //! |
| 57 | //! ```rust,no_run | 57 | //! ```rust,ignore |
| 58 | //! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; | 58 | //! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; |
| 59 | //! use nostr_sdk::Event; | 59 | //! use nostr_sdk::{Event, PublicKey}; |
| 60 | //! use std::time::Duration; | 60 | //! use std::time::Duration; |
| 61 | //! | 61 | //! |
| 62 | //! let index = RejectedEventsIndex::new( | 62 | //! let index = RejectedEventsIndex::new( |
| @@ -64,7 +64,7 @@ | |||
| 64 | //! Duration::from_secs(604800), // cold index: 7 days | 64 | //! Duration::from_secs(604800), // cold index: 7 days |
| 65 | //! ); | 65 | //! ); |
| 66 | //! | 66 | //! |
| 67 | //! // Add rejected announcement | 67 | //! // Add rejected announcement (event is a nostr_sdk::Event) |
| 68 | //! index.add_announcement( | 68 | //! index.add_announcement( |
| 69 | //! event.clone(), | 69 | //! event.clone(), |
| 70 | //! event.pubkey, | 70 | //! event.pubkey, |
| @@ -116,16 +116,19 @@ struct HotCacheEntry { | |||
| 116 | event: Event, | 116 | event: Event, |
| 117 | pubkey: PublicKey, | 117 | pubkey: PublicKey, |
| 118 | identifier: String, | 118 | identifier: String, |
| 119 | #[allow(dead_code)] // Used for metrics/debugging in future | ||
| 119 | reason: RejectionReason, | 120 | reason: RejectionReason, |
| 120 | cached_at: Instant, | 121 | cached_at: Instant, |
| 121 | } | 122 | } |
| 122 | 123 | ||
| 123 | /// Entry in the cold index (metadata only) | 124 | /// Entry in the cold index (metadata only) |
| 125 | /// | ||
| 126 | /// Note: event_id is stored as the HashMap key, not in this struct | ||
| 124 | #[derive(Debug, Clone)] | 127 | #[derive(Debug, Clone)] |
| 125 | struct ColdIndexEntry { | 128 | struct ColdIndexEntry { |
| 126 | event_id: EventId, | ||
| 127 | pubkey: PublicKey, | 129 | pubkey: PublicKey, |
| 128 | identifier: String, | 130 | identifier: String, |
| 131 | #[allow(dead_code)] // Used for metrics/debugging in future | ||
| 129 | reason: RejectionReason, | 132 | reason: RejectionReason, |
| 130 | rejected_at: Instant, | 133 | rejected_at: Instant, |
| 131 | } | 134 | } |
| @@ -235,7 +238,6 @@ impl ColdIndex { | |||
| 235 | reason: RejectionReason, | 238 | reason: RejectionReason, |
| 236 | ) { | 239 | ) { |
| 237 | let entry = ColdIndexEntry { | 240 | let entry = ColdIndexEntry { |
| 238 | event_id, | ||
| 239 | pubkey, | 241 | pubkey, |
| 240 | identifier, | 242 | identifier, |
| 241 | reason, | 243 | reason, |