upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs226
-rw-r--r--src/sync/rejected_index.rs12
2 files changed, 206 insertions, 32 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 93b0e38..d5c4856 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1468,16 +1468,16 @@ impl SyncManager {
1468 match relay_event { 1468 match relay_event {
1469 RelayEvent::Event(event, subscription_id) => { 1469 RelayEvent::Event(event, subscription_id) => {
1470 // Skip events we've already rejected (announcements only) 1470 // Skip events we've already rejected (announcements only)
1471 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 1471 if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState)
1472 if rejected_events_index.contains(&event.id) { 1472 && rejected_events_index.contains(&event.id)
1473 tracing::trace!( 1473 {
1474 event_id = %event.id, 1474 tracing::trace!(
1475 kind = %event.kind.as_u16(), 1475 event_id = %event.id,
1476 relay = %relay_url_clone, 1476 kind = %event.kind.as_u16(),
1477 "Skipping previously rejected announcement event" 1477 relay = %relay_url_clone,
1478 ); 1478 "Skipping previously rejected announcement event"
1479 continue; 1479 );
1480 } 1480 continue;
1481 } 1481 }
1482 1482
1483 let result = Self::process_event_static( 1483 let result = Self::process_event_static(
@@ -2275,6 +2275,161 @@ impl SyncManager {
2275 ); 2275 );
2276 } 2276 }
2277 } 2277 }
2278
2279 // GRASP-02 PR4.1: Re-process state events that were rejected because no announcement existed
2280 // When an announcement is accepted, check for state events that were rejected
2281 // because "no announcement exists for this repository". These should now pass.
2282 match RepositoryAnnouncement::from_event(event.clone()) {
2283 Ok(announcement) => {
2284 // Get the announcement author's state events that were rejected
2285 let (removed, hot_events) = rejected_events_index
2286 .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier);
2287
2288 if removed > 0 {
2289 tracing::info!(
2290 pubkey = %event.pubkey,
2291 identifier = %announcement.identifier,
2292 removed_from_cold_index = removed,
2293 hot_cache_events = hot_events.len(),
2294 "Invalidated rejected state events (announcement now exists)"
2295 );
2296 }
2297
2298 // Re-process state events from hot cache immediately
2299 for state_event in hot_events {
2300 tracing::info!(
2301 event_id = %state_event.id,
2302 pubkey = %event.pubkey,
2303 identifier = %announcement.identifier,
2304 "Re-processing state event from hot cache (announcement now exists)"
2305 );
2306
2307 let reprocess_result = Box::pin(Self::process_event_static(
2308 &state_event,
2309 relay_url,
2310 database,
2311 write_policy,
2312 local_relay,
2313 rejected_events_index,
2314 ))
2315 .await;
2316
2317 match reprocess_result {
2318 ProcessResult::Saved => {
2319 tracing::info!(
2320 event_id = %state_event.id,
2321 pubkey = %event.pubkey,
2322 identifier = %announcement.identifier,
2323 "State event accepted on re-processing (announcement now exists)"
2324 );
2325 }
2326 ProcessResult::Purgatory => {
2327 tracing::debug!(
2328 event_id = %state_event.id,
2329 "State event added to purgatory (waiting for git data)"
2330 );
2331 }
2332 ProcessResult::Duplicate => {
2333 tracing::debug!(
2334 event_id = %state_event.id,
2335 "State event already exists (duplicate)"
2336 );
2337 }
2338 other => {
2339 tracing::warn!(
2340 event_id = %state_event.id,
2341 pubkey = %event.pubkey,
2342 identifier = %announcement.identifier,
2343 result = ?other,
2344 "State event still rejected on re-processing"
2345 );
2346 }
2347 }
2348 }
2349 }
2350 Err(e) => {
2351 tracing::warn!(
2352 event_id = %event.id,
2353 error = %e,
2354 "Failed to parse repository announcement for state event invalidation"
2355 );
2356 }
2357 }
2358 }
2359
2360 // GRASP-02 PR4.2: Invalidate and re-process state events
2361 // When a state event is accepted (git data arrived), check if there are any
2362 // other rejected state events for the same repository in the hot cache.
2363 // Re-process them immediately since git data is now available.
2364 if event.kind == Kind::RepoState {
2365 // Extract identifier from 'd' tag
2366 if let Some(identifier) = event
2367 .tags
2368 .iter()
2369 .find(|t| t.kind() == nostr_sdk::TagKind::d())
2370 .and_then(|t| t.content())
2371 {
2372 // Get rejected state events for this pubkey + identifier
2373 let (removed, hot_events) = rejected_events_index
2374 .invalidate_and_get_state_events(&event.pubkey, identifier);
2375
2376 if removed > 0 {
2377 tracing::info!(
2378 pubkey = %event.pubkey,
2379 identifier = %identifier,
2380 removed_from_cold_index = removed,
2381 hot_cache_events = hot_events.len(),
2382 "Invalidated rejected state events (git data now available)"
2383 );
2384 }
2385
2386 // Re-process events from hot cache immediately
2387 for state_event in hot_events {
2388 tracing::info!(
2389 event_id = %state_event.id,
2390 pubkey = %event.pubkey,
2391 identifier = %identifier,
2392 "Re-processing state event from hot cache"
2393 );
2394
2395 // Recursive call to process_event_static
2396 let reprocess_result = Box::pin(Self::process_event_static(
2397 &state_event,
2398 relay_url,
2399 database,
2400 write_policy,
2401 local_relay,
2402 rejected_events_index,
2403 ))
2404 .await;
2405
2406 match reprocess_result {
2407 ProcessResult::Saved => {
2408 tracing::info!(
2409 event_id = %state_event.id,
2410 pubkey = %event.pubkey,
2411 identifier = %identifier,
2412 "State event accepted on re-processing"
2413 );
2414 }
2415 ProcessResult::Duplicate => {
2416 tracing::debug!(
2417 event_id = %state_event.id,
2418 "State event already exists (duplicate)"
2419 );
2420 }
2421 other => {
2422 tracing::warn!(
2423 event_id = %state_event.id,
2424 pubkey = %event.pubkey,
2425 identifier = %identifier,
2426 result = ?other,
2427 "State event still rejected on re-processing"
2428 );
2429 }
2430 }
2431 }
2432 }
2278 } 2433 }
2279 2434
2280 ProcessResult::Saved 2435 ProcessResult::Saved
@@ -2299,7 +2454,7 @@ impl SyncManager {
2299 "Event rejected by write policy" 2454 "Event rejected by write policy"
2300 ); 2455 );
2301 2456
2302 // Track rejected announcement events to avoid re-fetching them 2457 // Track rejected announcement and state events to avoid re-fetching them
2303 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 2458 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
2304 // Extract identifier from 'd' tag 2459 // Extract identifier from 'd' tag
2305 if let Some(identifier) = event 2460 if let Some(identifier) = event
@@ -2312,30 +2467,47 @@ impl SyncManager {
2312 let reason = if message.contains("doesn't list this service") 2467 let reason = if message.contains("doesn't list this service")
2313 || message.contains("Announcement must list service") { 2468 || message.contains("Announcement must list service") {
2314 rejected_index::RejectionReason::DoesNotListService 2469 rejected_index::RejectionReason::DoesNotListService
2315 } else if message.contains("maintainer") { 2470 } else if message.contains("maintainer")
2471 || message.contains("no announcement exists")
2472 || message.contains("not authorized") {
2316 rejected_index::RejectionReason::MaintainerNotYetValid 2473 rejected_index::RejectionReason::MaintainerNotYetValid
2317 } else { 2474 } else {
2318 rejected_index::RejectionReason::Other 2475 rejected_index::RejectionReason::Other
2319 }; 2476 };
2320 2477
2321 rejected_events_index.add_announcement( 2478 // Use appropriate method based on event kind
2322 event.clone(), 2479 if event.kind == Kind::RepoState {
2323 event.pubkey, 2480 rejected_events_index.add_state(
2324 identifier.to_string(), 2481 event.clone(),
2325 reason, 2482 event.pubkey,
2326 ); 2483 identifier.to_string(),
2327 2484 reason,
2328 tracing::debug!( 2485 );
2329 event_id = %event.id, 2486 tracing::debug!(
2330 kind = %event.kind.as_u16(), 2487 event_id = %event.id,
2331 identifier = %identifier, 2488 kind = %event.kind.as_u16(),
2332 "Added rejected announcement to two-tier index" 2489 identifier = %identifier,
2333 ); 2490 "Added rejected state event to two-tier index"
2491 );
2492 } else {
2493 rejected_events_index.add_announcement(
2494 event.clone(),
2495 event.pubkey,
2496 identifier.to_string(),
2497 reason,
2498 );
2499 tracing::debug!(
2500 event_id = %event.id,
2501 kind = %event.kind.as_u16(),
2502 identifier = %identifier,
2503 "Added rejected announcement to two-tier index"
2504 );
2505 }
2334 } else { 2506 } else {
2335 tracing::warn!( 2507 tracing::warn!(
2336 event_id = %event.id, 2508 event_id = %event.id,
2337 kind = %event.kind.as_u16(), 2509 kind = %event.kind.as_u16(),
2338 "Announcement missing 'd' tag, cannot track in rejected index" 2510 "Event missing 'd' tag, cannot track in rejected index"
2339 ); 2511 );
2340 } 2512 }
2341 } 2513 }
@@ -3192,7 +3364,7 @@ mod tests {
3192 ); 3364 );
3193 3365
3194 // Create test event IDs 3366 // Create test event IDs
3195 let rejected_id = EventId::from_hex( 3367 let _rejected_id = EventId::from_hex(
3196 "0000000000000000000000000000000000000000000000000000000000000001", 3368 "0000000000000000000000000000000000000000000000000000000000000001",
3197 ) 3369 )
3198 .unwrap(); 3370 .unwrap();
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index f5ffef4..a9d7a4d 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -54,9 +54,9 @@
54//! 54//!
55//! # Usage 55//! # Usage
56//! 56//!
57//! ```rust,no_run 57//! ```rust,ignore
58//! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason}; 58//! use ngit_grasp::sync::rejected_index::{RejectedEventsIndex, RejectionReason};
59//! use nostr_sdk::Event; 59//! use nostr_sdk::{Event, PublicKey};
60//! use std::time::Duration; 60//! use std::time::Duration;
61//! 61//!
62//! let index = RejectedEventsIndex::new( 62//! let index = RejectedEventsIndex::new(
@@ -64,7 +64,7 @@
64//! Duration::from_secs(604800), // cold index: 7 days 64//! Duration::from_secs(604800), // cold index: 7 days
65//! ); 65//! );
66//! 66//!
67//! // Add rejected announcement 67//! // Add rejected announcement (event is a nostr_sdk::Event)
68//! index.add_announcement( 68//! index.add_announcement(
69//! event.clone(), 69//! event.clone(),
70//! event.pubkey, 70//! event.pubkey,
@@ -116,16 +116,19 @@ struct HotCacheEntry {
116 event: Event, 116 event: Event,
117 pubkey: PublicKey, 117 pubkey: PublicKey,
118 identifier: String, 118 identifier: String,
119 #[allow(dead_code)] // Used for metrics/debugging in future
119 reason: RejectionReason, 120 reason: RejectionReason,
120 cached_at: Instant, 121 cached_at: Instant,
121} 122}
122 123
123/// Entry in the cold index (metadata only) 124/// Entry in the cold index (metadata only)
125///
126/// Note: event_id is stored as the HashMap key, not in this struct
124#[derive(Debug, Clone)] 127#[derive(Debug, Clone)]
125struct ColdIndexEntry { 128struct ColdIndexEntry {
126 event_id: EventId,
127 pubkey: PublicKey, 129 pubkey: PublicKey,
128 identifier: String, 130 identifier: String,
131 #[allow(dead_code)] // Used for metrics/debugging in future
129 reason: RejectionReason, 132 reason: RejectionReason,
130 rejected_at: Instant, 133 rejected_at: Instant,
131} 134}
@@ -235,7 +238,6 @@ impl ColdIndex {
235 reason: RejectionReason, 238 reason: RejectionReason,
236 ) { 239 ) {
237 let entry = ColdIndexEntry { 240 let entry = ColdIndexEntry {
238 event_id,
239 pubkey, 241 pubkey,
240 identifier, 242 identifier,
241 reason, 243 reason,