upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 19:58:41 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 19:58:41 +0000
commitb28a356cb41077ccee12a9c52f4ef2054e76cac6 (patch)
tree2a0867f1ab0216e86efa062aef90b2b8077e6fb9
parent6dd9fcd5392891b0ddb7894e2c5cb40450eae00e (diff)
chore: cargo fmt
-rw-r--r--src/config.rs33
-rw-r--r--src/http/nip11.rs4
-rw-r--r--src/nostr/builder.rs20
-rw-r--r--src/nostr/policy/state.rs8
-rw-r--r--src/sync/metrics.rs37
-rw-r--r--src/sync/mod.rs185
-rw-r--r--src/sync/rejected_index.rs57
-rw-r--r--src/sync/relay_connection.rs30
-rw-r--r--tests/common/sync_helpers.rs2
-rw-r--r--tests/state_authorization.rs242
-rw-r--r--tests/sync/maintainer_reprocessing.rs311
11 files changed, 462 insertions, 467 deletions
diff --git a/src/config.rs b/src/config.rs
index 2343c88..44001d8 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -141,12 +141,20 @@ pub struct Config {
141 /// Stores full event objects for immediate re-processing when dependencies resolve. 141 /// Stores full event objects for immediate re-processing when dependencies resolve.
142 /// Too short (<30s): Miss events from slow relays 142 /// Too short (<30s): Miss events from slow relays
143 /// Too long (>5min): Waste memory 143 /// Too long (>5min): Waste memory
144 #[arg(long, env = "NGIT_REJECTED_HOT_CACHE_DURATION_SECS", default_value_t = 120)] 144 #[arg(
145 long,
146 env = "NGIT_REJECTED_HOT_CACHE_DURATION_SECS",
147 default_value_t = 120
148 )]
145 pub rejected_hot_cache_duration_secs: u64, 149 pub rejected_hot_cache_duration_secs: u64,
146 150
147 /// Cold index expiry in seconds for rejected announcements (default: 604800 = 7 days) 151 /// Cold index expiry in seconds for rejected announcements (default: 604800 = 7 days)
148 /// Stores metadata only to prevent repeated downloads of rejected events. 152 /// Stores metadata only to prevent repeated downloads of rejected events.
149 #[arg(long, env = "NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS", default_value_t = 604800)] 153 #[arg(
154 long,
155 env = "NGIT_REJECTED_COLD_INDEX_EXPIRY_SECS",
156 default_value_t = 604800
157 )]
150 pub rejected_cold_index_expiry_secs: u64, 158 pub rejected_cold_index_expiry_secs: u64,
151} 159}
152 160
@@ -190,10 +198,7 @@ impl Config {
190 // Validate it's a valid nsec 198 // Validate it's a valid nsec
191 Keys::parse(&nsec).context("Invalid nsec in relay owner key file")?; 199 Keys::parse(&nsec).context("Invalid nsec in relay owner key file")?;
192 200
193 tracing::info!( 201 tracing::info!("Loaded relay owner key from {}", key_path.display());
194 "Loaded relay owner key from {}",
195 key_path.display()
196 );
197 return Ok(nsec); 202 return Ok(nsec);
198 } 203 }
199 204
@@ -202,8 +207,7 @@ impl Config {
202 let nsec = keys.secret_key().to_bech32()?; 207 let nsec = keys.secret_key().to_bech32()?;
203 208
204 // Save to file 209 // Save to file
205 fs::write(&key_path, &nsec) 210 fs::write(&key_path, &nsec).context("Failed to write relay owner key file")?;
206 .context("Failed to write relay owner key file")?;
207 211
208 tracing::info!( 212 tracing::info!(
209 "Generated new relay owner key and saved to {}", 213 "Generated new relay owner key and saved to {}",
@@ -215,7 +219,9 @@ impl Config {
215 219
216 /// Get the relay owner's Keys object 220 /// Get the relay owner's Keys object
217 pub fn relay_owner_keys(&self) -> Result<Keys> { 221 pub fn relay_owner_keys(&self) -> Result<Keys> {
218 let nsec = self.relay_owner_nsec.as_ref() 222 let nsec = self
223 .relay_owner_nsec
224 .as_ref()
219 .context("relay_owner_nsec not set (should be set by Config::load())")?; 225 .context("relay_owner_nsec not set (should be set by Config::load())")?;
220 Keys::parse(nsec).context("Invalid relay_owner_nsec") 226 Keys::parse(nsec).context("Invalid relay_owner_nsec")
221 } 227 }
@@ -251,8 +257,11 @@ impl Config {
251 pub fn for_testing() -> Self { 257 pub fn for_testing() -> Self {
252 // Generate a test key deterministically for consistent tests 258 // Generate a test key deterministically for consistent tests
253 let keys = Keys::generate(); 259 let keys = Keys::generate();
254 let nsec = keys.secret_key().to_bech32().expect("Failed to generate test nsec"); 260 let nsec = keys
255 261 .secret_key()
262 .to_bech32()
263 .expect("Failed to generate test nsec");
264
256 Self { 265 Self {
257 domain: "localhost:8080".to_string(), 266 domain: "localhost:8080".to_string(),
258 relay_owner_nsec: Some(nsec), 267 relay_owner_nsec: Some(nsec),
@@ -348,7 +357,7 @@ mod tests {
348 let config = Config::for_testing(); 357 let config = Config::for_testing();
349 let keys = config.relay_owner_keys().expect("Should have valid keys"); 358 let keys = config.relay_owner_keys().expect("Should have valid keys");
350 let npub = config.relay_owner_npub().expect("Should derive npub"); 359 let npub = config.relay_owner_npub().expect("Should derive npub");
351 360
352 // Verify the npub matches the keys 361 // Verify the npub matches the keys
353 assert_eq!(npub, keys.public_key().to_bech32().unwrap()); 362 assert_eq!(npub, keys.public_key().to_bech32().unwrap());
354 assert!(npub.starts_with("npub1")); 363 assert!(npub.starts_with("npub1"));
diff --git a/src/http/nip11.rs b/src/http/nip11.rs
index cf31cf3..de714cb 100644
--- a/src/http/nip11.rs
+++ b/src/http/nip11.rs
@@ -102,12 +102,12 @@ mod tests {
102 102
103 assert_eq!(doc.name, "Test Relay"); 103 assert_eq!(doc.name, "Test Relay");
104 assert_eq!(doc.description, "A test relay"); 104 assert_eq!(doc.description, "A test relay");
105 105
106 // Verify pubkey is present and is a valid npub 106 // Verify pubkey is present and is a valid npub
107 assert!(doc.pubkey.is_some()); 107 assert!(doc.pubkey.is_some());
108 let pubkey = doc.pubkey.unwrap(); 108 let pubkey = doc.pubkey.unwrap();
109 assert!(pubkey.starts_with("npub1")); 109 assert!(pubkey.starts_with("npub1"));
110 110
111 assert!(doc.supported_nips.contains(&1)); 111 assert!(doc.supported_nips.contains(&1));
112 assert!(doc.supported_nips.contains(&11)); 112 assert!(doc.supported_nips.contains(&11));
113 assert!(doc.supported_nips.contains(&34)); 113 assert!(doc.supported_nips.contains(&34));
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index acaac71..c010854 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -102,11 +102,11 @@ impl Nip34WritePolicy {
102 } 102 }
103 103
104 tracing::debug!("Accepted repository announcement: {}", event_id_str); 104 tracing::debug!("Accepted repository announcement: {}", event_id_str);
105 105
106 // Check purgatory for state events that might now be authorized 106 // Check purgatory for state events that might now be authorized
107 self.check_purgatory_state_events_for_identifier(&announcement.identifier) 107 self.check_purgatory_state_events_for_identifier(&announcement.identifier)
108 .await; 108 .await;
109 109
110 WritePolicyResult::Accept 110 WritePolicyResult::Accept
111 } 111 }
112 Err(e) => { 112 Err(e) => {
@@ -130,11 +130,11 @@ impl Nip34WritePolicy {
130 announcement.identifier 130 announcement.identifier
131 ); 131 );
132 // Don't create bare repository for external announcements 132 // Don't create bare repository for external announcements
133 133
134 // Check purgatory for state events that might now be authorized 134 // Check purgatory for state events that might now be authorized
135 self.check_purgatory_state_events_for_identifier(&announcement.identifier) 135 self.check_purgatory_state_events_for_identifier(&announcement.identifier)
136 .await; 136 .await;
137 137
138 WritePolicyResult::Accept 138 WritePolicyResult::Accept
139 } 139 }
140 Err(e) => { 140 Err(e) => {
@@ -324,20 +324,24 @@ impl Nip34WritePolicy {
324 /// 4. Keeps unauthorized events in purgatory (will expire naturally) 324 /// 4. Keeps unauthorized events in purgatory (will expire naturally)
325 async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) { 325 async fn check_purgatory_state_events_for_identifier(&self, identifier: &str) {
326 let state_events = self.ctx.purgatory.find_state(identifier); 326 let state_events = self.ctx.purgatory.find_state(identifier);
327 327
328 if state_events.is_empty() { 328 if state_events.is_empty() {
329 return; 329 return;
330 } 330 }
331 331
332 tracing::debug!( 332 tracing::debug!(
333 identifier = %identifier, 333 identifier = %identifier,
334 count = state_events.len(), 334 count = state_events.len(),
335 "Checking purgatory state events after announcement acceptance" 335 "Checking purgatory state events after announcement acceptance"
336 ); 336 );
337 337
338 for entry in state_events { 338 for entry in state_events {
339 // Re-evaluate authorization with the new announcement 339 // Re-evaluate authorization with the new announcement
340 match self.state_policy.process_state_event(&entry.event, false).await { 340 match self
341 .state_policy
342 .process_state_event(&entry.event, false)
343 .await
344 {
341 Ok(WritePolicyResult::Accept) => { 345 Ok(WritePolicyResult::Accept) => {
342 tracing::info!( 346 tracing::info!(
343 event_id = %entry.event.id, 347 event_id = %entry.event.id,
diff --git a/src/nostr/policy/state.rs b/src/nostr/policy/state.rs
index d26b5ec..b850e7b 100644
--- a/src/nostr/policy/state.rs
+++ b/src/nostr/policy/state.rs
@@ -93,9 +93,11 @@ impl StatePolicy {
93 }); 93 });
94 } 94 }
95 95
96 let authorized_owners = 96 let authorized_owners = crate::git::authorization::pubkey_authorised_for_repo_owners(
97 crate::git::authorization::pubkey_authorised_for_repo_owners(&event.pubkey, &db_repo_data); 97 &event.pubkey,
98 98 &db_repo_data,
99 );
100
99 if authorized_owners.is_empty() { 101 if authorized_owners.is_empty() {
100 tracing::warn!( 102 tracing::warn!(
101 event_id = %event.id, 103 event_id = %event.id,
diff --git a/src/sync/metrics.rs b/src/sync/metrics.rs
index 7d6d42d..2ed983e 100644
--- a/src/sync/metrics.rs
+++ b/src/sync/metrics.rs
@@ -156,19 +156,25 @@ impl SyncMetrics {
156 "ngit_sync_rejected_announcements_hot_cache_hits_total", 156 "ngit_sync_rejected_announcements_hot_cache_hits_total",
157 "Total hot cache hits (events re-processed from cache)", 157 "Total hot cache hits (events re-processed from cache)",
158 ))?; 158 ))?;
159 registry.register(Box::new(rejected_announcements_hot_cache_hits_total.clone()))?; 159 registry.register(Box::new(
160 rejected_announcements_hot_cache_hits_total.clone(),
161 ))?;
160 162
161 let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new( 163 let rejected_announcements_hot_cache_misses_total = IntCounter::with_opts(Opts::new(
162 "ngit_sync_rejected_announcements_hot_cache_misses_total", 164 "ngit_sync_rejected_announcements_hot_cache_misses_total",
163 "Total hot cache misses (events not in cache when invalidated)", 165 "Total hot cache misses (events not in cache when invalidated)",
164 ))?; 166 ))?;
165 registry.register(Box::new(rejected_announcements_hot_cache_misses_total.clone()))?; 167 registry.register(Box::new(
168 rejected_announcements_hot_cache_misses_total.clone(),
169 ))?;
166 170
167 let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new( 171 let rejected_announcements_hot_cache_expired_total = IntCounter::with_opts(Opts::new(
168 "ngit_sync_rejected_announcements_hot_cache_expired_total", 172 "ngit_sync_rejected_announcements_hot_cache_expired_total",
169 "Total expired entries removed from hot cache", 173 "Total expired entries removed from hot cache",
170 ))?; 174 ))?;
171 registry.register(Box::new(rejected_announcements_hot_cache_expired_total.clone()))?; 175 registry.register(Box::new(
176 rejected_announcements_hot_cache_expired_total.clone(),
177 ))?;
172 178
173 let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new( 179 let rejected_announcements_cold_index_current = IntGauge::with_opts(Opts::new(
174 "ngit_sync_rejected_announcements_cold_index_current", 180 "ngit_sync_rejected_announcements_cold_index_current",
@@ -180,7 +186,9 @@ impl SyncMetrics {
180 "ngit_sync_rejected_announcements_cold_index_expired_total", 186 "ngit_sync_rejected_announcements_cold_index_expired_total",
181 "Total expired entries removed from cold index", 187 "Total expired entries removed from cold index",
182 ))?; 188 ))?;
183 registry.register(Box::new(rejected_announcements_cold_index_expired_total.clone()))?; 189 registry.register(Box::new(
190 rejected_announcements_cold_index_expired_total.clone(),
191 ))?;
184 192
185 let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new( 193 let rejected_announcements_invalidated_total = IntCounter::with_opts(Opts::new(
186 "ngit_sync_rejected_announcements_invalidated_total", 194 "ngit_sync_rejected_announcements_invalidated_total",
@@ -430,7 +438,8 @@ impl SyncMetrics {
430 438
431 /// Update hot cache current size gauge. 439 /// Update hot cache current size gauge.
432 pub fn update_hot_cache_size(&self, size: usize) { 440 pub fn update_hot_cache_size(&self, size: usize) {
433 self.rejected_announcements_hot_cache_current.set(size as i64); 441 self.rejected_announcements_hot_cache_current
442 .set(size as i64);
434 } 443 }
435 444
436 /// Record hot cache hit (event re-processed from cache). 445 /// Record hot cache hit (event re-processed from cache).
@@ -445,22 +454,26 @@ impl SyncMetrics {
445 454
446 /// Record hot cache expired entries. 455 /// Record hot cache expired entries.
447 pub fn record_hot_cache_expired(&self, count: usize) { 456 pub fn record_hot_cache_expired(&self, count: usize) {
448 self.rejected_announcements_hot_cache_expired_total.inc_by(count as u64); 457 self.rejected_announcements_hot_cache_expired_total
458 .inc_by(count as u64);
449 } 459 }
450 460
451 /// Update cold index current size gauge. 461 /// Update cold index current size gauge.
452 pub fn update_cold_index_size(&self, size: usize) { 462 pub fn update_cold_index_size(&self, size: usize) {
453 self.rejected_announcements_cold_index_current.set(size as i64); 463 self.rejected_announcements_cold_index_current
464 .set(size as i64);
454 } 465 }
455 466
456 /// Record cold index expired entries. 467 /// Record cold index expired entries.
457 pub fn record_cold_index_expired(&self, count: usize) { 468 pub fn record_cold_index_expired(&self, count: usize) {
458 self.rejected_announcements_cold_index_expired_total.inc_by(count as u64); 469 self.rejected_announcements_cold_index_expired_total
470 .inc_by(count as u64);
459 } 471 }
460 472
461 /// Record invalidation (maintainer announcement invalidated). 473 /// Record invalidation (maintainer announcement invalidated).
462 pub fn record_invalidation(&self, count: usize) { 474 pub fn record_invalidation(&self, count: usize) {
463 self.rejected_announcements_invalidated_total.inc_by(count as u64); 475 self.rejected_announcements_invalidated_total
476 .inc_by(count as u64);
464 } 477 }
465 478
466 // === Rejected States Recording Methods === 479 // === Rejected States Recording Methods ===
@@ -482,7 +495,8 @@ impl SyncMetrics {
482 495
483 /// Record state event hot cache expired entries. 496 /// Record state event hot cache expired entries.
484 pub fn record_states_hot_cache_expired(&self, count: usize) { 497 pub fn record_states_hot_cache_expired(&self, count: usize) {
485 self.rejected_states_hot_cache_expired_total.inc_by(count as u64); 498 self.rejected_states_hot_cache_expired_total
499 .inc_by(count as u64);
486 } 500 }
487 501
488 /// Update state events cold index current size gauge. 502 /// Update state events cold index current size gauge.
@@ -492,7 +506,8 @@ impl SyncMetrics {
492 506
493 /// Record state event cold index expired entries. 507 /// Record state event cold index expired entries.
494 pub fn record_states_cold_index_expired(&self, count: usize) { 508 pub fn record_states_cold_index_expired(&self, count: usize) {
495 self.rejected_states_cold_index_expired_total.inc_by(count as u64); 509 self.rejected_states_cold_index_expired_total
510 .inc_by(count as u64);
496 } 511 }
497 512
498 /// Record state event invalidation. 513 /// Record state event invalidation.
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index ed3b78c..07527c7 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -27,7 +27,7 @@ pub use algorithms::{AddFilters, RelaySyncNeeds};
27pub use metrics::SyncMetrics; 27pub use metrics::SyncMetrics;
28 28
29// Re-export rejected index types 29// Re-export rejected index types
30pub use rejected_index::{RejectionReason}; 30pub use rejected_index::RejectionReason;
31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used 31// Note: RejectedEventsIndex struct exists in rejected_index.rs but not yet used
32// Current code still uses the simple HashSet type alias below 32// Current code still uses the simple HashSet type alias below
33 33
@@ -73,7 +73,7 @@ pub type PendingSyncIndex = Arc<RwLock<HashMap<String, Vec<PendingBatch>>>>;
73/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync. 73/// Tracks EventIds of announcement events (30617/30618) that were rejected during sync.
74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing 74/// These events are excluded from negentropy sync and skipped during REQ+EOSE processing
75/// to avoid repeatedly fetching and rejecting the same events. 75/// to avoid repeatedly fetching and rejecting the same events.
76/// 76///
77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs: 77/// Uses the two-tier RejectedEventsIndex from rejected_index.rs:
78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing) 78/// - Hot cache: Full events for 2 minutes (enables immediate re-processing)
79/// - Cold index: Metadata for 7 days (prevents repeated downloads) 79/// - Cold index: Metadata for 7 days (prevents repeated downloads)
@@ -113,7 +113,9 @@ impl ConnectionStatus {
113 pub fn is_live_sync_active(&self) -> bool { 113 pub fn is_live_sync_active(&self) -> bool {
114 matches!( 114 matches!(
115 self, 115 self,
116 ConnectionStatus::Syncing | ConnectionStatus::Connected | ConnectionStatus::ConnectedHistoricSyncFailures 116 ConnectionStatus::Syncing
117 | ConnectionStatus::Connected
118 | ConnectionStatus::ConnectedHistoricSyncFailures
117 ) 119 )
118 } 120 }
119} 121}
@@ -384,9 +386,7 @@ async fn run_rejected_index_cleanup(
384 let hot_cache_interval = Duration::from_secs(60); 386 let hot_cache_interval = Duration::from_secs(60);
385 let cold_index_interval = Duration::from_secs(86400); // 24 hours 387 let cold_index_interval = Duration::from_secs(86400); // 24 hours
386 388
387 tracing::info!( 389 tracing::info!("Rejected index cleanup started (hot cache: 60s, cold index: daily)");
388 "Rejected index cleanup started (hot cache: 60s, cold index: daily)"
389 );
390 390
391 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval); 391 let mut hot_cache_timer = tokio::time::interval(hot_cache_interval);
392 let mut cold_index_timer = tokio::time::interval(cold_index_interval); 392 let mut cold_index_timer = tokio::time::interval(cold_index_interval);
@@ -399,7 +399,7 @@ async fn run_rejected_index_cleanup(
399 tokio::select! { 399 tokio::select! {
400 _ = hot_cache_timer.tick() => { 400 _ = hot_cache_timer.tick() => {
401 let manager = sync_manager.lock().await; 401 let manager = sync_manager.lock().await;
402 402
403 // Clean up announcements index 403 // Clean up announcements index
404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired(); 404 let (hot_expired, _) = manager.rejected_events_index.cleanup_expired();
405 if hot_expired > 0 { 405 if hot_expired > 0 {
@@ -408,7 +408,7 @@ async fn run_rejected_index_cleanup(
408 hot_expired 408 hot_expired
409 ); 409 );
410 } 410 }
411 411
412 // Clean up states index 412 // Clean up states index
413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired(); 413 let (states_hot_expired, _) = manager.rejected_states_index.cleanup_states_expired();
414 if states_hot_expired > 0 { 414 if states_hot_expired > 0 {
@@ -420,7 +420,7 @@ async fn run_rejected_index_cleanup(
420 } 420 }
421 _ = cold_index_timer.tick() => { 421 _ = cold_index_timer.tick() => {
422 let manager = sync_manager.lock().await; 422 let manager = sync_manager.lock().await;
423 423
424 // Clean up announcements index 424 // Clean up announcements index
425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired(); 425 let (_, cold_expired) = manager.rejected_events_index.cleanup_expired();
426 if cold_expired > 0 { 426 if cold_expired > 0 {
@@ -429,7 +429,7 @@ async fn run_rejected_index_cleanup(
429 cold_expired 429 cold_expired
430 ); 430 );
431 } 431 }
432 432
433 // Clean up states index 433 // Clean up states index
434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired(); 434 let (_, states_cold_expired) = manager.rejected_states_index.cleanup_states_expired();
435 if states_cold_expired > 0 { 435 if states_cold_expired > 0 {
@@ -799,8 +799,7 @@ impl SyncManager {
799 if let (Some(requested), Some(received)) = 799 if let (Some(requested), Some(received)) =
800 (&batch.requested_event_ids, &batch.received_event_ids) 800 (&batch.requested_event_ids, &batch.received_event_ids)
801 { 801 {
802 let missing: Vec<EventId> = 802 let missing: Vec<EventId> = requested.difference(received).cloned().collect();
803 requested.difference(received).cloned().collect();
804 803
805 if !missing.is_empty() { 804 if !missing.is_empty() {
806 let requested_count = requested.len(); 805 let requested_count = requested.len();
@@ -884,13 +883,11 @@ impl SyncManager {
884 // Re-acquire lock and update batch with new subscriptions 883 // Re-acquire lock and update batch with new subscriptions
885 let mut pending = self.pending_sync_index.write().await; 884 let mut pending = self.pending_sync_index.write().await;
886 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 885 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
887 if let Some(batch) = 886 if let Some(batch) = batches.iter_mut().find(|b| b.batch_id == batch_id)
888 batches.iter_mut().find(|b| b.batch_id == batch_id)
889 { 887 {
890 batch.outstanding_subs.extend(new_sub_ids.clone()); 888 batch.outstanding_subs.extend(new_sub_ids.clone());
891 // Update requested_event_ids to only include missing ones 889 // Update requested_event_ids to only include missing ones
892 batch.requested_event_ids = 890 batch.requested_event_ids = Some(missing.iter().cloned().collect());
893 Some(missing.iter().cloned().collect());
894 // Clear received_event_ids for fresh tracking 891 // Clear received_event_ids for fresh tracking
895 batch.received_event_ids = Some(HashSet::new()); 892 batch.received_event_ids = Some(HashSet::new());
896 // Increment retry counter 893 // Increment retry counter
@@ -921,14 +918,14 @@ impl SyncManager {
921 // Re-acquire lock to extract the batch 918 // Re-acquire lock to extract the batch
922 let mut pending = self.pending_sync_index.write().await; 919 let mut pending = self.pending_sync_index.write().await;
923 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 920 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
924 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) 921 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
925 {
926 let completed_batch = batches.remove(idx); 922 let completed_batch = batches.remove(idx);
927 if batches.is_empty() { 923 if batches.is_empty() {
928 pending.remove(&relay_url_for_retry); 924 pending.remove(&relay_url_for_retry);
929 } 925 }
930 drop(pending); 926 drop(pending);
931 self.confirm_batch(&relay_url_for_retry, completed_batch).await; 927 self.confirm_batch(&relay_url_for_retry, completed_batch)
928 .await;
932 } 929 }
933 } 930 }
934 return; 931 return;
@@ -1023,17 +1020,17 @@ impl SyncManager {
1023 "Batch completed but no RelayState found for relay" 1020 "Batch completed but no RelayState found for relay"
1024 ); 1021 );
1025 } 1022 }
1026 1023
1027 // Release lock before checking if historic sync is complete 1024 // Release lock before checking if historic sync is complete
1028 drop(relay_index); 1025 drop(relay_index);
1029 1026
1030 // Spawn background task to check if historic sync is complete 1027 // Spawn background task to check if historic sync is complete
1031 // This avoids blocking the confirm_batch flow for 6 seconds 1028 // This avoids blocking the confirm_batch flow for 6 seconds
1032 let relay_url = relay_url.to_string(); 1029 let relay_url = relay_url.to_string();
1033 let pending_index = self.pending_sync_index.clone(); 1030 let pending_index = self.pending_sync_index.clone();
1034 let relay_index = self.relay_sync_index.clone(); 1031 let relay_index = self.relay_sync_index.clone();
1035 let metrics = self.metrics.clone(); 1032 let metrics = self.metrics.clone();
1036 1033
1037 tokio::spawn(async move { 1034 tokio::spawn(async move {
1038 Self::check_and_complete_historic_sync_impl( 1035 Self::check_and_complete_historic_sync_impl(
1039 &relay_url, 1036 &relay_url,
@@ -1073,29 +1070,33 @@ impl SyncManager {
1073 // First check: Are there any pending batches? 1070 // First check: Are there any pending batches?
1074 let has_pending = { 1071 let has_pending = {
1075 let pending = pending_index.read().await; 1072 let pending = pending_index.read().await;
1076 pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) 1073 pending
1074 .get(relay_url)
1075 .is_some_and(|batches| !batches.is_empty())
1077 }; 1076 };
1078 1077
1079 if has_pending { 1078 if has_pending {
1080 // Still syncing, don't transition yet 1079 // Still syncing, don't transition yet
1081 return; 1080 return;
1082 } 1081 }
1083 1082
1084 // Wait for self-subscriber batch window + buffer to catch any in-flight events 1083 // Wait for self-subscriber batch window + buffer to catch any in-flight events
1085 // that might create new Layer 2/3 filters 1084 // that might create new Layer 2/3 filters
1086 tokio::time::sleep(Duration::from_millis(6000)).await; 1085 tokio::time::sleep(Duration::from_millis(6000)).await;
1087 1086
1088 // Second check: Are there still no pending batches? 1087 // Second check: Are there still no pending batches?
1089 let has_pending = { 1088 let has_pending = {
1090 let pending = pending_index.read().await; 1089 let pending = pending_index.read().await;
1091 pending.get(relay_url).is_some_and(|batches| !batches.is_empty()) 1090 pending
1091 .get(relay_url)
1092 .is_some_and(|batches| !batches.is_empty())
1092 }; 1093 };
1093 1094
1094 if has_pending { 1095 if has_pending {
1095 // New batches appeared during the wait - still syncing 1096 // New batches appeared during the wait - still syncing
1096 return; 1097 return;
1097 } 1098 }
1098 1099
1099 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded 1100 // No pending batches after waiting - safe to transition to Connected or ConnectedDegraded
1100 let mut relay_index_guard = relay_index.write().await; 1101 let mut relay_index_guard = relay_index.write().await;
1101 if let Some(state) = relay_index_guard.get_mut(relay_url) { 1102 if let Some(state) = relay_index_guard.get_mut(relay_url) {
@@ -1106,11 +1107,11 @@ impl SyncManager {
1106 } else { 1107 } else {
1107 ConnectionStatus::Connected 1108 ConnectionStatus::Connected
1108 }; 1109 };
1109 1110
1110 state.connection_status = new_status; 1111 state.connection_status = new_status;
1111 state.historic_sync_completed = true; 1112 state.historic_sync_completed = true;
1112 state.historic_sync_completed_at = Some(Timestamp::now()); 1113 state.historic_sync_completed_at = Some(Timestamp::now());
1113 1114
1114 tracing::info!( 1115 tracing::info!(
1115 relay = %relay_url, 1116 relay = %relay_url,
1116 repos_synced = state.repos.len(), 1117 repos_synced = state.repos.len(),
@@ -1120,7 +1121,7 @@ impl SyncManager {
1120 "Historic sync complete - transitioned to {} status", 1121 "Historic sync complete - transitioned to {} status",
1121 if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" } 1122 if state.historic_sync_had_failures { "ConnectedHistoricSyncFailures" } else { "Connected" }
1122 ); 1123 );
1123 1124
1124 // Update metrics 1125 // Update metrics
1125 if let Some(ref metrics) = metrics { 1126 if let Some(ref metrics) = metrics {
1126 metrics.record_connection_status(relay_url, new_status); 1127 metrics.record_connection_status(relay_url, new_status);
@@ -1362,8 +1363,8 @@ impl SyncManager {
1362 ); 1363 );
1363 return; 1364 return;
1364 } 1365 }
1365 Some(ConnectionStatus::Syncing) 1366 Some(ConnectionStatus::Syncing)
1366 | Some(ConnectionStatus::Connected) 1367 | Some(ConnectionStatus::Connected)
1367 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => { 1368 | Some(ConnectionStatus::ConnectedHistoricSyncFailures) => {
1368 // Continue to subscribe - live sync is active, can accept new filters 1369 // Continue to subscribe - live sync is active, can accept new filters
1369 } 1370 }
@@ -1468,7 +1469,8 @@ impl SyncManager {
1468 match relay_event { 1469 match relay_event {
1469 RelayEvent::Event(event, subscription_id) => { 1470 RelayEvent::Event(event, subscription_id) => {
1470 // Skip events we've already rejected (announcements only) 1471 // Skip events we've already rejected (announcements only)
1471 if (event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState) 1472 if (event.kind == Kind::GitRepoAnnouncement
1473 || event.kind == Kind::RepoState)
1472 && rejected_events_index.contains(&event.id) 1474 && rejected_events_index.contains(&event.id)
1473 { 1475 {
1474 tracing::trace!( 1476 tracing::trace!(
@@ -1479,7 +1481,7 @@ impl SyncManager {
1479 ); 1481 );
1480 continue; 1482 continue;
1481 } 1483 }
1482 1484
1483 let result = Self::process_event_static( 1485 let result = Self::process_event_static(
1484 &event, 1486 &event,
1485 &relay_url_clone, 1487 &relay_url_clone,
@@ -1863,11 +1865,16 @@ impl SyncManager {
1863 // Create RelayConnection if not exists 1865 // Create RelayConnection if not exists
1864 if !self.connections.contains_key(&relay_url) { 1866 if !self.connections.contains_key(&relay_url) {
1865 // Get relay owner keys for NIP-42 authentication 1867 // Get relay owner keys for NIP-42 authentication
1866 let keys = self.config.relay_owner_keys() 1868 let keys = self
1869 .config
1870 .relay_owner_keys()
1867 .expect("relay_owner_keys should be available"); 1871 .expect("relay_owner_keys should be available");
1868 1872
1869 let connection = 1873 let connection = RelayConnection::new_with_database(
1870 RelayConnection::new_with_database(relay_url.clone(), Arc::clone(&self.database), keys); 1874 relay_url.clone(),
1875 Arc::clone(&self.database),
1876 keys,
1877 );
1871 self.connections.insert(relay_url.clone(), connection); 1878 self.connections.insert(relay_url.clone(), connection);
1872 tracing::debug!(relay = %relay_url, "Registered new relay connection"); 1879 tracing::debug!(relay = %relay_url, "Registered new relay connection");
1873 } 1880 }
@@ -1919,7 +1926,7 @@ impl SyncManager {
1919 state.connection_status = ConnectionStatus::Connecting; 1926 state.connection_status = ConnectionStatus::Connecting;
1920 } 1927 }
1921 } 1928 }
1922 1929
1923 // Update metrics to show connecting status 1930 // Update metrics to show connecting status
1924 if let Some(ref metrics) = self.metrics { 1931 if let Some(ref metrics) = self.metrics {
1925 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting); 1932 metrics.record_connection_status(relay_url, ConnectionStatus::Connecting);
@@ -1974,7 +1981,8 @@ impl SyncManager {
1974 if let Some(ref metrics) = self.metrics { 1981 if let Some(ref metrics) = self.metrics {
1975 metrics.record_connection_attempt(relay_url, false); 1982 metrics.record_connection_attempt(relay_url, false);
1976 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected); 1983 metrics.record_connection_status(relay_url, ConnectionStatus::Disconnected);
1977 metrics.record_health_state(relay_url, self.health_tracker.get_state(relay_url)); 1984 metrics
1985 .record_health_state(relay_url, self.health_tracker.get_state(relay_url));
1978 } 1986 }
1979 } 1987 }
1980 } 1988 }
@@ -2172,7 +2180,7 @@ impl SyncManager {
2172 // immediately and should now pass validation. 2180 // immediately and should now pass validation.
2173 if event.kind == Kind::GitRepoAnnouncement { 2181 if event.kind == Kind::GitRepoAnnouncement {
2174 use crate::nostr::events::RepositoryAnnouncement; 2182 use crate::nostr::events::RepositoryAnnouncement;
2175 2183
2176 match RepositoryAnnouncement::from_event(event.clone()) { 2184 match RepositoryAnnouncement::from_event(event.clone()) {
2177 Ok(announcement) => { 2185 Ok(announcement) => {
2178 if !announcement.maintainers.is_empty() { 2186 if !announcement.maintainers.is_empty() {
@@ -2219,15 +2227,16 @@ impl SyncManager {
2219 // 2. Second attempt uses maintainer exception (different code path) 2227 // 2. Second attempt uses maintainer exception (different code path)
2220 // 3. If second attempt fails, stays in cold index only (no third attempt) 2228 // 3. If second attempt fails, stays in cold index only (no third attempt)
2221 // Use Box::pin to avoid infinitely sized future 2229 // Use Box::pin to avoid infinitely sized future
2222 let reprocess_result = Box::pin(Self::process_event_static( 2230 let reprocess_result =
2223 &maintainer_event, 2231 Box::pin(Self::process_event_static(
2224 relay_url, 2232 &maintainer_event,
2225 database, 2233 relay_url,
2226 write_policy, 2234 database,
2227 local_relay, 2235 write_policy,
2228 rejected_events_index, 2236 local_relay,
2229 )) 2237 rejected_events_index,
2230 .await; 2238 ))
2239 .await;
2231 2240
2232 match reprocess_result { 2241 match reprocess_result {
2233 ProcessResult::Saved => { 2242 ProcessResult::Saved => {
@@ -2275,7 +2284,7 @@ impl SyncManager {
2275 ); 2284 );
2276 } 2285 }
2277 } 2286 }
2278 2287
2279 // When a repository announcement is accepted, re-process any state events 2288 // When a repository announcement is accepted, re-process any state events
2280 // that were previously rejected because no announcement existed. 2289 // that were previously rejected because no announcement existed.
2281 // This handles the race condition where state events arrive before their 2290 // This handles the race condition where state events arrive before their
@@ -2284,7 +2293,10 @@ impl SyncManager {
2284 Ok(announcement) => { 2293 Ok(announcement) => {
2285 // Get the announcement author's state events that were rejected 2294 // Get the announcement author's state events that were rejected
2286 let (removed, hot_events) = rejected_events_index 2295 let (removed, hot_events) = rejected_events_index
2287 .invalidate_and_get_state_events(&event.pubkey, &announcement.identifier); 2296 .invalidate_and_get_state_events(
2297 &event.pubkey,
2298 &announcement.identifier,
2299 );
2288 2300
2289 if removed > 0 { 2301 if removed > 0 {
2290 tracing::info!( 2302 tracing::info!(
@@ -2357,7 +2369,7 @@ impl SyncManager {
2357 } 2369 }
2358 } 2370 }
2359 } 2371 }
2360 2372
2361 // When a state event is accepted (git data arrived), re-process any other 2373 // When a state event is accepted (git data arrived), re-process any other
2362 // rejected state events for the same repository. This handles the case where 2374 // rejected state events for the same repository. This handles the case where
2363 // multiple state events arrive but only one has git data initially. 2375 // multiple state events arrive but only one has git data initially.
@@ -2454,7 +2466,7 @@ impl SyncManager {
2454 reason = %message, 2466 reason = %message,
2455 "Event rejected by write policy" 2467 "Event rejected by write policy"
2456 ); 2468 );
2457 2469
2458 // Track rejected announcement and state events to avoid re-fetching them 2470 // Track rejected announcement and state events to avoid re-fetching them
2459 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState { 2471 if event.kind == Kind::GitRepoAnnouncement || event.kind == Kind::RepoState {
2460 // Extract identifier from 'd' tag 2472 // Extract identifier from 'd' tag
@@ -2465,12 +2477,14 @@ impl SyncManager {
2465 .and_then(|t| t.content()) 2477 .and_then(|t| t.content())
2466 { 2478 {
2467 // Determine rejection reason based on message 2479 // Determine rejection reason based on message
2468 let reason = if message.contains("doesn't list this service") 2480 let reason = if message.contains("doesn't list this service")
2469 || message.contains("Announcement must list service") { 2481 || message.contains("Announcement must list service")
2482 {
2470 rejected_index::RejectionReason::DoesNotListService 2483 rejected_index::RejectionReason::DoesNotListService
2471 } else if message.contains("maintainer") 2484 } else if message.contains("maintainer")
2472 || message.contains("no announcement exists") 2485 || message.contains("no announcement exists")
2473 || message.contains("not authorized") { 2486 || message.contains("not authorized")
2487 {
2474 rejected_index::RejectionReason::MaintainerNotYetValid 2488 rejected_index::RejectionReason::MaintainerNotYetValid
2475 } else { 2489 } else {
2476 rejected_index::RejectionReason::Other 2490 rejected_index::RejectionReason::Other
@@ -2512,7 +2526,7 @@ impl SyncManager {
2512 ); 2526 );
2513 } 2527 }
2514 } 2528 }
2515 2529
2516 ProcessResult::Rejected 2530 ProcessResult::Rejected
2517 } 2531 }
2518 } 2532 }
@@ -3042,7 +3056,8 @@ impl SyncManager {
3042 // Get event IDs to exclude: purgatory + rejected announcements 3056 // Get event IDs to exclude: purgatory + rejected announcements
3043 let purgatory_ids = self.purgatory.event_ids(); 3057 let purgatory_ids = self.purgatory.event_ids();
3044 let rejected_ids = self.rejected_events_index.get_all_event_ids(); 3058 let rejected_ids = self.rejected_events_index.get_all_event_ids();
3045 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect(); 3059 let excluded_ids: HashSet<EventId> =
3060 purgatory_ids.union(&rejected_ids).cloned().collect();
3046 3061
3047 for (idx, result) in diff_results { 3062 for (idx, result) in diff_results {
3048 match result { 3063 match result {
@@ -3166,8 +3181,7 @@ impl SyncManager {
3166 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) { 3181 if let Some(batch) = relay_batches.iter_mut().find(|b| b.batch_id == batch_id) {
3167 batch.outstanding_subs.extend(subscription_ids.clone()); 3182 batch.outstanding_subs.extend(subscription_ids.clone());
3168 // Store requested event IDs for validation after EOSE 3183 // Store requested event IDs for validation after EOSE
3169 batch.requested_event_ids = 3184 batch.requested_event_ids = Some(all_remote_ids.iter().cloned().collect());
3170 Some(all_remote_ids.iter().cloned().collect());
3171 batch.received_event_ids = Some(HashSet::new()); 3185 batch.received_event_ids = Some(HashSet::new());
3172 } 3186 }
3173 } 3187 }
@@ -3359,20 +3373,16 @@ mod tests {
3359 async fn test_rejected_events_excluded_from_negentropy() { 3373 async fn test_rejected_events_excluded_from_negentropy() {
3360 // Create indices 3374 // Create indices
3361 let purgatory_ids: HashSet<EventId> = HashSet::new(); 3375 let purgatory_ids: HashSet<EventId> = HashSet::new();
3362 let rejected_index = RejectedEventsIndex::new( 3376 let rejected_index =
3363 Duration::from_secs(120), 3377 RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
3364 Duration::from_secs(604800),
3365 );
3366 3378
3367 // Create test event IDs 3379 // Create test event IDs
3368 let _rejected_id = EventId::from_hex( 3380 let _rejected_id =
3369 "0000000000000000000000000000000000000000000000000000000000000001", 3381 EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000001")
3370 ) 3382 .unwrap();
3371 .unwrap(); 3383 let valid_id =
3372 let valid_id = EventId::from_hex( 3384 EventId::from_hex("0000000000000000000000000000000000000000000000000000000000000002")
3373 "0000000000000000000000000000000000000000000000000000000000000002", 3385 .unwrap();
3374 )
3375 .unwrap();
3376 3386
3377 // Add rejected event to index 3387 // Add rejected event to index
3378 let keys = Keys::generate(); 3388 let keys = Keys::generate();
@@ -3383,7 +3393,7 @@ mod tests {
3383 )) 3393 ))
3384 .sign_with_keys(&keys) 3394 .sign_with_keys(&keys)
3385 .unwrap(); 3395 .unwrap();
3386 3396
3387 // Override the event ID for testing (we need a specific ID) 3397 // Override the event ID for testing (we need a specific ID)
3388 // Since we can't override the ID, let's use the actual event ID 3398 // Since we can't override the ID, let's use the actual event ID
3389 let rejected_id = rejected_event.id; 3399 let rejected_id = rejected_event.id;
@@ -3403,8 +3413,7 @@ mod tests {
3403 remote_ids.insert(valid_id); 3413 remote_ids.insert(valid_id);
3404 3414
3405 // Exclude rejected and purgatory events 3415 // Exclude rejected and purgatory events
3406 let excluded_ids: HashSet<EventId> = 3416 let excluded_ids: HashSet<EventId> = purgatory_ids.union(&rejected_ids).cloned().collect();
3407 purgatory_ids.union(&rejected_ids).cloned().collect();
3408 let filtered_ids: HashSet<EventId> = 3417 let filtered_ids: HashSet<EventId> =
3409 remote_ids.difference(&excluded_ids).cloned().collect(); 3418 remote_ids.difference(&excluded_ids).cloned().collect();
3410 3419
@@ -3422,22 +3431,14 @@ mod tests {
3422 // Requested 5 events from negentropy diff 3431 // Requested 5 events from negentropy diff
3423 let mut requested: HashSet<EventId> = HashSet::new(); 3432 let mut requested: HashSet<EventId> = HashSet::new();
3424 for i in 1u8..=5 { 3433 for i in 1u8..=5 {
3425 let id = EventId::from_hex(&format!( 3434 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3426 "{:0>64}",
3427 format!("{:x}", i)
3428 ))
3429 .unwrap();
3430 requested.insert(id); 3435 requested.insert(id);
3431 } 3436 }
3432 3437
3433 // Only received 3 events (simulating relay limit) 3438 // Only received 3 events (simulating relay limit)
3434 let mut received: HashSet<EventId> = HashSet::new(); 3439 let mut received: HashSet<EventId> = HashSet::new();
3435 for i in 1u8..=3 { 3440 for i in 1u8..=3 {
3436 let id = EventId::from_hex(&format!( 3441 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3437 "{:0>64}",
3438 format!("{:x}", i)
3439 ))
3440 .unwrap();
3441 received.insert(id); 3442 received.insert(id);
3442 } 3443 }
3443 3444
@@ -3461,11 +3462,7 @@ mod tests {
3461 // Simulate scenario where all requested events are received 3462 // Simulate scenario where all requested events are received
3462 let mut requested: HashSet<EventId> = HashSet::new(); 3463 let mut requested: HashSet<EventId> = HashSet::new();
3463 for i in 1u8..=3 { 3464 for i in 1u8..=3 {
3464 let id = EventId::from_hex(&format!( 3465 let id = EventId::from_hex(&format!("{:0>64}", format!("{:x}", i))).unwrap();
3465 "{:0>64}",
3466 format!("{:x}", i)
3467 ))
3468 .unwrap();
3469 requested.insert(id); 3466 requested.insert(id);
3470 } 3467 }
3471 3468
diff --git a/src/sync/rejected_index.rs b/src/sync/rejected_index.rs
index a9d7a4d..403792a 100644
--- a/src/sync/rejected_index.rs
+++ b/src/sync/rejected_index.rs
@@ -190,9 +190,7 @@ impl HotCache {
190 let now = Instant::now(); 190 let now = Instant::now();
191 let initial_count = entries.len(); 191 let initial_count = entries.len();
192 192
193 entries.retain(|_, entry| { 193 entries.retain(|_, entry| now.duration_since(entry.cached_at) < self.expiry_duration);
194 now.duration_since(entry.cached_at) < self.expiry_duration
195 });
196 194
197 initial_count - entries.len() 195 initial_count - entries.len()
198 } 196 }
@@ -284,9 +282,7 @@ impl ColdIndex {
284 let now = Instant::now(); 282 let now = Instant::now();
285 let initial_count = entries.len(); 283 let initial_count = entries.len();
286 284
287 entries.retain(|_, entry| { 285 entries.retain(|_, entry| now.duration_since(entry.rejected_at) < self.expiry_duration);
288 now.duration_since(entry.rejected_at) < self.expiry_duration
289 });
290 286
291 initial_count - entries.len() 287 initial_count - entries.len()
292 } 288 }
@@ -389,12 +385,8 @@ impl RejectedEventsIndex {
389 reason: RejectionReason, 385 reason: RejectionReason,
390 ) { 386 ) {
391 // Add to hot cache (full event) 387 // Add to hot cache (full event)
392 self.hot_cache.add( 388 self.hot_cache
393 event.clone(), 389 .add(event.clone(), pubkey, identifier.clone(), reason);
394 pubkey,
395 identifier.clone(),
396 reason,
397 );
398 390
399 // Add to cold index (metadata only) 391 // Add to cold index (metadata only)
400 self.cold_index.add(event.id, pubkey, identifier, reason); 392 self.cold_index.add(event.id, pubkey, identifier, reason);
@@ -419,12 +411,8 @@ impl RejectedEventsIndex {
419 reason: RejectionReason, 411 reason: RejectionReason,
420 ) { 412 ) {
421 // Add to hot cache (full event) 413 // Add to hot cache (full event)
422 self.hot_cache.add( 414 self.hot_cache
423 event.clone(), 415 .add(event.clone(), pubkey, identifier.clone(), reason);
424 pubkey,
425 identifier.clone(),
426 reason,
427 );
428 416
429 // Add to cold index (metadata only) 417 // Add to cold index (metadata only)
430 self.cold_index.add(event.id, pubkey, identifier, reason); 418 self.cold_index.add(event.id, pubkey, identifier, reason);
@@ -608,8 +596,7 @@ mod tests {
608 596
609 async fn create_test_event() -> Event { 597 async fn create_test_event() -> Event {
610 let keys = Keys::generate(); 598 let keys = Keys::generate();
611 let unsigned = nostr_sdk::EventBuilder::text_note("test") 599 let unsigned = nostr_sdk::EventBuilder::text_note("test").build(keys.public_key());
612 .build(keys.public_key());
613 keys.sign_event(unsigned).await.unwrap() 600 keys.sign_event(unsigned).await.unwrap()
614 } 601 }
615 602
@@ -695,10 +682,7 @@ mod tests {
695 682
696 #[tokio::test] 683 #[tokio::test]
697 async fn test_two_tier_index_add_and_contains() { 684 async fn test_two_tier_index_add_and_contains() {
698 let index = RejectedEventsIndex::new( 685 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
699 Duration::from_secs(120),
700 Duration::from_secs(604800),
701 );
702 let event = create_test_event().await; 686 let event = create_test_event().await;
703 687
704 index.add_announcement( 688 index.add_announcement(
@@ -715,10 +699,7 @@ mod tests {
715 699
716 #[tokio::test] 700 #[tokio::test]
717 async fn test_invalidate_and_get_events() { 701 async fn test_invalidate_and_get_events() {
718 let index = RejectedEventsIndex::new( 702 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
719 Duration::from_secs(120),
720 Duration::from_secs(604800),
721 );
722 let event = create_test_event().await; 703 let event = create_test_event().await;
723 let pubkey = event.pubkey; 704 let pubkey = event.pubkey;
724 let identifier = "test-repo".to_string(); 705 let identifier = "test-repo".to_string();
@@ -773,10 +754,8 @@ mod tests {
773 754
774 #[tokio::test] 755 #[tokio::test]
775 async fn test_hot_cache_miss_after_expiry() { 756 async fn test_hot_cache_miss_after_expiry() {
776 let index = RejectedEventsIndex::new( 757 let index =
777 Duration::from_millis(50), 758 RejectedEventsIndex::new(Duration::from_millis(50), Duration::from_secs(604800));
778 Duration::from_secs(604800),
779 );
780 let event = create_test_event().await; 759 let event = create_test_event().await;
781 let pubkey = event.pubkey; 760 let pubkey = event.pubkey;
782 let identifier = "test-repo".to_string(); 761 let identifier = "test-repo".to_string();
@@ -801,20 +780,15 @@ mod tests {
801 780
802 #[tokio::test] 781 #[tokio::test]
803 async fn test_multiple_maintainer_repos() { 782 async fn test_multiple_maintainer_repos() {
804 let index = RejectedEventsIndex::new( 783 let index = RejectedEventsIndex::new(Duration::from_secs(120), Duration::from_secs(604800));
805 Duration::from_secs(120),
806 Duration::from_secs(604800),
807 );
808 784
809 let keys1 = Keys::generate(); 785 let keys1 = Keys::generate();
810 let keys2 = Keys::generate(); 786 let keys2 = Keys::generate();
811 787
812 let unsigned1 = nostr_sdk::EventBuilder::text_note("test1") 788 let unsigned1 = nostr_sdk::EventBuilder::text_note("test1").build(keys1.public_key());
813 .build(keys1.public_key());
814 let event1 = keys1.sign_event(unsigned1).await.unwrap(); 789 let event1 = keys1.sign_event(unsigned1).await.unwrap();
815 790
816 let unsigned2 = nostr_sdk::EventBuilder::text_note("test2") 791 let unsigned2 = nostr_sdk::EventBuilder::text_note("test2").build(keys2.public_key());
817 .build(keys2.public_key());
818 let event2 = keys2.sign_event(unsigned2).await.unwrap(); 792 let event2 = keys2.sign_event(unsigned2).await.unwrap();
819 793
820 // Add two different maintainer repos 794 // Add two different maintainer repos
@@ -836,8 +810,7 @@ mod tests {
836 assert_eq!(index.cold_index_len(), 2); 810 assert_eq!(index.cold_index_len(), 2);
837 811
838 // Invalidate only first maintainer 812 // Invalidate only first maintainer
839 let (removed, hot_events) = 813 let (removed, hot_events) = index.invalidate_and_get_events(&event1.pubkey, "repo1");
840 index.invalidate_and_get_events(&event1.pubkey, "repo1");
841 814
842 assert_eq!(removed, 1); 815 assert_eq!(removed, 1);
843 assert_eq!(hot_events.len(), 1); 816 assert_eq!(hot_events.len(), 1);
diff --git a/src/sync/relay_connection.rs b/src/sync/relay_connection.rs
index d0090c8..b86d298 100644
--- a/src/sync/relay_connection.rs
+++ b/src/sync/relay_connection.rs
@@ -526,31 +526,46 @@ mod tests {
526 #[test] 526 #[test]
527 fn test_normalize_url_with_wss_scheme() { 527 fn test_normalize_url_with_wss_scheme() {
528 let url = "wss://relay.example.com"; 528 let url = "wss://relay.example.com";
529 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); 529 assert_eq!(
530 RelayConnection::normalize_url(url),
531 "wss://relay.example.com"
532 );
530 } 533 }
531 534
532 #[test] 535 #[test]
533 fn test_normalize_url_with_ws_scheme() { 536 fn test_normalize_url_with_ws_scheme() {
534 let url = "ws://relay.example.com"; 537 let url = "ws://relay.example.com";
535 assert_eq!(RelayConnection::normalize_url(url), "ws://relay.example.com"); 538 assert_eq!(
539 RelayConnection::normalize_url(url),
540 "ws://relay.example.com"
541 );
536 } 542 }
537 543
538 #[test] 544 #[test]
539 fn test_normalize_url_without_scheme() { 545 fn test_normalize_url_without_scheme() {
540 let url = "relay.example.com"; 546 let url = "relay.example.com";
541 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com"); 547 assert_eq!(
548 RelayConnection::normalize_url(url),
549 "wss://relay.example.com"
550 );
542 } 551 }
543 552
544 #[test] 553 #[test]
545 fn test_normalize_url_without_scheme_with_port() { 554 fn test_normalize_url_without_scheme_with_port() {
546 let url = "relay.example.com:8080"; 555 let url = "relay.example.com:8080";
547 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com:8080"); 556 assert_eq!(
557 RelayConnection::normalize_url(url),
558 "wss://relay.example.com:8080"
559 );
548 } 560 }
549 561
550 #[test] 562 #[test]
551 fn test_normalize_url_with_path() { 563 fn test_normalize_url_with_path() {
552 let url = "relay.example.com/nostr"; 564 let url = "relay.example.com/nostr";
553 assert_eq!(RelayConnection::normalize_url(url), "wss://relay.example.com/nostr"); 565 assert_eq!(
566 RelayConnection::normalize_url(url),
567 "wss://relay.example.com/nostr"
568 );
554 } 569 }
555 570
556 #[test] 571 #[test]
@@ -587,6 +602,9 @@ mod tests {
587 fn test_normalize_url_real_world_example() { 602 fn test_normalize_url_real_world_example() {
588 // Test the exact case from the bug report 603 // Test the exact case from the bug report
589 let url = "git.shakespeare.diy"; 604 let url = "git.shakespeare.diy";
590 assert_eq!(RelayConnection::normalize_url(url), "wss://git.shakespeare.diy"); 605 assert_eq!(
606 RelayConnection::normalize_url(url),
607 "wss://git.shakespeare.diy"
608 );
591 } 609 }
592} 610}
diff --git a/tests/common/sync_helpers.rs b/tests/common/sync_helpers.rs
index d6a6ee4..5fc2ad7 100644
--- a/tests/common/sync_helpers.rs
+++ b/tests/common/sync_helpers.rs
@@ -708,7 +708,7 @@ impl ParsedMetrics {
708 /// Check if a specific relay is connected 708 /// Check if a specific relay is connected
709 pub fn relay_connected(&self, relay: &str) -> Option<bool> { 709 pub fn relay_connected(&self, relay: &str) -> Option<bool> {
710 self.gauge("ngit_sync_relay_connected", &[("relay", relay)]) 710 self.gauge("ngit_sync_relay_connected", &[("relay", relay)])
711 .map(|v| v >= 2) // Syncing (2), Connected (3), or ConnectedHistoricSyncFailures (4) 711 .map(|v| v >= 2) // Syncing (2), Connected (3), or ConnectedHistoricSyncFailures (4)
712 } 712 }
713 713
714 /// Get total number of connected relays 714 /// Get total number of connected relays
diff --git a/tests/state_authorization.rs b/tests/state_authorization.rs
index a5dfa2d..d443005 100644
--- a/tests/state_authorization.rs
+++ b/tests/state_authorization.rs
@@ -13,30 +13,27 @@ use nostr_sdk::prelude::*;
13async fn test_reject_state_without_announcement() { 13async fn test_reject_state_without_announcement() {
14 // Start test relay 14 // Start test relay
15 let relay = TestRelay::start().await; 15 let relay = TestRelay::start().await;
16 16
17 // Create test keypair 17 // Create test keypair
18 let keys = Keys::generate(); 18 let keys = Keys::generate();
19 19
20 // Create a state event without any announcement 20 // Create a state event without any announcement
21 let state_event = EventBuilder::new( 21 let state_event = EventBuilder::new(Kind::RepoState, "")
22 Kind::RepoState, 22 .tags([
23 "", 23 Tag::custom(TagKind::custom("d"), ["test-repo"]),
24 ) 24 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
25 .tags([ 25 ])
26 Tag::custom(TagKind::custom("d"), ["test-repo"]), 26 .sign_with_keys(&keys)
27 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), 27 .unwrap();
28 ]) 28
29 .sign_with_keys(&keys)
30 .unwrap();
31
32 // Connect to relay 29 // Connect to relay
33 let client = Client::default(); 30 let client = Client::default();
34 client.add_relay(relay.url()).await.unwrap(); 31 client.add_relay(relay.url()).await.unwrap();
35 client.connect().await; 32 client.connect().await;
36 33
37 // Try to send state event 34 // Try to send state event
38 let result = client.send_event(&state_event).await; 35 let result = client.send_event(&state_event).await;
39 36
40 // Should be rejected 37 // Should be rejected
41 match result { 38 match result {
42 Ok(output) => { 39 Ok(output) => {
@@ -45,22 +42,26 @@ async fn test_reject_state_without_announcement() {
45 "Event should be processed" 42 "Event should be processed"
46 ); 43 );
47 // Check if any relay rejected it 44 // Check if any relay rejected it
48 let rejected = output.failed.values().any(|err| { 45 let rejected = output
49 err.to_string().contains("no announcement exists") 46 .failed
50 }); 47 .values()
51 assert!(rejected, "Event should be rejected due to missing announcement"); 48 .any(|err| err.to_string().contains("no announcement exists"));
49 assert!(
50 rejected,
51 "Event should be rejected due to missing announcement"
52 );
52 } 53 }
53 Err(e) => { 54 Err(e) => {
54 // Also acceptable - relay rejected the event 55 // Also acceptable - relay rejected the event
55 assert!( 56 assert!(
56 e.to_string().contains("no announcement exists") || 57 e.to_string().contains("no announcement exists")
57 e.to_string().contains("rejected"), 58 || e.to_string().contains("rejected"),
58 "Error should indicate missing announcement: {}", 59 "Error should indicate missing announcement: {}",
59 e 60 e
60 ); 61 );
61 } 62 }
62 } 63 }
63 64
64 relay.stop().await; 65 relay.stop().await;
65} 66}
66 67
@@ -68,67 +69,67 @@ async fn test_reject_state_without_announcement() {
68async fn test_reject_state_from_unauthorized_author() { 69async fn test_reject_state_from_unauthorized_author() {
69 // Start test relay 70 // Start test relay
70 let relay = TestRelay::start().await; 71 let relay = TestRelay::start().await;
71 72
72 // Create two keypairs: one for announcement, one for unauthorized state 73 // Create two keypairs: one for announcement, one for unauthorized state
73 let announcement_keys = Keys::generate(); 74 let announcement_keys = Keys::generate();
74 let unauthorized_keys = Keys::generate(); 75 let unauthorized_keys = Keys::generate();
75 76
76 // Create announcement 77 // Create announcement
77 let announcement = EventBuilder::new( 78 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "")
78 Kind::GitRepoAnnouncement, 79 .tags([
79 "", 80 Tag::custom(TagKind::custom("d"), ["test-repo"]),
80 ) 81 Tag::custom(
81 .tags([ 82 TagKind::custom("clone"),
82 Tag::custom(TagKind::custom("d"), ["test-repo"]), 83 [format!("https://{}/test.git", relay.domain())],
83 Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]), 84 ),
84 Tag::custom(TagKind::custom("relays"), [relay.url()]), 85 Tag::custom(TagKind::custom("relays"), [relay.url()]),
85 ]) 86 ])
86 .sign_with_keys(&announcement_keys) 87 .sign_with_keys(&announcement_keys)
87 .unwrap(); 88 .unwrap();
88 89
89 // Connect to relay 90 // Connect to relay
90 let client = Client::default(); 91 let client = Client::default();
91 client.add_relay(relay.url()).await.unwrap(); 92 client.add_relay(relay.url()).await.unwrap();
92 client.connect().await; 93 client.connect().await;
93 94
94 // Send announcement 95 // Send announcement
95 client.send_event(&announcement).await.unwrap(); 96 client.send_event(&announcement).await.unwrap();
96 97
97 // Wait for announcement to be processed 98 // Wait for announcement to be processed
98 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 99 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
99 100
100 // Try to send state event from unauthorized author 101 // Try to send state event from unauthorized author
101 let state_event = EventBuilder::new( 102 let state_event = EventBuilder::new(Kind::RepoState, "")
102 Kind::RepoState, 103 .tags([
103 "", 104 Tag::custom(TagKind::custom("d"), ["test-repo"]),
104 ) 105 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
105 .tags([ 106 ])
106 Tag::custom(TagKind::custom("d"), ["test-repo"]), 107 .sign_with_keys(&unauthorized_keys)
107 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), 108 .unwrap();
108 ]) 109
109 .sign_with_keys(&unauthorized_keys)
110 .unwrap();
111
112 let result = client.send_event(&state_event).await; 110 let result = client.send_event(&state_event).await;
113 111
114 // Should be rejected 112 // Should be rejected
115 match result { 113 match result {
116 Ok(output) => { 114 Ok(output) => {
117 let rejected = output.failed.values().any(|err| { 115 let rejected = output
118 err.to_string().contains("not authorized") 116 .failed
119 }); 117 .values()
120 assert!(rejected, "Event should be rejected due to unauthorized author"); 118 .any(|err| err.to_string().contains("not authorized"));
119 assert!(
120 rejected,
121 "Event should be rejected due to unauthorized author"
122 );
121 } 123 }
122 Err(e) => { 124 Err(e) => {
123 assert!( 125 assert!(
124 e.to_string().contains("not authorized") || 126 e.to_string().contains("not authorized") || e.to_string().contains("rejected"),
125 e.to_string().contains("rejected"),
126 "Error should indicate unauthorized author: {}", 127 "Error should indicate unauthorized author: {}",
127 e 128 e
128 ); 129 );
129 } 130 }
130 } 131 }
131 132
132 relay.stop().await; 133 relay.stop().await;
133} 134}
134 135
@@ -136,48 +137,45 @@ async fn test_reject_state_from_unauthorized_author() {
136async fn test_accept_state_from_announcement_author() { 137async fn test_accept_state_from_announcement_author() {
137 // Start test relay 138 // Start test relay
138 let relay = TestRelay::start().await; 139 let relay = TestRelay::start().await;
139 140
140 // Create keypair 141 // Create keypair
141 let keys = Keys::generate(); 142 let keys = Keys::generate();
142 143
143 // Create announcement 144 // Create announcement
144 let announcement = EventBuilder::new( 145 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "")
145 Kind::GitRepoAnnouncement, 146 .tags([
146 "", 147 Tag::custom(TagKind::custom("d"), ["test-repo"]),
147 ) 148 Tag::custom(
148 .tags([ 149 TagKind::custom("clone"),
149 Tag::custom(TagKind::custom("d"), ["test-repo"]), 150 [format!("https://{}/test.git", relay.domain())],
150 Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]), 151 ),
151 Tag::custom(TagKind::custom("relays"), [relay.url()]), 152 Tag::custom(TagKind::custom("relays"), [relay.url()]),
152 ]) 153 ])
153 .sign_with_keys(&keys) 154 .sign_with_keys(&keys)
154 .unwrap(); 155 .unwrap();
155 156
156 // Connect to relay 157 // Connect to relay
157 let client = Client::default(); 158 let client = Client::default();
158 client.add_relay(relay.url()).await.unwrap(); 159 client.add_relay(relay.url()).await.unwrap();
159 client.connect().await; 160 client.connect().await;
160 161
161 // Send announcement 162 // Send announcement
162 client.send_event(&announcement).await.unwrap(); 163 client.send_event(&announcement).await.unwrap();
163 164
164 // Wait for announcement to be processed 165 // Wait for announcement to be processed
165 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 166 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
166 167
167 // Send state event from same author (should be accepted or go to purgatory) 168 // Send state event from same author (should be accepted or go to purgatory)
168 let state_event = EventBuilder::new( 169 let state_event = EventBuilder::new(Kind::RepoState, "")
169 Kind::RepoState, 170 .tags([
170 "", 171 Tag::custom(TagKind::custom("d"), ["test-repo"]),
171 ) 172 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
172 .tags([ 173 ])
173 Tag::custom(TagKind::custom("d"), ["test-repo"]), 174 .sign_with_keys(&keys)
174 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), 175 .unwrap();
175 ]) 176
176 .sign_with_keys(&keys)
177 .unwrap();
178
179 let result = client.send_event(&state_event).await; 177 let result = client.send_event(&state_event).await;
180 178
181 // Should be accepted or go to purgatory (not permanently rejected) 179 // Should be accepted or go to purgatory (not permanently rejected)
182 match result { 180 match result {
183 Ok(output) => { 181 Ok(output) => {
@@ -194,14 +192,13 @@ async fn test_accept_state_from_announcement_author() {
194 Err(e) => { 192 Err(e) => {
195 // Purgatory is acceptable 193 // Purgatory is acceptable
196 assert!( 194 assert!(
197 e.to_string().contains("purgatory") || 195 e.to_string().contains("purgatory") || e.to_string().contains("waiting for git"),
198 e.to_string().contains("waiting for git"),
199 "Error should be about purgatory, not authorization: {}", 196 "Error should be about purgatory, not authorization: {}",
200 e 197 e
201 ); 198 );
202 } 199 }
203 } 200 }
204 201
205 relay.stop().await; 202 relay.stop().await;
206} 203}
207 204
@@ -209,50 +206,50 @@ async fn test_accept_state_from_announcement_author() {
209async fn test_accept_state_from_maintainer() { 206async fn test_accept_state_from_maintainer() {
210 // Start test relay 207 // Start test relay
211 let relay = TestRelay::start().await; 208 let relay = TestRelay::start().await;
212 209
213 // Create two keypairs: owner and maintainer 210 // Create two keypairs: owner and maintainer
214 let owner_keys = Keys::generate(); 211 let owner_keys = Keys::generate();
215 let maintainer_keys = Keys::generate(); 212 let maintainer_keys = Keys::generate();
216 213
217 // Create announcement with maintainer 214 // Create announcement with maintainer
218 let announcement = EventBuilder::new( 215 let announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "")
219 Kind::GitRepoAnnouncement, 216 .tags([
220 "", 217 Tag::custom(TagKind::custom("d"), ["test-repo"]),
221 ) 218 Tag::custom(
222 .tags([ 219 TagKind::custom("clone"),
223 Tag::custom(TagKind::custom("d"), ["test-repo"]), 220 [format!("https://{}/test.git", relay.domain())],
224 Tag::custom(TagKind::custom("clone"), [format!("https://{}/test.git", relay.domain())]), 221 ),
225 Tag::custom(TagKind::custom("relays"), [relay.url()]), 222 Tag::custom(TagKind::custom("relays"), [relay.url()]),
226 Tag::custom(TagKind::custom("maintainers"), [maintainer_keys.public_key().to_hex()]), 223 Tag::custom(
227 ]) 224 TagKind::custom("maintainers"),
228 .sign_with_keys(&owner_keys) 225 [maintainer_keys.public_key().to_hex()],
229 .unwrap(); 226 ),
230 227 ])
228 .sign_with_keys(&owner_keys)
229 .unwrap();
230
231 // Connect to relay 231 // Connect to relay
232 let client = Client::default(); 232 let client = Client::default();
233 client.add_relay(relay.url()).await.unwrap(); 233 client.add_relay(relay.url()).await.unwrap();
234 client.connect().await; 234 client.connect().await;
235 235
236 // Send announcement 236 // Send announcement
237 client.send_event(&announcement).await.unwrap(); 237 client.send_event(&announcement).await.unwrap();
238 238
239 // Wait for announcement to be processed 239 // Wait for announcement to be processed
240 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 240 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
241 241
242 // Send state event from maintainer 242 // Send state event from maintainer
243 let state_event = EventBuilder::new( 243 let state_event = EventBuilder::new(Kind::RepoState, "")
244 Kind::RepoState, 244 .tags([
245 "", 245 Tag::custom(TagKind::custom("d"), ["test-repo"]),
246 ) 246 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]),
247 .tags([ 247 ])
248 Tag::custom(TagKind::custom("d"), ["test-repo"]), 248 .sign_with_keys(&maintainer_keys)
249 Tag::custom(TagKind::custom("refs/heads/main"), ["abc123"]), 249 .unwrap();
250 ]) 250
251 .sign_with_keys(&maintainer_keys)
252 .unwrap();
253
254 let result = client.send_event(&state_event).await; 251 let result = client.send_event(&state_event).await;
255 252
256 // Should be accepted or go to purgatory (not permanently rejected) 253 // Should be accepted or go to purgatory (not permanently rejected)
257 match result { 254 match result {
258 Ok(output) => { 255 Ok(output) => {
@@ -268,13 +265,12 @@ async fn test_accept_state_from_maintainer() {
268 Err(e) => { 265 Err(e) => {
269 // Purgatory is acceptable 266 // Purgatory is acceptable
270 assert!( 267 assert!(
271 e.to_string().contains("purgatory") || 268 e.to_string().contains("purgatory") || e.to_string().contains("waiting for git"),
272 e.to_string().contains("waiting for git"),
273 "Error should be about purgatory, not authorization: {}", 269 "Error should be about purgatory, not authorization: {}",
274 e 270 e
275 ); 271 );
276 } 272 }
277 } 273 }
278 274
279 relay.stop().await; 275 relay.stop().await;
280} 276}
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs
index 2b7fb0f..df1bf78 100644
--- a/tests/sync/maintainer_reprocessing.rs
+++ b/tests/sync/maintainer_reprocessing.rs
@@ -42,23 +42,18 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
42 .await 42 .await
43 .expect("Failed to connect to relay_a"); 43 .expect("Failed to connect to relay_a");
44 44
45 let maintainer_announcement = EventBuilder::new( 45 let maintainer_announcement =
46 Kind::GitRepoAnnouncement, 46 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository")
47 "Maintainer's repository", 47 .tags(vec![
48 ) 48 Tag::identifier(identifier),
49 .tags(vec![ 49 Tag::custom(
50 Tag::identifier(identifier), 50 TagKind::custom("clone"),
51 Tag::custom( 51 vec![format!("https://{}/{}.git", relay_a.domain(), identifier)],
52 TagKind::custom("clone"), 52 ),
53 vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], 53 Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]),
54 ), 54 ])
55 Tag::custom( 55 .sign_with_keys(&maintainer_keys)
56 TagKind::custom("relays"), 56 .unwrap();
57 vec![relay_a.url().to_string()],
58 ),
59 ])
60 .sign_with_keys(&maintainer_keys)
61 .unwrap();
62 57
63 client_a.send_event(&maintainer_announcement).await.unwrap(); 58 client_a.send_event(&maintainer_announcement).await.unwrap();
64 println!("✓ Maintainer announcement sent to relay_a"); 59 println!("✓ Maintainer announcement sent to relay_a");
@@ -68,27 +63,24 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
68 .await 63 .await
69 .expect("Failed to connect to relay_b"); 64 .expect("Failed to connect to relay_b");
70 65
71 let owner_announcement = EventBuilder::new( 66 let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository")
72 Kind::GitRepoAnnouncement, 67 .tags(vec![
73 "Owner's repository", 68 Tag::identifier(identifier),
74 ) 69 Tag::custom(
75 .tags(vec![ 70 TagKind::custom("clone"),
76 Tag::identifier(identifier), 71 vec![format!("https://{}/{}.git", relay_b.domain(), identifier)],
77 Tag::custom( 72 ),
78 TagKind::custom("clone"), 73 Tag::custom(
79 vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], 74 TagKind::custom("relays"),
80 ), 75 vec![relay_a.url().to_string(), relay_b.url().to_string()],
81 Tag::custom( 76 ),
82 TagKind::custom("relays"), 77 Tag::custom(
83 vec![relay_a.url().to_string(), relay_b.url().to_string()], 78 TagKind::custom("maintainers"),
84 ), 79 vec![maintainer_keys.public_key().to_hex()],
85 Tag::custom( 80 ),
86 TagKind::custom("maintainers"), 81 ])
87 vec![maintainer_keys.public_key().to_hex()], 82 .sign_with_keys(&owner_keys)
88 ), 83 .unwrap();
89 ])
90 .sign_with_keys(&owner_keys)
91 .unwrap();
92 84
93 client_b.send_event(&owner_announcement).await.unwrap(); 85 client_b.send_event(&owner_announcement).await.unwrap();
94 println!("✓ Owner announcement sent to relay_b"); 86 println!("✓ Owner announcement sent to relay_b");
@@ -104,7 +96,8 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
104 .author(owner_keys.public_key()) 96 .author(owner_keys.public_key())
105 .identifier(identifier); 97 .identifier(identifier);
106 98
107 let owner_found = wait_for_event_on_relay(relay_b.url(), owner_filter, Duration::from_secs(2)).await; 99 let owner_found =
100 wait_for_event_on_relay(relay_b.url(), owner_filter, Duration::from_secs(2)).await;
108 assert!(owner_found, "Owner announcement should be in relay_b"); 101 assert!(owner_found, "Owner announcement should be in relay_b");
109 102
110 let maintainer_filter = Filter::new() 103 let maintainer_filter = Filter::new()
@@ -112,8 +105,12 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
112 .author(maintainer_keys.public_key()) 105 .author(maintainer_keys.public_key())
113 .identifier(identifier); 106 .identifier(identifier);
114 107
115 let maintainer_found = wait_for_event_on_relay(relay_b.url(), maintainer_filter, Duration::from_secs(2)).await; 108 let maintainer_found =
116 assert!(maintainer_found, "Maintainer announcement should be re-processed and accepted in relay_b"); 109 wait_for_event_on_relay(relay_b.url(), maintainer_filter, Duration::from_secs(2)).await;
110 assert!(
111 maintainer_found,
112 "Maintainer announcement should be re-processed and accepted in relay_b"
113 );
117 114
118 // Step 5: Verify it happened quickly (not 24 hours!) 115 // Step 5: Verify it happened quickly (not 24 hours!)
119 assert!( 116 assert!(
@@ -145,36 +142,34 @@ async fn test_maintainer_announcement_reprocessed_immediately() {
145#[ignore] // Skip by default due to 2+ minute duration 142#[ignore] // Skip by default due to 2+ minute duration
146async fn test_maintainer_announcement_cold_index_prevents_refetch() { 143async fn test_maintainer_announcement_cold_index_prevents_refetch() {
147 let relay = TestRelay::start().await; 144 let relay = TestRelay::start().await;
148 145
149 // Create keys 146 // Create keys
150 let owner_keys = Keys::generate(); 147 let owner_keys = Keys::generate();
151 let maintainer_keys = Keys::generate(); 148 let maintainer_keys = Keys::generate();
152 149
153 let identifier = "test-repo-cold"; 150 let identifier = "test-repo-cold";
154 151
155 // Create client using TestClient helper 152 // Create client using TestClient helper
156 let client = TestClient::new(relay.url(), maintainer_keys.clone()) 153 let client = TestClient::new(relay.url(), maintainer_keys.clone())
157 .await 154 .await
158 .expect("Failed to connect to relay"); 155 .expect("Failed to connect to relay");
159 156
160 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) 157 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay)
161 let maintainer_announcement = EventBuilder::new( 158 let maintainer_announcement =
162 Kind::GitRepoAnnouncement, 159 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository")
163 "Maintainer's repository", 160 .tags(vec![
164 ) 161 Tag::identifier(identifier),
165 .tags(vec![ 162 Tag::custom(
166 Tag::identifier(identifier), 163 TagKind::custom("clone"),
167 Tag::custom( 164 vec![format!("https://example.com/{}.git", identifier)],
168 TagKind::custom("clone"), 165 ),
169 vec![format!("https://example.com/{}.git", identifier)], 166 Tag::custom(
170 ), 167 TagKind::custom("relays"),
171 Tag::custom( 168 vec!["wss://example.com".to_string()],
172 TagKind::custom("relays"), 169 ),
173 vec!["wss://example.com".to_string()], 170 ])
174 ), 171 .sign_with_keys(&maintainer_keys)
175 ]) 172 .unwrap();
176 .sign_with_keys(&maintainer_keys)
177 .unwrap();
178 173
179 // Send maintainer announcement - expect it to be rejected 174 // Send maintainer announcement - expect it to be rejected
180 let _ = client.send_event(&maintainer_announcement).await; 175 let _ = client.send_event(&maintainer_announcement).await;
@@ -185,27 +180,21 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() {
185 tokio::time::sleep(Duration::from_secs(125)).await; 180 tokio::time::sleep(Duration::from_secs(125)).await;
186 181
187 // Step 3: Send owner announcement (lists maintainer) 182 // Step 3: Send owner announcement (lists maintainer)
188 let owner_announcement = EventBuilder::new( 183 let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository")
189 Kind::GitRepoAnnouncement, 184 .tags(vec![
190 "Owner's repository", 185 Tag::identifier(identifier),
191 ) 186 Tag::custom(
192 .tags(vec![ 187 TagKind::custom("clone"),
193 Tag::identifier(identifier), 188 vec![format!("https://{}/{}.git", relay.domain(), identifier)],
194 Tag::custom( 189 ),
195 TagKind::custom("clone"), 190 Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]),
196 vec![format!("https://{}/{}.git", relay.domain(), identifier)], 191 Tag::custom(
197 ), 192 TagKind::custom("maintainers"),
198 Tag::custom( 193 vec![maintainer_keys.public_key().to_hex()],
199 TagKind::custom("relays"), 194 ),
200 vec![relay.url().to_string()], 195 ])
201 ), 196 .sign_with_keys(&owner_keys)
202 Tag::custom( 197 .unwrap();
203 TagKind::custom("maintainers"),
204 vec![maintainer_keys.public_key().to_hex()],
205 ),
206 ])
207 .sign_with_keys(&owner_keys)
208 .unwrap();
209 198
210 client.send_event(&owner_announcement).await.unwrap(); 199 client.send_event(&owner_announcement).await.unwrap();
211 tokio::time::sleep(Duration::from_millis(500)).await; 200 tokio::time::sleep(Duration::from_millis(500)).await;
@@ -215,16 +204,18 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() {
215 .kind(Kind::GitRepoAnnouncement) 204 .kind(Kind::GitRepoAnnouncement)
216 .author(owner_keys.public_key()) 205 .author(owner_keys.public_key())
217 .identifier(identifier); 206 .identifier(identifier);
218 207
219 let owner_found = wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await; 208 let owner_found =
209 wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await;
220 assert!(owner_found, "Owner announcement should be accepted"); 210 assert!(owner_found, "Owner announcement should be accepted");
221 211
222 let maintainer_filter = Filter::new() 212 let maintainer_filter = Filter::new()
223 .kind(Kind::GitRepoAnnouncement) 213 .kind(Kind::GitRepoAnnouncement)
224 .author(maintainer_keys.public_key()) 214 .author(maintainer_keys.public_key())
225 .identifier(identifier); 215 .identifier(identifier);
226 216
227 let maintainer_found = wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await; 217 let maintainer_found =
218 wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await;
228 assert!( 219 assert!(
229 !maintainer_found, 220 !maintainer_found,
230 "Maintainer announcement should NOT be re-processed (hot cache expired)" 221 "Maintainer announcement should NOT be re-processed (hot cache expired)"
@@ -267,7 +258,10 @@ async fn test_multiple_maintainers_all_reprocessed() {
267 .await 258 .await
268 .expect("Failed to connect to relay_a"); 259 .expect("Failed to connect to relay_a");
269 260
270 for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys].iter().enumerate() { 261 for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys]
262 .iter()
263 .enumerate()
264 {
271 let announcement = EventBuilder::new( 265 let announcement = EventBuilder::new(
272 Kind::GitRepoAnnouncement, 266 Kind::GitRepoAnnouncement,
273 format!("Maintainer {} repository", idx + 1), 267 format!("Maintainer {} repository", idx + 1),
@@ -278,10 +272,7 @@ async fn test_multiple_maintainers_all_reprocessed() {
278 TagKind::custom("clone"), 272 TagKind::custom("clone"),
279 vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], 273 vec![format!("https://{}/{}.git", relay_a.domain(), identifier)],
280 ), 274 ),
281 Tag::custom( 275 Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]),
282 TagKind::custom("relays"),
283 vec![relay_a.url().to_string()],
284 ),
285 ]) 276 ])
286 .sign_with_keys(maintainer_keys) 277 .sign_with_keys(maintainer_keys)
287 .unwrap(); 278 .unwrap();
@@ -295,31 +286,28 @@ async fn test_multiple_maintainers_all_reprocessed() {
295 .await 286 .await
296 .expect("Failed to connect to relay_b"); 287 .expect("Failed to connect to relay_b");
297 288
298 let owner_announcement = EventBuilder::new( 289 let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository")
299 Kind::GitRepoAnnouncement, 290 .tags(vec![
300 "Owner's repository", 291 Tag::identifier(identifier),
301 ) 292 Tag::custom(
302 .tags(vec![ 293 TagKind::custom("clone"),
303 Tag::identifier(identifier), 294 vec![format!("https://{}/{}.git", relay_b.domain(), identifier)],
304 Tag::custom( 295 ),
305 TagKind::custom("clone"), 296 Tag::custom(
306 vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], 297 TagKind::custom("relays"),
307 ), 298 vec![relay_a.url().to_string(), relay_b.url().to_string()],
308 Tag::custom( 299 ),
309 TagKind::custom("relays"), 300 Tag::custom(
310 vec![relay_a.url().to_string(), relay_b.url().to_string()], 301 TagKind::custom("maintainers"),
311 ), 302 vec![
312 Tag::custom( 303 maintainer1_keys.public_key().to_hex(),
313 TagKind::custom("maintainers"), 304 maintainer2_keys.public_key().to_hex(),
314 vec![ 305 maintainer3_keys.public_key().to_hex(),
315 maintainer1_keys.public_key().to_hex(), 306 ],
316 maintainer2_keys.public_key().to_hex(), 307 ),
317 maintainer3_keys.public_key().to_hex(), 308 ])
318 ], 309 .sign_with_keys(&owner_keys)
319 ), 310 .unwrap();
320 ])
321 .sign_with_keys(&owner_keys)
322 .unwrap();
323 311
324 client_b.send_event(&owner_announcement).await.unwrap(); 312 client_b.send_event(&owner_announcement).await.unwrap();
325 println!("✓ Owner announcement sent to relay_b"); 313 println!("✓ Owner announcement sent to relay_b");
@@ -340,11 +328,7 @@ async fn test_multiple_maintainers_all_reprocessed() {
340 .identifier(identifier); 328 .identifier(identifier);
341 329
342 let found = wait_for_event_on_relay(relay_b.url(), filter, Duration::from_secs(2)).await; 330 let found = wait_for_event_on_relay(relay_b.url(), filter, Duration::from_secs(2)).await;
343 assert!( 331 assert!(found, "{} announcement should be in relay_b", name);
344 found,
345 "{} announcement should be in relay_b",
346 name
347 );
348 } 332 }
349 333
350 println!("✅ All three maintainer announcements re-processed successfully"); 334 println!("✅ All three maintainer announcements re-processed successfully");
@@ -365,63 +349,55 @@ async fn test_multiple_maintainers_all_reprocessed() {
365#[tokio::test] 349#[tokio::test]
366async fn test_invalid_maintainer_pubkey_handled_gracefully() { 350async fn test_invalid_maintainer_pubkey_handled_gracefully() {
367 let relay = TestRelay::start().await; 351 let relay = TestRelay::start().await;
368 352
369 // Create keys 353 // Create keys
370 let owner_keys = Keys::generate(); 354 let owner_keys = Keys::generate();
371 let maintainer_keys = Keys::generate(); 355 let maintainer_keys = Keys::generate();
372 356
373 let identifier = "invalid-maintainer-repo"; 357 let identifier = "invalid-maintainer-repo";
374 358
375 // Create client using TestClient helper 359 // Create client using TestClient helper
376 let client = TestClient::new(relay.url(), owner_keys.clone()) 360 let client = TestClient::new(relay.url(), owner_keys.clone())
377 .await 361 .await
378 .expect("Failed to connect to relay"); 362 .expect("Failed to connect to relay");
379 363
380 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay) 364 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay)
381 let maintainer_announcement = EventBuilder::new( 365 let maintainer_announcement =
382 Kind::GitRepoAnnouncement, 366 EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository")
383 "Maintainer's repository", 367 .tags(vec![
384 ) 368 Tag::identifier(identifier),
385 .tags(vec![ 369 Tag::custom(
386 Tag::identifier(identifier), 370 TagKind::custom("clone"),
387 Tag::custom( 371 vec![format!("https://example.com/{}.git", identifier)],
388 TagKind::custom("clone"), 372 ),
389 vec![format!("https://example.com/{}.git", identifier)], 373 Tag::custom(
390 ), 374 TagKind::custom("relays"),
391 Tag::custom( 375 vec!["wss://example.com".to_string()],
392 TagKind::custom("relays"), 376 ),
393 vec!["wss://example.com".to_string()], 377 ])
394 ), 378 .sign_with_keys(&maintainer_keys)
395 ]) 379 .unwrap();
396 .sign_with_keys(&maintainer_keys)
397 .unwrap();
398 380
399 // Send maintainer announcement - expect it to be rejected 381 // Send maintainer announcement - expect it to be rejected
400 let _ = client.send_event(&maintainer_announcement).await; 382 let _ = client.send_event(&maintainer_announcement).await;
401 tokio::time::sleep(Duration::from_millis(200)).await; 383 tokio::time::sleep(Duration::from_millis(200)).await;
402 384
403 // Step 2: Send owner announcement with INVALID maintainer hex 385 // Step 2: Send owner announcement with INVALID maintainer hex
404 let owner_announcement = EventBuilder::new( 386 let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository")
405 Kind::GitRepoAnnouncement, 387 .tags(vec![
406 "Owner's repository", 388 Tag::identifier(identifier),
407 ) 389 Tag::custom(
408 .tags(vec![ 390 TagKind::custom("clone"),
409 Tag::identifier(identifier), 391 vec![format!("https://{}/{}.git", relay.domain(), identifier)],
410 Tag::custom( 392 ),
411 TagKind::custom("clone"), 393 Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]),
412 vec![format!("https://{}/{}.git", relay.domain(), identifier)], 394 Tag::custom(
413 ), 395 TagKind::custom("maintainers"),
414 Tag::custom( 396 vec!["invalid-hex-not-a-pubkey".to_string()],
415 TagKind::custom("relays"), 397 ),
416 vec![relay.url().to_string()], 398 ])
417 ), 399 .sign_with_keys(&owner_keys)
418 Tag::custom( 400 .unwrap();
419 TagKind::custom("maintainers"),
420 vec!["invalid-hex-not-a-pubkey".to_string()],
421 ),
422 ])
423 .sign_with_keys(&owner_keys)
424 .unwrap();
425 401
426 client.send_event(&owner_announcement).await.unwrap(); 402 client.send_event(&owner_announcement).await.unwrap();
427 tokio::time::sleep(Duration::from_millis(500)).await; 403 tokio::time::sleep(Duration::from_millis(500)).await;
@@ -431,16 +407,21 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() {
431 .kind(Kind::GitRepoAnnouncement) 407 .kind(Kind::GitRepoAnnouncement)
432 .author(owner_keys.public_key()) 408 .author(owner_keys.public_key())
433 .identifier(identifier); 409 .identifier(identifier);
434 410
435 let owner_found = wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await; 411 let owner_found =
436 assert!(owner_found, "Owner announcement should be accepted despite invalid maintainer"); 412 wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await;
413 assert!(
414 owner_found,
415 "Owner announcement should be accepted despite invalid maintainer"
416 );
437 417
438 let maintainer_filter = Filter::new() 418 let maintainer_filter = Filter::new()
439 .kind(Kind::GitRepoAnnouncement) 419 .kind(Kind::GitRepoAnnouncement)
440 .author(maintainer_keys.public_key()) 420 .author(maintainer_keys.public_key())
441 .identifier(identifier); 421 .identifier(identifier);
442 422
443 let maintainer_found = wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await; 423 let maintainer_found =
424 wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await;
444 assert!( 425 assert!(
445 !maintainer_found, 426 !maintainer_found,
446 "Maintainer announcement should NOT be re-processed (invalid pubkey)" 427 "Maintainer announcement should NOT be re-processed (invalid pubkey)"