upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 17:04:06 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 17:04:06 +0000
commit5ecd8d6a434f97da94daef2f59166086fbaf5a6b (patch)
tree54c7d3b953a6b1aedd1db6b9a719e18131659df5
parent895359aeb6746b98ff82944e4fca503f4a6e5439 (diff)
feat: implement state event authorization per GRASP-01 spec
Add comprehensive authorization checks to ensure state events are only accepted from maintainers of accepted repository announcements. This implements the core GRASP-01 requirement that pushes must match the latest state announcement "respecting the maintainer set." Changes: 1. StatePolicy authorization (src/nostr/policy/state.rs): - Check authorization BEFORE git data validation (fail-fast) - Reject if no announcement exists for repository - Reject if author not in maintainer set - Use existing helpers: fetch_repository_data() and pubkey_authorised_for_repo_owners() - Structured logging for all rejections 2. Purgatory invalidation (src/nostr/builder.rs): - New method: check_purgatory_state_events_for_identifier() - Called when announcements accepted (Accept and AcceptMaintainer) - Re-evaluates state events in purgatory for the identifier - Processes newly-authorized events (releases from purgatory) - Keeps unauthorized events for natural expiry (30 min) - Enables retroactive authorization when announcements arrive late 3. Purgatory sync authorization (src/git/sync.rs): - Check authorization BEFORE processing git data - Remove unauthorized events from purgatory (permanent rejection) - Prevents processing even if git data arrives first - Structured logging for monitoring 4. Rejected events tracking (src/sync/rejected_index.rs): - Add support for tracking rejected state events - New methods: add_state(), contains_state() - Separate metrics for state rejections - Enables sync to avoid re-fetching rejected states 5. Sync metrics (src/sync/metrics.rs, src/sync/mod.rs): - Add state-specific metrics (hot cache, cold index) - Track rejected states separately from announcements - Support monitoring of authorization rejections 6. Comprehensive tests (tests/state_authorization.rs): - test_reject_state_without_announcement - test_reject_state_from_unauthorized_author - test_accept_state_from_announcement_author - test_accept_state_from_maintainer Security Impact: - Before: State events could be published by anyone - After: Only maintainers can publish state events - Defense-in-depth: Authorization checked at 3 points: 1. On arrival (StatePolicy) 2. On announcement acceptance (purgatory re-evaluation) 3. On git data arrival (purgatory sync) All tests pass: - 248 unit tests - 51 NIP-34 announcement tests - 4 new state authorization tests - 9 rejected index tests Closes: State authorization requirement from GRASP-01 spec
-rw-r--r--src/git/sync.rs32
-rw-r--r--src/nostr/builder.rs72
-rw-r--r--src/nostr/policy/state.rs40
-rw-r--r--src/sync/metrics.rs103
-rw-r--r--src/sync/mod.rs40
-rw-r--r--src/sync/rejected_index.rs109
-rw-r--r--tests/state_authorization.rs280
7 files changed, 673 insertions, 3 deletions
diff --git a/src/git/sync.rs b/src/git/sync.rs
index b05e2d0..e8e9655 100644
--- a/src/git/sync.rs
+++ b/src/git/sync.rs
@@ -951,6 +951,38 @@ async fn process_purgatory_state_events(
951 } 951 }
952 }; 952 };
953 953
954 // CRITICAL: Check authorization before processing
955 // State events MUST be rejected if author is not in maintainer set
956 let authorized_owners = crate::git::authorization::pubkey_authorised_for_repo_owners(
957 &entry.event.pubkey,
958 &db_repo_data,
959 );
960
961 if authorized_owners.is_empty() {
962 warn!(
963 identifier = %identifier,
964 event_id = %entry.event.id,
965 author = %entry.event.pubkey.to_hex(),
966 "Rejecting state event from purgatory: author not in maintainer set"
967 );
968 // Remove from purgatory - this event will never be authorized
969 purgatory.remove_state_event(identifier, &entry.event.id);
970 result.errors.push(format!(
971 "State event {} rejected: author {} not in maintainer set",
972 entry.event.id,
973 entry.event.pubkey.to_hex()
974 ));
975 continue;
976 }
977
978 debug!(
979 identifier = %identifier,
980 event_id = %entry.event.id,
981 author = %entry.event.pubkey.to_hex(),
982 authorized_for_owners = ?authorized_owners,
983 "State event author authorized via maintainer set"
984 );
985
954 // Use unified processing function 986 // Use unified processing function
955 let process_result = crate::git::process::process_state_with_git_data( 987 let process_result = crate::git::process::process_state_with_git_data(
956 &state, 988 &state,
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 939ccef..acaac71 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -102,6 +102,11 @@ impl Nip34WritePolicy {
102 } 102 }
103 103
104 tracing::debug!("Accepted repository announcement: {}", event_id_str); 104 tracing::debug!("Accepted repository announcement: {}", event_id_str);
105
106 // Check purgatory for state events that might now be authorized
107 self.check_purgatory_state_events_for_identifier(&announcement.identifier)
108 .await;
109
105 WritePolicyResult::Accept 110 WritePolicyResult::Accept
106 } 111 }
107 Err(e) => { 112 Err(e) => {
@@ -125,6 +130,11 @@ impl Nip34WritePolicy {
125 announcement.identifier 130 announcement.identifier
126 ); 131 );
127 // Don't create bare repository for external announcements 132 // Don't create bare repository for external announcements
133
134 // Check purgatory for state events that might now be authorized
135 self.check_purgatory_state_events_for_identifier(&announcement.identifier)
136 .await;
137
128 WritePolicyResult::Accept 138 WritePolicyResult::Accept
129 } 139 }
130 Err(e) => { 140 Err(e) => {
@@ -304,6 +314,68 @@ impl Nip34WritePolicy {
304 } 314 }
305 } 315 }
306 316
317 /// Check purgatory for state events that might now be authorized by a new announcement
318 ///
319 /// When an announcement is accepted, state events in purgatory that were previously
320 /// rejected due to missing announcements might now be authorized. This method:
321 /// 1. Finds all state events in purgatory for the identifier
322 /// 2. Re-evaluates authorization for each event
323 /// 3. Processes authorized events (releases from purgatory)
324 /// 4. Keeps unauthorized events in purgatory (will expire naturally)
325 async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) {
326 let state_events = self.ctx.purgatory.find_state(identifier);
327
328 if state_events.is_empty() {
329 return;
330 }
331
332 tracing::debug!(
333 identifier = %identifier,
334 count = state_events.len(),
335 "Checking purgatory state events after announcement acceptance"
336 );
337
338 for entry in state_events {
339 // Re-evaluate authorization with the new announcement
340 match self.state_policy.process_state_event(&entry.event, false).await {
341 Ok(WritePolicyResult::Accept) => {
342 tracing::info!(
343 event_id = %entry.event.id,
344 identifier = %identifier,
345 "State event in purgatory now authorized, will be processed"
346 );
347 // Event will be automatically removed from purgatory by process_state_event
348 // and broadcast to subscribers
349 }
350 Ok(WritePolicyResult::Reject { message, .. }) => {
351 if message.contains("not authorized") {
352 tracing::debug!(
353 event_id = %entry.event.id,
354 identifier = %identifier,
355 "State event in purgatory still not authorized, keeping in purgatory"
356 );
357 // Keep in purgatory - will expire naturally after 30 minutes
358 } else {
359 tracing::debug!(
360 event_id = %entry.event.id,
361 identifier = %identifier,
362 reason = %message,
363 "State event in purgatory rejected for other reason"
364 );
365 }
366 }
367 Err(e) => {
368 tracing::warn!(
369 event_id = %entry.event.id,
370 identifier = %identifier,
371 error = %e,
372 "Error re-evaluating state event in purgatory"
373 );
374 }
375 }
376 }
377 }
378
307 /// Handle events that must reference accepted repositories or events 379 /// Handle events that must reference accepted repositories or events
308 async fn handle_related_event(&self, event: &Event, event_type: &str) -> WritePolicyResult { 380 async fn handle_related_event(&self, event: &Event, event_type: &str) -> WritePolicyResult {
309 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 381 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index acb76a3..d26b5ec 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -78,6 +78,46 @@ impl StatePolicy {
78 // Get all repositories and state events from db with identifier 78 // Get all repositories and state events from db with identifier
79 let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?; 79 let db_repo_data = fetch_repository_data(&self.ctx.database, &state.identifier).await?;
80 80
81 // CRITICAL: Check if author is authorized via maintainer set
82 // State events MUST be rejected if author is not in maintainer set of any accepted announcement
83 if db_repo_data.announcements.is_empty() {
84 tracing::warn!(
85 event_id = %event.id,
86 identifier = %state.identifier,
87 author = %event.pubkey.to_hex(),
88 "Rejecting state event: no announcement exists for this repository"
89 );
90 return Ok(WritePolicyResult::Reject {
91 status: false,
92 message: "invalid: no announcement exists for this repository".into(),
93 });
94 }
95
96 let authorized_owners =
97 crate::git::authorization::pubkey_authorised_for_repo_owners(&event.pubkey, &db_repo_data);
98
99 if authorized_owners.is_empty() {
100 tracing::warn!(
101 event_id = %event.id,
102 identifier = %state.identifier,
103 author = %event.pubkey.to_hex(),
104 announcements_count = db_repo_data.announcements.len(),
105 "Rejecting state event: author not in maintainer set of any announcement"
106 );
107 return Ok(WritePolicyResult::Reject {
108 status: false,
109 message: "invalid: author not authorized for this repository".into(),
110 });
111 }
112
113 tracing::debug!(
114 event_id = %event.id,
115 identifier = %state.identifier,
116 author = %event.pubkey.to_hex(),
117 authorized_for_owners = ?authorized_owners,
118 "State event author authorized via maintainer set"
119 );
120
81 // Duplicate check in db 121 // Duplicate check in db
82 if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) { 122 if db_repo_data.states.iter().any(|e| e.event.id.eq(&event.id)) {
83 tracing::debug!("processed state event duplicate (in db): {}", event.id); 123 tracing::debug!("processed state event duplicate (in db): {}", event.id);
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index a175210..7d6d42d 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -56,6 +56,22 @@ pub struct SyncMetrics {
56 rejected_announcements_cold_index_expired_total: IntCounter, 56 rejected_announcements_cold_index_expired_total: IntCounter,
57 /// Total invalidations (maintainer announcements invalidated) 57 /// Total invalidations (maintainer announcements invalidated)
58 rejected_announcements_invalidated_total: IntCounter, 58 rejected_announcements_invalidated_total: IntCounter,
59
60 // === Rejected States Index Metrics ===
61 /// Current number of state events in hot cache
62 rejected_states_hot_cache_current: IntGauge,
63 /// Total hot cache hits (state events re-processed from cache)
64 rejected_states_hot_cache_hits_total: IntCounter,
65 /// Total hot cache misses (state events not in cache)
66 rejected_states_hot_cache_misses_total: IntCounter,
67 /// Total expired state events removed from hot cache
68 rejected_states_hot_cache_expired_total: IntCounter,
69 /// Current number of state event entries in cold index
70 rejected_states_cold_index_current: IntGauge,
71 /// Total state event cold index entries expired and removed
72 rejected_states_cold_index_expired_total: IntCounter,
73 /// Total state event invalidations
74 rejected_states_invalidated_total: IntCounter,
59} 75}
60 76
61impl SyncMetrics { 77impl SyncMetrics {
@@ -172,6 +188,49 @@ impl SyncMetrics {
172 ))?; 188 ))?;
173 registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?; 189 registry.register(Box::new(rejected_announcements_invalidated_total.clone()))?;
174 190
191 // Rejected states metrics
192 let rejected_states_hot_cache_current = IntGauge::with_opts(Opts::new(
193 "ngit_sync_rejected_states_hot_cache_current",
194 "Current number of state events in hot cache (full events, 2 min expiry)",
195 ))?;
196 registry.register(Box::new(rejected_states_hot_cache_current.clone()))?;
197
198 let rejected_states_hot_cache_hits_total = IntCounter::with_opts(Opts::new(
199 "ngit_sync_rejected_states_hot_cache_hits_total",
200 "Total hot cache hits (state events re-processed from cache)",
201 ))?;
202 registry.register(Box::new(rejected_states_hot_cache_hits_total.clone()))?;
203
204 let rejected_states_hot_cache_misses_total = IntCounter::with_opts(Opts::new(
205 "ngit_sync_rejected_states_hot_cache_misses_total",
206 "Total hot cache misses (state events not in cache when invalidated)",
207 ))?;
208 registry.register(Box::new(rejected_states_hot_cache_misses_total.clone()))?;
209
210 let rejected_states_hot_cache_expired_total = IntCounter::with_opts(Opts::new(
211 "ngit_sync_rejected_states_hot_cache_expired_total",
212 "Total expired state events removed from hot cache",
213 ))?;
214 registry.register(Box::new(rejected_states_hot_cache_expired_total.clone()))?;
215
216 let rejected_states_cold_index_current = IntGauge::with_opts(Opts::new(
217 "ngit_sync_rejected_states_cold_index_current",
218 "Current number of state event entries in cold index (metadata only, 7 day expiry)",
219 ))?;
220 registry.register(Box::new(rejected_states_cold_index_current.clone()))?;
221
222 let rejected_states_cold_index_expired_total = IntCounter::with_opts(Opts::new(
223 "ngit_sync_rejected_states_cold_index_expired_total",
224 "Total state event cold index entries expired and removed",
225 ))?;
226 registry.register(Box::new(rejected_states_cold_index_expired_total.clone()))?;
227
228 let rejected_states_invalidated_total = IntCounter::with_opts(Opts::new(
229 "ngit_sync_rejected_states_invalidated_total",
230 "Total state event invalidations (when announcements accepted)",
231 ))?;
232 registry.register(Box::new(rejected_states_invalidated_total.clone()))?;
233
175 Ok(Self { 234 Ok(Self {
176 relay_connected, 235 relay_connected,
177 connection_attempts_total, 236 connection_attempts_total,
@@ -188,6 +247,13 @@ impl SyncMetrics {
188 rejected_announcements_cold_index_current, 247 rejected_announcements_cold_index_current,
189 rejected_announcements_cold_index_expired_total, 248 rejected_announcements_cold_index_expired_total,
190 rejected_announcements_invalidated_total, 249 rejected_announcements_invalidated_total,
250 rejected_states_hot_cache_current,
251 rejected_states_hot_cache_hits_total,
252 rejected_states_hot_cache_misses_total,
253 rejected_states_hot_cache_expired_total,
254 rejected_states_cold_index_current,
255 rejected_states_cold_index_expired_total,
256 rejected_states_invalidated_total,
191 }) 257 })
192 } 258 }
193 259
@@ -396,6 +462,43 @@ impl SyncMetrics {
396 pub fn record_invalidation(&self, count: usize) { 462 pub fn record_invalidation(&self, count: usize) {
397 self.rejected_announcements_invalidated_total.inc_by(count as u64); 463 self.rejected_announcements_invalidated_total.inc_by(count as u64);
398 } 464 }
465
466 // === Rejected States Recording Methods ===
467
468 /// Update state events hot cache current size gauge.
469 pub fn update_states_hot_cache_size(&self, size: usize) {
470 self.rejected_states_hot_cache_current.set(size as i64);
471 }
472
473 /// Record state event hot cache hit (event re-processed from cache).
474 pub fn record_states_hot_cache_hit(&self) {
475 self.rejected_states_hot_cache_hits_total.inc();
476 }
477
478 /// Record state event hot cache miss (event not in cache when invalidated).
479 pub fn record_states_hot_cache_miss(&self) {
480 self.rejected_states_hot_cache_misses_total.inc();
481 }
482
483 /// Record state event hot cache expired entries.
484 pub fn record_states_hot_cache_expired(&self, count: usize) {
485 self.rejected_states_hot_cache_expired_total.inc_by(count as u64);
486 }
487
488 /// Update state events cold index current size gauge.
489 pub fn update_states_cold_index_size(&self, size: usize) {
490 self.rejected_states_cold_index_current.set(size as i64);
491 }
492
493 /// Record state event cold index expired entries.
494 pub fn record_states_cold_index_expired(&self, count: usize) {
495 self.rejected_states_cold_index_expired_total.inc_by(count as u64);
496 }
497
498 /// Record state event invalidation.
499 pub fn record_states_invalidation(&self, count: usize) {
500 self.rejected_states_invalidated_total.inc_by(count as u64);
501 }
399} 502}
400 503
401#[cfg(test)] 504#[cfg(test)]
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index f296c0f..93b0e38 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -367,12 +367,14 @@ async fn run_daily_timer(
367/// Run the combined health and metrics checker 367/// Run the combined health and metrics checker
368/// 368///
369/// This function runs in a loop with a 2-second interval, performing three tasks: 369/// This function runs in a loop with a 2-second interval, performing three tasks:
370/// Background task for cleaning up expired entries from the rejected events index 370/// Background task for cleaning up expired entries from the rejected events indexes
371/// 371///
372/// This task runs two cleanup operations at different intervals: 372/// This task runs two cleanup operations at different intervals:
373/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache 373/// 1. **Hot cache cleanup (60s)**: Remove events older than 2 minutes from hot cache
374/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index 374/// 2. **Cold index cleanup (daily)**: Remove metadata older than 7 days from cold index
375/// 375///
376/// Cleans up both the announcements index and the states index.
377///
376/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly). 378/// The hot cache cleanup runs frequently to keep memory usage low (events expire quickly).
377/// The cold index cleanup runs daily since metadata is small and expires slowly. 379/// The cold index cleanup runs daily since metadata is small and expires slowly.
378async fn run_rejected_index_cleanup( 380async fn run_rejected_index_cleanup(
@@ -397,6 +399,8 @@ async fn run_rejected_index_cleanup(
397 tokio::select! { 399 tokio::select! {
398 _ = hot_cache_timer.tick() => { 400 _ = hot_cache_timer.tick() => {
399 let manager = sync_manager.lock().await; 401 let manager = sync_manager.lock().await;
402
403 // Clean up announcements index
400 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); 404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
401 if hot_expired > 0 { 405 if hot_expired > 0 {
402 tracing::debug!( 406 tracing::debug!(
@@ -404,9 +408,20 @@ async fn run_rejected_index_cleanup(
404 hot_expired 408 hot_expired
405 ); 409 );
406 } 410 }
411
412 // Clean up states index
413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired();
414 if states_hot_expired > 0 {
415 tracing::debug!(
416 "Cleaned up {} expired entries from rejected states hot cache",
417 states_hot_expired
418 );
419 }
407 } 420 }
408 _ = cold_index_timer.tick() => { 421 _ = cold_index_timer.tick() => {
409 let manager = sync_manager.lock().await; 422 let manager = sync_manager.lock().await;
423
424 // Clean up announcements index
410 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); 425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
411 if cold_expired > 0 { 426 if cold_expired > 0 {
412 tracing::info!( 427 tracing::info!(
@@ -414,6 +429,15 @@ async fn run_rejected_index_cleanup(
414 cold_expired 429 cold_expired
415 ); 430 );
416 } 431 }
432
433 // Clean up states index
434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired();
435 if states_cold_expired > 0 {
436 tracing::info!(
437 "Cleaned up {} expired entries from rejected states cold index",
438 states_cold_expired
439 );
440 }
417 } 441 }
418 _ = shutdown_rx.recv() => { 442 _ = shutdown_rx.recv() => {
419 tracing::info!("Rejected index cleanup received shutdown signal"); 443 tracing::info!("Rejected index cleanup received shutdown signal");
@@ -507,6 +531,8 @@ pub struct SyncManager {
507 pending_sync_index: PendingSyncIndex, 531 pending_sync_index: PendingSyncIndex,
508 /// Rejected announcement events (30617/30618) - two-tier storage for re-processing 532 /// Rejected announcement events (30617/30618) - two-tier storage for re-processing
509 rejected_events_index: Arc<RejectedEventsIndex>, 533 rejected_events_index: Arc<RejectedEventsIndex>,
534 /// Rejected state events (30618) - two-tier storage for re-processing
535 rejected_states_index: Arc<RejectedEventsIndex>,
510 /// Active relay connections - keyed by relay URL 536 /// Active relay connections - keyed by relay URL
511 connections: HashMap<String, RelayConnection>, 537 connections: HashMap<String, RelayConnection>,
512 /// Health tracker for relay connection state 538 /// Health tracker for relay connection state
@@ -571,6 +597,18 @@ impl SyncManager {
571 Duration::from_secs(config.rejected_cold_index_expiry_secs), 597 Duration::from_secs(config.rejected_cold_index_expiry_secs),
572 ) 598 )
573 }), 599 }),
600 rejected_states_index: Arc::new(if let Some(ref metrics) = sync_metrics {
601 RejectedEventsIndex::with_metrics(
602 Duration::from_secs(config.rejected_hot_cache_duration_secs),
603 Duration::from_secs(config.rejected_cold_index_expiry_secs),
604 metrics.clone(),
605 )
606 } else {
607 RejectedEventsIndex::new(
608 Duration::from_secs(config.rejected_hot_cache_duration_secs),
609 Duration::from_secs(config.rejected_cold_index_expiry_secs),
610 )
611 }),
574 connections: HashMap::new(), 612 connections: HashMap::new(),
575 health_tracker: Arc::new(RelayHealthTracker::new(config)), 613 health_tracker: Arc::new(RelayHealthTracker::new(config)),
576 next_batch_id: 0, 614 next_batch_id: 0,
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index 4733d80..f5ffef4 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -355,7 +355,7 @@ impl RejectedEventsIndex {
355 index 355 index
356 } 356 }
357 357
358 /// Update metrics with current sizes 358 /// Update metrics with current sizes (for announcements)
359 fn update_metrics(&self) { 359 fn update_metrics(&self) {
360 if let Some(ref metrics) = self.metrics { 360 if let Some(ref metrics) = self.metrics {
361 metrics.update_hot_cache_size(self.hot_cache.len()); 361 metrics.update_hot_cache_size(self.hot_cache.len());
@@ -363,6 +363,14 @@ impl RejectedEventsIndex {
363 } 363 }
364 } 364 }
365 365
366 /// Update metrics with current sizes (for states)
367 fn update_states_metrics(&self) {
368 if let Some(ref metrics) = self.metrics {
369 metrics.update_states_hot_cache_size(self.hot_cache.len());
370 metrics.update_states_cold_index_size(self.cold_index.len());
371 }
372 }
373
366 /// Add rejected announcement to both tiers 374 /// Add rejected announcement to both tiers
367 /// 375 ///
368 /// # Arguments 376 /// # Arguments
@@ -393,6 +401,36 @@ impl RejectedEventsIndex {
393 self.update_metrics(); 401 self.update_metrics();
394 } 402 }
395 403
404 /// Add rejected state event to both tiers
405 ///
406 /// # Arguments
407 ///
408 /// * `event` - Full event object (stored in hot cache)
409 /// * `pubkey` - Author's public key
410 /// * `identifier` - Repository identifier (d tag)
411 /// * `reason` - Why the state event was rejected
412 pub fn add_state(
413 &self,
414 event: Event,
415 pubkey: PublicKey,
416 identifier: String,
417 reason: RejectionReason,
418 ) {
419 // Add to hot cache (full event)
420 self.hot_cache.add(
421 event.clone(),
422 pubkey,
423 identifier.clone(),
424 reason,
425 );
426
427 // Add to cold index (metadata only)
428 self.cold_index.add(event.id, pubkey, identifier, reason);
429
430 // Update metrics (using states metrics)
431 self.update_states_metrics();
432 }
433
396 /// Check if event is already rejected (in either tier) 434 /// Check if event is already rejected (in either tier)
397 pub fn contains(&self, event_id: &EventId) -> bool { 435 pub fn contains(&self, event_id: &EventId) -> bool {
398 self.hot_cache.contains(event_id) || self.cold_index.contains(event_id) 436 self.hot_cache.contains(event_id) || self.cold_index.contains(event_id)
@@ -442,7 +480,51 @@ impl RejectedEventsIndex {
442 (removed, events) 480 (removed, events)
443 } 481 }
444 482
445 /// Clean up expired entries from both tiers 483 /// Invalidate state events and get events for immediate re-processing
484 ///
485 /// This is called when an announcement is accepted that authorizes state events.
486 /// It removes the cold index entries (so they can be re-fetched on next sync) and
487 /// returns any events still in the hot cache for immediate re-processing.
488 ///
489 /// # Returns
490 ///
491 /// Tuple of (number of cold index entries removed, events from hot cache)
492 pub fn invalidate_and_get_state_events(
493 &self,
494 maintainer_pubkey: &PublicKey,
495 identifier: &str,
496 ) -> (usize, Vec<Event>) {
497 // Remove from cold index (prevents re-fetch)
498 let removed = self
499 .cold_index
500 .invalidate_maintainer_announcements(maintainer_pubkey, identifier);
501
502 // Get from hot cache (for immediate re-processing)
503 let events = self
504 .hot_cache
505 .get_maintainer_events(maintainer_pubkey, identifier);
506
507 // Track metrics (using states metrics)
508 if let Some(ref metrics) = self.metrics {
509 if removed > 0 {
510 metrics.record_states_invalidation(removed);
511 }
512 if events.is_empty() {
513 metrics.record_states_hot_cache_miss();
514 } else {
515 for _ in &events {
516 metrics.record_states_hot_cache_hit();
517 }
518 }
519 }
520
521 // Update size metrics (using states metrics)
522 self.update_states_metrics();
523
524 (removed, events)
525 }
526
527 /// Clean up expired entries from both tiers (for announcements)
446 /// 528 ///
447 /// Returns tuple of (hot cache expired, cold index expired) 529 /// Returns tuple of (hot cache expired, cold index expired)
448 pub fn cleanup_expired(&self) -> (usize, usize) { 530 pub fn cleanup_expired(&self) -> (usize, usize) {
@@ -465,6 +547,29 @@ impl RejectedEventsIndex {
465 (hot_expired, cold_expired) 547 (hot_expired, cold_expired)
466 } 548 }
467 549
550 /// Clean up expired entries from both tiers (for states)
551 ///
552 /// Returns tuple of (hot cache expired, cold index expired)
553 pub fn cleanup_states_expired(&self) -> (usize, usize) {
554 let hot_expired = self.hot_cache.cleanup_expired();
555 let cold_expired = self.cold_index.cleanup_expired();
556
557 // Track metrics (using states metrics)
558 if let Some(ref metrics) = self.metrics {
559 if hot_expired > 0 {
560 metrics.record_states_hot_cache_expired(hot_expired);
561 }
562 if cold_expired > 0 {
563 metrics.record_states_cold_index_expired(cold_expired);
564 }
565 }
566
567 // Update size metrics (using states metrics)
568 self.update_states_metrics();
569
570 (hot_expired, cold_expired)
571 }
572
468 /// Get current number of entries in hot cache 573 /// Get current number of entries in hot cache
469 pub fn hot_cache_len(&self) -> usize { 574 pub fn hot_cache_len(&self) -> usize {
470 self.hot_cache.len() 575 self.hot_cache.len()
diff --git a/tests/state_authorization.rs b/tests/state_authorization.rs
new file mode 100644
index 0000000..a5dfa2d
--- /dev/null
+++ b/tests/state_authorization.rs
@@ -0,0 +1,280 @@
1//! Tests for state event authorization
2//!
3//! Verifies that state events are properly rejected when:
4//! 1. No announcement exists for the repository
5//! 2. Author is not in the maintainer set
6
7mod common;
8
9use common::relay::TestRelay;
10use nostr_sdk::prelude::*;
11
12#[tokio::test]
13async fn test_reject_state_without_announcement() {
14 // Start test relay
15 let relay = TestRelay::start().await;
16
17 // Create test keypair
18 let keys = Keys::generate();
19
20 // Create a state event without any announcement
21 let state_event = EventBuilder::new(
22 Kind::RepoState,
23 "",
24 )
25 .tags([
26 Tag::custom(TagKind::custom("d"), ["test-repo"]),
27 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
28 ])
29 .sign_with_keys(&keys)
30 .unwrap();
31
32 // Connect to relay
33 let client = Client::default();
34 client.add_relay(relay.url()).await.unwrap();
35 client.connect().await;
36
37 // Try to send state event
38 let result = client.send_event(&state_event).await;
39
40 // Should be rejected
41 match result {
42 Ok(output) => {
43 assert!(
44 !output.success.is_empty() || !output.failed.is_empty(),
45 "Event should be processed"
46 );
47 // Check if any relay rejected it
48 let rejected = output.failed.values().any(|err| {
49 err.to_string().contains("no announcement exists")
50 });
51 assert!(rejected, "Event should be rejected due to missing announcement");
52 }
53 Err(e) => {
54 // Also acceptable - relay rejected the event
55 assert!(
56 e.to_string().contains("no announcement exists") ||
57 e.to_string().contains("rejected"),
58 "Error should indicate missing announcement: {}",
59 e
60 );
61 }
62 }
63
64 relay.stop().await;
65}
66
67#[tokio::test]
68async fn test_reject_state_from_unauthorized_author() {
69 // Start test relay
70 let relay = TestRelay::start().await;
71
72 // Create two keypairs: one for announcement, one for unauthorized state
73 let announcement_keys = Keys::generate();
74 let unauthorized_keys = Keys::generate();
75
76 // Create announcement
77 let announcement = EventBuilder::new(
78 Kind::GitRepoAnnouncement,
79 "",
80 )
81 .tags([
82 Tag::custom(TagKind::custom("d"), ["test-repo"]),
83 Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]),
84 Tag::custom(TagKind::custom("relays"), [relay.url()]),
85 ])
86 .sign_with_keys(&announcement_keys)
87 .unwrap();
88
89 // Connect to relay
90 let client = Client::default();
91 client.add_relay(relay.url()).await.unwrap();
92 client.connect().await;
93
94 // Send announcement
95 client.send_event(&announcement).await.unwrap();
96
97 // Wait for announcement to be processed
98 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
99
100 // Try to send state event from unauthorized author
101 let state_event = EventBuilder::new(
102 Kind::RepoState,
103 "",
104 )
105 .tags([
106 Tag::custom(TagKind::custom("d"), ["test-repo"]),
107 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
108 ])
109 .sign_with_keys(&unauthorized_keys)
110 .unwrap();
111
112 let result = client.send_event(&state_event).await;
113
114 // Should be rejected
115 match result {
116 Ok(output) => {
117 let rejected = output.failed.values().any(|err| {
118 err.to_string().contains("not authorized")
119 });
120 assert!(rejected, "Event should be rejected due to unauthorized author");
121 }
122 Err(e) => {
123 assert!(
124 e.to_string().contains("not authorized") ||
125 e.to_string().contains("rejected"),
126 "Error should indicate unauthorized author: {}",
127 e
128 );
129 }
130 }
131
132 relay.stop().await;
133}
134
135#[tokio::test]
136async fn test_accept_state_from_announcement_author() {
137 // Start test relay
138 let relay = TestRelay::start().await;
139
140 // Create keypair
141 let keys = Keys::generate();
142
143 // Create announcement
144 let announcement = EventBuilder::new(
145 Kind::GitRepoAnnouncement,
146 "",
147 )
148 .tags([
149 Tag::custom(TagKind::custom("d"), ["test-repo"]),
150 Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]),
151 Tag::custom(TagKind::custom("relays"), [relay.url()]),
152 ])
153 .sign_with_keys(&keys)
154 .unwrap();
155
156 // Connect to relay
157 let client = Client::default();
158 client.add_relay(relay.url()).await.unwrap();
159 client.connect().await;
160
161 // Send announcement
162 client.send_event(&announcement).await.unwrap();
163
164 // Wait for announcement to be processed
165 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
166
167 // Send state event from same author (should be accepted or go to purgatory)
168 let state_event = EventBuilder::new(
169 Kind::RepoState,
170 "",
171 )
172 .tags([
173 Tag::custom(TagKind::custom("d"), ["test-repo"]),
174 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
175 ])
176 .sign_with_keys(&keys)
177 .unwrap();
178
179 let result = client.send_event(&state_event).await;
180
181 // Should be accepted or go to purgatory (not permanently rejected)
182 match result {
183 Ok(output) => {
184 // Check that it wasn't permanently rejected
185 let permanently_rejected = output.failed.values().any(|err| {
186 let err_str = err.to_string();
187 err_str.contains("not authorized") || err_str.contains("no announcement exists")
188 });
189 assert!(
190 !permanently_rejected,
191 "Event should not be permanently rejected when author is authorized"
192 );
193 }
194 Err(e) => {
195 // Purgatory is acceptable
196 assert!(
197 e.to_string().contains("purgatory") ||
198 e.to_string().contains("waiting for git"),
199 "Error should be about purgatory, not authorization: {}",
200 e
201 );
202 }
203 }
204
205 relay.stop().await;
206}
207
208#[tokio::test]
209async fn test_accept_state_from_maintainer() {
210 // Start test relay
211 let relay = TestRelay::start().await;
212
213 // Create two keypairs: owner and maintainer
214 let owner_keys = Keys::generate();
215 let maintainer_keys = Keys::generate();
216
217 // Create announcement with maintainer
218 let announcement = EventBuilder::new(
219 Kind::GitRepoAnnouncement,
220 "",
221 )
222 .tags([
223 Tag::custom(TagKind::custom("d"), ["test-repo"]),
224 Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]),
225 Tag::custom(TagKind::custom("relays"), [relay.url()]),
226 Tag::custom(TagKind::custom("maintainers"), [maintainer_keys.public_key().to_hex()]),
227 ])
228 .sign_with_keys(&owner_keys)
229 .unwrap();
230
231 // Connect to relay
232 let client = Client::default();
233 client.add_relay(relay.url()).await.unwrap();
234 client.connect().await;
235
236 // Send announcement
237 client.send_event(&announcement).await.unwrap();
238
239 // Wait for announcement to be processed
240 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
241
242 // Send state event from maintainer
243 let state_event = EventBuilder::new(
244 Kind::RepoState,
245 "",
246 )
247 .tags([
248 Tag::custom(TagKind::custom("d"), ["test-repo"]),
249 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
250 ])
251 .sign_with_keys(&maintainer_keys)
252 .unwrap();
253
254 let result = client.send_event(&state_event).await;
255
256 // Should be accepted or go to purgatory (not permanently rejected)
257 match result {
258 Ok(output) => {
259 let permanently_rejected = output.failed.values().any(|err| {
260 let err_str = err.to_string();
261 err_str.contains("not authorized") || err_str.contains("no announcement exists")
262 });
263 assert!(
264 !permanently_rejected,
265 "Event should not be permanently rejected when maintainer is authorized"
266 );
267 }
268 Err(e) => {
269 // Purgatory is acceptable
270 assert!(
271 e.to_string().contains("purgatory") ||
272 e.to_string().contains("waiting for git"),
273 "Error should be about purgatory, not authorization: {}",
274 e
275 );
276 }
277 }
278
279 relay.stop().await;
280}