diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/http/landing.rs | 35 | ||||
| -rw-r--r-- | src/http/mod.rs | 142 | ||||
| -rw-r--r-- | src/http/websocket.rs | 73 |
3 files changed, 131 insertions, 119 deletions
diff --git a/src/http/landing.rs b/src/http/landing.rs index 976ec50..55ffb26 100644 --- a/src/http/landing.rs +++ b/src/http/landing.rs | |||
| @@ -1,40 +1,15 @@ | |||
| 1 | /// Landing Page Handler | 1 | /// Landing Page Handler |
| 2 | /// | 2 | /// |
| 3 | /// Serves the HTML landing page or upgrades to WebSocket for Nostr relay connections. | 3 | /// Generates HTML landing page for the Nostr relay. |
| 4 | use actix_web::{web, HttpRequest, HttpResponse, Result}; | ||
| 5 | use nostr_relay_builder::LocalRelay; | ||
| 6 | |||
| 7 | use crate::config::Config; | 4 | use crate::config::Config; |
| 8 | 5 | ||
| 9 | /// Handle landing page or WebSocket upgrade | 6 | /// Generate the HTML landing page |
| 10 | pub async fn handle( | 7 | pub fn get_html(config: &Config) -> String { |
| 11 | req: HttpRequest, | 8 | format!( |
| 12 | stream: web::Payload, | ||
| 13 | config: web::Data<Config>, | ||
| 14 | relay: web::Data<LocalRelay>, | ||
| 15 | ) -> Result<HttpResponse> { | ||
| 16 | // Check if this is a WebSocket upgrade request | ||
| 17 | if let Some(upgrade) = req.headers().get("upgrade") { | ||
| 18 | if upgrade | ||
| 19 | .to_str() | ||
| 20 | .unwrap_or("") | ||
| 21 | .eq_ignore_ascii_case("websocket") | ||
| 22 | { | ||
| 23 | // Delegate to WebSocket handler | ||
| 24 | return crate::http::websocket::handle(req, stream, relay).await; | ||
| 25 | } | ||
| 26 | } | ||
| 27 | |||
| 28 | // Otherwise, serve the landing page | ||
| 29 | let html = format!( | ||
| 30 | include_str!("../../templates/landing.html"), | 9 | include_str!("../../templates/landing.html"), |
| 31 | relay_name = config.relay_name, | 10 | relay_name = config.relay_name, |
| 32 | relay_description = config.relay_description, | 11 | relay_description = config.relay_description, |
| 33 | domain = config.domain, | 12 | domain = config.domain, |
| 34 | bind_address = config.bind_address, | 13 | bind_address = config.bind_address, |
| 35 | ); | 14 | ) |
| 36 | |||
| 37 | Ok(HttpResponse::Ok() | ||
| 38 | .content_type("text/html; charset=utf-8") | ||
| 39 | .body(html)) | ||
| 40 | } | 15 | } |
diff --git a/src/http/mod.rs b/src/http/mod.rs index b434c69..4690790 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs | |||
| @@ -1,30 +1,140 @@ | |||
| 1 | /// HTTP Server Module | 1 | /// HTTP Server Module |
| 2 | /// | 2 | /// |
| 3 | /// Provides actix-web HTTP server with WebSocket upgrade support for the Nostr relay. | 3 | /// Provides hyper HTTP server with WebSocket upgrade support for the Nostr relay. |
| 4 | pub mod landing; | 4 | pub mod landing; |
| 5 | pub mod websocket; | ||
| 6 | 5 | ||
| 7 | use actix_web::{middleware, web, App, HttpServer}; | 6 | use std::future::Future; |
| 7 | use std::net::SocketAddr; | ||
| 8 | use std::pin::Pin; | ||
| 9 | |||
| 10 | use hyper::body::Incoming; | ||
| 11 | use hyper::header::{CONNECTION, SEC_WEBSOCKET_ACCEPT, UPGRADE}; | ||
| 12 | use hyper::server::conn::http1; | ||
| 13 | use hyper::service::Service; | ||
| 14 | use hyper::{Request, Response}; | ||
| 15 | use hyper_util::rt::TokioIo; | ||
| 16 | use nostr_sdk::hashes::sha1::Hash as Sha1Hash; | ||
| 17 | use nostr_sdk::hashes::{Hash, HashEngine}; | ||
| 8 | use nostr_relay_builder::LocalRelay; | 18 | use nostr_relay_builder::LocalRelay; |
| 19 | use tokio::net::TcpListener; | ||
| 20 | use base64::Engine; | ||
| 9 | 21 | ||
| 10 | use crate::config::Config; | 22 | use crate::config::Config; |
| 11 | 23 | ||
| 24 | /// HTTP Service that serves both WebSocket (relay) and HTML landing page | ||
| 25 | struct HttpService { | ||
| 26 | relay: LocalRelay, | ||
| 27 | config: Config, | ||
| 28 | remote: SocketAddr, | ||
| 29 | } | ||
| 30 | |||
| 31 | impl HttpService { | ||
| 32 | fn new(relay: LocalRelay, config: Config, remote: SocketAddr) -> Self { | ||
| 33 | Self { | ||
| 34 | relay, | ||
| 35 | config, | ||
| 36 | remote, | ||
| 37 | } | ||
| 38 | } | ||
| 39 | } | ||
| 40 | |||
| 41 | impl Service<Request<Incoming>> for HttpService { | ||
| 42 | type Response = Response<String>; | ||
| 43 | type Error = String; | ||
| 44 | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; | ||
| 45 | |||
| 46 | fn call(&self, req: Request<Incoming>) -> Self::Future { | ||
| 47 | let base = Response::builder().header("server", "ngit-grasp"); | ||
| 48 | |||
| 49 | // Check if this is a WebSocket upgrade request | ||
| 50 | if let (Some(c), Some(w)) = ( | ||
| 51 | req.headers().get("connection"), | ||
| 52 | req.headers().get("upgrade"), | ||
| 53 | ) { | ||
| 54 | if c.to_str() | ||
| 55 | .map(|s| s.to_lowercase() == "upgrade") | ||
| 56 | .unwrap_or(false) | ||
| 57 | && w.to_str() | ||
| 58 | .map(|s| s.to_lowercase() == "websocket") | ||
| 59 | .unwrap_or(false) | ||
| 60 | { | ||
| 61 | let key = req.headers().get("sec-websocket-key"); | ||
| 62 | let derived = key.map(|k| derive_accept_key(k.as_bytes())); | ||
| 63 | |||
| 64 | let addr = self.remote; | ||
| 65 | let relay = self.relay.clone(); | ||
| 66 | |||
| 67 | tokio::spawn(async move { | ||
| 68 | match hyper::upgrade::on(req).await { | ||
| 69 | Ok(upgraded) => { | ||
| 70 | tracing::info!("WebSocket connection established from {}", addr); | ||
| 71 | if let Err(e) = relay.take_connection(TokioIo::new(upgraded), addr).await | ||
| 72 | { | ||
| 73 | tracing::error!("Relay error for {}: {}", addr, e); | ||
| 74 | } | ||
| 75 | tracing::info!("WebSocket connection closed for {}", addr); | ||
| 76 | } | ||
| 77 | Err(e) => tracing::error!("Upgrade error: {}", e), | ||
| 78 | } | ||
| 79 | }); | ||
| 80 | |||
| 81 | return Box::pin(async move { | ||
| 82 | Ok(base | ||
| 83 | .status(101) | ||
| 84 | .header(CONNECTION, "upgrade") | ||
| 85 | .header(UPGRADE, "websocket") | ||
| 86 | .header(SEC_WEBSOCKET_ACCEPT, derived.unwrap()) | ||
| 87 | .body("".to_string()) | ||
| 88 | .unwrap()) | ||
| 89 | }); | ||
| 90 | } | ||
| 91 | } | ||
| 92 | |||
| 93 | // Serve landing page for HTTP requests | ||
| 94 | let html = landing::get_html(&self.config); | ||
| 95 | Box::pin(async move { | ||
| 96 | Ok(base | ||
| 97 | .status(200) | ||
| 98 | .header("content-type", "text/html; charset=utf-8") | ||
| 99 | .body(html) | ||
| 100 | .unwrap()) | ||
| 101 | }) | ||
| 102 | } | ||
| 103 | } | ||
| 104 | |||
| 105 | /// Derive the `Sec-WebSocket-Accept` response header from a `Sec-WebSocket-Key` request header | ||
| 106 | fn derive_accept_key(request_key: &[u8]) -> String { | ||
| 107 | const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | ||
| 108 | let mut engine = Sha1Hash::engine(); | ||
| 109 | engine.input(request_key); | ||
| 110 | engine.input(WS_GUID); | ||
| 111 | let hash: Sha1Hash = Sha1Hash::from_engine(engine); | ||
| 112 | base64::prelude::BASE64_STANDARD.encode(hash) | ||
| 113 | } | ||
| 114 | |||
| 12 | /// Start the HTTP server with integrated Nostr relay | 115 | /// Start the HTTP server with integrated Nostr relay |
| 13 | pub async fn run_server(config: Config, relay: LocalRelay) -> anyhow::Result<()> { | 116 | pub async fn run_server(config: Config, relay: LocalRelay) -> anyhow::Result<()> { |
| 14 | let bind_addr = config.bind_address.clone(); | 117 | let bind_addr: SocketAddr = config.bind_address.parse()?; |
| 15 | 118 | ||
| 16 | tracing::info!("Starting HTTP server on {}", bind_addr); | 119 | tracing::info!("Starting HTTP server on {}", bind_addr); |
| 120 | tracing::info!("Relay name: {}", config.relay_name); | ||
| 121 | tracing::info!("Domain: {}", config.domain); | ||
| 17 | 122 | ||
| 18 | HttpServer::new(move || { | 123 | let listener = TcpListener::bind(&bind_addr).await?; |
| 19 | App::new() | 124 | |
| 20 | .app_data(web::Data::new(config.clone())) | 125 | loop { |
| 21 | .app_data(web::Data::new(relay.clone())) | 126 | let (socket, addr) = listener.accept().await?; |
| 22 | .wrap(middleware::Logger::default()) | 127 | let io = TokioIo::new(socket); |
| 23 | .route("/", web::get().to(landing::handle)) | 128 | let service = HttpService::new(relay.clone(), config.clone(), addr); |
| 24 | }) | 129 | |
| 25 | .bind(&bind_addr)? | 130 | tokio::spawn(async move { |
| 26 | .run() | 131 | if let Err(e) = http1::Builder::new() |
| 27 | .await?; | 132 | .serve_connection(io, service) |
| 28 | 133 | .with_upgrades() | |
| 29 | Ok(()) | 134 | .await |
| 135 | { | ||
| 136 | tracing::error!("Failed to handle request from {}: {}", addr, e); | ||
| 137 | } | ||
| 138 | }); | ||
| 139 | } | ||
| 30 | } | 140 | } |
diff --git a/src/http/websocket.rs b/src/http/websocket.rs deleted file mode 100644 index 0171013..0000000 --- a/src/http/websocket.rs +++ /dev/null | |||
| @@ -1,73 +0,0 @@ | |||
| 1 | /// WebSocket Handler | ||
| 2 | /// | ||
| 3 | /// Handles WebSocket upgrade requests and passes connections to the Nostr relay. | ||
| 4 | use actix_web::{web, Error, HttpRequest, HttpResponse, Result}; | ||
| 5 | use actix_ws::Message; | ||
| 6 | use futures_util::StreamExt; | ||
| 7 | use nostr_relay_builder::LocalRelay; | ||
| 8 | |||
| 9 | /// Handle WebSocket upgrade and relay connection | ||
| 10 | pub async fn handle( | ||
| 11 | req: HttpRequest, | ||
| 12 | stream: web::Payload, | ||
| 13 | relay: web::Data<LocalRelay>, | ||
| 14 | ) -> Result<HttpResponse, Error> { | ||
| 15 | let (response, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; | ||
| 16 | |||
| 17 | let peer_addr = req | ||
| 18 | .peer_addr() | ||
| 19 | .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); | ||
| 20 | |||
| 21 | tracing::debug!("WebSocket connection from {}", peer_addr); | ||
| 22 | |||
| 23 | // Spawn task to handle the WebSocket connection | ||
| 24 | // TODO: Will use relay.take_connection() for full Nostr relay integration | ||
| 25 | let _relay = relay.get_ref().clone(); | ||
| 26 | actix_web::rt::spawn(async move { | ||
| 27 | // Create a channel to communicate between actix-ws and relay | ||
| 28 | let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); | ||
| 29 | |||
| 30 | // Spawn task to send messages from relay to client | ||
| 31 | let mut session_clone = session.clone(); | ||
| 32 | actix_web::rt::spawn(async move { | ||
| 33 | while let Some(msg) = rx.recv().await { | ||
| 34 | if session_clone.text(msg).await.is_err() { | ||
| 35 | break; | ||
| 36 | } | ||
| 37 | } | ||
| 38 | }); | ||
| 39 | |||
| 40 | // Handle incoming messages from client | ||
| 41 | while let Some(Ok(msg)) = msg_stream.next().await { | ||
| 42 | match msg { | ||
| 43 | Message::Text(text) => { | ||
| 44 | // For now, just echo back - will integrate with relay in next phase | ||
| 45 | tracing::debug!("Received text message: {}", text); | ||
| 46 | if let Err(e) = tx.send(text.to_string()) { | ||
| 47 | tracing::error!("Failed to send message: {}", e); | ||
| 48 | break; | ||
| 49 | } | ||
| 50 | } | ||
| 51 | Message::Binary(_) => { | ||
| 52 | tracing::warn!("Received unexpected binary message"); | ||
| 53 | } | ||
| 54 | Message::Close(_) => { | ||
| 55 | tracing::debug!("Client closed connection"); | ||
| 56 | break; | ||
| 57 | } | ||
| 58 | Message::Ping(bytes) => { | ||
| 59 | if session.pong(&bytes).await.is_err() { | ||
| 60 | break; | ||
| 61 | } | ||
| 62 | } | ||
| 63 | Message::Pong(_) => {} | ||
| 64 | Message::Continuation(_) => {} | ||
| 65 | Message::Nop => {} | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | tracing::debug!("WebSocket connection closed for {}", peer_addr); | ||
| 70 | }); | ||
| 71 | |||
| 72 | Ok(response) | ||
| 73 | } | ||