1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
/// HTTP Server Module
///
/// Provides hyper HTTP server with WebSocket upgrade support for the Nostr relay.
pub mod landing;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use hyper::body::Incoming;
use hyper::header::{CONNECTION, SEC_WEBSOCKET_ACCEPT, UPGRADE};
use hyper::server::conn::http1;
use hyper::service::Service;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use nostr_sdk::hashes::sha1::Hash as Sha1Hash;
use nostr_sdk::hashes::{Hash, HashEngine};
use nostr_relay_builder::LocalRelay;
use tokio::net::TcpListener;
use base64::Engine;
use crate::config::Config;
/// HTTP Service that serves both WebSocket (relay) and HTML landing page
struct HttpService {
relay: LocalRelay,
config: Config,
remote: SocketAddr,
}
impl HttpService {
fn new(relay: LocalRelay, config: Config, remote: SocketAddr) -> Self {
Self {
relay,
config,
remote,
}
}
}
impl Service<Request<Incoming>> for HttpService {
type Response = Response<String>;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
let base = Response::builder().header("server", "ngit-grasp");
// Check if this is a WebSocket upgrade request
if let (Some(c), Some(w)) = (
req.headers().get("connection"),
req.headers().get("upgrade"),
) {
if c.to_str()
.map(|s| s.to_lowercase() == "upgrade")
.unwrap_or(false)
&& w.to_str()
.map(|s| s.to_lowercase() == "websocket")
.unwrap_or(false)
{
let key = req.headers().get("sec-websocket-key");
let derived = key.map(|k| derive_accept_key(k.as_bytes()));
let addr = self.remote;
let relay = self.relay.clone();
tokio::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
tracing::info!("WebSocket connection established from {}", addr);
if let Err(e) = relay.take_connection(TokioIo::new(upgraded), addr).await
{
tracing::error!("Relay error for {}: {}", addr, e);
}
tracing::info!("WebSocket connection closed for {}", addr);
}
Err(e) => tracing::error!("Upgrade error: {}", e),
}
});
return Box::pin(async move {
Ok(base
.status(101)
.header(CONNECTION, "upgrade")
.header(UPGRADE, "websocket")
.header(SEC_WEBSOCKET_ACCEPT, derived.unwrap())
.body("".to_string())
.unwrap())
});
}
}
// Serve landing page for HTTP requests
let html = landing::get_html(&self.config);
Box::pin(async move {
Ok(base
.status(200)
.header("content-type", "text/html; charset=utf-8")
.body(html)
.unwrap())
})
}
}
/// Derive the `Sec-WebSocket-Accept` response header from a `Sec-WebSocket-Key` request header
fn derive_accept_key(request_key: &[u8]) -> String {
const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
let mut engine = Sha1Hash::engine();
engine.input(request_key);
engine.input(WS_GUID);
let hash: Sha1Hash = Sha1Hash::from_engine(engine);
base64::prelude::BASE64_STANDARD.encode(hash)
}
/// Start the HTTP server with integrated Nostr relay
pub async fn run_server(config: Config, relay: LocalRelay) -> anyhow::Result<()> {
let bind_addr: SocketAddr = config.bind_address.parse()?;
tracing::info!("Starting HTTP server on {}", bind_addr);
tracing::info!("Relay name: {}", config.relay_name);
tracing::info!("Domain: {}", config.domain);
let listener = TcpListener::bind(&bind_addr).await?;
loop {
let (socket, addr) = listener.accept().await?;
let io = TokioIo::new(socket);
let service = HttpService::new(relay.clone(), config.clone(), addr);
tokio::spawn(async move {
if let Err(e) = http1::Builder::new()
.serve_connection(io, service)
.with_upgrades()
.await
{
tracing::error!("Failed to handle request from {}: {}", addr, e);
}
});
}
}
|