upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/config.rs5
-rw-r--r--src/http/nip11.rs2
-rw-r--r--src/lib.rs1
-rw-r--r--src/main.rs16
-rw-r--r--src/nostr/builder.rs13
-rw-r--r--src/sync/connection.rs143
-rw-r--r--src/sync/manager.rs101
-rw-r--r--src/sync/mod.rs22
-rw-r--r--tests/common/relay.rs42
-rw-r--r--tests/proactive_sync_basic.rs262
10 files changed, 595 insertions, 12 deletions
diff --git a/src/config.rs b/src/config.rs
index 025e020..a2a27be 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -83,6 +83,10 @@ pub struct Config {
83 /// Number of top bandwidth repos to track in metrics 83 /// Number of top bandwidth repos to track in metrics
84 #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)] 84 #[arg(long = "metrics-top-n-repos", env = "NGIT_METRICS_TOP_N_REPOS", default_value_t = 10)]
85 pub metrics_top_n_repos: usize, 85 pub metrics_top_n_repos: usize,
86
87 /// URL of relay to sync kind 30617 events from (optional, enables proactive sync)
88 #[arg(long, env = "NGIT_SYNC_RELAY_URL")]
89 pub sync_relay_url: Option<String>,
86} 90}
87 91
88impl Config { 92impl Config {
@@ -138,6 +142,7 @@ impl Config {
138 metrics_enabled: true, 142 metrics_enabled: true,
139 metrics_connection_per_ip_abuse_threshold: 10, 143 metrics_connection_per_ip_abuse_threshold: 10,
140 metrics_top_n_repos: 10, 144 metrics_top_n_repos: 10,
145 sync_relay_url: None,
141 } 146 }
142 } 147 }
143} 148}
diff --git a/src/http/nip11.rs b/src/http/nip11.rs
index 1723601..e9e1c25 100644
--- a/src/http/nip11.rs
+++ b/src/http/nip11.rs
@@ -105,6 +105,7 @@ mod tests {
105 metrics_enabled: true, 105 metrics_enabled: true,
106 metrics_connection_per_ip_abuse_threshold: 10, 106 metrics_connection_per_ip_abuse_threshold: 10,
107 metrics_top_n_repos: 10, 107 metrics_top_n_repos: 10,
108 sync_relay_url: None,
108 }; 109 };
109 110
110 let doc = RelayInformationDocument::from_config(&config); 111 let doc = RelayInformationDocument::from_config(&config);
@@ -139,6 +140,7 @@ mod tests {
139 metrics_enabled: true, 140 metrics_enabled: true,
140 metrics_connection_per_ip_abuse_threshold: 10, 141 metrics_connection_per_ip_abuse_threshold: 10,
141 metrics_top_n_repos: 10, 142 metrics_top_n_repos: 10,
143 sync_relay_url: None,
142 }; 144 };
143 145
144 let doc = RelayInformationDocument::from_config(&config); 146 let doc = RelayInformationDocument::from_config(&config);
diff --git a/src/lib.rs b/src/lib.rs
index 4d5aab0..a1306c4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,3 +3,4 @@ pub mod git;
3pub mod http; 3pub mod http;
4pub mod metrics; 4pub mod metrics;
5pub mod nostr; 5pub mod nostr;
6pub mod sync;
diff --git a/src/main.rs b/src/main.rs
index 9200cc2..21b351f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,6 +9,7 @@ use ngit_grasp::{
9 http, 9 http,
10 metrics::Metrics, 10 metrics::Metrics,
11 nostr, 11 nostr,
12 sync::SyncManager,
12}; 13};
13 14
14#[tokio::main] 15#[tokio::main]
@@ -50,6 +51,21 @@ async fn main() -> Result<()> {
50 config.domain 51 config.domain
51 ); 52 );
52 53
54 // Start SyncManager if sync_relay_url is configured
55 if let Some(ref sync_url) = config.sync_relay_url {
56 info!("Starting proactive sync from: {}", sync_url);
57 let sync_manager = SyncManager::new(
58 sync_url.clone(),
59 relay_with_db.database.clone(),
60 relay_with_db.write_policy.clone(),
61 );
62 tokio::spawn(async move {
63 sync_manager.run().await;
64 });
65 } else {
66 info!("Proactive sync disabled (no NGIT_SYNC_RELAY_URL configured)");
67 }
68
53 // Start HTTP server with integrated relay and database 69 // Start HTTP server with integrated relay and database
54 info!("Starting HTTP server on {}", config.bind_address); 70 info!("Starting HTTP server on {}", config.bind_address);
55 http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?; 71 http::run_server(config, relay_with_db.relay, relay_with_db.database, metrics).await?;
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index 15ff083..2284c18 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -269,12 +269,14 @@ impl WritePolicy for Nip34WritePolicy {
269 } 269 }
270} 270}
271 271
272/// Result of creating a relay - includes both the relay and database 272/// Result of creating a relay - includes relay, database, and write policy
273pub struct RelayWithDatabase { 273pub struct RelayWithDatabase {
274 /// The local relay instance 274 /// The local relay instance
275 pub relay: LocalRelay, 275 pub relay: LocalRelay,
276 /// The database Arc that can be used for direct queries 276 /// The database Arc that can be used for direct queries
277 pub database: SharedDatabase, 277 pub database: SharedDatabase,
278 /// The write policy used for event validation
279 pub write_policy: Nip34WritePolicy,
278} 280}
279 281
280/// Create a configured LocalRelay with full GRASP-01 validation 282/// Create a configured LocalRelay with full GRASP-01 validation
@@ -330,13 +332,11 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> {
330 // Build relay with GRASP-01 validation 332 // Build relay with GRASP-01 validation
331 // Clone Arc for the write policy so both relay and policy can access the database 333 // Clone Arc for the write policy so both relay and policy can access the database
332 let git_data_path = config.effective_git_data_path(); 334 let git_data_path = config.effective_git_data_path();
335 let write_policy = Nip34WritePolicy::new(&config.domain, database.clone(), &git_data_path);
336
333 let builder = RelayBuilder::default() 337 let builder = RelayBuilder::default()
334 .database(database.clone()) 338 .database(database.clone())
335 .write_policy(Nip34WritePolicy::new( 339 .write_policy(write_policy.clone());
336 &config.domain,
337 database.clone(),
338 &git_data_path,
339 ));
340 340
341 tracing::info!( 341 tracing::info!(
342 "Relay configured with GRASP-01 validation for domain: {}", 342 "Relay configured with GRASP-01 validation for domain: {}",
@@ -346,5 +346,6 @@ pub fn create_relay(config: &Config) -> Result<RelayWithDatabase> {
346 Ok(RelayWithDatabase { 346 Ok(RelayWithDatabase {
347 relay: LocalRelay::new(builder), 347 relay: LocalRelay::new(builder),
348 database, 348 database,
349 write_policy,
349 }) 350 })
350} \ No newline at end of file 351} \ No newline at end of file
diff --git a/src/sync/connection.rs b/src/sync/connection.rs
new file mode 100644
index 0000000..4a79128
--- /dev/null
+++ b/src/sync/connection.rs
@@ -0,0 +1,143 @@
1//! WebSocket connection handling for sync
2//!
3//! Manages the connection to a source relay, subscribes to kind 30617 events,
4//! and passes them through validation.
5
6use std::time::Duration;
7
8use nostr_sdk::prelude::*;
9use tokio::sync::mpsc;
10
11use super::KIND_REPOSITORY_STATE;
12
13/// Event received from the sync connection
14#[derive(Debug, Clone)]
15pub struct SyncedEvent {
16 pub event: Event,
17 pub source_url: String,
18}
19
20/// Manages a WebSocket connection to a single relay for syncing
21pub struct SyncConnection {
22 url: String,
23 client: Client,
24}
25
26impl SyncConnection {
27 /// Create a new sync connection to the given relay URL
28 pub async fn new(url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
29 let client = Client::default();
30
31 // Add the relay
32 client.add_relay(url).await?;
33
34 // Connect to the relay
35 client.connect().await;
36
37 tracing::info!("Sync connection established to {}", url);
38
39 Ok(Self {
40 url: url.to_string(),
41 client,
42 })
43 }
44
45 /// Start receiving events and send them through the channel
46 ///
47 /// This method runs indefinitely, reconnecting as needed.
48 pub async fn run(self, tx: mpsc::Sender<SyncedEvent>) {
49 // Create filter for kind 30617 (repository state) events
50 let filter = Filter::new().kind(Kind::Custom(KIND_REPOSITORY_STATE));
51
52 // Subscribe to events
53 match self.client.subscribe(filter, None).await {
54 Ok(output) => {
55 tracing::info!(
56 "Subscribed to kind {} events on {} (subscription: {})",
57 KIND_REPOSITORY_STATE,
58 self.url,
59 output.id()
60 );
61 }
62 Err(e) => {
63 tracing::error!("Failed to subscribe on {}: {}", self.url, e);
64 return;
65 }
66 };
67
68 // Handle incoming notifications
69 let url = self.url.clone();
70 self.client
71 .handle_notifications(|notification| {
72 let tx = tx.clone();
73 let url = url.clone();
74 async move {
75 match notification {
76 RelayPoolNotification::Event { event, .. } => {
77 tracing::debug!(
78 "Received event {} from {} (kind {})",
79 event.id,
80 url,
81 event.kind.as_u16()
82 );
83
84 // Send the event to the manager for processing
85 let synced = SyncedEvent {
86 event: (*event).clone(),
87 source_url: url.clone(),
88 };
89
90 if let Err(e) = tx.send(synced).await {
91 tracing::warn!("Failed to send synced event: {}", e);
92 return Ok(true); // Stop if channel is closed
93 }
94 }
95 RelayPoolNotification::Shutdown => {
96 tracing::warn!("Relay connection shutdown for {}", url);
97 return Ok(true); // Stop on shutdown
98 }
99 RelayPoolNotification::Message { message, .. } => {
100 tracing::trace!("Received message from {}: {:?}", url, message);
101 }
102 }
103 Ok(false) // Continue processing
104 }
105 })
106 .await
107 .ok();
108 }
109
110}
111
112/// Reconnect loop with exponential backoff
113pub async fn connect_with_retry(
114 url: &str,
115 tx: mpsc::Sender<SyncedEvent>,
116) {
117 let mut backoff = Duration::from_secs(1);
118 let max_backoff = Duration::from_secs(60);
119
120 loop {
121 match SyncConnection::new(url).await {
122 Ok(conn) => {
123 backoff = Duration::from_secs(1); // Reset backoff on successful connection
124 conn.run(tx.clone()).await;
125 tracing::warn!("Sync connection to {} ended, will reconnect", url);
126 }
127 Err(e) => {
128 tracing::error!(
129 "Failed to connect to sync relay {}: {} (retrying in {:?})",
130 url,
131 e,
132 backoff
133 );
134 }
135 }
136
137 // Wait before reconnecting
138 tokio::time::sleep(backoff).await;
139
140 // Exponential backoff
141 backoff = std::cmp::min(backoff * 2, max_backoff);
142 }
143} \ No newline at end of file
diff --git a/src/sync/manager.rs b/src/sync/manager.rs
new file mode 100644
index 0000000..8c883f5
--- /dev/null
+++ b/src/sync/manager.rs
@@ -0,0 +1,101 @@
1//! SyncManager - Coordinates proactive sync operations
2//!
3//! The SyncManager spawns connections to configured relays, receives events,
4//! validates them through the write policy, and stores accepted events.
5
6use nostr_relay_builder::prelude::*;
7use tokio::sync::mpsc;
8
9use super::connection::{connect_with_retry, SyncedEvent};
10use super::SYNC_SOURCE_ADDR;
11use crate::nostr::builder::{Nip34WritePolicy, SharedDatabase};
12
13/// Coordinates proactive sync from configured relays
14pub struct SyncManager {
15 /// URL of the relay to sync from
16 sync_relay_url: String,
17 /// Database for storing accepted events
18 database: SharedDatabase,
19 /// Write policy for validating events
20 write_policy: Nip34WritePolicy,
21}
22
23impl SyncManager {
24 /// Create a new SyncManager
25 pub fn new(
26 sync_relay_url: String,
27 database: SharedDatabase,
28 write_policy: Nip34WritePolicy,
29 ) -> Self {
30 Self {
31 sync_relay_url,
32 database,
33 write_policy,
34 }
35 }
36
37 /// Run the sync manager
38 ///
39 /// This spawns a connection task and processes incoming events.
40 /// Runs indefinitely until the task is cancelled.
41 pub async fn run(self) {
42 tracing::info!("Starting SyncManager for relay: {}", self.sync_relay_url);
43
44 // Create channel for receiving events from connection
45 let (tx, mut rx) = mpsc::channel::<SyncedEvent>(100);
46
47 // Spawn connection task with auto-reconnect
48 let url = self.sync_relay_url.clone();
49 tokio::spawn(async move {
50 connect_with_retry(&url, tx).await;
51 });
52
53 // Process incoming events
54 while let Some(synced_event) = rx.recv().await {
55 self.process_event(synced_event).await;
56 }
57
58 tracing::warn!("SyncManager event channel closed, shutting down");
59 }
60
61 /// Process a single synced event
62 async fn process_event(&self, synced_event: SyncedEvent) {
63 let event = &synced_event.event;
64 let event_id = event.id.to_hex();
65
66 tracing::debug!(
67 "Processing synced event {} (kind {}) from {}",
68 event_id,
69 event.kind.as_u16(),
70 synced_event.source_url
71 );
72
73 // Validate through write policy using SYNC_SOURCE_ADDR
74 let result = self.write_policy.admit_event(event, &SYNC_SOURCE_ADDR).await;
75
76 match result {
77 PolicyResult::Accept => {
78 tracing::info!(
79 "Synced event {} (kind {}) accepted, storing",
80 event_id,
81 event.kind.as_u16()
82 );
83
84 // Store the event in the database
85 if let Err(e) = self.database.save_event(event).await {
86 tracing::error!("Failed to store synced event {}: {}", event_id, e);
87 } else {
88 tracing::debug!("Synced event {} stored successfully", event_id);
89 }
90 }
91 PolicyResult::Reject(reason) => {
92 tracing::info!(
93 "Synced event {} (kind {}) rejected: {}",
94 event_id,
95 event.kind.as_u16(),
96 reason
97 );
98 }
99 }
100 }
101} \ No newline at end of file
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
new file mode 100644
index 0000000..279471b
--- /dev/null
+++ b/src/sync/mod.rs
@@ -0,0 +1,22 @@
1//! Proactive Sync Module for GRASP-02
2//!
3//! This module implements proactive synchronization of kind 30617 (repository state)
4//! events from configured relay(s). Events are validated through the same write policy
5//! as directly-submitted events.
6
7mod connection;
8mod manager;
9
10pub use manager::SyncManager;
11
12use std::net::SocketAddr;
13
14/// Synthetic source address used for synced events
15///
16/// This distinguishes synced events from directly-submitted events in logs and metrics.
17/// Uses 127.0.0.2:0 as a recognizable "synced event" marker.
18pub const SYNC_SOURCE_ADDR: SocketAddr =
19 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 2)), 0);
20
21/// Kind for repository state events (NIP-34)
22pub const KIND_REPOSITORY_STATE: u16 = 30617; \ No newline at end of file
diff --git a/tests/common/relay.rs b/tests/common/relay.rs
index 449b4cb..9fb7b1d 100644
--- a/tests/common/relay.rs
+++ b/tests/common/relay.rs
@@ -33,11 +33,36 @@ impl TestRelay {
33 /// } 33 /// }
34 /// ``` 34 /// ```
35 pub async fn start() -> Self { 35 pub async fn start() -> Self {
36 Self::start_with_port(Self::find_free_port()).await 36 Self::start_with_options(Self::find_free_port(), None).await
37 } 37 }
38 38
39 /// Start relay on a specific port 39 /// Start relay on a specific port
40 pub async fn start_with_port(port: u16) -> Self { 40 pub async fn start_with_port(port: u16) -> Self {
41 Self::start_with_options(port, None).await
42 }
43
44 /// Start relay with sync from another relay
45 ///
46 /// # Example
47 ///
48 /// ```no_run
49 /// use common::TestRelay;
50 ///
51 /// #[tokio::test]
52 /// async fn test_sync() {
53 /// let source = TestRelay::start().await;
54 /// let syncing = TestRelay::start_with_sync(source.url()).await;
55 /// // ... test sync behavior ...
56 /// syncing.stop().await;
57 /// source.stop().await;
58 /// }
59 /// ```
60 pub async fn start_with_sync(sync_relay_url: &str) -> Self {
61 Self::start_with_options(Self::find_free_port(), Some(sync_relay_url.to_string())).await
62 }
63
64 /// Start relay with options
65 async fn start_with_options(port: u16, sync_relay_url: Option<String>) -> Self {
41 let bind_address = format!("127.0.0.1:{}", port); 66 let bind_address = format!("127.0.0.1:{}", port);
42 let url = format!("ws://127.0.0.1:{}", port); 67 let url = format!("ws://127.0.0.1:{}", port);
43 68
@@ -62,16 +87,21 @@ impl TestRelay {
62 .expect("Failed to generate test npub"); 87 .expect("Failed to generate test npub");
63 88
64 // Start the relay process 89 // Start the relay process
65 let process = Command::new(&binary_path) 90 let mut cmd = Command::new(&binary_path);
66 .env("NGIT_BIND_ADDRESS", &bind_address) 91 cmd.env("NGIT_BIND_ADDRESS", &bind_address)
67 .env("NGIT_DOMAIN", &bind_address) // Set domain to match bind address 92 .env("NGIT_DOMAIN", &bind_address) // Set domain to match bind address
68 .env("NGIT_GIT_DATA_PATH", git_data_dir.path()) 93 .env("NGIT_GIT_DATA_PATH", git_data_dir.path())
69 .env("NGIT_OWNER_NPUB", &test_npub) 94 .env("NGIT_OWNER_NPUB", &test_npub)
70 .env("RUST_LOG", "warn") // Less logging during tests 95 .env("RUST_LOG", "warn") // Less logging during tests
71 .stdout(Stdio::null()) 96 .stdout(Stdio::null())
72 .stderr(Stdio::null()) 97 .stderr(Stdio::null());
73 .spawn() 98
74 .expect("Failed to start relay process"); 99 // Add sync relay URL if provided
100 if let Some(ref sync_url) = sync_relay_url {
101 cmd.env("NGIT_SYNC_RELAY_URL", sync_url);
102 }
103
104 let process = cmd.spawn().expect("Failed to start relay process");
75 105
76 let relay = Self { process, url, port }; 106 let relay = Self { process, url, port };
77 107
diff --git a/tests/proactive_sync_basic.rs b/tests/proactive_sync_basic.rs
new file mode 100644
index 0000000..b0b2cbf
--- /dev/null
+++ b/tests/proactive_sync_basic.rs
@@ -0,0 +1,262 @@
1//! GRASP-02 Phase 1: Proactive Sync Basic Integration Tests
2//!
3//! Tests the basic proactive sync functionality using two TestRelay instances:
4//! - relay_a: Source relay with events
5//! - relay_b: Sync relay configured to sync from relay_a
6//!
7//! # Running Tests
8//!
9//! ```bash
10//! cargo test --test proactive_sync_basic
11//! cargo test --test proactive_sync_basic -- --nocapture
12//! ```
13
14mod common;
15
16use std::time::Duration;
17
18use common::TestRelay;
19use nostr_sdk::prelude::*;
20
21/// Kind 30617 - Repository State (NIP-34)
22const KIND_REPOSITORY_STATE: u16 = 30617;
23
24/// Create a valid repository announcement event for testing
25///
26/// This creates a kind 30617 event with required clone and relays tags
27fn create_valid_repo_announcement(
28 keys: &Keys,
29 domain: &str,
30 identifier: &str,
31) -> Event {
32 // Build tags for repository announcement
33 let tags = vec![
34 Tag::identifier(identifier),
35 Tag::custom(
36 TagKind::custom("clone"),
37 vec![format!("http://{}/{}", domain, identifier)],
38 ),
39 Tag::custom(
40 TagKind::custom("relays"),
41 vec![format!("ws://{}", domain)],
42 ),
43 ];
44
45 EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Repository state")
46 .tags(tags)
47 .sign_with_keys(keys)
48 .expect("Failed to sign event")
49}
50
51/// Test that syncing relay connects to source relay
52#[tokio::test]
53async fn test_sync_relay_connects_to_source() {
54 // Start source relay (relay_a)
55 let relay_a = TestRelay::start().await;
56
57 // Start syncing relay (relay_b) configured to sync from relay_a
58 let relay_b = TestRelay::start_with_sync(relay_a.url()).await;
59
60 // Give some time for connection to establish
61 tokio::time::sleep(Duration::from_millis(500)).await;
62
63 // If we got here without panicking, the relays started successfully
64 // The sync connection happens in the background
65
66 relay_b.stop().await;
67 relay_a.stop().await;
68}
69
70/// Test that valid events sync from source to syncing relay
71#[tokio::test]
72async fn test_valid_event_syncs_to_relay() {
73 // Start source relay (relay_a)
74 let relay_a = TestRelay::start().await;
75
76 // Give relay_a time to start
77 tokio::time::sleep(Duration::from_millis(200)).await;
78
79 // Start syncing relay (relay_b) configured to sync from relay_a
80 let relay_b = TestRelay::start_with_sync(relay_a.url()).await;
81
82 // Create test keys
83 let keys = Keys::generate();
84
85 // Create and submit a valid repository announcement to relay_a
86 let event = create_valid_repo_announcement(&keys, &relay_a.domain(), "test-repo");
87 let event_id = event.id;
88
89 // Submit event to relay_a
90 let client_a = Client::default();
91 client_a.add_relay(relay_a.url()).await.expect("Failed to add relay_a");
92 client_a.connect().await;
93
94 let send_result = client_a.send_event(&event).await;
95 assert!(send_result.is_ok(), "Failed to send event to relay_a: {:?}", send_result.err());
96
97 // Wait for sync to occur
98 tokio::time::sleep(Duration::from_secs(2)).await;
99
100 // Query relay_b to verify the event was synced
101 let client_b = Client::default();
102 client_b.add_relay(relay_b.url()).await.expect("Failed to add relay_b");
103 client_b.connect().await;
104
105 // Create filter to find our event
106 let filter = Filter::new()
107 .kind(Kind::Custom(KIND_REPOSITORY_STATE))
108 .author(keys.public_key());
109
110 let events = client_b
111 .fetch_events(filter, Duration::from_secs(5))
112 .await
113 .expect("Failed to fetch events from relay_b");
114
115 // Check if our event was synced
116 let found = events.iter().any(|e| e.id == event_id);
117
118 // Clean up
119 client_a.disconnect().await;
120 client_b.disconnect().await;
121 relay_b.stop().await;
122 relay_a.stop().await;
123
124 assert!(
125 found,
126 "Event {} was not synced to relay_b. Found {} events",
127 event_id,
128 events.len()
129 );
130}
131
132/// Test that invalid events are rejected by syncing relay validation
133#[tokio::test]
134async fn test_invalid_event_rejected_by_sync_validation() {
135 // Start source relay (relay_a) - this is a simple relay without GRASP validation
136 // For this test, we'll use a second ngit-grasp relay, but the key insight is that
137 // the syncing relay should reject events that don't pass its own validation
138
139 let relay_a = TestRelay::start().await;
140 let relay_b = TestRelay::start_with_sync(relay_a.url()).await;
141
142 // Give time for connection
143 tokio::time::sleep(Duration::from_millis(500)).await;
144
145 // Create test keys
146 let keys = Keys::generate();
147
148 // Create an INVALID repository announcement (missing clone tag)
149 let tags = vec![
150 Tag::identifier("test-invalid-repo"),
151 // Missing required "clone" tag!
152 Tag::custom(
153 TagKind::custom("relays"),
154 vec![format!("ws://{}", relay_a.domain())],
155 ),
156 ];
157
158 let invalid_event = EventBuilder::new(Kind::Custom(KIND_REPOSITORY_STATE), "Invalid repo")
159 .tags(tags)
160 .sign_with_keys(&keys)
161 .expect("Failed to sign event");
162
163 let invalid_event_id = invalid_event.id;
164
165 // Submit invalid event to relay_a
166 // Note: relay_a will also reject it due to GRASP validation
167 let client_a = Client::default();
168 client_a.add_relay(relay_a.url()).await.expect("Failed to add relay_a");
169 client_a.connect().await;
170
171 // This will likely fail since relay_a also validates, but let's try
172 let _ = client_a.send_event(&invalid_event).await;
173
174 // Wait for potential sync
175 tokio::time::sleep(Duration::from_secs(1)).await;
176
177 // Query relay_b - the event should NOT be present
178 let client_b = Client::default();
179 client_b.add_relay(relay_b.url()).await.expect("Failed to add relay_b");
180 client_b.connect().await;
181
182 let filter = Filter::new()
183 .kind(Kind::Custom(KIND_REPOSITORY_STATE))
184 .author(keys.public_key());
185
186 let events = client_b
187 .fetch_events(filter, Duration::from_secs(3))
188 .await
189 .expect("Failed to fetch events from relay_b");
190
191 let found = events.iter().any(|e| e.id == invalid_event_id);
192
193 // Clean up
194 client_a.disconnect().await;
195 client_b.disconnect().await;
196 relay_b.stop().await;
197 relay_a.stop().await;
198
199 assert!(
200 !found,
201 "Invalid event {} should NOT have been synced to relay_b",
202 invalid_event_id
203 );
204}
205
206/// Test that syncing relay maintains its own validation policy
207#[tokio::test]
208async fn test_sync_respects_local_validation() {
209 // This test verifies that synced events go through the local Nip34WritePolicy
210 // by testing that orphan events (events referencing non-existent repos) are rejected
211
212 let relay_a = TestRelay::start().await;
213 let relay_b = TestRelay::start_with_sync(relay_a.url()).await;
214
215 tokio::time::sleep(Duration::from_millis(500)).await;
216
217 let keys = Keys::generate();
218
219 // First, create a VALID repository announcement and submit it
220 let valid_event = create_valid_repo_announcement(&keys, &relay_a.domain(), "valid-repo");
221 let valid_event_id = valid_event.id;
222
223 let client_a = Client::default();
224 client_a.add_relay(relay_a.url()).await.expect("Failed to add relay_a");
225 client_a.connect().await;
226
227 client_a
228 .send_event(&valid_event)
229 .await
230 .expect("Failed to send valid event");
231
232 // Wait for sync
233 tokio::time::sleep(Duration::from_secs(2)).await;
234
235 // Query relay_b to verify the valid event was synced
236 let client_b = Client::default();
237 client_b.add_relay(relay_b.url()).await.expect("Failed to add relay_b");
238 client_b.connect().await;
239
240 let filter = Filter::new()
241 .kind(Kind::Custom(KIND_REPOSITORY_STATE))
242 .author(keys.public_key());
243
244 let events = client_b
245 .fetch_events(filter, Duration::from_secs(5))
246 .await
247 .expect("Failed to fetch events from relay_b");
248
249 let found = events.iter().any(|e| e.id == valid_event_id);
250
251 // Clean up
252 client_a.disconnect().await;
253 client_b.disconnect().await;
254 relay_b.stop().await;
255 relay_a.stop().await;
256
257 assert!(
258 found,
259 "Valid event {} should have been synced to relay_b",
260 valid_event_id
261 );
262} \ No newline at end of file