From fa065ad128882755f2a988d6203b59a2ab5e38ff Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 19 Nov 2025 11:55:32 +0000 Subject: add landing page and nostr-relay-builder relay on same port --- src/http/landing.rs | 37 ++++++ src/http/mod.rs | 31 +++++ src/http/websocket.rs | 73 +++++++++++ src/lib.rs | 2 +- src/main.rs | 24 ++-- src/nostr/builder.rs | 121 ++++++++++++++++++ src/nostr/mod.rs | 2 +- src/nostr/relay.rs | 340 -------------------------------------------------- src/storage/mod.rs | 132 -------------------- 9 files changed, 277 insertions(+), 485 deletions(-) create mode 100644 src/http/landing.rs create mode 100644 src/http/mod.rs create mode 100644 src/http/websocket.rs create mode 100644 src/nostr/builder.rs delete mode 100644 src/nostr/relay.rs delete mode 100644 src/storage/mod.rs (limited to 'src') diff --git a/src/http/landing.rs b/src/http/landing.rs new file mode 100644 index 0000000..35e49e5 --- /dev/null +++ b/src/http/landing.rs @@ -0,0 +1,37 @@ +/// Landing Page Handler +/// +/// Serves the HTML landing page or upgrades to WebSocket for Nostr relay connections. + +use actix_web::{web, HttpRequest, HttpResponse, Result}; +use nostr_relay_builder::LocalRelay; + +use crate::config::Config; + +/// Handle landing page or WebSocket upgrade +pub async fn handle( + req: HttpRequest, + stream: web::Payload, + config: web::Data, + relay: web::Data, +) -> Result { + // Check if this is a WebSocket upgrade request + if let Some(upgrade) = req.headers().get("upgrade") { + if upgrade.to_str().unwrap_or("").eq_ignore_ascii_case("websocket") { + // Delegate to WebSocket handler + return crate::http::websocket::handle(req, stream, relay).await; + } + } + + // Otherwise, serve the landing page + let html = format!( + include_str!("../../templates/landing.html"), + relay_name = config.relay_name, + relay_description = config.relay_description, + domain = config.domain, + bind_address = config.bind_address, + ); + + Ok(HttpResponse::Ok() + .content_type("text/html; charset=utf-8") + .body(html)) +} \ No newline at end of file diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..286e8ff --- /dev/null +++ b/src/http/mod.rs @@ -0,0 +1,31 @@ +/// HTTP Server Module +/// +/// Provides actix-web HTTP server with WebSocket upgrade support for the Nostr relay. + +pub mod landing; +pub mod websocket; + +use actix_web::{middleware, web, App, HttpServer}; +use nostr_relay_builder::LocalRelay; + +use crate::config::Config; + +/// Start the HTTP server with integrated Nostr relay +pub async fn run_server(config: Config, relay: LocalRelay) -> anyhow::Result<()> { + let bind_addr = config.bind_address.clone(); + + tracing::info!("Starting HTTP server on {}", bind_addr); + + HttpServer::new(move || { + App::new() + .app_data(web::Data::new(config.clone())) + .app_data(web::Data::new(relay.clone())) + .wrap(middleware::Logger::default()) + .route("/", web::get().to(landing::handle)) + }) + .bind(&bind_addr)? + .run() + .await?; + + Ok(()) +} \ No newline at end of file diff --git a/src/http/websocket.rs b/src/http/websocket.rs new file mode 100644 index 0000000..7af847a --- /dev/null +++ b/src/http/websocket.rs @@ -0,0 +1,73 @@ +/// WebSocket Handler +/// +/// Handles WebSocket upgrade requests and passes connections to the Nostr relay. + +use actix_web::{web, HttpRequest, HttpResponse, Result, Error}; +use actix_ws::Message; +use futures_util::StreamExt; +use nostr_relay_builder::LocalRelay; + +/// Handle WebSocket upgrade and relay connection +pub async fn handle( + req: HttpRequest, + stream: web::Payload, + relay: web::Data, +) -> Result { + let (response, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; + + let peer_addr = req.peer_addr() + .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); + + tracing::debug!("WebSocket connection from {}", peer_addr); + + // Spawn task to handle the WebSocket connection + // TODO: Will use relay.take_connection() for full Nostr relay integration + let _relay = relay.get_ref().clone(); + actix_web::rt::spawn(async move { + // Create a channel to communicate between actix-ws and relay + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn task to send messages from relay to client + let mut session_clone = session.clone(); + actix_web::rt::spawn(async move { + while let Some(msg) = rx.recv().await { + if session_clone.text(msg).await.is_err() { + break; + } + } + }); + + // Handle incoming messages from client + while let Some(Ok(msg)) = msg_stream.next().await { + match msg { + Message::Text(text) => { + // For now, just echo back - will integrate with relay in next phase + tracing::debug!("Received text message: {}", text); + if let Err(e) = tx.send(text.to_string()) { + tracing::error!("Failed to send message: {}", e); + break; + } + } + Message::Binary(_) => { + tracing::warn!("Received unexpected binary message"); + } + Message::Close(_) => { + tracing::debug!("Client closed connection"); + break; + } + Message::Ping(bytes) => { + if session.pong(&bytes).await.is_err() { + break; + } + } + Message::Pong(_) => {} + Message::Continuation(_) => {} + Message::Nop => {} + } + } + + tracing::debug!("WebSocket connection closed for {}", peer_addr); + }); + + Ok(response) +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index f4a8cbf..6460716 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,3 @@ pub mod config; +pub mod http; pub mod nostr; -pub mod storage; diff --git a/src/main.rs b/src/main.rs index 7da4c73..38a3b95 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,8 +3,8 @@ use tracing::{info, Level}; use tracing_subscriber::FmtSubscriber; mod config; +mod http; mod nostr; -mod storage; use config::Config; @@ -16,21 +16,23 @@ async fn main() -> Result<()> { .finish(); tracing::subscriber::set_global_default(subscriber)?; - info!("Starting ngit-grasp..."); + info!("Starting ngit-grasp with nostr-relay-builder..."); // Load configuration let config = Config::from_env()?; info!("Configuration loaded: {}", config.bind_address); - // Initialize storage - let storage = storage::Storage::new(&config)?; - info!("Storage initialized at: {}", config.relay_data_path); - - // Start Nostr relay - let relay = nostr::relay::RelayServer::new(config.clone(), storage)?; - - info!("Starting Nostr relay on {}", config.bind_address); - relay.run().await?; + // Create Nostr relay with NIP-34 validation + if let Ok(relay) = nostr::builder::create_relay(&config) { + info!( + "Relay created with NIP-34 validation for domain: {}", + config.domain + ); + + // Start HTTP server with integrated relay + info!("Starting HTTP server on {}", config.bind_address); + http::run_server(config, relay).await?; + } Ok(()) } diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs new file mode 100644 index 0000000..cd1f4d2 --- /dev/null +++ b/src/nostr/builder.rs @@ -0,0 +1,121 @@ +/// Nostr Relay Builder Configuration +/// +/// This module integrates nostr-relay-builder with NIP-34 validation logic +/// preserved from the original implementation. +use std::net::SocketAddr; +use std::path::Path; + +use nostr::nips::nip19::ToBech32; +use nostr_relay_builder::prelude::*; + +use crate::config::Config; +use crate::nostr::events::{ + validate_announcement, validate_state, KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE, +}; + +/// NIP-34 Write Policy +/// +/// Validates repository announcement and state events according to GRASP-01 spec. +/// Preserves all original validation logic from src/nostr/events.rs. +#[derive(Debug, Clone)] +pub struct Nip34WritePolicy { + domain: String, +} + +impl Nip34WritePolicy { + pub fn new(domain: impl Into) -> Self { + Self { + domain: domain.into(), + } + } +} + +impl WritePolicy for Nip34WritePolicy { + fn admit_event<'a>( + &'a self, + event: &'a nostr_relay_builder::prelude::Event, + _addr: &'a SocketAddr, + ) -> BoxedFuture<'a, PolicyResult> { + Box::pin(async move { + match event.kind.as_u16() { + KIND_REPOSITORY_ANNOUNCEMENT => match validate_announcement(event, &self.domain) { + Ok(_) => { + tracing::debug!( + "Accepted repository announcement: {}", + event + .id + .to_bech32() + .unwrap_or_else(|_| "invalid".to_string()) + ); + PolicyResult::Accept + } + Err(e) => { + tracing::warn!( + "Rejected repository announcement {}: {}", + event + .id + .to_bech32() + .unwrap_or_else(|_| "invalid".to_string()), + e + ); + PolicyResult::Reject(e.to_string()) + } + }, + KIND_REPOSITORY_STATE => match validate_state(event) { + Ok(_) => { + tracing::debug!( + "Accepted repository state: {}", + event + .id + .to_bech32() + .unwrap_or_else(|_| "invalid".to_string()) + ); + PolicyResult::Accept + } + Err(e) => { + tracing::warn!( + "Rejected repository state {}: {}", + event + .id + .to_bech32() + .unwrap_or_else(|_| "invalid".to_string()), + e + ); + PolicyResult::Reject(e.to_string()) + } + }, + // Accept all other event kinds without validation + _ => PolicyResult::Accept, + } + }) + } +} + +/// Create a configured LocalRelay with NIP-34 validation +pub fn create_relay(config: &Config) -> Result { + tracing::info!("Configuring nostr relay..."); + + // Determine database path + let db_path = Path::new(&config.relay_data_path); + + // Create database - using in-memory for now, can switch to persistent later + // TODO: Add configuration for NostrDB or LMDB backends + let database = MemoryDatabase::with_opts(MemoryDatabaseOptions { + events: true, + max_events: Some(100_000), + }); + + tracing::info!("Using in-memory database (path: {})", db_path.display()); + + // Build relay with NIP-34 validation + let builder = RelayBuilder::default() + .database(database) + .write_policy(Nip34WritePolicy::new(&config.domain)); + + tracing::info!( + "Relay configured with NIP-34 validation for domain: {}", + config.domain + ); + + Ok(LocalRelay::new(builder)) +} diff --git a/src/nostr/mod.rs b/src/nostr/mod.rs index b485b91..2bf0346 100644 --- a/src/nostr/mod.rs +++ b/src/nostr/mod.rs @@ -1,2 +1,2 @@ +pub mod builder; pub mod events; -pub mod relay; diff --git a/src/nostr/relay.rs b/src/nostr/relay.rs deleted file mode 100644 index 1033b5b..0000000 --- a/src/nostr/relay.rs +++ /dev/null @@ -1,340 +0,0 @@ -use anyhow::Result; -use futures_util::{SinkExt, StreamExt}; -use nostr_sdk::{Event, EventId, Filter, Kind}; -use serde_json::{json, Value}; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::RwLock; -use tokio_tungstenite::{accept_async, tungstenite::Message}; -use tracing::{debug, error, info, warn}; - -use crate::config::Config; -use crate::nostr::events::{validate_announcement, validate_state, KIND_REPOSITORY_ANNOUNCEMENT, KIND_REPOSITORY_STATE}; -use crate::storage::Storage; - -type Subscriptions = Arc>>>; - -pub struct RelayServer { - config: Config, - storage: Storage, -} - -impl RelayServer { - pub fn new(config: Config, storage: Storage) -> Result { - Ok(RelayServer { config, storage }) - } - - pub async fn run(self) -> Result<()> { - let addr: SocketAddr = self.config.bind_address.parse()?; - let listener = TcpListener::bind(&addr).await?; - - info!("✅ Nostr relay listening on ws://{}", addr); - info!("📡 Ready to accept connections..."); - - loop { - match listener.accept().await { - Ok((stream, peer_addr)) => { - debug!("New connection from: {}", peer_addr); - let storage = self.storage.clone(); - tokio::spawn(async move { - if let Err(e) = handle_connection(stream, storage).await { - error!("Error handling connection from {}: {}", peer_addr, e); - } - }); - } - Err(e) => { - error!("Error accepting connection: {}", e); - } - } - } - } -} - -async fn handle_connection(stream: TcpStream, storage: Storage) -> Result<()> { - let ws_stream = accept_async(stream).await?; - let (mut ws_sender, mut ws_receiver) = ws_stream.split(); - - let subscriptions: Subscriptions = Arc::new(RwLock::new(HashMap::new())); - - while let Some(msg) = ws_receiver.next().await { - match msg { - Ok(Message::Text(text)) => { - debug!("Received message: {}", text); - - match handle_message(&text, &storage, &subscriptions).await { - Ok(responses) => { - for response in responses { - let response_text = serde_json::to_string(&response)?; - debug!("Sending response: {}", response_text); - ws_sender.send(Message::Text(response_text)).await?; - } - } - Err(e) => { - warn!("Error handling message: {}", e); - let notice = json!(["NOTICE", format!("Error: {}", e)]); - ws_sender.send(Message::Text(notice.to_string())).await?; - } - } - } - Ok(Message::Close(_)) => { - debug!("Client closed connection"); - break; - } - Ok(Message::Ping(data)) => { - ws_sender.send(Message::Pong(data)).await?; - } - Ok(_) => { - // Ignore other message types - } - Err(e) => { - error!("WebSocket error: {}", e); - break; - } - } - } - - Ok(()) -} - -async fn handle_message( - text: &str, - storage: &Storage, - subscriptions: &Subscriptions, -) -> Result> { - let msg: Value = serde_json::from_str(text)?; - - if let Some(arr) = msg.as_array() { - if arr.is_empty() { - return Ok(vec![json!(["NOTICE", "Empty message"])]); - } - - let msg_type = arr[0].as_str().unwrap_or(""); - - match msg_type { - "EVENT" => handle_event(arr, storage).await, - "REQ" => handle_req(arr, storage, subscriptions).await, - "CLOSE" => handle_close(arr, subscriptions).await, - _ => Ok(vec![json!(["NOTICE", format!("Unknown message type: {}", msg_type)])]), - } - } else { - Ok(vec![json!(["NOTICE", "Invalid message format"])]) - } -} - -async fn handle_event(arr: &[Value], storage: &Storage) -> Result> { - if arr.len() < 2 { - return Ok(vec![json!(["NOTICE", "EVENT message requires event object"])]); - } - - let event: Event = serde_json::from_value(arr[1].clone())?; - let event_id = event.id; - - // Verify event (signature and ID) - if event.verify().is_err() { - return Ok(vec![json!(["OK", event_id.to_hex(), false, "invalid: signature or ID verification failed"])]); - } - - // Check if event already exists - if storage.get_event(&event_id.to_hex()).await.is_some() { - return Ok(vec![json!(["OK", event_id.to_hex(), true, "duplicate: event already exists"])]); - } - - // Validate repository announcements (kind 30617) - if event.kind == Kind::from(KIND_REPOSITORY_ANNOUNCEMENT) { - // Get domain from storage config - let domain = storage.get_domain(); - - match validate_announcement(&event, &domain) { - Ok(()) => { - info!("✅ Valid repository announcement: {} ({})", event_id, event.kind); - } - Err(e) => { - warn!("❌ Invalid repository announcement: {}", e); - return Ok(vec![json!(["OK", event_id.to_hex(), false, format!("invalid: {}", e)])]); - } - } - } - - // Validate repository state announcements (kind 30618) - if event.kind == Kind::from(KIND_REPOSITORY_STATE) { - match validate_state(&event) { - Ok(()) => { - info!("✅ Valid repository state: {} ({})", event_id, event.kind); - } - Err(e) => { - warn!("❌ Invalid repository state: {}", e); - return Ok(vec![json!(["OK", event_id.to_hex(), false, format!("invalid: {}", e)])]); - } - } - } - - // Store the event - storage.store_event(event.clone()).await?; - - info!("✅ Stored event: {} (kind: {})", event_id, event.kind); - - Ok(vec![json!(["OK", event_id.to_hex(), true, ""])]) -} - -async fn handle_req( - arr: &[Value], - storage: &Storage, - subscriptions: &Subscriptions, -) -> Result> { - if arr.len() < 2 { - return Ok(vec![json!(["NOTICE", "REQ message requires subscription ID"])]); - } - - let sub_id = arr[1].as_str().ok_or_else(|| anyhow::anyhow!("Invalid subscription ID"))?; - - // Parse filters - let mut filters = Vec::new(); - for filter_value in &arr[2..] { - let filter: Filter = serde_json::from_value(filter_value.clone())?; - filters.push(filter.clone()); - } - - // Store subscription - { - let mut subs = subscriptions.write().await; - subs.insert(sub_id.to_string(), filters.clone()); - } - - debug!("Created subscription: {} with {} filters", sub_id, filters.len()); - - // Query and send matching events - let mut responses = Vec::new(); - - for filter in filters { - let events = storage.query_events(|event| { - matches_filter(event, &filter) - }).await; - - for event in events { - responses.push(json!(["EVENT", sub_id, event])); - } - } - - // Send EOSE (End of Stored Events) - responses.push(json!(["EOSE", sub_id])); - - debug!("Subscription {} returned {} events", sub_id, responses.len() - 1); - - Ok(responses) -} - -async fn handle_close(arr: &[Value], subscriptions: &Subscriptions) -> Result> { - if arr.len() < 2 { - return Ok(vec![json!(["NOTICE", "CLOSE message requires subscription ID"])]); - } - - let sub_id = arr[1].as_str().ok_or_else(|| anyhow::anyhow!("Invalid subscription ID"))?; - - { - let mut subs = subscriptions.write().await; - subs.remove(sub_id); - } - - debug!("Closed subscription: {}", sub_id); - - Ok(vec![]) -} - -fn matches_filter(event: &Event, filter: &Filter) -> bool { - // Check IDs - if let Some(ref ids) = filter.ids { - if !ids.is_empty() && !ids.contains(&event.id) { - return false; - } - } - - // Check authors - if let Some(ref authors) = filter.authors { - if !authors.is_empty() && !authors.contains(&event.pubkey) { - return false; - } - } - - // Check kinds - if let Some(ref kinds) = filter.kinds { - if !kinds.is_empty() && !kinds.contains(&event.kind) { - return false; - } - } - - // Check since - if let Some(since) = filter.since { - if event.created_at < since { - return false; - } - } - - // Check until - if let Some(until) = filter.until { - if event.created_at > until { - return false; - } - } - - // TODO: Check tags (#e, #p, etc.) - - true -} - -#[cfg(test)] -mod tests { - use super::*; - use nostr_sdk::{EventBuilder, Keys, Kind}; - - #[test] - fn test_matches_filter_by_id() { - let keys = Keys::generate(); - let event = EventBuilder::text_note("test") - .sign_with_keys(&keys) - .unwrap(); - - // Filter matching the event ID - let filter = Filter::new().id(event.id); - assert!(matches_filter(&event, &filter)); - - // Filter not matching - let other_id = EventId::all_zeros(); - let filter = Filter::new().id(other_id); - assert!(!matches_filter(&event, &filter)); - } - - #[test] - fn test_matches_filter_by_author() { - let keys = Keys::generate(); - let event = EventBuilder::text_note("test") - .sign_with_keys(&keys) - .unwrap(); - - // Filter matching the author - let filter = Filter::new().author(keys.public_key()); - assert!(matches_filter(&event, &filter)); - - // Filter not matching - let other_keys = Keys::generate(); - let filter = Filter::new().author(other_keys.public_key()); - assert!(!matches_filter(&event, &filter)); - } - - #[test] - fn test_matches_filter_by_kind() { - let keys = Keys::generate(); - let event = EventBuilder::text_note("test") - .sign_with_keys(&keys) - .unwrap(); - - // Filter matching the kind - let filter = Filter::new().kind(Kind::TextNote); - assert!(matches_filter(&event, &filter)); - - // Filter not matching - let filter = Filter::new().kind(Kind::Metadata); - assert!(!matches_filter(&event, &filter)); - } -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs deleted file mode 100644 index eab8211..0000000 --- a/src/storage/mod.rs +++ /dev/null @@ -1,132 +0,0 @@ -use anyhow::Result; -use nostr_sdk::Event; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -use crate::config::Config; - -/// Simple in-memory storage for events -/// TODO: Persist to disk for production use -#[derive(Clone)] -pub struct Storage { - events: Arc>>, - data_path: String, - domain: String, -} - -impl Storage { - pub fn new(config: &Config) -> Result { - // Create data directory if it doesn't exist - std::fs::create_dir_all(&config.relay_data_path)?; - - Ok(Storage { - events: Arc::new(RwLock::new(HashMap::new())), - data_path: config.relay_data_path.clone(), - domain: config.domain.clone(), - }) - } - - pub fn get_domain(&self) -> String { - self.domain.clone() - } - - pub async fn store_event(&self, event: Event) -> Result<()> { - let mut events = self.events.write().await; - events.insert(event.id.to_hex(), event); - Ok(()) - } - - pub async fn get_event(&self, event_id: &str) -> Option { - let events = self.events.read().await; - events.get(event_id).cloned() - } - - pub async fn query_events(&self, filter: F) -> Vec - where - F: Fn(&Event) -> bool, - { - let events = self.events.read().await; - events.values().filter(|e| filter(e)).cloned().collect() - } - - pub async fn count_events(&self) -> usize { - let events = self.events.read().await; - events.len() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use nostr_sdk::{EventBuilder, Keys, Kind}; - - #[tokio::test] - async fn test_store_and_retrieve() { - let config = Config { - domain: "test".to_string(), - owner_npub: "npub1test".to_string(), - relay_name: "test".to_string(), - relay_description: "test".to_string(), - git_data_path: "./test_data/git".to_string(), - relay_data_path: "./test_data/relay".to_string(), - bind_address: "127.0.0.1:8080".to_string(), - }; - - let storage = Storage::new(&config).unwrap(); - - // Create a test event - let keys = Keys::generate(); - let event = EventBuilder::text_note("test content") - .sign_with_keys(&keys) - .unwrap(); - - // Store it - storage.store_event(event.clone()).await.unwrap(); - - // Retrieve it - let retrieved = storage.get_event(&event.id.to_hex()).await; - assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap().id, event.id); - - // Count events - assert_eq!(storage.count_events().await, 1); - } - - #[tokio::test] - async fn test_query_events() { - let config = Config { - domain: "test".to_string(), - owner_npub: "npub1test".to_string(), - relay_name: "test".to_string(), - relay_description: "test".to_string(), - git_data_path: "./test_data/git".to_string(), - relay_data_path: "./test_data/relay".to_string(), - bind_address: "127.0.0.1:8080".to_string(), - }; - - let storage = Storage::new(&config).unwrap(); - - // Create multiple events - let keys = Keys::generate(); - let event1 = EventBuilder::text_note("message 1") - .sign_with_keys(&keys) - .unwrap(); - let event2 = EventBuilder::text_note("message 2") - .sign_with_keys(&keys) - .unwrap(); - - storage.store_event(event1.clone()).await.unwrap(); - storage.store_event(event2.clone()).await.unwrap(); - - // Query all events - let all_events = storage.query_events(|_| true).await; - assert_eq!(all_events.len(), 2); - - // Query by kind - let text_notes = storage - .query_events(|e| e.kind == Kind::TextNote) - .await; - assert_eq!(text_notes.len(), 2); - } -} -- cgit v1.2.3