upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-05 11:04:00 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-05 11:04:00 +0000
commitef7ba7c59b8e0b6369f63b8a46e202693963d92b (patch)
tree9a3abe34e41995b02717292050cfb09d4b7d0de1
parent83ede29fb2ce563fe53ee4dc62334c03c67026cb (diff)
fix basic sync tests
-rw-r--r--src/config.rs5
-rw-r--r--src/sync/manager.rs31
-rw-r--r--tests/common/relay.rs1
-rw-r--r--tests/proactive_sync_basic.rs233
4 files changed, 217 insertions, 53 deletions
diff --git a/src/config.rs b/src/config.rs
index 0ca534c..07e67c8 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -103,6 +103,11 @@ pub struct Config {
103 /// Number of days to look back for reconnect catchup (default: 3) 103 /// Number of days to look back for reconnect catchup (default: 3)
104 #[arg(long, env = "NGIT_SYNC_RECONNECT_LOOKBACK_DAYS", default_value_t = 3)] 104 #[arg(long, env = "NGIT_SYNC_RECONNECT_LOOKBACK_DAYS", default_value_t = 3)]
105 pub sync_reconnect_lookback_days: u64, 105 pub sync_reconnect_lookback_days: u64,
106
107 /// Maximum startup jitter in milliseconds for sync connections (default: 10000 = 10 seconds)
108 /// Set to 0 to disable jitter (useful for testing)
109 #[arg(long, env = "NGIT_SYNC_STARTUP_JITTER_MS", default_value_t = 10_000)]
110 pub sync_startup_jitter_ms: u64,
106} 111}
107 112
108impl Config { 113impl Config {
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
index 97ea81a..6fcfcd7 100644
--- a/src/sync/manager.rs
+++ b/src/sync/manager.rs
@@ -40,8 +40,6 @@ use super::metrics::SyncMetrics;
40use crate::config::Config; 40use crate::config::Config;
41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase}; 41use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
42 42
43/// Maximum startup jitter in milliseconds (10 seconds)
44const MAX_STARTUP_JITTER_MS: u64 = 10_000;
45 43
46/// Default fallback address for sync source when bind_address cannot be parsed 44/// Default fallback address for sync source when bind_address cannot be parsed
47/// 45///
@@ -76,6 +74,8 @@ pub struct SyncManager {
76 metrics: Option<SyncMetrics>, 74 metrics: Option<SyncMetrics>,
77 /// Source address for synced events (derived from config.bind_address) 75 /// Source address for synced events (derived from config.bind_address)
78 sync_source_addr: SocketAddr, 76 sync_source_addr: SocketAddr,
77 /// Maximum startup jitter in milliseconds (from config)
78 startup_jitter_ms: u64,
79} 79}
80 80
81impl SyncManager { 81impl SyncManager {
@@ -102,6 +102,7 @@ impl SyncManager {
102 health_tracker: Arc::new(RelayHealthTracker::new(config)), 102 health_tracker: Arc::new(RelayHealthTracker::new(config)),
103 metrics: None, 103 metrics: None,
104 sync_source_addr: get_sync_source_addr(&config.bind_address), 104 sync_source_addr: get_sync_source_addr(&config.bind_address),
105 startup_jitter_ms: config.sync_startup_jitter_ms,
105 } 106 }
106 } 107 }
107 108
@@ -130,6 +131,7 @@ impl SyncManager {
130 health_tracker: Arc::new(RelayHealthTracker::new(config)), 131 health_tracker: Arc::new(RelayHealthTracker::new(config)),
131 metrics: Some(metrics), 132 metrics: Some(metrics),
132 sync_source_addr: get_sync_source_addr(&config.bind_address), 133 sync_source_addr: get_sync_source_addr(&config.bind_address),
134 startup_jitter_ms: config.sync_startup_jitter_ms,
133 } 135 }
134 } 136 }
135 137
@@ -149,6 +151,7 @@ impl SyncManager {
149 health_tracker: Arc::new(RelayHealthTracker::with_defaults()), 151 health_tracker: Arc::new(RelayHealthTracker::with_defaults()),
150 metrics: None, 152 metrics: None,
151 sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR, 153 sync_source_addr: DEFAULT_SYNC_SOURCE_ADDR,
154 startup_jitter_ms: 10_000, // Default 10 seconds
152 } 155 }
153 } 156 }
154 157
@@ -265,8 +268,9 @@ impl SyncManager {
265 268
266 /// Spawn a connection task for a relay with startup jitter 269 /// Spawn a connection task for a relay with startup jitter
267 /// 270 ///
268 /// Adds a random delay (0-10s) before connecting to prevent thundering herd 271 /// Adds a random delay (0 to startup_jitter_ms) before connecting to prevent
269 /// on startup when multiple relays are configured. 272 /// thundering herd on startup when multiple relays are configured.
273 /// Set startup_jitter_ms to 0 to disable jitter (useful for testing).
270 fn spawn_connection_with_jitter( 274 fn spawn_connection_with_jitter(
271 &self, 275 &self,
272 url: String, 276 url: String,
@@ -276,16 +280,19 @@ impl SyncManager {
276 let domain = self.relay_domain.clone(); 280 let domain = self.relay_domain.clone();
277 let health_tracker = self.health_tracker.clone(); 281 let health_tracker = self.health_tracker.clone();
278 let metrics = self.metrics.clone(); 282 let metrics = self.metrics.clone();
283 let max_jitter = self.startup_jitter_ms;
279 284
280 tokio::spawn(async move { 285 tokio::spawn(async move {
281 // Apply startup jitter 286 // Apply startup jitter (if configured)
282 let jitter_ms = rand::thread_rng().gen_range(0..MAX_STARTUP_JITTER_MS); 287 if max_jitter > 0 {
283 tracing::debug!( 288 let jitter_ms = rand::thread_rng().gen_range(0..max_jitter);
284 "Applying {}ms startup jitter before connecting to {}", 289 tracing::debug!(
285 jitter_ms, 290 "Applying {}ms startup jitter before connecting to {}",
286 url 291 jitter_ms,
287 ); 292 url
288 tokio::time::sleep(Duration::from_millis(jitter_ms)).await; 293 );
294 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
295 }
289 296
290 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await; 297 connect_with_retry(&url, tx, filter_service, &domain, health_tracker, metrics).await;
291 }); 298 });
diff --git a/tests/common/relay.rs b/tests/common/relay.rs
index 9fb7b1d..21d6deb 100644
--- a/tests/common/relay.rs
+++ b/tests/common/relay.rs
@@ -92,6 +92,7 @@ impl TestRelay {
92 .env("NGIT_DOMAIN", &bind_address) // Set domain to match bind address 92 .env("NGIT_DOMAIN", &bind_address) // Set domain to match bind address
93 .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) 93 .env("NGIT_GIT_DATA_PATH", git_data_dir.path())
94 .env("NGIT_OWNER_NPUB", &test_npub) 94 .env("NGIT_OWNER_NPUB", &test_npub)
95 .env("NGIT_SYNC_STARTUP_JITTER_MS", "0") // Disable jitter for tests
95 .env("RUST_LOG", "warn") // Less logging during tests 96 .env("RUST_LOG", "warn") // Less logging during tests
96 .stdout(Stdio::null()) 97 .stdout(Stdio::null())
97 .stderr(Stdio::null()); 98 .stderr(Stdio::null());
diff --git a/tests/proactive_sync_basic.rs b/tests/proactive_sync_basic.rs
index b0b2cbf..9af5a2d 100644
--- a/tests/proactive_sync_basic.rs
+++ b/tests/proactive_sync_basic.rs
@@ -21,25 +21,96 @@ use nostr_sdk::prelude::*;
21/// Kind 30617 - Repository State (NIP-34) 21/// Kind 30617 - Repository State (NIP-34)
22const KIND_REPOSITORY_STATE: u16 = 30617; 22const KIND_REPOSITORY_STATE: u16 = 30617;
23 23
24/// Create a client with keys, connect to relay, and wait for connection
25async fn create_connected_client(relay_url: &str, keys: Keys) -> Result<Client, String> {
26 let client = Client::new(keys);
27
28 client
29 .add_relay(relay_url)
30 .await
31 .map_err(|e| e.to_string())?;
32 client.connect().await;
33
34 // Wait for connection to establish (with retries, matching grasp-audit pattern)
35 for _ in 0..30 {
36 tokio::time::sleep(Duration::from_millis(100)).await;
37 let relays = client.relays().await;
38 if relays.values().any(|r| r.is_connected()) {
39 return Ok(client);
40 }
41 }
42
43 Err("Failed to connect to relay after 3 seconds".to_string())
44}
45
46/// Send an event and wait for successful delivery
47async fn send_event_reliably(client: &Client, event: &Event) -> Result<EventId, String> {
48 // Try sending the event with retries
49 for attempt in 1..=5 {
50 let result = client.send_event(event).await;
51 match result {
52 Ok(output) => {
53 if !output.success.is_empty() {
54 return Ok(output.val);
55 }
56 // Check what went wrong
57 if !output.failed.is_empty() {
58 println!(" Attempt {} - failures: {:?}", attempt, output.failed);
59 // If relay not connected, try reconnecting
60 client.connect().await;
61 }
62 }
63 Err(e) => {
64 println!(" Attempt {} - error: {}", attempt, e);
65 }
66 }
67 tokio::time::sleep(Duration::from_millis(500)).await;
68 }
69 Err("Failed to send event after 5 attempts".to_string())
70}
71
24/// Create a valid repository announcement event for testing 72/// Create a valid repository announcement event for testing
25/// 73///
26/// This creates a kind 30617 event with required clone and relays tags 74/// This creates a kind 30617 event with required clone and relays tags.
27fn create_valid_repo_announcement( 75/// Uses TagKind::custom("clone") and TagKind::custom("relays") to match grasp-audit patterns.
28 keys: &Keys, 76#[allow(dead_code)]
29 domain: &str, 77fn create_valid_repo_announcement(keys: &Keys, domain: &str, identifier: &str) -> Event {
30 identifier: &str, 78 // Build tags for repository announcement using custom tag kinds (as grasp-audit does)
31) -> Event {
32 // Build tags for repository announcement
33 let tags = vec![ 79 let tags = vec![
34 Tag::identifier(identifier), 80 Tag::identifier(identifier),
35 Tag::custom( 81 Tag::custom(
36 TagKind::custom("clone"), 82 TagKind::custom("clone"),
37 vec![format!("http://{}/{}", domain, identifier)], 83 vec![format!("http://{}/{}.git", domain, identifier)],
38 ),
39 Tag::custom(
40 TagKind::custom("relays"),
41 vec![format!("ws://{}", domain)],
42 ), 84 ),
85 Tag::custom(TagKind::custom("relays"), vec![format!("ws://{}", domain)]),
86 ];
87
88 EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Repository state")
89 .tags(tags)
90 .sign_with_keys(keys)
91 .expect("Failed to sign event")
92}
93
94/// Create a valid repository announcement event listing multiple relays
95///
96/// This creates a kind 30617 event with clone/relays tags referencing multiple domains,
97/// which is necessary for sync tests where the event needs to be accepted by both relays.
98/// Uses TagKind::custom("clone") and TagKind::custom("relays") to match grasp-audit patterns.
99fn create_shared_repo_announcement(keys: &Keys, domains: &[&str], identifier: &str) -> Event {
100 // Build clone URLs for all domains (with .git suffix)
101 let clone_urls: Vec<String> = domains
102 .iter()
103 .map(|d| format!("http://{}/{}.git", d, identifier))
104 .collect();
105
106 // Build relay URLs for all domains
107 let relay_urls: Vec<String> = domains.iter().map(|d| format!("ws://{}", d)).collect();
108
109 // Build tags for repository announcement using custom tag kinds (as grasp-audit does)
110 let tags = vec![
111 Tag::identifier(identifier),
112 Tag::custom(TagKind::custom("clone"), clone_urls),
113 Tag::custom(TagKind::custom("relays"), relay_urls),
43 ]; 114 ];
44 115
45 EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Repository state") 116 EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Repository state")
@@ -72,48 +143,116 @@ async fn test_sync_relay_connects_to_source() {
72async fn test_valid_event_syncs_to_relay() { 143async fn test_valid_event_syncs_to_relay() {
73 // Start source relay (relay_a) 144 // Start source relay (relay_a)
74 let relay_a = TestRelay::start().await; 145 let relay_a = TestRelay::start().await;
75 146 println!(
76 // Give relay_a time to start 147 "relay_a started at {} (domain: {})",
77 tokio::time::sleep(Duration::from_millis(200)).await; 148 relay_a.url(),
149 relay_a.domain()
150 );
78 151
79 // Start syncing relay (relay_b) configured to sync from relay_a 152 // Start syncing relay (relay_b) configured to sync from relay_a
80 let relay_b = TestRelay::start_with_sync(relay_a.url()).await; 153 let relay_b = TestRelay::start_with_sync(relay_a.url()).await;
154 println!(
155 "relay_b started at {} (domain: {})",
156 relay_b.url(),
157 relay_b.domain()
158 );
81 159
82 // Create test keys 160 // Create test keys that will be used for both client and event signing
83 let keys = Keys::generate(); 161 let keys = Keys::generate();
84 162
85 // Create and submit a valid repository announcement to relay_a 163 // Wait for relay_b's sync connection to establish
86 let event = create_valid_repo_announcement(&keys, &relay_a.domain(), "test-repo"); 164 // With NGIT_SYNC_STARTUP_JITTER_MS=0 (set by TestRelay), sync connects immediately.
165 // A brief wait allows the WebSocket connection and Layer 1 subscription to be set up.
166 println!("Waiting 1s for relay_b sync connection to establish...");
167 tokio::time::sleep(Duration::from_secs(1)).await;
168
169 // Create a client with our keys and connect to relay_a
170 let client_a = create_connected_client(relay_a.url(), keys.clone())
171 .await
172 .expect("Failed to connect to relay_a");
173 println!("client_a connected to relay_a");
174
175 // Create a repository announcement that lists BOTH relays
176 // This is required for sync - the event must reference both the source relay
177 // and the syncing relay for the write policy to accept it on both sides
178 let event = create_shared_repo_announcement(
179 &keys,
180 &[&relay_a.domain(), &relay_b.domain()],
181 "test-repo",
182 );
87 let event_id = event.id; 183 let event_id = event.id;
88 184
89 // Submit event to relay_a 185 // Print event details for debugging
90 let client_a = Client::default(); 186 println!("Created event {} (kind {})", event_id, event.kind.as_u16());
91 client_a.add_relay(relay_a.url()).await.expect("Failed to add relay_a"); 187 for tag in event.tags.iter() {
92 client_a.connect().await; 188 println!(" Tag: {:?}", tag.as_slice());
189 }
93 190
94 let send_result = client_a.send_event(&event).await; 191 // Submit event to relay_a AFTER relay_b's subscription is established
95 assert!(send_result.is_ok(), "Failed to send event to relay_a: {:?}", send_result.err()); 192 // This ensures the event is received via the live subscription
193 println!("Sending event to relay_a...");
194 send_event_reliably(&client_a, &event)
195 .await
196 .expect("Failed to send event to relay_a");
197 println!("Event sent successfully");
96 198
97 // Wait for sync to occur 199 // Verify event is stored on relay_a first
98 tokio::time::sleep(Duration::from_secs(2)).await; 200 let filter_a = Filter::new()
201 .kind(Kind::Custom(KIND_REPOSITORY_STATE))
202 .author(keys.public_key());
203
204 let events_on_a = client_a
205 .fetch_events(filter_a.clone(), Duration::from_secs(5))
206 .await
207 .expect("Failed to fetch events from relay_a");
208
209 println!(
210 "Events on relay_a: {} (looking for {})",
211 events_on_a.len(),
212 event_id
213 );
214 for e in events_on_a.iter() {
215 println!(" Found event: {} (kind {})", e.id, e.kind.as_u16());
216 }
217
218 let found_on_a = events_on_a.iter().any(|e| e.id == event_id);
219 assert!(
220 found_on_a,
221 "Event {} was not stored on relay_a! This is a prerequisite for sync.",
222 event_id
223 );
224 println!("✓ Event confirmed on relay_a");
225
226 // Wait for sync to occur (event processing and storage)
227 println!("Waiting 1s for sync to occur...");
228 tokio::time::sleep(Duration::from_secs(1)).await;
99 229
100 // Query relay_b to verify the event was synced 230 // Query relay_b to verify the event was synced
101 let client_b = Client::default(); 231 let client_b = create_connected_client(relay_b.url(), Keys::generate())
102 client_b.add_relay(relay_b.url()).await.expect("Failed to add relay_b"); 232 .await
103 client_b.connect().await; 233 .expect("Failed to connect to relay_b");
104 234
105 // Create filter to find our event 235 // Create filter to find our event
106 let filter = Filter::new() 236 let filter_b = Filter::new()
107 .kind(Kind::Custom(KIND_REPOSITORY_STATE)) 237 .kind(Kind::Custom(KIND_REPOSITORY_STATE))
108 .author(keys.public_key()); 238 .author(keys.public_key());
109 239
110 let events = client_b 240 let events_on_b = client_b
111 .fetch_events(filter, Duration::from_secs(5)) 241 .fetch_events(filter_b, Duration::from_secs(5))
112 .await 242 .await
113 .expect("Failed to fetch events from relay_b"); 243 .expect("Failed to fetch events from relay_b");
114 244
245 println!(
246 "Events on relay_b: {} (looking for {})",
247 events_on_b.len(),
248 event_id
249 );
250 for e in events_on_b.iter() {
251 println!(" Found event: {} (kind {})", e.id, e.kind.as_u16());
252 }
253
115 // Check if our event was synced 254 // Check if our event was synced
116 let found = events.iter().any(|e| e.id == event_id); 255 let found_on_b = events_on_b.iter().any(|e| e.id == event_id);
117 256
118 // Clean up 257 // Clean up
119 client_a.disconnect().await; 258 client_a.disconnect().await;
@@ -122,10 +261,10 @@ async fn test_valid_event_syncs_to_relay() {
122 relay_a.stop().await; 261 relay_a.stop().await;
123 262
124 assert!( 263 assert!(
125 found, 264 found_on_b,
126 "Event {} was not synced to relay_b. Found {} events", 265 "Event {} was not synced to relay_b. Found {} events on relay_b",
127 event_id, 266 event_id,
128 events.len() 267 events_on_b.len()
129 ); 268 );
130} 269}
131 270
@@ -165,7 +304,10 @@ async fn test_invalid_event_rejected_by_sync_validation() {
165 // Submit invalid event to relay_a 304 // Submit invalid event to relay_a
166 // Note: relay_a will also reject it due to GRASP validation 305 // Note: relay_a will also reject it due to GRASP validation
167 let client_a = Client::default(); 306 let client_a = Client::default();
168 client_a.add_relay(relay_a.url()).await.expect("Failed to add relay_a"); 307 client_a
308 .add_relay(relay_a.url())
309 .await
310 .expect("Failed to add relay_a");
169 client_a.connect().await; 311 client_a.connect().await;
170 312
171 // This will likely fail since relay_a also validates, but let's try 313 // This will likely fail since relay_a also validates, but let's try
@@ -176,7 +318,10 @@ async fn test_invalid_event_rejected_by_sync_validation() {
176 318
177 // Query relay_b - the event should NOT be present 319 // Query relay_b - the event should NOT be present
178 let client_b = Client::default(); 320 let client_b = Client::default();
179 client_b.add_relay(relay_b.url()).await.expect("Failed to add relay_b"); 321 client_b
322 .add_relay(relay_b.url())
323 .await
324 .expect("Failed to add relay_b");
180 client_b.connect().await; 325 client_b.connect().await;
181 326
182 let filter = Filter::new() 327 let filter = Filter::new()
@@ -221,7 +366,10 @@ async fn test_sync_respects_local_validation() {
221 let valid_event_id = valid_event.id; 366 let valid_event_id = valid_event.id;
222 367
223 let client_a = Client::default(); 368 let client_a = Client::default();
224 client_a.add_relay(relay_a.url()).await.expect("Failed to add relay_a"); 369 client_a
370 .add_relay(relay_a.url())
371 .await
372 .expect("Failed to add relay_a");
225 client_a.connect().await; 373 client_a.connect().await;
226 374
227 client_a 375 client_a
@@ -234,7 +382,10 @@ async fn test_sync_respects_local_validation() {
234 382
235 // Query relay_b to verify the valid event was synced 383 // Query relay_b to verify the valid event was synced
236 let client_b = Client::default(); 384 let client_b = Client::default();
237 client_b.add_relay(relay_b.url()).await.expect("Failed to add relay_b"); 385 client_b
386 .add_relay(relay_b.url())
387 .await
388 .expect("Failed to add relay_b");
238 client_b.connect().await; 389 client_b.connect().await;
239 390
240 let filter = Filter::new() 391 let filter = Filter::new()
@@ -259,4 +410,4 @@ async fn test_sync_respects_local_validation() {
259 "Valid event {} should have been synced to relay_b", 410 "Valid event {} should have been synced to relay_b",
260 valid_event_id 411 valid_event_id
261 ); 412 );
262} \ No newline at end of file 413}