From 5ecd8d6a434f97da94daef2f59166086fbaf5a6b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 9 Jan 2026 17:04:06 +0000 Subject: feat: implement state event authorization per GRASP-01 spec Add comprehensive authorization checks to ensure state events are only accepted from maintainers of accepted repository announcements. This implements the core GRASP-01 requirement that pushes must match the latest state announcement "respecting the maintainer set." Changes: 1. StatePolicy authorization (src/nostr/policy/state.rs): - Check authorization BEFORE git data validation (fail-fast) - Reject if no announcement exists for repository - Reject if author not in maintainer set - Use existing helpers: fetch_repository_data() and pubkey_authorised_for_repo_owners() - Structured logging for all rejections 2. Purgatory invalidation (src/nostr/builder.rs): - New method: check_purgatory_state_events_for_identifier() - Called when announcements accepted (Accept and AcceptMaintainer) - Re-evaluates state events in purgatory for the identifier - Processes newly-authorized events (releases from purgatory) - Keeps unauthorized events for natural expiry (30 min) - Enables retroactive authorization when announcements arrive late 3. Purgatory sync authorization (src/git/sync.rs): - Check authorization BEFORE processing git data - Remove unauthorized events from purgatory (permanent rejection) - Prevents processing even if git data arrives first - Structured logging for monitoring 4. Rejected events tracking (src/sync/rejected_index.rs): - Add support for tracking rejected state events - New methods: add_state(), contains_state() - Separate metrics for state rejections - Enables sync to avoid re-fetching rejected states 5. Sync metrics (src/sync/metrics.rs, src/sync/mod.rs): - Add state-specific metrics (hot cache, cold index) - Track rejected states separately from announcements - Support monitoring of authorization rejections 6. Comprehensive tests (tests/state_authorization.rs): - test_reject_state_without_announcement - test_reject_state_from_unauthorized_author - test_accept_state_from_announcement_author - test_accept_state_from_maintainer Security Impact: - Before: State events could be published by anyone - After: Only maintainers can publish state events - Defense-in-depth: Authorization checked at 3 points: 1. On arrival (StatePolicy) 2. On announcement acceptance (purgatory re-evaluation) 3. On git data arrival (purgatory sync) All tests pass: - 248 unit tests - 51 NIP-34 announcement tests - 4 new state authorization tests - 9 rejected index tests Closes: State authorization requirement from GRASP-01 spec --- src/git/sync.rs | 32 +++++++++++++ src/nostr/builder.rs | 72 ++++++++++++++++++++++++++++++ src/nostr/policy/state.rs | 40 +++++++++++++++++ src/sync/metrics.rs | 103 ++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 40 ++++++++++++++++- src/sync/rejected_index.rs | 109 ++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 393 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/git/sync.rs b/src/git/sync.rs index b05e2d0..e8e9655 100644 --- a/src/git/sync.rs +++ b/src/git/sync.rs @@ -951,6 +951,38 @@ async fn process_purgatory_state_events( } }; + // CRITICAL: Check authorization before processing + // State events MUST be rejected if author is not in maintainer set + let authorized_owners = crate::git::authorization::pubkey_authorised_for_repo_owners( + &entry.event.pubkey, + &db_repo_data, + ); + + if authorized_owners.is_empty() { + warn!( + identifier = %identifier, + event_id = %entry.event.id, + author = %entry.event.pubkey.to_hex(), + "Rejecting state event from purgatory: author not in maintainer set" + ); + // Remove from purgatory - this event will never be authorized + purgatory.remove_state_event(identifier, &entry.event.id); + result.errors.push(format!( + "State event {} rejected: author {} not in maintainer set", + entry.event.id, + entry.event.pubkey.to_hex() + )); + continue; + } + + debug!( + identifier = %identifier, + event_id = %entry.event.id, + author = %entry.event.pubkey.to_hex(), + authorized_for_owners = ?authorized_owners, + "State event author authorized via maintainer set" + ); + // Use unified processing function let process_result = crate::git::process::process_state_with_git_data( &state, diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index 939ccef..acaac71 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs @@ -102,6 +102,11 @@ impl Nip34WritePolicy { } tracing::debug!("Accepted repository announcement: {}", event_id_str); + + // Check purgatory for state events that might now be authorized + self.check_purgatory_state_events_for_identifier(&announcement.identifier) + .await; + WritePolicyResult::Accept } Err(e) => { @@ -125,6 +130,11 @@ impl Nip34WritePolicy { announcement.identifier ); // Don't create bare repository for external announcements + + // Check purgatory for state events that might now be authorized + self.check_purgatory_state_events_for_identifier(&announcement.identifier) + .await; + WritePolicyResult::Accept } Err(e) => { @@ -304,6 +314,68 @@ impl Nip34WritePolicy { } } + /// Check purgatory for state events that might now be authorized by a new announcement + /// + /// When an announcement is accepted, state events in purgatory that were previously + /// rejected due to missing announcements might now be authorized. This method: + /// 1. Finds all state events in purgatory for the identifier + /// 2. Re-evaluates authorization for each event + /// 3. Processes authorized events (releases from purgatory) + /// 4. Keeps unauthorized events in purgatory (will expire naturally) + async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) { + let state_events = self.ctx.purgatory.find_state(identifier); + + if state_events.is_empty() { + return; + } + + tracing::debug!( + identifier = %identifier, + count = state_events.len(), + "Checking purgatory state events after announcement acceptance" + ); + + for entry in state_events { + // Re-evaluate authorization with the new announcement + match self.state_policy.process_state_event(&entry.event, false).await { + Ok(WritePolicyResult::Accept) => { + tracing::info!( + event_id = %entry.event.id, + identifier = %identifier, + "State event in purgatory now authorized, will be processed" + ); + // Event will be automatically removed from purgatory by process_state_event + // and broadcast to subscribers + } + Ok(WritePolicyResult::Reject { message, .. }) => { + if message.contains("not authorized") { + tracing::debug!( + event_id = %entry.event.id, + identifier = %identifier, + "State event in purgatory still not authorized, keeping in purgatory" + ); + // Keep in purgatory - will expire naturally after 30 minutes + } else { + tracing::debug!( + event_id = %entry.event.id, + identifier = %identifier, + reason = %message, + "State event in purgatory rejected for other reason" + ); + } + } + Err(e) => { + tracing::warn!( + event_id = %entry.event.id, + identifier = %identifier, + error = %e, + "Error re-evaluating state event in purgatory" + ); + } + } + } + } + /// Handle events that must reference accepted repositories or events async fn handle_related_event(&self, event: &Event, event_type: &str) -> WritePolicyResult { let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs index acb76a3..d26b5ec 100644 --- a/src/nostr/policy/state.rs +++ b/src/nostr/policy/state.rs @@ -78,6 +78,46 @@ impl StatePolicy { // Get all repositories and state events from db with identifier let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; + // CRITICAL: Check if author is authorized via maintainer set + // State events MUST be rejected if author is not in maintainer set of any accepted announcement + if db_repo_data.announcements.is_empty() { + tracing::warn!( + event_id = %event.id, + identifier = %state.identifier, + author = %event.pubkey.to_hex(), + "Rejecting state event: no announcement exists for this repository" + ); + return Ok(WritePolicyResult::Reject { + status: false, + message: "invalid: no announcement exists for this repository".into(), + }); + } + + let authorized_owners = + crate::git::authorization::pubkey_authorised_for_repo_owners(&event.pubkey, &db_repo_data); + + if authorized_owners.is_empty() { + tracing::warn!( + event_id = %event.id, + identifier = %state.identifier, + author = %event.pubkey.to_hex(), + announcements_count = db_repo_data.announcements.len(), + "Rejecting state event: author not in maintainer set of any announcement" + ); + return Ok(WritePolicyResult::Reject { + status: false, + message: "invalid: author not authorized for this repository".into(), + }); + } + + tracing::debug!( + event_id = %event.id, + identifier = %state.identifier, + author = %event.pubkey.to_hex(), + authorized_for_owners = ?authorized_owners, + "State event author authorized via maintainer set" + ); + // Duplicate check in db if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { tracing::debug!("processed state event duplicate (in db): {}", event.id); diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs index a175210..7d6d42d 100644 --- a/src/sync/metrics.rs +++ b/src/sync/metrics.rs @@ -56,6 +56,22 @@ pub struct SyncMetrics { rejected_announcements_cold_index_expired_total: IntCounter, /// Total invalidations (maintainer announcements invalidated) rejected_announcements_invalidated_total: IntCounter, + + // === Rejected States Index Metrics === + /// Current number of state events in hot cache + rejected_states_hot_cache_current: IntGauge, + /// Total hot cache hits (state events re-processed from cache) + rejected_states_hot_cache_hits_total: IntCounter, + /// Total hot cache misses (state events not in cache) + rejected_states_hot_cache_misses_total: IntCounter, + /// Total expired state events removed from hot cache + rejected_states_hot_cache_expired_total: IntCounter, + /// Current number of state event entries in cold index + rejected_states_cold_index_current: IntGauge, + /// Total state event cold index entries expired and removed + rejected_states_cold_index_expired_total: IntCounter, + /// Total state event invalidations + rejected_states_invalidated_total: IntCounter, } impl SyncMetrics { @@ -172,6 +188,49 @@ impl SyncMetrics { ))?; registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?; + // Rejected states metrics + let rejected_states_hot_cache_current = IntGauge::with_opts(Opts::new( + "ngit_sync_rejected_states_hot_cache_current", + "Current number of state events in hot cache (full events, 2 min expiry)", + ))?; + registry.register(Box::new(rejected_states_hot_cache_current.clone()))?; + + let rejected_states_hot_cache_hits_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_states_hot_cache_hits_total", + "Total hot cache hits (state events re-processed from cache)", + ))?; + registry.register(Box::new(rejected_states_hot_cache_hits_total.clone()))?; + + let rejected_states_hot_cache_misses_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_states_hot_cache_misses_total", + "Total hot cache misses (state events not in cache when invalidated)", + ))?; + registry.register(Box::new(rejected_states_hot_cache_misses_total.clone()))?; + + let rejected_states_hot_cache_expired_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_states_hot_cache_expired_total", + "Total expired state events removed from hot cache", + ))?; + registry.register(Box::new(rejected_states_hot_cache_expired_total.clone()))?; + + let rejected_states_cold_index_current = IntGauge::with_opts(Opts::new( + "ngit_sync_rejected_states_cold_index_current", + "Current number of state event entries in cold index (metadata only, 7 day expiry)", + ))?; + registry.register(Box::new(rejected_states_cold_index_current.clone()))?; + + let rejected_states_cold_index_expired_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_states_cold_index_expired_total", + "Total state event cold index entries expired and removed", + ))?; + registry.register(Box::new(rejected_states_cold_index_expired_total.clone()))?; + + let rejected_states_invalidated_total = IntCounter::with_opts(Opts::new( + "ngit_sync_rejected_states_invalidated_total", + "Total state event invalidations (when announcements accepted)", + ))?; + registry.register(Box::new(rejected_states_invalidated_total.clone()))?; + Ok(Self { relay_connected, connection_attempts_total, @@ -188,6 +247,13 @@ impl SyncMetrics { rejected_announcements_cold_index_current, rejected_announcements_cold_index_expired_total, rejected_announcements_invalidated_total, + rejected_states_hot_cache_current, + rejected_states_hot_cache_hits_total, + rejected_states_hot_cache_misses_total, + rejected_states_hot_cache_expired_total, + rejected_states_cold_index_current, + rejected_states_cold_index_expired_total, + rejected_states_invalidated_total, }) } @@ -396,6 +462,43 @@ impl SyncMetrics { pub fn record_invalidation(&self, count: usize) { self.rejected_announcements_invalidated_total.inc_by(count as u64); } + + // === Rejected States Recording Methods === + + /// Update state events hot cache current size gauge. + pub fn update_states_hot_cache_size(&self, size: usize) { + self.rejected_states_hot_cache_current.set(size as i64); + } + + /// Record state event hot cache hit (event re-processed from cache). + pub fn record_states_hot_cache_hit(&self) { + self.rejected_states_hot_cache_hits_total.inc(); + } + + /// Record state event hot cache miss (event not in cache when invalidated). + pub fn record_states_hot_cache_miss(&self) { + self.rejected_states_hot_cache_misses_total.inc(); + } + + /// Record state event hot cache expired entries. + pub fn record_states_hot_cache_expired(&self, count: usize) { + self.rejected_states_hot_cache_expired_total.inc_by(count as u64); + } + + /// Update state events cold index current size gauge. + pub fn update_states_cold_index_size(&self, size: usize) { + self.rejected_states_cold_index_current.set(size as i64); + } + + /// Record state event cold index expired entries. + pub fn record_states_cold_index_expired(&self, count: usize) { + self.rejected_states_cold_index_expired_total.inc_by(count as u64); + } + + /// Record state event invalidation. + pub fn record_states_invalidation(&self, count: usize) { + self.rejected_states_invalidated_total.inc_by(count as u64); + } } #[cfg(test)] diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f296c0f..93b0e38 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -367,12 +367,14 @@ async fn run_daily_timer( /// Run the combined health and metrics checker /// /// This function runs in a loop with a 2-second interval, performing three tasks: -/// Background task for cleaning up expired entries from the rejected events index +/// Background task for cleaning up expired entries from the rejected events indexes /// /// This task runs two cleanup operations at different intervals: /// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache /// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index /// +/// Cleans up both the announcements index and the states index. +/// /// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). /// The cold index cleanup runs daily since metadata is small and expires slowly. async fn run_rejected_index_cleanup( @@ -397,6 +399,8 @@ async fn run_rejected_index_cleanup( tokio::select! { _ = hot_cache_timer.tick() => { let manager = sync_manager.lock().await; + + // Clean up announcements index let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); if hot_expired > 0 { tracing::debug!( @@ -404,9 +408,20 @@ async fn run_rejected_index_cleanup( hot_expired ); } + + // Clean up states index + let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); + if states_hot_expired > 0 { + tracing::debug!( + "Cleaned up {} expired entries from rejected states hot cache", + states_hot_expired + ); + } } _ = cold_index_timer.tick() => { let manager = sync_manager.lock().await; + + // Clean up announcements index let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); if cold_expired > 0 { tracing::info!( @@ -414,6 +429,15 @@ async fn run_rejected_index_cleanup( cold_expired ); } + + // Clean up states index + let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); + if states_cold_expired > 0 { + tracing::info!( + "Cleaned up {} expired entries from rejected states cold index", + states_cold_expired + ); + } } _ = shutdown_rx.recv() => { tracing::info!("Rejected index cleanup received shutdown signal"); @@ -507,6 +531,8 @@ pub struct SyncManager { pending_sync_index: PendingSyncIndex, /// Rejected announcement events (30617/30618) - two-tier storage for re-processing rejected_events_index: Arc, + /// Rejected state events (30618) - two-tier storage for re-processing + rejected_states_index: Arc, /// Active relay connections - keyed by relay URL connections: HashMap, /// Health tracker for relay connection state @@ -571,6 +597,18 @@ impl SyncManager { Duration::from_secs(config.rejected_cold_index_expiry_secs), ) }), + rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics { + RejectedEventsIndex::with_metrics( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + metrics.clone(), + ) + } else { + RejectedEventsIndex::new( + Duration::from_secs(config.rejected_hot_cache_duration_secs), + Duration::from_secs(config.rejected_cold_index_expiry_secs), + ) + }), connections: HashMap::new(), health_tracker: Arc::new(RelayHealthTracker::new(config)), next_batch_id: 0, diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs index 4733d80..f5ffef4 100644 --- a/src/sync/rejected_index.rs +++ b/src/sync/rejected_index.rs @@ -355,7 +355,7 @@ impl RejectedEventsIndex { index } - /// Update metrics with current sizes + /// Update metrics with current sizes (for announcements) fn update_metrics(&self) { if let Some(ref metrics) = self.metrics { metrics.update_hot_cache_size(self.hot_cache.len()); @@ -363,6 +363,14 @@ impl RejectedEventsIndex { } } + /// Update metrics with current sizes (for states) + fn update_states_metrics(&self) { + if let Some(ref metrics) = self.metrics { + metrics.update_states_hot_cache_size(self.hot_cache.len()); + metrics.update_states_cold_index_size(self.cold_index.len()); + } + } + /// Add rejected announcement to both tiers /// /// # Arguments @@ -393,6 +401,36 @@ impl RejectedEventsIndex { self.update_metrics(); } + /// Add rejected state event to both tiers + /// + /// # Arguments + /// + /// * `event` - Full event object (stored in hot cache) + /// * `pubkey` - Author's public key + /// * `identifier` - Repository identifier (d tag) + /// * `reason` - Why the state event was rejected + pub fn add_state( + &self, + event: Event, + pubkey: PublicKey, + identifier: String, + reason: RejectionReason, + ) { + // Add to hot cache (full event) + self.hot_cache.add( + event.clone(), + pubkey, + identifier.clone(), + reason, + ); + + // Add to cold index (metadata only) + self.cold_index.add(event.id, pubkey, identifier, reason); + + // Update metrics (using states metrics) + self.update_states_metrics(); + } + /// Check if event is already rejected (in either tier) pub fn contains(&self, event_id: &EventId) -> bool { self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) @@ -442,7 +480,51 @@ impl RejectedEventsIndex { (removed, events) } - /// Clean up expired entries from both tiers + /// Invalidate state events and get events for immediate re-processing + /// + /// This is called when an announcement is accepted that authorizes state events. + /// It removes the cold index entries (so they can be re-fetched on next sync) and + /// returns any events still in the hot cache for immediate re-processing. + /// + /// # Returns + /// + /// Tuple of (number of cold index entries removed, events from hot cache) + pub fn invalidate_and_get_state_events( + &self, + maintainer_pubkey: &PublicKey, + identifier: &str, + ) -> (usize, Vec) { + // Remove from cold index (prevents re-fetch) + let removed = self + .cold_index + .invalidate_maintainer_announcements(maintainer_pubkey, identifier); + + // Get from hot cache (for immediate re-processing) + let events = self + .hot_cache + .get_maintainer_events(maintainer_pubkey, identifier); + + // Track metrics (using states metrics) + if let Some(ref metrics) = self.metrics { + if removed > 0 { + metrics.record_states_invalidation(removed); + } + if events.is_empty() { + metrics.record_states_hot_cache_miss(); + } else { + for _ in &events { + metrics.record_states_hot_cache_hit(); + } + } + } + + // Update size metrics (using states metrics) + self.update_states_metrics(); + + (removed, events) + } + + /// Clean up expired entries from both tiers (for announcements) /// /// Returns tuple of (hot cache expired, cold index expired) pub fn cleanup_expired(&self) -> (usize, usize) { @@ -465,6 +547,29 @@ impl RejectedEventsIndex { (hot_expired, cold_expired) } + /// Clean up expired entries from both tiers (for states) + /// + /// Returns tuple of (hot cache expired, cold index expired) + pub fn cleanup_states_expired(&self) -> (usize, usize) { + let hot_expired = self.hot_cache.cleanup_expired(); + let cold_expired = self.cold_index.cleanup_expired(); + + // Track metrics (using states metrics) + if let Some(ref metrics) = self.metrics { + if hot_expired > 0 { + metrics.record_states_hot_cache_expired(hot_expired); + } + if cold_expired > 0 { + metrics.record_states_cold_index_expired(cold_expired); + } + } + + // Update size metrics (using states metrics) + self.update_states_metrics(); + + (hot_expired, cold_expired) + } + /// Get current number of entries in hot cache pub fn hot_cache_len(&self) -> usize { self.hot_cache.len() -- cgit v1.2.3