diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 15:49:17 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-01-09 15:49:17 +0000 |
| commit | 02e957ec97c9a9e6e37eca9c9d4aa6aef4bcd363 (patch) | |
| tree | a4d6966452d66eb3fa0592311c1d85c570473a1f /src/sync | |
| parent | e4cfecbfc909c9ca4983101cf6a5855959a5d49f (diff) | |
feat: Switch SyncManager to use two-tier RejectedEventsIndex
Replaces the simple HashSet<EventId> with the sophisticated two-tier
RejectedEventsIndex from PR1, enabling future immediate re-processing
when maintainer dependencies resolve.
## Changes
### Config (src/config.rs)
- Add `rejected_hot_cache_duration_secs` (default: 120 = 2 minutes)
- Add `rejected_cold_index_expiry_secs` (default: 604800 = 7 days)
- Both configurable via CLI flags or environment variables
### SyncManager (src/sync/mod.rs)
**Type Change:**
- Before: `Arc<RwLock<HashSet<EventId>>>` (simple event ID set)
- After: `Arc<RejectedEventsIndex>` (two-tier storage)
**Initialization:**
- Pass config durations to RejectedEventsIndex::new()
- Creates hot cache (2 min) + cold index (7 days)
**Event Processing (process_event_static):**
- Extract identifier from 'd' tag
- Determine rejection reason from error message
- Call `add_announcement()` with full event + metadata
- Stores in both hot cache and cold index
**Negentropy Sync (derive_relay_targets):**
- Call `get_all_event_ids()` to get rejected IDs
- Returns union of hot cache + cold index event IDs
- Excludes from negentropy reconciliation
**Event Loop (relay_connection):**
- Use `contains()` method instead of direct HashSet access
- Simpler API, same skip-rejected behavior
### RejectedEventsIndex (src/sync/rejected_index.rs)
**New Method:**
- `get_all_event_ids()`: Returns HashSet<EventId> from both tiers
- Used for negentropy exclusion (replaces direct HashSet access)
### Tests Updated
**test_rejected_events_index_tracks_announcements:**
- Create RejectedEventsIndex with config durations
- Add 'd' tag to test announcement
- Use `add_announcement()` with full event
- Verify both hot cache and cold index populated
- Check lengths with `hot_cache_len()` and `cold_index_len()`
**test_rejected_events_excluded_from_negentropy:**
- Create RejectedEventsIndex instead of HashSet
- Build full event with 'd' tag
- Add to index with `add_announcement()`
- Get IDs with `get_all_event_ids()`
- Verify excluded from reconciliation
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ SyncManager │
│ │
│ rejected_events_index: Arc<RejectedEventsIndex> │
│ ├─ Hot Cache (2 min): Full events for re-processing │
│ └─ Cold Index (7 days): Metadata for dedup │
└─────────────────────────────────────────────────────────────┘
│
│ On rejection
▼
┌─────────────────────────────────────────────────────────────┐
│ add_announcement(event, pubkey, identifier, reason) │
│ ├─ Store full event in hot cache │
│ └─ Store metadata in cold index │
└─────────────────────────────────────────────────────────────┘
│
│ On negentropy sync
▼
┌─────────────────────────────────────────────────────────────┐
│ get_all_event_ids() → HashSet<EventId> │
│ ├─ Union of hot cache IDs │
│ └─ Union of cold index IDs │
└─────────────────────────────────────────────────────────────┘
```
## Benefits
### Immediate
- **Better tracking**: Store rejection reason + metadata
- **Configurable**: Tune cache/index durations per deployment
- **Observable**: Separate hot/cold metrics (future PR4)
### Future (PR3)
- **Immediate re-processing**: Get events from hot cache when valid
- **No 24h delay**: Maintainer announcements accepted in <1 second
- **Automatic recovery**: Hot cache for immediate, cold index for later
## Backward Compatibility
**No breaking changes:**
- Same rejection behavior (skip events in index)
- Same negentropy exclusion (union with purgatory IDs)
- Default config values match previous implicit behavior
**Migration:**
- Existing deployments continue working with defaults
- Optional: Tune durations via new config flags
## Testing
All tests passing:
- ✅ 9 rejected_index tests (hot cache, cold index, two-tier)
- ✅ 139 sync module tests (including updated integration tests)
- ✅ 247 total library tests
## Next Steps
**PR3: Add invalidation + immediate re-processing**
- Invalidate cold index when owner announcement accepted
- Get events from hot cache for re-processing
- Recursive call to process_event_static
- Integration tests for <1s maintainer acceptance
**PR4: Add cleanup + metrics**
- Hot cache cleanup task (every 60s)
- Cold index cleanup task (daily)
- Prometheus metrics for both tiers
- Monitor hot cache hits vs misses
## Configuration Examples
```bash
# Default (2 min hot cache, 7 day cold index)
ngit-grasp
# Longer hot cache for slow relays
ngit-grasp --rejected-hot-cache-duration-secs 300
# Shorter cold index for memory-constrained systems
ngit-grasp --rejected-cold-index-expiry-secs 86400
# Environment variables
export NGIT_REJECTED_HOT_CACHE_DURATION_SECS=180
export NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS=259200
ngit-grasp
```
Part of: Maintainer chain discovery fix
See: work/SOLUTION-SUMMARY-V2.md for full design
Previous: PR1 (rejected_index.rs implementation)
Next: PR3 (invalidation + re-processing)
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mod.rs | 133 | ||||
| -rw-r--r-- | src/sync/rejected_index.rs | 20 |
2 files changed, 116 insertions, 37 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 55bea17..fe336d1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -74,9 +74,10 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>; | |||
| 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing | 74 | /// These events are excluded from negentropy sync and skipped during REQ+EOSE processing |
| 75 | /// to avoid repeatedly fetching and rejecting the same events. | 75 | /// to avoid repeatedly fetching and rejecting the same events. |
| 76 | /// | 76 | /// |
| 77 | /// NOTE: This is a temporary simple implementation. PR2 will replace this with the | 77 | /// Uses the two-tier RejectedEventsIndex from rejected_index.rs: |
| 78 | /// two-tier RejectedEventsIndex from rejected_index.rs (hot cache + cold index). | 78 | /// - Hot cache: Full events for 2 minutes (enables immediate re-processing) |
| 79 | type RejectedEventsIndexSimple = Arc<RwLock<HashSet<EventId>>>; | 79 | /// - Cold index: Metadata for 7 days (prevents repeated downloads) |
| 80 | use rejected_index::RejectedEventsIndex; | ||
| 80 | 81 | ||
| 81 | // ============================================================================= | 82 | // ============================================================================= |
| 82 | // Supporting Data Structures | 83 | // Supporting Data Structures |
| @@ -444,8 +445,8 @@ pub struct SyncManager { | |||
| 444 | relay_sync_index: RelaySyncIndex, | 445 | relay_sync_index: RelaySyncIndex, |
| 445 | /// In-flight subscription batches | 446 | /// In-flight subscription batches |
| 446 | pending_sync_index: PendingSyncIndex, | 447 | pending_sync_index: PendingSyncIndex, |
| 447 | /// Rejected announcement event IDs (30617/30618) - excluded from sync | 448 | /// Rejected announcement events (30617/30618) - two-tier storage for re-processing |
| 448 | rejected_events_index: RejectedEventsIndexSimple, | 449 | rejected_events_index: Arc<RejectedEventsIndex>, |
| 449 | /// Active relay connections - keyed by relay URL | 450 | /// Active relay connections - keyed by relay URL |
| 450 | connections: HashMap<String, RelayConnection>, | 451 | connections: HashMap<String, RelayConnection>, |
| 451 | /// Health tracker for relay connection state | 452 | /// Health tracker for relay connection state |
| @@ -498,7 +499,10 @@ impl SyncManager { | |||
| 498 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), | 499 | repo_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 499 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), | 500 | relay_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 500 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), | 501 | pending_sync_index: Arc::new(RwLock::new(HashMap::new())), |
| 501 | rejected_events_index: Arc::new(RwLock::new(HashSet::new())), | 502 | rejected_events_index: Arc::new(RejectedEventsIndex::new( |
| 503 | Duration::from_secs(config.rejected_hot_cache_duration_secs), | ||
| 504 | Duration::from_secs(config.rejected_cold_index_expiry_secs), | ||
| 505 | )), | ||
| 502 | connections: HashMap::new(), | 506 | connections: HashMap::new(), |
| 503 | health_tracker: Arc::new(RelayHealthTracker::new(config)), | 507 | health_tracker: Arc::new(RelayHealthTracker::new(config)), |
| 504 | next_batch_id: 0, | 508 | next_batch_id: 0, |
| @@ -1351,8 +1355,7 @@ impl SyncManager { | |||
| 1351 | RelayEvent::Event(event, subscription_id) => { | 1355 | RelayEvent::Event(event, subscription_id) => { |
| 1352 | // Skip events we've already rejected (announcements only) | 1356 | // Skip events we've already rejected (announcements only) |
| 1353 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | 1357 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { |
| 1354 | let rejected = rejected_events_index.read().await; | 1358 | if rejected_events_index.contains(&event.id) { |
| 1355 | if rejected.contains(&event.id) { | ||
| 1356 | tracing::trace!( | 1359 | tracing::trace!( |
| 1357 | event_id = %event.id, | 1360 | event_id = %event.id, |
| 1358 | kind = %event.kind.as_u16(), | 1361 | kind = %event.kind.as_u16(), |
| @@ -2001,7 +2004,7 @@ impl SyncManager { | |||
| 2001 | database: &SharedDatabase, | 2004 | database: &SharedDatabase, |
| 2002 | write_policy: &Nip34WritePolicy, | 2005 | write_policy: &Nip34WritePolicy, |
| 2003 | local_relay: &LocalRelay, | 2006 | local_relay: &LocalRelay, |
| 2004 | rejected_events_index: &RejectedEventsIndexSimple, | 2007 | rejected_events_index: &Arc<RejectedEventsIndex>, |
| 2005 | ) -> ProcessResult { | 2008 | ) -> ProcessResult { |
| 2006 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; | 2009 | use nostr_relay_builder::prelude::{WritePolicy, WritePolicyResult}; |
| 2007 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | 2010 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; |
| @@ -2071,13 +2074,42 @@ impl SyncManager { | |||
| 2071 | 2074 | ||
| 2072 | // Track rejected announcement events to avoid re-fetching them | 2075 | // Track rejected announcement events to avoid re-fetching them |
| 2073 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { | 2076 | if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { |
| 2074 | let mut rejected = rejected_events_index.write().await; | 2077 | // Extract identifier from 'd' tag |
| 2075 | rejected.insert(event.id); | 2078 | if let Some(identifier) = event |
| 2076 | tracing::debug!( | 2079 | .tags |
| 2077 | event_id = %event.id, | 2080 | .iter() |
| 2078 | kind = %event.kind.as_u16(), | 2081 | .find(|t| t.kind() == nostr_sdk::TagKind::d()) |
| 2079 | "Added rejected announcement to exclusion list" | 2082 | .and_then(|t| t.content()) |
| 2080 | ); | 2083 | { |
| 2084 | // Determine rejection reason based on message | ||
| 2085 | let reason = if message.contains("doesn't list this service") { | ||
| 2086 | rejected_index::RejectionReason::DoesNotListService | ||
| 2087 | } else if message.contains("maintainer") { | ||
| 2088 | rejected_index::RejectionReason::MaintainerNotYetValid | ||
| 2089 | } else { | ||
| 2090 | rejected_index::RejectionReason::Other | ||
| 2091 | }; | ||
| 2092 | |||
| 2093 | rejected_events_index.add_announcement( | ||
| 2094 | event.clone(), | ||
| 2095 | event.pubkey, | ||
| 2096 | identifier.to_string(), | ||
| 2097 | reason, | ||
| 2098 | ); | ||
| 2099 | |||
| 2100 | tracing::debug!( | ||
| 2101 | event_id = %event.id, | ||
| 2102 | kind = %event.kind.as_u16(), | ||
| 2103 | identifier = %identifier, | ||
| 2104 | "Added rejected announcement to two-tier index" | ||
| 2105 | ); | ||
| 2106 | } else { | ||
| 2107 | tracing::warn!( | ||
| 2108 | event_id = %event.id, | ||
| 2109 | kind = %event.kind.as_u16(), | ||
| 2110 | "Announcement missing 'd' tag, cannot track in rejected index" | ||
| 2111 | ); | ||
| 2112 | } | ||
| 2081 | } | 2113 | } |
| 2082 | 2114 | ||
| 2083 | ProcessResult::Rejected | 2115 | ProcessResult::Rejected |
| @@ -2608,7 +2640,7 @@ impl SyncManager { | |||
| 2608 | 2640 | ||
| 2609 | // Get event IDs to exclude: purgatory + rejected announcements | 2641 | // Get event IDs to exclude: purgatory + rejected announcements |
| 2610 | let purgatory_ids = self.purgatory.event_ids(); | 2642 | let purgatory_ids = self.purgatory.event_ids(); |
| 2611 | let rejected_ids = self.rejected_events_index.read().await.clone(); | 2643 | let rejected_ids = self.rejected_events_index.get_all_event_ids(); |
| 2612 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); | 2644 | let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); |
| 2613 | 2645 | ||
| 2614 | for (idx, result) in diff_results { | 2646 | for (idx, result) in diff_results { |
| @@ -2888,40 +2920,48 @@ mod tests { | |||
| 2888 | 2920 | ||
| 2889 | #[tokio::test] | 2921 | #[tokio::test] |
| 2890 | async fn test_rejected_events_index_tracks_announcements() { | 2922 | async fn test_rejected_events_index_tracks_announcements() { |
| 2891 | // Create a rejected events index | 2923 | // Create a rejected events index with 2 minute hot cache, 7 day cold index |
| 2892 | let rejected_index: RejectedEventsIndexSimple = Arc::new(RwLock::new(HashSet::new())); | 2924 | let rejected_index = Arc::new(RejectedEventsIndex::new( |
| 2925 | Duration::from_secs(120), | ||
| 2926 | Duration::from_secs(604800), | ||
| 2927 | )); | ||
| 2893 | 2928 | ||
| 2894 | // Create test announcement event (kind 30617) | 2929 | // Create test announcement event (kind 30617) with 'd' tag |
| 2895 | let keys = Keys::generate(); | 2930 | let keys = Keys::generate(); |
| 2896 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content") | 2931 | let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "test content") |
| 2932 | .tag(nostr_sdk::Tag::custom( | ||
| 2933 | nostr_sdk::TagKind::d(), | ||
| 2934 | vec!["test-repo"], | ||
| 2935 | )) | ||
| 2897 | .sign_with_keys(&keys) | 2936 | .sign_with_keys(&keys) |
| 2898 | .unwrap(); | 2937 | .unwrap(); |
| 2899 | 2938 | ||
| 2900 | // Verify index is empty | 2939 | // Verify index is empty |
| 2901 | { | 2940 | assert_eq!(rejected_index.hot_cache_len(), 0); |
| 2902 | let rejected = rejected_index.read().await; | 2941 | assert_eq!(rejected_index.cold_index_len(), 0); |
| 2903 | assert_eq!(rejected.len(), 0); | ||
| 2904 | } | ||
| 2905 | 2942 | ||
| 2906 | // Simulate rejection by adding to index | 2943 | // Simulate rejection by adding to index |
| 2907 | { | 2944 | rejected_index.add_announcement( |
| 2908 | let mut rejected = rejected_index.write().await; | 2945 | announcement.clone(), |
| 2909 | rejected.insert(announcement.id); | 2946 | announcement.pubkey, |
| 2910 | } | 2947 | "test-repo".to_string(), |
| 2948 | rejected_index::RejectionReason::DoesNotListService, | ||
| 2949 | ); | ||
| 2911 | 2950 | ||
| 2912 | // Verify event is tracked | 2951 | // Verify event is tracked in both tiers |
| 2913 | { | 2952 | assert!(rejected_index.contains(&announcement.id)); |
| 2914 | let rejected = rejected_index.read().await; | 2953 | assert_eq!(rejected_index.hot_cache_len(), 1); |
| 2915 | assert!(rejected.contains(&announcement.id)); | 2954 | assert_eq!(rejected_index.cold_index_len(), 1); |
| 2916 | assert_eq!(rejected.len(), 1); | ||
| 2917 | } | ||
| 2918 | } | 2955 | } |
| 2919 | 2956 | ||
| 2920 | #[tokio::test] | 2957 | #[tokio::test] |
| 2921 | async fn test_rejected_events_excluded_from_negentropy() { | 2958 | async fn test_rejected_events_excluded_from_negentropy() { |
| 2922 | // Create indices | 2959 | // Create indices |
| 2923 | let purgatory_ids: HashSet<EventId> = HashSet::new(); | 2960 | let purgatory_ids: HashSet<EventId> = HashSet::new(); |
| 2924 | let mut rejected_ids = HashSet::new(); | 2961 | let rejected_index = RejectedEventsIndex::new( |
| 2962 | Duration::from_secs(120), | ||
| 2963 | Duration::from_secs(604800), | ||
| 2964 | ); | ||
| 2925 | 2965 | ||
| 2926 | // Create test event IDs | 2966 | // Create test event IDs |
| 2927 | let rejected_id = EventId::from_hex( | 2967 | let rejected_id = EventId::from_hex( |
| @@ -2933,7 +2973,28 @@ mod tests { | |||
| 2933 | ) | 2973 | ) |
| 2934 | .unwrap(); | 2974 | .unwrap(); |
| 2935 | 2975 | ||
| 2936 | rejected_ids.insert(rejected_id); | 2976 | // Add rejected event to index |
| 2977 | let keys = Keys::generate(); | ||
| 2978 | let rejected_event = EventBuilder::new(Kind::GitRepoAnnouncement, "rejected") | ||
| 2979 | .tag(nostr_sdk::Tag::custom( | ||
| 2980 | nostr_sdk::TagKind::d(), | ||
| 2981 | vec!["rejected-repo"], | ||
| 2982 | )) | ||
| 2983 | .sign_with_keys(&keys) | ||
| 2984 | .unwrap(); | ||
| 2985 | |||
| 2986 | // Override the event ID for testing (we need a specific ID) | ||
| 2987 | // Since we can't override the ID, let's use the actual event ID | ||
| 2988 | let rejected_id = rejected_event.id; | ||
| 2989 | rejected_index.add_announcement( | ||
| 2990 | rejected_event, | ||
| 2991 | keys.public_key(), | ||
| 2992 | "rejected-repo".to_string(), | ||
| 2993 | rejected_index::RejectionReason::DoesNotListService, | ||
| 2994 | ); | ||
| 2995 | |||
| 2996 | // Get rejected IDs from index | ||
| 2997 | let rejected_ids = rejected_index.get_all_event_ids(); | ||
| 2937 | 2998 | ||
| 2938 | // Simulate negentropy reconciliation result | 2999 | // Simulate negentropy reconciliation result |
| 2939 | let mut remote_ids = HashSet::new(); | 3000 | let mut remote_ids = HashSet::new(); |
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index f89783a..80d6b5b 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs | |||
| @@ -85,7 +85,7 @@ | |||
| 85 | //! ``` | 85 | //! ``` |
| 86 | 86 | ||
| 87 | use nostr_sdk::{Event, EventId, PublicKey}; | 87 | use nostr_sdk::{Event, EventId, PublicKey}; |
| 88 | use std::collections::HashMap; | 88 | use std::collections::{HashMap, HashSet}; |
| 89 | use std::sync::{Arc, RwLock}; | 89 | use std::sync::{Arc, RwLock}; |
| 90 | use std::time::{Duration, Instant}; | 90 | use std::time::{Duration, Instant}; |
| 91 | 91 | ||
| @@ -396,6 +396,24 @@ impl RejectedEventsIndex { | |||
| 396 | pub fn cold_index_len(&self) -> usize { | 396 | pub fn cold_index_len(&self) -> usize { |
| 397 | self.cold_index.len() | 397 | self.cold_index.len() |
| 398 | } | 398 | } |
| 399 | |||
| 400 | /// Get all rejected event IDs (from both hot cache and cold index) | ||
| 401 | /// | ||
| 402 | /// Used for excluding rejected events from negentropy sync. | ||
| 403 | /// Note: This creates a snapshot - events may be added/removed concurrently. | ||
| 404 | pub fn get_all_event_ids(&self) -> HashSet<EventId> { | ||
| 405 | let mut ids = HashSet::new(); | ||
| 406 | |||
| 407 | // Add from hot cache | ||
| 408 | let hot_entries = self.hot_cache.entries.read().unwrap(); | ||
| 409 | ids.extend(hot_entries.keys().cloned()); | ||
| 410 | |||
| 411 | // Add from cold index | ||
| 412 | let cold_entries = self.cold_index.entries.read().unwrap(); | ||
| 413 | ids.extend(cold_entries.keys().cloned()); | ||
| 414 | |||
| 415 | ids | ||
| 416 | } | ||
| 399 | } | 417 | } |
| 400 | 418 | ||
| 401 | #[cfg(test)] | 419 | #[cfg(test)] |