From 02e957ec97c9a9e6e37eca9c9d4aa6aef4bcd363 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 15:49:17 +0000 Subject: feat: Switch SyncManager to use two-tier RejectedEventsIndex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the simple HashSet with the sophisticated two-tier RejectedEventsIndex from PR1, enabling future immediate re-processing when maintainer dependencies resolve. ## Changes ### Config (src/config.rs) - Add `rejected_hot_cache_duration_secs` (default: 120 = 2 minutes) - Add `rejected_cold_index_expiry_secs` (default: 604800 = 7 days) - Both configurable via CLI flags or environment variables ### SyncManager (src/sync/mod.rs) **Type Change:** - Before: `Arc>>` (simple event ID set) - After: `Arc` (two-tier storage) **Initialization:** - Pass config durations to RejectedEventsIndex::new() - Creates hot cache (2 min) + cold index (7 days) **Event Processing (process_event_static):** - Extract identifier from 'd' tag - Determine rejection reason from error message - Call `add_announcement()` with full event + metadata - Stores in both hot cache and cold index **Negentropy Sync (derive_relay_targets):** - Call `get_all_event_ids()` to get rejected IDs - Returns union of hot cache + cold index event IDs - Excludes from negentropy reconciliation **Event Loop (relay_connection):** - Use `contains()` method instead of direct HashSet access - Simpler API, same skip-rejected behavior ### RejectedEventsIndex (src/sync/rejected_index.rs) **New Method:** - `get_all_event_ids()`: Returns HashSet from both tiers - Used for negentropy exclusion (replaces direct HashSet access) ### Tests Updated **test_rejected_events_index_tracks_announcements:** - Create RejectedEventsIndex with config durations - Add 'd' tag to test announcement - Use `add_announcement()` with full event - Verify both hot cache and cold index populated - Check lengths with `hot_cache_len()` and `cold_index_len()` **test_rejected_events_excluded_from_negentropy:** - Create RejectedEventsIndex instead of HashSet - Build full event with 'd' tag - Add to index with `add_announcement()` - Get IDs with `get_all_event_ids()` - Verify excluded from reconciliation ## Architecture ``` ┌─────────────────────────────────────────────────────────────┐ │ SyncManager │ │ │ │ rejected_events_index: Arc │ │ ├─ Hot Cache (2 min): Full events for re-processing │ │ └─ Cold Index (7 days): Metadata for dedup │ └─────────────────────────────────────────────────────────────┘ │ │ On rejection ▼ ┌─────────────────────────────────────────────────────────────┐ │ add_announcement(event, pubkey, identifier, reason) │ │ ├─ Store full event in hot cache │ │ └─ Store metadata in cold index │ └─────────────────────────────────────────────────────────────┘ │ │ On negentropy sync ▼ ┌─────────────────────────────────────────────────────────────┐ │ get_all_event_ids() → HashSet │ │ ├─ Union of hot cache IDs │ │ └─ Union of cold index IDs │ └─────────────────────────────────────────────────────────────┘ ``` ## Benefits ### Immediate - **Better tracking**: Store rejection reason + metadata - **Configurable**: Tune cache/index durations per deployment - **Observable**: Separate hot/cold metrics (future PR4) ### Future (PR3) - **Immediate re-processing**: Get events from hot cache when valid - **No 24h delay**: Maintainer announcements accepted in <1 second - **Automatic recovery**: Hot cache for immediate, cold index for later ## Backward Compatibility **No breaking changes:** - Same rejection behavior (skip events in index) - Same negentropy exclusion (union with purgatory IDs) - Default config values match previous implicit behavior **Migration:** - Existing deployments continue working with defaults - Optional: Tune durations via new config flags ## Testing All tests passing: - ✅ 9 rejected_index tests (hot cache, cold index, two-tier) - ✅ 139 sync module tests (including updated integration tests) - ✅ 247 total library tests ## Next Steps **PR3: Add invalidation + immediate re-processing** - Invalidate cold index when owner announcement accepted - Get events from hot cache for re-processing - Recursive call to process_event_static - Integration tests for <1s maintainer acceptance **PR4: Add cleanup + metrics** - Hot cache cleanup task (every 60s) - Cold index cleanup task (daily) - Prometheus metrics for both tiers - Monitor hot cache hits vs misses ## Configuration Examples ```bash # Default (2 min hot cache, 7 day cold index) ngit-grasp # Longer hot cache for slow relays ngit-grasp --rejected-hot-cache-duration-secs 300 # Shorter cold index for memory-constrained systems ngit-grasp --rejected-cold-index-expiry-secs 86400 # Environment variables export NGIT_REJECTED_HOT_CACHE_DURATION_SECS=180 export NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS=259200 ngit-grasp ``` Part of: Maintainer chain discovery fix See: work/SOLUTION-SUMMARY-V2.md for full design Previous: PR1 (rejected_index.rs implementation) Next: PR3 (invalidation + re-processing) --- src/sync/mod.rs | 133 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 97 insertions(+), 36 deletions(-) (limited to 'src/sync/mod.rs') diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 55bea17..fe336d1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -74,9 +74,10 @@ pub type PendingSyncIndex = Arc>>>; /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing /// to avoid repeatedly fetching and rejecting the same events. /// -/// NOTE: This is a temporary simple implementation. PR2 will replace this with the -/// two-tier RejectedEventsIndex from rejected_index.rs (hot cache + cold index). -type RejectedEventsIndexSimple = Arc>>; +/// Uses the two-tier RejectedEventsIndex from rejected_index.rs: +/// - Hot cache: Full events for 2 minutes (enables immediate re-processing) +/// - Cold index: Metadata for 7 days (prevents repeated downloads) +use rejected_index::RejectedEventsIndex; // ============================================================================= // Supporting Data Structures @@ -444,8 +445,8 @@ pub struct SyncManager { relay_sync_index: RelaySyncIndex, /// In-flight subscription batches pending_sync_index: PendingSyncIndex, - /// Rejected announcement event IDs (30617/30618) - excluded from sync - rejected_events_index: RejectedEventsIndexSimple, + /// Rejected announcement events (30617/30618) - two-tier storage for re-processing + rejected_events_index: Arc, /// Active relay connections - keyed by relay URL connections: HashMap, /// Health tracker for relay connection state @@ -498,7 +499,10 @@ impl SyncManager { repo_sync_index: Arc::new(RwLock::new(HashMap::new())), relay_sync_index: Arc::new(RwLock::new(HashMap::new())), pending_sync_index: Arc::new(RwLock::new(HashMap::new())), - rejected_events_index: Arc::new(RwLock::new(HashSet::new())), + rejected_events_index: Arc::new(RejectedEventsIndex::new( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + )), connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, @@ -1351,8 +1355,7 @@ impl SyncManager { RelayEvent::Event(event, subscription_id) => { // Skip events we've already rejected (announcements only) if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { - let rejected = rejected_events_index.read().await; - if rejected.contains(&event.id) { + if rejected_events_index.contains(&event.id) { tracing::trace!( event_id = %event.id, kind = %event.kind.as_u16(), @@ -2001,7 +2004,7 @@ impl SyncManager { database: &SharedDatabase, write_policy: &Nip34WritePolicy, local_relay: &LocalRelay, - rejected_events_index: &RejectedEventsIndexSimple, + rejected_events_index: &Arc, ) -> ProcessResult { use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -2071,13 +2074,42 @@ impl SyncManager { // Track rejected announcement events to avoid re-fetching them if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { - let mut rejected = rejected_events_index.write().await; - rejected.insert(event.id); - tracing::debug!( - event_id = %event.id, - kind = %event.kind.as_u16(), - "Added rejected announcement to exclusion list" - ); + // Extract identifier from 'd' tag + if let Some(identifier) = event + .tags + .iter() + .find(|t| t.kind() == nostr_sdk::TagKind::d()) + .and_then(|t| t.content()) + { + // Determine rejection reason based on message + let reason = if message.contains("doesn't list this service") { + rejected_index::RejectionReason::DoesNotListService + } else if message.contains("maintainer") { + rejected_index::RejectionReason::MaintainerNotYetValid + } else { + rejected_index::RejectionReason::Other + }; + + rejected_events_index.add_announcement( + event.clone(), + event.pubkey, + identifier.to_string(), + reason, + ); + + tracing::debug!( + event_id = %event.id, + kind = %event.kind.as_u16(), + identifier = %identifier, + "Added rejected announcement to two-tier index" + ); + } else { + tracing::warn!( + event_id = %event.id, + kind = %event.kind.as_u16(), + "Announcement missing 'd' tag, cannot track in rejected index" + ); + } } ProcessResult::Rejected @@ -2608,7 +2640,7 @@ impl SyncManager { // Get event IDs to exclude: purgatory + rejected announcements let purgatory_ids = self.purgatory.event_ids(); - let rejected_ids = self.rejected_events_index.read().await.clone(); + let rejected_ids = self.rejected_events_index.get_all_event_ids(); let excluded_ids: HashSet = purgatory_ids.union(&rejected_ids).cloned().collect(); for (idx, result) in diff_results { @@ -2888,40 +2920,48 @@ mod tests { #[tokio::test] async fn test_rejected_events_index_tracks_announcements() { - // Create a rejected events index - let rejected_index: RejectedEventsIndexSimple = Arc::new(RwLock::new(HashSet::new())); + // Create a rejected events index with 2 minute hot cache, 7 day cold index + let rejected_index = Arc::new(RejectedEventsIndex::new( + Duration::from_secs(120), + Duration::from_secs(604800), + )); - // Create test announcement event (kind 30617) + // Create test announcement event (kind 30617) with 'd' tag let keys = Keys::generate(); let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content") + .tag(nostr_sdk::Tag::custom( + nostr_sdk::TagKind::d(), + vec!["test-repo"], + )) .sign_with_keys(&keys) .unwrap(); // Verify index is empty - { - let rejected = rejected_index.read().await; - assert_eq!(rejected.len(), 0); - } + assert_eq!(rejected_index.hot_cache_len(), 0); + assert_eq!(rejected_index.cold_index_len(), 0); // Simulate rejection by adding to index - { - let mut rejected = rejected_index.write().await; - rejected.insert(announcement.id); - } + rejected_index.add_announcement( + announcement.clone(), + announcement.pubkey, + "test-repo".to_string(), + rejected_index::RejectionReason::DoesNotListService, + ); - // Verify event is tracked - { - let rejected = rejected_index.read().await; - assert!(rejected.contains(&announcement.id)); - assert_eq!(rejected.len(), 1); - } + // Verify event is tracked in both tiers + assert!(rejected_index.contains(&announcement.id)); + assert_eq!(rejected_index.hot_cache_len(), 1); + assert_eq!(rejected_index.cold_index_len(), 1); } #[tokio::test] async fn test_rejected_events_excluded_from_negentropy() { // Create indices let purgatory_ids: HashSet = HashSet::new(); - let mut rejected_ids = HashSet::new(); + let rejected_index = RejectedEventsIndex::new( + Duration::from_secs(120), + Duration::from_secs(604800), + ); // Create test event IDs let rejected_id = EventId::from_hex( @@ -2933,7 +2973,28 @@ mod tests { ) .unwrap(); - rejected_ids.insert(rejected_id); + // Add rejected event to index + let keys = Keys::generate(); + let rejected_event = EventBuilder::new(Kind::GitRepoAnnouncement, "rejected") + .tag(nostr_sdk::Tag::custom( + nostr_sdk::TagKind::d(), + vec!["rejected-repo"], + )) + .sign_with_keys(&keys) + .unwrap(); + + // Override the event ID for testing (we need a specific ID) + // Since we can't override the ID, let's use the actual event ID + let rejected_id = rejected_event.id; + rejected_index.add_announcement( + rejected_event, + keys.public_key(), + "rejected-repo".to_string(), + rejected_index::RejectionReason::DoesNotListService, + ); + + // Get rejected IDs from index + let rejected_ids = rejected_index.get_all_event_ids(); // Simulate negentropy reconciliation result let mut remote_ids = HashSet::new(); -- cgit v1.2.3