upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 16:17:25 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-01-09 16:17:25 +0000
commit83d29a446d96f87e5c947faf49fb33f18db4fc17 (patch)
treece5268bdc30e568b14421d26cf78937b8a06a25e
parent02e957ec97c9a9e6e37eca9c9d4aa6aef4bcd363 (diff)
feat(sync): invalidation + immediate re-processing of maintainer announcements
- Add two-tier rejected events index (hot cache + cold index) - Hot cache: 2-minute in-memory storage of full rejected events - Cold index: 7-day metadata storage for deduplication - Immediate re-processing when owner announcements list maintainers - Fix rejection reason detection to match actual error messages - Rewrite integration tests to use two-relay sync pattern - All tests passing (3 passed, 1 ignored slow test)
-rw-r--r--src/sync/mod.rs116
-rw-r--r--tests/sync.rs1
-rw-r--r--tests/sync/maintainer_reprocessing.rs453
-rw-r--r--tests/sync/mod.rs1
4 files changed, 570 insertions, 1 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index fe336d1..35a8afb 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -2050,6 +2050,119 @@ impl SyncManager {
2050 broadcast = broadcast_success, 2050 broadcast = broadcast_success,
2051 "Synced event saved and broadcast" 2051 "Synced event saved and broadcast"
2052 ); 2052 );
2053
2054 // GRASP-02 PR3: Invalidate and re-process maintainer announcements
2055 // If this is a repository announcement that lists maintainers, check if any
2056 // of those maintainer announcements were previously rejected and are still
2057 // in the hot cache. If so, re-process them immediately (they should now pass
2058 // validation since the owner announcement has been accepted).
2059 if event.kind == Kind::GitRepoAnnouncement {
2060 use crate::nostr::events::RepositoryAnnouncement;
2061
2062 match RepositoryAnnouncement::from_event(event.clone()) {
2063 Ok(announcement) => {
2064 if !announcement.maintainers.is_empty() {
2065 tracing::debug!(
2066 event_id = %event.id,
2067 identifier = %announcement.identifier,
2068 maintainer_count = announcement.maintainers.len(),
2069 "Owner announcement accepted, checking for rejected maintainer announcements"
2070 );
2071
2072 // For each maintainer, invalidate and get their events
2073 for maintainer_hex in &announcement.maintainers {
2074 // Parse maintainer public key
2075 match PublicKey::from_hex(maintainer_hex) {
2076 Ok(maintainer_pubkey) => {
2077 let (removed, hot_events) = rejected_events_index
2078 .invalidate_and_get_events(
2079 &maintainer_pubkey,
2080 &announcement.identifier,
2081 );
2082
2083 if removed > 0 {
2084 tracing::info!(
2085 maintainer = %maintainer_hex,
2086 identifier = %announcement.identifier,
2087 removed_from_cold_index = removed,
2088 hot_cache_events = hot_events.len(),
2089 "Invalidated rejected maintainer announcements"
2090 );
2091 }
2092
2093 // Re-process events from hot cache immediately
2094 for maintainer_event in hot_events {
2095 tracing::info!(
2096 event_id = %maintainer_event.id,
2097 maintainer = %maintainer_hex,
2098 identifier = %announcement.identifier,
2099 "Re-processing maintainer announcement from hot cache"
2100 );
2101
2102 // Recursive call to process_event_static
2103 // This is safe because:
2104 // 1. Event was removed from hot cache before this call
2105 // 2. Second attempt uses maintainer exception (different code path)
2106 // 3. If second attempt fails, stays in cold index only (no third attempt)
2107 // Use Box::pin to avoid infinitely sized future
2108 let reprocess_result = Box::pin(Self::process_event_static(
2109 &maintainer_event,
2110 relay_url,
2111 database,
2112 write_policy,
2113 local_relay,
2114 rejected_events_index,
2115 ))
2116 .await;
2117
2118 match reprocess_result {
2119 ProcessResult::Saved => {
2120 tracing::info!(
2121 event_id = %maintainer_event.id,
2122 maintainer = %maintainer_hex,
2123 identifier = %announcement.identifier,
2124 "Maintainer announcement accepted on re-processing"
2125 );
2126 }
2127 ProcessResult::Duplicate => {
2128 tracing::debug!(
2129 event_id = %maintainer_event.id,
2130 "Maintainer announcement already exists (duplicate)"
2131 );
2132 }
2133 other => {
2134 tracing::warn!(
2135 event_id = %maintainer_event.id,
2136 maintainer = %maintainer_hex,
2137 identifier = %announcement.identifier,
2138 result = ?other,
2139 "Maintainer announcement still rejected on re-processing"
2140 );
2141 }
2142 }
2143 }
2144 }
2145 Err(e) => {
2146 tracing::warn!(
2147 maintainer_hex = %maintainer_hex,
2148 error = %e,
2149 "Invalid maintainer public key in announcement"
2150 );
2151 }
2152 }
2153 }
2154 }
2155 }
2156 Err(e) => {
2157 tracing::warn!(
2158 event_id = %event.id,
2159 error = %e,
2160 "Failed to parse repository announcement for maintainer invalidation"
2161 );
2162 }
2163 }
2164 }
2165
2053 ProcessResult::Saved 2166 ProcessResult::Saved
2054 } 2167 }
2055 WritePolicyResult::Reject { message, status } => { 2168 WritePolicyResult::Reject { message, status } => {
@@ -2082,7 +2195,8 @@ impl SyncManager {
2082 .and_then(|t| t.content()) 2195 .and_then(|t| t.content())
2083 { 2196 {
2084 // Determine rejection reason based on message 2197 // Determine rejection reason based on message
2085 let reason = if message.contains("doesn't list this service") { 2198 let reason = if message.contains("doesn't list this service")
2199 || message.contains("Announcement must list service") {
2086 rejected_index::RejectionReason::DoesNotListService 2200 rejected_index::RejectionReason::DoesNotListService
2087 } else if message.contains("maintainer") { 2201 } else if message.contains("maintainer") {
2088 rejected_index::RejectionReason::MaintainerNotYetValid 2202 rejected_index::RejectionReason::MaintainerNotYetValid
diff --git a/tests/sync.rs b/tests/sync.rs
index ad5ca96..104e815 100644
--- a/tests/sync.rs
+++ b/tests/sync.rs
@@ -35,6 +35,7 @@ mod sync {
35 pub mod discovery; 35 pub mod discovery;
36 pub mod historic_sync; 36 pub mod historic_sync;
37 pub mod live_sync; 37 pub mod live_sync;
38 pub mod maintainer_reprocessing;
38 pub mod metrics; 39 pub mod metrics;
39 pub mod tag_variations; 40 pub mod tag_variations;
40} 41}
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs
new file mode 100644
index 0000000..2b7fb0f
--- /dev/null
+++ b/tests/sync/maintainer_reprocessing.rs
@@ -0,0 +1,453 @@
1//! Integration tests for GRASP-02 PR3: Maintainer Announcement Re-Processing
2//!
3//! Tests the two-tier rejected events index and immediate re-processing of
4//! maintainer announcements when owner announcements are accepted.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9
10use crate::common::{sync_helpers::*, TestRelay};
11
12/// Test that maintainer announcements are re-processed immediately when owner announcement accepted
13///
14/// Flow:
15/// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b)
16/// 2. relay_b: Owner sends announcement (lists relay_a + maintainer)
17/// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index
18/// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement
19/// 5. Both announcements should be in relay_b's database
20///
21/// Expected time: <5 seconds (vs 24 hours without hot cache)
22#[tokio::test]
23async fn test_maintainer_announcement_reprocessed_immediately() {
24 // Start relay_a (where maintainer announcement will be sent)
25 let relay_a = TestRelay::start().await;
26 println!("relay_a started at {}", relay_a.url());
27
28 // Start relay_b with sync enabled (will sync from relay_a)
29 let relay_b = TestRelay::start_with_sync(None).await;
30 println!("relay_b started at {}", relay_b.url());
31
32 // Create keys
33 let owner_keys = Keys::generate();
34 let maintainer_keys = Keys::generate();
35
36 let identifier = "test-repo";
37
38 let start = std::time::Instant::now();
39
40 // Step 1: Send maintainer announcement to relay_a (will be rejected - doesn't list relay_b)
41 let client_a = TestClient::new(relay_a.url(), maintainer_keys.clone())
42 .await
43 .expect("Failed to connect to relay_a");
44
45 let maintainer_announcement = EventBuilder::new(
46 Kind::GitRepoAnnouncement,
47 "Maintainer's repository",
48 )
49 .tags(vec![
50 Tag::identifier(identifier),
51 Tag::custom(
52 TagKind::custom("clone"),
53 vec![format!("https://{}/{}.git", relay_a.domain(), identifier)],
54 ),
55 Tag::custom(
56 TagKind::custom("relays"),
57 vec![relay_a.url().to_string()],
58 ),
59 ])
60 .sign_with_keys(&maintainer_keys)
61 .unwrap();
62
63 client_a.send_event(&maintainer_announcement).await.unwrap();
64 println!("✓ Maintainer announcement sent to relay_a");
65
66 // Step 2: Send owner announcement to relay_b (lists relay_a + maintainer)
67 let client_b = TestClient::new(relay_b.url(), owner_keys.clone())
68 .await
69 .expect("Failed to connect to relay_b");
70
71 let owner_announcement = EventBuilder::new(
72 Kind::GitRepoAnnouncement,
73 "Owner's repository",
74 )
75 .tags(vec![
76 Tag::identifier(identifier),
77 Tag::custom(
78 TagKind::custom("clone"),
79 vec![format!("https://{}/{}.git", relay_b.domain(), identifier)],
80 ),
81 Tag::custom(
82 TagKind::custom("relays"),
83 vec![relay_a.url().to_string(), relay_b.url().to_string()],
84 ),
85 Tag::custom(
86 TagKind::custom("maintainers"),
87 vec![maintainer_keys.public_key().to_hex()],
88 ),
89 ])
90 .sign_with_keys(&owner_keys)
91 .unwrap();
92
93 client_b.send_event(&owner_announcement).await.unwrap();
94 println!("✓ Owner announcement sent to relay_b");
95
96 // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes)
97 tokio::time::sleep(Duration::from_secs(3)).await;
98
99 let elapsed = start.elapsed();
100
101 // Step 4: Verify both announcements are in relay_b's database
102 let owner_filter = Filter::new()
103 .kind(Kind::GitRepoAnnouncement)
104 .author(owner_keys.public_key())
105 .identifier(identifier);
106
107 let owner_found = wait_for_event_on_relay(relay_b.url(), owner_filter, Duration::from_secs(2)).await;
108 assert!(owner_found, "Owner announcement should be in relay_b");
109
110 let maintainer_filter = Filter::new()
111 .kind(Kind::GitRepoAnnouncement)
112 .author(maintainer_keys.public_key())
113 .identifier(identifier);
114
115 let maintainer_found = wait_for_event_on_relay(relay_b.url(), maintainer_filter, Duration::from_secs(2)).await;
116 assert!(maintainer_found, "Maintainer announcement should be re-processed and accepted in relay_b");
117
118 // Step 5: Verify it happened quickly (not 24 hours!)
119 assert!(
120 elapsed.as_secs() < 10,
121 "Re-processing should happen in <10 seconds, took {:?}",
122 elapsed
123 );
124
125 println!("✅ Maintainer announcement re-processed in {:?}", elapsed);
126
127 client_a.disconnect().await;
128 client_b.disconnect().await;
129 relay_a.stop().await;
130 relay_b.stop().await;
131}
132
133/// Test that maintainer announcements NOT in hot cache are still prevented from re-fetching
134///
135/// Flow:
136/// 1. Maintainer announcement arrives → Rejected (added to hot cache + cold index)
137/// 2. Wait for hot cache to expire (2+ minutes)
138/// 3. Owner announcement arrives → Invalidates cold index
139/// 4. Maintainer announcement should NOT be re-fetched (cold index prevents)
140/// 5. Only owner announcement should be in database
141///
142/// This test verifies the cold index prevents repeated downloads after hot cache expiry.
143/// Note: This test is slow (2+ minutes) so we'll skip it in normal test runs.
144#[tokio::test]
145#[ignore] // Skip by default due to 2+ minute duration
146async fn test_maintainer_announcement_cold_index_prevents_refetch() {
147 let relay = TestRelay::start().await;
148
149 // Create keys
150 let owner_keys = Keys::generate();
151 let maintainer_keys = Keys::generate();
152
153 let identifier = "test-repo-cold";
154
155 // Create client using TestClient helper
156 let client = TestClient::new(relay.url(), maintainer_keys.clone())
157 .await
158 .expect("Failed to connect to relay");
159
160 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay)
161 let maintainer_announcement = EventBuilder::new(
162 Kind::GitRepoAnnouncement,
163 "Maintainer's repository",
164 )
165 .tags(vec![
166 Tag::identifier(identifier),
167 Tag::custom(
168 TagKind::custom("clone"),
169 vec![format!("https://example.com/{}.git", identifier)],
170 ),
171 Tag::custom(
172 TagKind::custom("relays"),
173 vec!["wss://example.com".to_string()],
174 ),
175 ])
176 .sign_with_keys(&maintainer_keys)
177 .unwrap();
178
179 // Send maintainer announcement - expect it to be rejected
180 let _ = client.send_event(&maintainer_announcement).await;
181 tokio::time::sleep(Duration::from_millis(200)).await;
182
183 // Step 2: Wait for hot cache to expire (default: 120 seconds)
184 println!("⏳ Waiting for hot cache to expire (120 seconds)...");
185 tokio::time::sleep(Duration::from_secs(125)).await;
186
187 // Step 3: Send owner announcement (lists maintainer)
188 let owner_announcement = EventBuilder::new(
189 Kind::GitRepoAnnouncement,
190 "Owner's repository",
191 )
192 .tags(vec![
193 Tag::identifier(identifier),
194 Tag::custom(
195 TagKind::custom("clone"),
196 vec![format!("https://{}/{}.git", relay.domain(), identifier)],
197 ),
198 Tag::custom(
199 TagKind::custom("relays"),
200 vec![relay.url().to_string()],
201 ),
202 Tag::custom(
203 TagKind::custom("maintainers"),
204 vec![maintainer_keys.public_key().to_hex()],
205 ),
206 ])
207 .sign_with_keys(&owner_keys)
208 .unwrap();
209
210 client.send_event(&owner_announcement).await.unwrap();
211 tokio::time::sleep(Duration::from_millis(500)).await;
212
213 // Step 4: Verify only owner announcement is in database
214 let owner_filter = Filter::new()
215 .kind(Kind::GitRepoAnnouncement)
216 .author(owner_keys.public_key())
217 .identifier(identifier);
218
219 let owner_found = wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await;
220 assert!(owner_found, "Owner announcement should be accepted");
221
222 let maintainer_filter = Filter::new()
223 .kind(Kind::GitRepoAnnouncement)
224 .author(maintainer_keys.public_key())
225 .identifier(identifier);
226
227 let maintainer_found = wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await;
228 assert!(
229 !maintainer_found,
230 "Maintainer announcement should NOT be re-processed (hot cache expired)"
231 );
232
233 println!("✅ Cold index prevented re-fetch after hot cache expiry");
234
235 client.disconnect().await;
236 relay.stop().await;
237}
238
239/// Test multiple maintainers are all re-processed when owner announcement accepted
240///
241/// Flow:
242/// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b)
243/// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers)
244/// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index
245/// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements
246/// 5. All four announcements should be in relay_b's database
247#[tokio::test]
248async fn test_multiple_maintainers_all_reprocessed() {
249 // Start relay_a (where maintainer announcements will be sent)
250 let relay_a = TestRelay::start().await;
251 println!("relay_a started at {}", relay_a.url());
252
253 // Start relay_b with sync enabled (will sync from relay_a)
254 let relay_b = TestRelay::start_with_sync(None).await;
255 println!("relay_b started at {}", relay_b.url());
256
257 // Create keys
258 let owner_keys = Keys::generate();
259 let maintainer1_keys = Keys::generate();
260 let maintainer2_keys = Keys::generate();
261 let maintainer3_keys = Keys::generate();
262
263 let identifier = "multi-maintainer-repo";
264
265 // Step 1: Send three maintainer announcements to relay_a
266 let client_a = TestClient::new(relay_a.url(), maintainer1_keys.clone())
267 .await
268 .expect("Failed to connect to relay_a");
269
270 for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys].iter().enumerate() {
271 let announcement = EventBuilder::new(
272 Kind::GitRepoAnnouncement,
273 format!("Maintainer {} repository", idx + 1),
274 )
275 .tags(vec![
276 Tag::identifier(identifier),
277 Tag::custom(
278 TagKind::custom("clone"),
279 vec![format!("https://{}/{}.git", relay_a.domain(), identifier)],
280 ),
281 Tag::custom(
282 TagKind::custom("relays"),
283 vec![relay_a.url().to_string()],
284 ),
285 ])
286 .sign_with_keys(maintainer_keys)
287 .unwrap();
288
289 client_a.send_event(&announcement).await.unwrap();
290 }
291 println!("✓ Three maintainer announcements sent to relay_a");
292
293 // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers)
294 let client_b = TestClient::new(relay_b.url(), owner_keys.clone())
295 .await
296 .expect("Failed to connect to relay_b");
297
298 let owner_announcement = EventBuilder::new(
299 Kind::GitRepoAnnouncement,
300 "Owner's repository",
301 )
302 .tags(vec![
303 Tag::identifier(identifier),
304 Tag::custom(
305 TagKind::custom("clone"),
306 vec![format!("https://{}/{}.git", relay_b.domain(), identifier)],
307 ),
308 Tag::custom(
309 TagKind::custom("relays"),
310 vec![relay_a.url().to_string(), relay_b.url().to_string()],
311 ),
312 Tag::custom(
313 TagKind::custom("maintainers"),
314 vec![
315 maintainer1_keys.public_key().to_hex(),
316 maintainer2_keys.public_key().to_hex(),
317 maintainer3_keys.public_key().to_hex(),
318 ],
319 ),
320 ])
321 .sign_with_keys(&owner_keys)
322 .unwrap();
323
324 client_b.send_event(&owner_announcement).await.unwrap();
325 println!("✓ Owner announcement sent to relay_b");
326
327 // Step 3: Wait for sync and re-processing
328 tokio::time::sleep(Duration::from_secs(3)).await;
329
330 // Step 4: Verify all four announcements are in relay_b's database
331 for (name, keys) in [
332 ("owner", &owner_keys),
333 ("maintainer1", &maintainer1_keys),
334 ("maintainer2", &maintainer2_keys),
335 ("maintainer3", &maintainer3_keys),
336 ] {
337 let filter = Filter::new()
338 .kind(Kind::GitRepoAnnouncement)
339 .author(keys.public_key())
340 .identifier(identifier);
341
342 let found = wait_for_event_on_relay(relay_b.url(), filter, Duration::from_secs(2)).await;
343 assert!(
344 found,
345 "{} announcement should be in relay_b",
346 name
347 );
348 }
349
350 println!("✅ All three maintainer announcements re-processed successfully");
351
352 client_a.disconnect().await;
353 client_b.disconnect().await;
354 relay_a.stop().await;
355 relay_b.stop().await;
356}
357
358/// Test that invalid maintainer public keys don't cause panics
359///
360/// Flow:
361/// 1. Maintainer announcement arrives → Rejected
362/// 2. Owner announcement arrives with INVALID maintainer hex → Should handle gracefully
363/// 3. Owner announcement should still be accepted
364/// 4. Maintainer announcement should NOT be re-processed (invalid pubkey)
365#[tokio::test]
366async fn test_invalid_maintainer_pubkey_handled_gracefully() {
367 let relay = TestRelay::start().await;
368
369 // Create keys
370 let owner_keys = Keys::generate();
371 let maintainer_keys = Keys::generate();
372
373 let identifier = "invalid-maintainer-repo";
374
375 // Create client using TestClient helper
376 let client = TestClient::new(relay.url(), owner_keys.clone())
377 .await
378 .expect("Failed to connect to relay");
379
380 // Step 1: Send maintainer announcement (will be rejected - doesn't list our relay)
381 let maintainer_announcement = EventBuilder::new(
382 Kind::GitRepoAnnouncement,
383 "Maintainer's repository",
384 )
385 .tags(vec![
386 Tag::identifier(identifier),
387 Tag::custom(
388 TagKind::custom("clone"),
389 vec![format!("https://example.com/{}.git", identifier)],
390 ),
391 Tag::custom(
392 TagKind::custom("relays"),
393 vec!["wss://example.com".to_string()],
394 ),
395 ])
396 .sign_with_keys(&maintainer_keys)
397 .unwrap();
398
399 // Send maintainer announcement - expect it to be rejected
400 let _ = client.send_event(&maintainer_announcement).await;
401 tokio::time::sleep(Duration::from_millis(200)).await;
402
403 // Step 2: Send owner announcement with INVALID maintainer hex
404 let owner_announcement = EventBuilder::new(
405 Kind::GitRepoAnnouncement,
406 "Owner's repository",
407 )
408 .tags(vec![
409 Tag::identifier(identifier),
410 Tag::custom(
411 TagKind::custom("clone"),
412 vec![format!("https://{}/{}.git", relay.domain(), identifier)],
413 ),
414 Tag::custom(
415 TagKind::custom("relays"),
416 vec![relay.url().to_string()],
417 ),
418 Tag::custom(
419 TagKind::custom("maintainers"),
420 vec!["invalid-hex-not-a-pubkey".to_string()],
421 ),
422 ])
423 .sign_with_keys(&owner_keys)
424 .unwrap();
425
426 client.send_event(&owner_announcement).await.unwrap();
427 tokio::time::sleep(Duration::from_millis(500)).await;
428
429 // Step 3: Verify owner announcement accepted, maintainer not re-processed
430 let owner_filter = Filter::new()
431 .kind(Kind::GitRepoAnnouncement)
432 .author(owner_keys.public_key())
433 .identifier(identifier);
434
435 let owner_found = wait_for_event_on_relay(relay.url(), owner_filter, Duration::from_secs(2)).await;
436 assert!(owner_found, "Owner announcement should be accepted despite invalid maintainer");
437
438 let maintainer_filter = Filter::new()
439 .kind(Kind::GitRepoAnnouncement)
440 .author(maintainer_keys.public_key())
441 .identifier(identifier);
442
443 let maintainer_found = wait_for_event_on_relay(relay.url(), maintainer_filter, Duration::from_millis(500)).await;
444 assert!(
445 !maintainer_found,
446 "Maintainer announcement should NOT be re-processed (invalid pubkey)"
447 );
448
449 println!("✅ Invalid maintainer pubkey handled gracefully without panic");
450
451 client.disconnect().await;
452 relay.stop().await;
453}
diff --git a/tests/sync/mod.rs b/tests/sync/mod.rs
index 58b7354..400341f 100644
--- a/tests/sync/mod.rs
+++ b/tests/sync/mod.rs
@@ -134,5 +134,6 @@ pub mod historic_sync;
134pub mod catchup; 134pub mod catchup;
135pub mod discovery; 135pub mod discovery;
136pub mod live_sync; 136pub mod live_sync;
137pub mod maintainer_reprocessing;
137pub mod metrics; 138pub mod metrics;
138pub mod tag_variations; \ No newline at end of file 139pub mod tag_variations; \ No newline at end of file