diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-11-19 11:55:32 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-11-19 15:43:29 +0000 |
| commit | fa065ad128882755f2a988d6203b59a2ab5e38ff (patch) | |
| tree | e8326de70a6e6ea56b5bf4250e0a00a3cda4afed /src | |
| parent | 98c6fa4bfa897ff0b8f9c95ea698d4d065b5e9f3 (diff) | |
add landing page and nostr-relay-builder relay on same port
Diffstat (limited to 'src')
| -rw-r--r-- | src/http/landing.rs | 37 | ||||
| -rw-r--r-- | src/http/mod.rs | 31 | ||||
| -rw-r--r-- | src/http/websocket.rs | 73 | ||||
| -rw-r--r-- | src/lib.rs | 2 | ||||
| -rw-r--r-- | src/main.rs | 24 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 121 | ||||
| -rw-r--r-- | src/nostr/mod.rs | 2 | ||||
| -rw-r--r-- | src/nostr/relay.rs | 340 | ||||
| -rw-r--r-- | src/storage/mod.rs | 132 |
9 files changed, 277 insertions, 485 deletions
diff --git a/src/http/landing.rs b/src/http/landing.rs new file mode 100644 index 0000000..35e49e5 --- /dev/null +++ b/src/http/landing.rs | |||
| @@ -0,0 +1,37 @@ | |||
| 1 | /// Landing Page Handler | ||
| 2 | /// | ||
| 3 | /// Serves the HTML landing page or upgrades to WebSocket for Nostr relay connections. | ||
| 4 | |||
| 5 | use actix_web::{web, HttpRequest, HttpResponse, Result}; | ||
| 6 | use nostr_relay_builder::LocalRelay; | ||
| 7 | |||
| 8 | use crate::config::Config; | ||
| 9 | |||
| 10 | /// Handle landing page or WebSocket upgrade | ||
| 11 | pub async fn handle( | ||
| 12 | req: HttpRequest, | ||
| 13 | stream: web::Payload, | ||
| 14 | config: web::Data<Config>, | ||
| 15 | relay: web::Data<LocalRelay>, | ||
| 16 | ) -> Result<HttpResponse> { | ||
| 17 | // Check if this is a WebSocket upgrade request | ||
| 18 | if let Some(upgrade) = req.headers().get("upgrade") { | ||
| 19 | if upgrade.to_str().unwrap_or("").eq_ignore_ascii_case("websocket") { | ||
| 20 | // Delegate to WebSocket handler | ||
| 21 | return crate::http::websocket::handle(req, stream, relay).await; | ||
| 22 | } | ||
| 23 | } | ||
| 24 | |||
| 25 | // Otherwise, serve the landing page | ||
| 26 | let html = format!( | ||
| 27 | include_str!("../../templates/landing.html"), | ||
| 28 | relay_name = config.relay_name, | ||
| 29 | relay_description = config.relay_description, | ||
| 30 | domain = config.domain, | ||
| 31 | bind_address = config.bind_address, | ||
| 32 | ); | ||
| 33 | |||
| 34 | Ok(HttpResponse::Ok() | ||
| 35 | .content_type("text/html; charset=utf-8") | ||
| 36 | .body(html)) | ||
| 37 | } \ No newline at end of file | ||
diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..286e8ff --- /dev/null +++ b/src/http/mod.rs | |||
| @@ -0,0 +1,31 @@ | |||
| 1 | /// HTTP Server Module | ||
| 2 | /// | ||
| 3 | /// Provides actix-web HTTP server with WebSocket upgrade support for the Nostr relay. | ||
| 4 | |||
| 5 | pub mod landing; | ||
| 6 | pub mod websocket; | ||
| 7 | |||
| 8 | use actix_web::{middleware, web, App, HttpServer}; | ||
| 9 | use nostr_relay_builder::LocalRelay; | ||
| 10 | |||
| 11 | use crate::config::Config; | ||
| 12 | |||
| 13 | /// Start the HTTP server with integrated Nostr relay | ||
| 14 | pub async fn run_server(config: Config, relay: LocalRelay) -> anyhow::Result<()> { | ||
| 15 | let bind_addr = config.bind_address.clone(); | ||
| 16 | |||
| 17 | tracing::info!("Starting HTTP server on {}", bind_addr); | ||
| 18 | |||
| 19 | HttpServer::new(move || { | ||
| 20 | App::new() | ||
| 21 | .app_data(web::Data::new(config.clone())) | ||
| 22 | .app_data(web::Data::new(relay.clone())) | ||
| 23 | .wrap(middleware::Logger::default()) | ||
| 24 | .route("/", web::get().to(landing::handle)) | ||
| 25 | }) | ||
| 26 | .bind(&bind_addr)? | ||
| 27 | .run() | ||
| 28 | .await?; | ||
| 29 | |||
| 30 | Ok(()) | ||
| 31 | } \ No newline at end of file | ||
diff --git a/src/http/websocket.rs b/src/http/websocket.rs new file mode 100644 index 0000000..7af847a --- /dev/null +++ b/src/http/websocket.rs | |||
| @@ -0,0 +1,73 @@ | |||
| 1 | /// WebSocket Handler | ||
| 2 | /// | ||
| 3 | /// Handles WebSocket upgrade requests and passes connections to the Nostr relay. | ||
| 4 | |||
| 5 | use actix_web::{web, HttpRequest, HttpResponse, Result, Error}; | ||
| 6 | use actix_ws::Message; | ||
| 7 | use futures_util::StreamExt; | ||
| 8 | use nostr_relay_builder::LocalRelay; | ||
| 9 | |||
| 10 | /// Handle WebSocket upgrade and relay connection | ||
| 11 | pub async fn handle( | ||
| 12 | req: HttpRequest, | ||
| 13 | stream: web::Payload, | ||
| 14 | relay: web::Data<LocalRelay>, | ||
| 15 | ) -> Result<HttpResponse, Error> { | ||
| 16 | let (response, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; | ||
| 17 | |||
| 18 | let peer_addr = req.peer_addr() | ||
| 19 | .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); | ||
| 20 | |||
| 21 | tracing::debug!("WebSocket connection from {}", peer_addr); | ||
| 22 | |||
| 23 | // Spawn task to handle the WebSocket connection | ||
| 24 | // TODO: Will use relay.take_connection() for full Nostr relay integration | ||
| 25 | let _relay = relay.get_ref().clone(); | ||
| 26 | actix_web::rt::spawn(async move { | ||
| 27 | // Create a channel to communicate between actix-ws and relay | ||
| 28 | let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); | ||
| 29 | |||
| 30 | // Spawn task to send messages from relay to client | ||
| 31 | let mut session_clone = session.clone(); | ||
| 32 | actix_web::rt::spawn(async move { | ||
| 33 | while let Some(msg) = rx.recv().await { | ||
| 34 | if session_clone.text(msg).await.is_err() { | ||
| 35 | break; | ||
| 36 | } | ||
| 37 | } | ||
| 38 | }); | ||
| 39 | |||
| 40 | // Handle incoming messages from client | ||
| 41 | while let Some(Ok(msg)) = msg_stream.next().await { | ||
| 42 | match msg { | ||
| 43 | Message::Text(text) => { | ||
| 44 | // For now, just echo back - will integrate with relay in next phase | ||
| 45 | tracing::debug!("Received text message: {}", text); | ||
| 46 | if let Err(e) = tx.send(text.to_string()) { | ||
| 47 | tracing::error!("Failed to send message: {}", e); | ||
| 48 | break; | ||
| 49 | } | ||
| 50 | } | ||
| 51 | Message::Binary(_) => { | ||
| 52 | tracing::warn!("Received unexpected binary message"); | ||
| 53 | } | ||
| 54 | Message::Close(_) => { | ||
| 55 | tracing::debug!("Client closed connection"); | ||
| 56 | break; | ||
| 57 | } | ||
| 58 | Message::Ping(bytes) => { | ||
| 59 | if session.pong(&bytes).await.is_err() { | ||
| 60 | break; | ||
| 61 | } | ||
| 62 | } | ||
| 63 | Message::Pong(_) => {} | ||
| 64 | Message::Continuation(_) => {} | ||
| 65 | Message::Nop => {} | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | tracing::debug!("WebSocket connection closed for {}", peer_addr); | ||
| 70 | }); | ||
| 71 | |||
| 72 | Ok(response) | ||
| 73 | } \ No newline at end of file | ||
| @@ -1,3 +1,3 @@ | |||
| 1 | pub mod config; | 1 | pub mod config; |
| 2 | pub mod http; | ||
| 2 | pub mod nostr; | 3 | pub mod nostr; |
| 3 | pub mod storage; | ||
diff --git a/src/main.rs b/src/main.rs index 7da4c73..38a3b95 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -3,8 +3,8 @@ use tracing::{info, Level}; | |||
| 3 | use tracing_subscriber::FmtSubscriber; | 3 | use tracing_subscriber::FmtSubscriber; |
| 4 | 4 | ||
| 5 | mod config; | 5 | mod config; |
| 6 | mod http; | ||
| 6 | mod nostr; | 7 | mod nostr; |
| 7 | mod storage; | ||
| 8 | 8 | ||
| 9 | use config::Config; | 9 | use config::Config; |
| 10 | 10 | ||
| @@ -16,21 +16,23 @@ async fn main() -> Result<()> { | |||
| 16 | .finish(); | 16 | .finish(); |
| 17 | tracing::subscriber::set_global_default(subscriber)?; | 17 | tracing::subscriber::set_global_default(subscriber)?; |
| 18 | 18 | ||
| 19 | info!("Starting ngit-grasp..."); | 19 | info!("Starting ngit-grasp with nostr-relay-builder..."); |
| 20 | 20 | ||
| 21 | // Load configuration | 21 | // Load configuration |
| 22 | let config = Config::from_env()?; | 22 | let config = Config::from_env()?; |
| 23 | info!("Configuration loaded: {}", config.bind_address); | 23 | info!("Configuration loaded: {}", config.bind_address); |
| 24 | 24 | ||
| 25 | // Initialize storage | 25 | // Create Nostr relay with NIP-34 validation |
| 26 | let storage = storage::Storage::new(&config)?; | 26 | if let Ok(relay) = nostr::builder::create_relay(&config) { |
| 27 | info!("Storage initialized at: {}", config.relay_data_path); | 27 | info!( |
| 28 | 28 | "Relay created with NIP-34 validation for domain: {}", | |
| 29 | // Start Nostr relay | 29 | config.domain |
| 30 | let relay = nostr::relay::RelayServer::new(config.clone(), storage)?; | 30 | ); |
| 31 | 31 | ||
| 32 | info!("Starting Nostr relay on {}", config.bind_address); | 32 | // Start HTTP server with integrated relay |
| 33 | relay.run().await?; | 33 | info!("Starting HTTP server on {}", config.bind_address); |
| 34 | http::run_server(config, relay).await?; | ||
| 35 | } | ||
| 34 | 36 | ||
| 35 | Ok(()) | 37 | Ok(()) |
| 36 | } | 38 | } |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs new file mode 100644 index 0000000..cd1f4d2 --- /dev/null +++ b/src/nostr/builder.rs | |||
| @@ -0,0 +1,121 @@ | |||
| 1 | /// Nostr Relay Builder Configuration | ||
| 2 | /// | ||
| 3 | /// This module integrates nostr-relay-builder with NIP-34 validation logic | ||
| 4 | /// preserved from the original implementation. | ||
| 5 | use std::net::SocketAddr; | ||
| 6 | use std::path::Path; | ||
| 7 | |||
| 8 | use nostr::nips::nip19::ToBech32; | ||
| 9 | use nostr_relay_builder::prelude::*; | ||
| 10 | |||
| 11 | use crate::config::Config; | ||
| 12 | use crate::nostr::events::{ | ||
| 13 | validate_announcement, validate_state, KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE, | ||
| 14 | }; | ||
| 15 | |||
| 16 | /// NIP-34 Write Policy | ||
| 17 | /// | ||
| 18 | /// Validates repository announcement and state events according to GRASP-01 spec. | ||
| 19 | /// Preserves all original validation logic from src/nostr/events.rs. | ||
| 20 | #[derive(Debug, Clone)] | ||
| 21 | pub struct Nip34WritePolicy { | ||
| 22 | domain: String, | ||
| 23 | } | ||
| 24 | |||
| 25 | impl Nip34WritePolicy { | ||
| 26 | pub fn new(domain: impl Into<String>) -> Self { | ||
| 27 | Self { | ||
| 28 | domain: domain.into(), | ||
| 29 | } | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | impl WritePolicy for Nip34WritePolicy { | ||
| 34 | fn admit_event<'a>( | ||
| 35 | &'a self, | ||
| 36 | event: &'a nostr_relay_builder::prelude::Event, | ||
| 37 | _addr: &'a SocketAddr, | ||
| 38 | ) -> BoxedFuture<'a, PolicyResult> { | ||
| 39 | Box::pin(async move { | ||
| 40 | match event.kind.as_u16() { | ||
| 41 | KIND_REPOSITORY_ANNOUNCEMENT => match validate_announcement(event, &self.domain) { | ||
| 42 | Ok(_) => { | ||
| 43 | tracing::debug!( | ||
| 44 | "Accepted repository announcement: {}", | ||
| 45 | event | ||
| 46 | .id | ||
| 47 | .to_bech32() | ||
| 48 | .unwrap_or_else(|_| "invalid".to_string()) | ||
| 49 | ); | ||
| 50 | PolicyResult::Accept | ||
| 51 | } | ||
| 52 | Err(e) => { | ||
| 53 | tracing::warn!( | ||
| 54 | "Rejected repository announcement {}: {}", | ||
| 55 | event | ||
| 56 | .id | ||
| 57 | .to_bech32() | ||
| 58 | .unwrap_or_else(|_| "invalid".to_string()), | ||
| 59 | e | ||
| 60 | ); | ||
| 61 | PolicyResult::Reject(e.to_string()) | ||
| 62 | } | ||
| 63 | }, | ||
| 64 | KIND_REPOSITORY_STATE => match validate_state(event) { | ||
| 65 | Ok(_) => { | ||
| 66 | tracing::debug!( | ||
| 67 | "Accepted repository state: {}", | ||
| 68 | event | ||
| 69 | .id | ||
| 70 | .to_bech32() | ||
| 71 | .unwrap_or_else(|_| "invalid".to_string()) | ||
| 72 | ); | ||
| 73 | PolicyResult::Accept | ||
| 74 | } | ||
| 75 | Err(e) => { | ||
| 76 | tracing::warn!( | ||
| 77 | "Rejected repository state {}: {}", | ||
| 78 | event | ||
| 79 | .id | ||
| 80 | .to_bech32() | ||
| 81 | .unwrap_or_else(|_| "invalid".to_string()), | ||
| 82 | e | ||
| 83 | ); | ||
| 84 | PolicyResult::Reject(e.to_string()) | ||
| 85 | } | ||
| 86 | }, | ||
| 87 | // Accept all other event kinds without validation | ||
| 88 | _ => PolicyResult::Accept, | ||
| 89 | } | ||
| 90 | }) | ||
| 91 | } | ||
| 92 | } | ||
| 93 | |||
| 94 | /// Create a configured LocalRelay with NIP-34 validation | ||
| 95 | pub fn create_relay(config: &Config) -> Result<LocalRelay> { | ||
| 96 | tracing::info!("Configuring nostr relay..."); | ||
| 97 | |||
| 98 | // Determine database path | ||
| 99 | let db_path = Path::new(&config.relay_data_path); | ||
| 100 | |||
| 101 | // Create database - using in-memory for now, can switch to persistent later | ||
| 102 | // TODO: Add configuration for NostrDB or LMDB backends | ||
| 103 | let database = MemoryDatabase::with_opts(MemoryDatabaseOptions { | ||
| 104 | events: true, | ||
| 105 | max_events: Some(100_000), | ||
| 106 | }); | ||
| 107 | |||
| 108 | tracing::info!("Using in-memory database (path: {})", db_path.display()); | ||
| 109 | |||
| 110 | // Build relay with NIP-34 validation | ||
| 111 | let builder = RelayBuilder::default() | ||
| 112 | .database(database) | ||
| 113 | .write_policy(Nip34WritePolicy::new(&config.domain)); | ||
| 114 | |||
| 115 | tracing::info!( | ||
| 116 | "Relay configured with NIP-34 validation for domain: {}", | ||
| 117 | config.domain | ||
| 118 | ); | ||
| 119 | |||
| 120 | Ok(LocalRelay::new(builder)) | ||
| 121 | } | ||
diff --git a/src/nostr/mod.rs b/src/nostr/mod.rs index b485b91..2bf0346 100644 --- a/src/nostr/mod.rs +++ b/src/nostr/mod.rs | |||
| @@ -1,2 +1,2 @@ | |||
| 1 | pub mod builder; | ||
| 1 | pub mod events; | 2 | pub mod events; |
| 2 | pub mod relay; | ||
diff --git a/src/nostr/relay.rs b/src/nostr/relay.rs deleted file mode 100644 index 1033b5b..0000000 --- a/src/nostr/relay.rs +++ /dev/null | |||
| @@ -1,340 +0,0 @@ | |||
| 1 | use anyhow::Result; | ||
| 2 | use futures_util::{SinkExt, StreamExt}; | ||
| 3 | use nostr_sdk::{Event, EventId, Filter, Kind}; | ||
| 4 | use serde_json::{json, Value}; | ||
| 5 | use std::collections::HashMap; | ||
| 6 | use std::net::SocketAddr; | ||
| 7 | use std::sync::Arc; | ||
| 8 | use tokio::net::{TcpListener, TcpStream}; | ||
| 9 | use tokio::sync::RwLock; | ||
| 10 | use tokio_tungstenite::{accept_async, tungstenite::Message}; | ||
| 11 | use tracing::{debug, error, info, warn}; | ||
| 12 | |||
| 13 | use crate::config::Config; | ||
| 14 | use crate::nostr::events::{validate_announcement, validate_state, KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; | ||
| 15 | use crate::storage::Storage; | ||
| 16 | |||
| 17 | type Subscriptions = Arc<RwLock<HashMap<String, Vec<Filter>>>>; | ||
| 18 | |||
| 19 | pub struct RelayServer { | ||
| 20 | config: Config, | ||
| 21 | storage: Storage, | ||
| 22 | } | ||
| 23 | |||
| 24 | impl RelayServer { | ||
| 25 | pub fn new(config: Config, storage: Storage) -> Result<Self> { | ||
| 26 | Ok(RelayServer { config, storage }) | ||
| 27 | } | ||
| 28 | |||
| 29 | pub async fn run(self) -> Result<()> { | ||
| 30 | let addr: SocketAddr = self.config.bind_address.parse()?; | ||
| 31 | let listener = TcpListener::bind(&addr).await?; | ||
| 32 | |||
| 33 | info!("✅ Nostr relay listening on ws://{}", addr); | ||
| 34 | info!("📡 Ready to accept connections..."); | ||
| 35 | |||
| 36 | loop { | ||
| 37 | match listener.accept().await { | ||
| 38 | Ok((stream, peer_addr)) => { | ||
| 39 | debug!("New connection from: {}", peer_addr); | ||
| 40 | let storage = self.storage.clone(); | ||
| 41 | tokio::spawn(async move { | ||
| 42 | if let Err(e) = handle_connection(stream, storage).await { | ||
| 43 | error!("Error handling connection from {}: {}", peer_addr, e); | ||
| 44 | } | ||
| 45 | }); | ||
| 46 | } | ||
| 47 | Err(e) => { | ||
| 48 | error!("Error accepting connection: {}", e); | ||
| 49 | } | ||
| 50 | } | ||
| 51 | } | ||
| 52 | } | ||
| 53 | } | ||
| 54 | |||
| 55 | async fn handle_connection(stream: TcpStream, storage: Storage) -> Result<()> { | ||
| 56 | let ws_stream = accept_async(stream).await?; | ||
| 57 | let (mut ws_sender, mut ws_receiver) = ws_stream.split(); | ||
| 58 | |||
| 59 | let subscriptions: Subscriptions = Arc::new(RwLock::new(HashMap::new())); | ||
| 60 | |||
| 61 | while let Some(msg) = ws_receiver.next().await { | ||
| 62 | match msg { | ||
| 63 | Ok(Message::Text(text)) => { | ||
| 64 | debug!("Received message: {}", text); | ||
| 65 | |||
| 66 | match handle_message(&text, &storage, &subscriptions).await { | ||
| 67 | Ok(responses) => { | ||
| 68 | for response in responses { | ||
| 69 | let response_text = serde_json::to_string(&response)?; | ||
| 70 | debug!("Sending response: {}", response_text); | ||
| 71 | ws_sender.send(Message::Text(response_text)).await?; | ||
| 72 | } | ||
| 73 | } | ||
| 74 | Err(e) => { | ||
| 75 | warn!("Error handling message: {}", e); | ||
| 76 | let notice = json!(["NOTICE", format!("Error: {}", e)]); | ||
| 77 | ws_sender.send(Message::Text(notice.to_string())).await?; | ||
| 78 | } | ||
| 79 | } | ||
| 80 | } | ||
| 81 | Ok(Message::Close(_)) => { | ||
| 82 | debug!("Client closed connection"); | ||
| 83 | break; | ||
| 84 | } | ||
| 85 | Ok(Message::Ping(data)) => { | ||
| 86 | ws_sender.send(Message::Pong(data)).await?; | ||
| 87 | } | ||
| 88 | Ok(_) => { | ||
| 89 | // Ignore other message types | ||
| 90 | } | ||
| 91 | Err(e) => { | ||
| 92 | error!("WebSocket error: {}", e); | ||
| 93 | break; | ||
| 94 | } | ||
| 95 | } | ||
| 96 | } | ||
| 97 | |||
| 98 | Ok(()) | ||
| 99 | } | ||
| 100 | |||
| 101 | async fn handle_message( | ||
| 102 | text: &str, | ||
| 103 | storage: &Storage, | ||
| 104 | subscriptions: &Subscriptions, | ||
| 105 | ) -> Result<Vec<Value>> { | ||
| 106 | let msg: Value = serde_json::from_str(text)?; | ||
| 107 | |||
| 108 | if let Some(arr) = msg.as_array() { | ||
| 109 | if arr.is_empty() { | ||
| 110 | return Ok(vec![json!(["NOTICE", "Empty message"])]); | ||
| 111 | } | ||
| 112 | |||
| 113 | let msg_type = arr[0].as_str().unwrap_or(""); | ||
| 114 | |||
| 115 | match msg_type { | ||
| 116 | "EVENT" => handle_event(arr, storage).await, | ||
| 117 | "REQ" => handle_req(arr, storage, subscriptions).await, | ||
| 118 | "CLOSE" => handle_close(arr, subscriptions).await, | ||
| 119 | _ => Ok(vec![json!(["NOTICE", format!("Unknown message type: {}", msg_type)])]), | ||
| 120 | } | ||
| 121 | } else { | ||
| 122 | Ok(vec![json!(["NOTICE", "Invalid message format"])]) | ||
| 123 | } | ||
| 124 | } | ||
| 125 | |||
| 126 | async fn handle_event(arr: &[Value], storage: &Storage) -> Result<Vec<Value>> { | ||
| 127 | if arr.len() < 2 { | ||
| 128 | return Ok(vec![json!(["NOTICE", "EVENT message requires event object"])]); | ||
| 129 | } | ||
| 130 | |||
| 131 | let event: Event = serde_json::from_value(arr[1].clone())?; | ||
| 132 | let event_id = event.id; | ||
| 133 | |||
| 134 | // Verify event (signature and ID) | ||
| 135 | if event.verify().is_err() { | ||
| 136 | return Ok(vec![json!(["OK", event_id.to_hex(), false, "invalid: signature or ID verification failed"])]); | ||
| 137 | } | ||
| 138 | |||
| 139 | // Check if event already exists | ||
| 140 | if storage.get_event(&event_id.to_hex()).await.is_some() { | ||
| 141 | return Ok(vec![json!(["OK", event_id.to_hex(), true, "duplicate: event already exists"])]); | ||
| 142 | } | ||
| 143 | |||
| 144 | // Validate repository announcements (kind 30617) | ||
| 145 | if event.kind == Kind::from(KIND_REPOSITORY_ANNOUNCEMENT) { | ||
| 146 | // Get domain from storage config | ||
| 147 | let domain = storage.get_domain(); | ||
| 148 | |||
| 149 | match validate_announcement(&event, &domain) { | ||
| 150 | Ok(()) => { | ||
| 151 | info!("✅ Valid repository announcement: {} ({})", event_id, event.kind); | ||
| 152 | } | ||
| 153 | Err(e) => { | ||
| 154 | warn!("❌ Invalid repository announcement: {}", e); | ||
| 155 | return Ok(vec![json!(["OK", event_id.to_hex(), false, format!("invalid: {}", e)])]); | ||
| 156 | } | ||
| 157 | } | ||
| 158 | } | ||
| 159 | |||
| 160 | // Validate repository state announcements (kind 30618) | ||
| 161 | if event.kind == Kind::from(KIND_REPOSITORY_STATE) { | ||
| 162 | match validate_state(&event) { | ||
| 163 | Ok(()) => { | ||
| 164 | info!("✅ Valid repository state: {} ({})", event_id, event.kind); | ||
| 165 | } | ||
| 166 | Err(e) => { | ||
| 167 | warn!("❌ Invalid repository state: {}", e); | ||
| 168 | return Ok(vec![json!(["OK", event_id.to_hex(), false, format!("invalid: {}", e)])]); | ||
| 169 | } | ||
| 170 | } | ||
| 171 | } | ||
| 172 | |||
| 173 | // Store the event | ||
| 174 | storage.store_event(event.clone()).await?; | ||
| 175 | |||
| 176 | info!("✅ Stored event: {} (kind: {})", event_id, event.kind); | ||
| 177 | |||
| 178 | Ok(vec![json!(["OK", event_id.to_hex(), true, ""])]) | ||
| 179 | } | ||
| 180 | |||
| 181 | async fn handle_req( | ||
| 182 | arr: &[Value], | ||
| 183 | storage: &Storage, | ||
| 184 | subscriptions: &Subscriptions, | ||
| 185 | ) -> Result<Vec<Value>> { | ||
| 186 | if arr.len() < 2 { | ||
| 187 | return Ok(vec![json!(["NOTICE", "REQ message requires subscription ID"])]); | ||
| 188 | } | ||
| 189 | |||
| 190 | let sub_id = arr[1].as_str().ok_or_else(|| anyhow::anyhow!("Invalid subscription ID"))?; | ||
| 191 | |||
| 192 | // Parse filters | ||
| 193 | let mut filters = Vec::new(); | ||
| 194 | for filter_value in &arr[2..] { | ||
| 195 | let filter: Filter = serde_json::from_value(filter_value.clone())?; | ||
| 196 | filters.push(filter.clone()); | ||
| 197 | } | ||
| 198 | |||
| 199 | // Store subscription | ||
| 200 | { | ||
| 201 | let mut subs = subscriptions.write().await; | ||
| 202 | subs.insert(sub_id.to_string(), filters.clone()); | ||
| 203 | } | ||
| 204 | |||
| 205 | debug!("Created subscription: {} with {} filters", sub_id, filters.len()); | ||
| 206 | |||
| 207 | // Query and send matching events | ||
| 208 | let mut responses = Vec::new(); | ||
| 209 | |||
| 210 | for filter in filters { | ||
| 211 | let events = storage.query_events(|event| { | ||
| 212 | matches_filter(event, &filter) | ||
| 213 | }).await; | ||
| 214 | |||
| 215 | for event in events { | ||
| 216 | responses.push(json!(["EVENT", sub_id, event])); | ||
| 217 | } | ||
| 218 | } | ||
| 219 | |||
| 220 | // Send EOSE (End of Stored Events) | ||
| 221 | responses.push(json!(["EOSE", sub_id])); | ||
| 222 | |||
| 223 | debug!("Subscription {} returned {} events", sub_id, responses.len() - 1); | ||
| 224 | |||
| 225 | Ok(responses) | ||
| 226 | } | ||
| 227 | |||
| 228 | async fn handle_close(arr: &[Value], subscriptions: &Subscriptions) -> Result<Vec<Value>> { | ||
| 229 | if arr.len() < 2 { | ||
| 230 | return Ok(vec![json!(["NOTICE", "CLOSE message requires subscription ID"])]); | ||
| 231 | } | ||
| 232 | |||
| 233 | let sub_id = arr[1].as_str().ok_or_else(|| anyhow::anyhow!("Invalid subscription ID"))?; | ||
| 234 | |||
| 235 | { | ||
| 236 | let mut subs = subscriptions.write().await; | ||
| 237 | subs.remove(sub_id); | ||
| 238 | } | ||
| 239 | |||
| 240 | debug!("Closed subscription: {}", sub_id); | ||
| 241 | |||
| 242 | Ok(vec![]) | ||
| 243 | } | ||
| 244 | |||
| 245 | fn matches_filter(event: &Event, filter: &Filter) -> bool { | ||
| 246 | // Check IDs | ||
| 247 | if let Some(ref ids) = filter.ids { | ||
| 248 | if !ids.is_empty() && !ids.contains(&event.id) { | ||
| 249 | return false; | ||
| 250 | } | ||
| 251 | } | ||
| 252 | |||
| 253 | // Check authors | ||
| 254 | if let Some(ref authors) = filter.authors { | ||
| 255 | if !authors.is_empty() && !authors.contains(&event.pubkey) { | ||
| 256 | return false; | ||
| 257 | } | ||
| 258 | } | ||
| 259 | |||
| 260 | // Check kinds | ||
| 261 | if let Some(ref kinds) = filter.kinds { | ||
| 262 | if !kinds.is_empty() && !kinds.contains(&event.kind) { | ||
| 263 | return false; | ||
| 264 | } | ||
| 265 | } | ||
| 266 | |||
| 267 | // Check since | ||
| 268 | if let Some(since) = filter.since { | ||
| 269 | if event.created_at < since { | ||
| 270 | return false; | ||
| 271 | } | ||
| 272 | } | ||
| 273 | |||
| 274 | // Check until | ||
| 275 | if let Some(until) = filter.until { | ||
| 276 | if event.created_at > until { | ||
| 277 | return false; | ||
| 278 | } | ||
| 279 | } | ||
| 280 | |||
| 281 | // TODO: Check tags (#e, #p, etc.) | ||
| 282 | |||
| 283 | true | ||
| 284 | } | ||
| 285 | |||
| 286 | #[cfg(test)] | ||
| 287 | mod tests { | ||
| 288 | use super::*; | ||
| 289 | use nostr_sdk::{EventBuilder, Keys, Kind}; | ||
| 290 | |||
| 291 | #[test] | ||
| 292 | fn test_matches_filter_by_id() { | ||
| 293 | let keys = Keys::generate(); | ||
| 294 | let event = EventBuilder::text_note("test") | ||
| 295 | .sign_with_keys(&keys) | ||
| 296 | .unwrap(); | ||
| 297 | |||
| 298 | // Filter matching the event ID | ||
| 299 | let filter = Filter::new().id(event.id); | ||
| 300 | assert!(matches_filter(&event, &filter)); | ||
| 301 | |||
| 302 | // Filter not matching | ||
| 303 | let other_id = EventId::all_zeros(); | ||
| 304 | let filter = Filter::new().id(other_id); | ||
| 305 | assert!(!matches_filter(&event, &filter)); | ||
| 306 | } | ||
| 307 | |||
| 308 | #[test] | ||
| 309 | fn test_matches_filter_by_author() { | ||
| 310 | let keys = Keys::generate(); | ||
| 311 | let event = EventBuilder::text_note("test") | ||
| 312 | .sign_with_keys(&keys) | ||
| 313 | .unwrap(); | ||
| 314 | |||
| 315 | // Filter matching the author | ||
| 316 | let filter = Filter::new().author(keys.public_key()); | ||
| 317 | assert!(matches_filter(&event, &filter)); | ||
| 318 | |||
| 319 | // Filter not matching | ||
| 320 | let other_keys = Keys::generate(); | ||
| 321 | let filter = Filter::new().author(other_keys.public_key()); | ||
| 322 | assert!(!matches_filter(&event, &filter)); | ||
| 323 | } | ||
| 324 | |||
| 325 | #[test] | ||
| 326 | fn test_matches_filter_by_kind() { | ||
| 327 | let keys = Keys::generate(); | ||
| 328 | let event = EventBuilder::text_note("test") | ||
| 329 | .sign_with_keys(&keys) | ||
| 330 | .unwrap(); | ||
| 331 | |||
| 332 | // Filter matching the kind | ||
| 333 | let filter = Filter::new().kind(Kind::TextNote); | ||
| 334 | assert!(matches_filter(&event, &filter)); | ||
| 335 | |||
| 336 | // Filter not matching | ||
| 337 | let filter = Filter::new().kind(Kind::Metadata); | ||
| 338 | assert!(!matches_filter(&event, &filter)); | ||
| 339 | } | ||
| 340 | } | ||
diff --git a/src/storage/mod.rs b/src/storage/mod.rs deleted file mode 100644 index eab8211..0000000 --- a/src/storage/mod.rs +++ /dev/null | |||
| @@ -1,132 +0,0 @@ | |||
| 1 | use anyhow::Result; | ||
| 2 | use nostr_sdk::Event; | ||
| 3 | use std::collections::HashMap; | ||
| 4 | use std::sync::Arc; | ||
| 5 | use tokio::sync::RwLock; | ||
| 6 | |||
| 7 | use crate::config::Config; | ||
| 8 | |||
| 9 | /// Simple in-memory storage for events | ||
| 10 | /// TODO: Persist to disk for production use | ||
| 11 | #[derive(Clone)] | ||
| 12 | pub struct Storage { | ||
| 13 | events: Arc<RwLock<HashMap<String, Event>>>, | ||
| 14 | data_path: String, | ||
| 15 | domain: String, | ||
| 16 | } | ||
| 17 | |||
| 18 | impl Storage { | ||
| 19 | pub fn new(config: &Config) -> Result<Self> { | ||
| 20 | // Create data directory if it doesn't exist | ||
| 21 | std::fs::create_dir_all(&config.relay_data_path)?; | ||
| 22 | |||
| 23 | Ok(Storage { | ||
| 24 | events: Arc::new(RwLock::new(HashMap::new())), | ||
| 25 | data_path: config.relay_data_path.clone(), | ||
| 26 | domain: config.domain.clone(), | ||
| 27 | }) | ||
| 28 | } | ||
| 29 | |||
| 30 | pub fn get_domain(&self) -> String { | ||
| 31 | self.domain.clone() | ||
| 32 | } | ||
| 33 | |||
| 34 | pub async fn store_event(&self, event: Event) -> Result<()> { | ||
| 35 | let mut events = self.events.write().await; | ||
| 36 | events.insert(event.id.to_hex(), event); | ||
| 37 | Ok(()) | ||
| 38 | } | ||
| 39 | |||
| 40 | pub async fn get_event(&self, event_id: &str) -> Option<Event> { | ||
| 41 | let events = self.events.read().await; | ||
| 42 | events.get(event_id).cloned() | ||
| 43 | } | ||
| 44 | |||
| 45 | pub async fn query_events<F>(&self, filter: F) -> Vec<Event> | ||
| 46 | where | ||
| 47 | F: Fn(&Event) -> bool, | ||
| 48 | { | ||
| 49 | let events = self.events.read().await; | ||
| 50 | events.values().filter(|e| filter(e)).cloned().collect() | ||
| 51 | } | ||
| 52 | |||
| 53 | pub async fn count_events(&self) -> usize { | ||
| 54 | let events = self.events.read().await; | ||
| 55 | events.len() | ||
| 56 | } | ||
| 57 | } | ||
| 58 | |||
| 59 | #[cfg(test)] | ||
| 60 | mod tests { | ||
| 61 | use super::*; | ||
| 62 | use nostr_sdk::{EventBuilder, Keys, Kind}; | ||
| 63 | |||
| 64 | #[tokio::test] | ||
| 65 | async fn test_store_and_retrieve() { | ||
| 66 | let config = Config { | ||
| 67 | domain: "test".to_string(), | ||
| 68 | owner_npub: "npub1test".to_string(), | ||
| 69 | relay_name: "test".to_string(), | ||
| 70 | relay_description: "test".to_string(), | ||
| 71 | git_data_path: "./test_data/git".to_string(), | ||
| 72 | relay_data_path: "./test_data/relay".to_string(), | ||
| 73 | bind_address: "127.0.0.1:8080".to_string(), | ||
| 74 | }; | ||
| 75 | |||
| 76 | let storage = Storage::new(&config).unwrap(); | ||
| 77 | |||
| 78 | // Create a test event | ||
| 79 | let keys = Keys::generate(); | ||
| 80 | let event = EventBuilder::text_note("test content") | ||
| 81 | .sign_with_keys(&keys) | ||
| 82 | .unwrap(); | ||
| 83 | |||
| 84 | // Store it | ||
| 85 | storage.store_event(event.clone()).await.unwrap(); | ||
| 86 | |||
| 87 | // Retrieve it | ||
| 88 | let retrieved = storage.get_event(&event.id.to_hex()).await; | ||
| 89 | assert!(retrieved.is_some()); | ||
| 90 | assert_eq!(retrieved.unwrap().id, event.id); | ||
| 91 | |||
| 92 | // Count events | ||
| 93 | assert_eq!(storage.count_events().await, 1); | ||
| 94 | } | ||
| 95 | |||
| 96 | #[tokio::test] | ||
| 97 | async fn test_query_events() { | ||
| 98 | let config = Config { | ||
| 99 | domain: "test".to_string(), | ||
| 100 | owner_npub: "npub1test".to_string(), | ||
| 101 | relay_name: "test".to_string(), | ||
| 102 | relay_description: "test".to_string(), | ||
| 103 | git_data_path: "./test_data/git".to_string(), | ||
| 104 | relay_data_path: "./test_data/relay".to_string(), | ||
| 105 | bind_address: "127.0.0.1:8080".to_string(), | ||
| 106 | }; | ||
| 107 | |||
| 108 | let storage = Storage::new(&config).unwrap(); | ||
| 109 | |||
| 110 | // Create multiple events | ||
| 111 | let keys = Keys::generate(); | ||
| 112 | let event1 = EventBuilder::text_note("message 1") | ||
| 113 | .sign_with_keys(&keys) | ||
| 114 | .unwrap(); | ||
| 115 | let event2 = EventBuilder::text_note("message 2") | ||
| 116 | .sign_with_keys(&keys) | ||
| 117 | .unwrap(); | ||
| 118 | |||
| 119 | storage.store_event(event1.clone()).await.unwrap(); | ||
| 120 | storage.store_event(event2.clone()).await.unwrap(); | ||
| 121 | |||
| 122 | // Query all events | ||
| 123 | let all_events = storage.query_events(|_| true).await; | ||
| 124 | assert_eq!(all_events.len(), 2); | ||
| 125 | |||
| 126 | // Query by kind | ||
| 127 | let text_notes = storage | ||
| 128 | .query_events(|e| e.kind == Kind::TextNote) | ||
| 129 | .await; | ||
| 130 | assert_eq!(text_notes.len(), 2); | ||
| 131 | } | ||
| 132 | } | ||