upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/lib/transport.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2026-02-25 11:21:45 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2026-02-25 11:48:13 +0000
commit319bb7fa2c984da45422fa81d12b4a5226beb493 (patch)
treebb05a39180cf0513434a0cdd63fd25b4788066c5 /src/lib/transport.rs
parent3a17877bcdabc9d6721e3054c2bb07a892f32147 (diff)
fix IPv6 connection failures with Happy Eyeballs (RFC 8305)
Implement a custom WebSocketTransport that races IPv6 and IPv4 connections with a 250ms head start for IPv6, matching browser behavior. This prevents broken IPv6 from blocking all relay connections indefinitely. This is a temporary fix until the upstream async-wsocket PR is merged: https://github.com/shadowylab/async-wsocket/pull/42
Diffstat (limited to 'src/lib/transport.rs')
-rw-r--r--src/lib/transport.rs358
1 files changed, 358 insertions, 0 deletions
diff --git a/src/lib/transport.rs b/src/lib/transport.rs
new file mode 100644
index 0000000..779d58d
--- /dev/null
+++ b/src/lib/transport.rs
@@ -0,0 +1,358 @@
1// TEMPORARY: Delete this entire file when async-wsocket merges Happy Eyeballs
2// support and the rust-nostr version we use includes it.
3// Upstream PR: https://github.com/shadowylab/async-wsocket/pull/42
4//
5// When removing this file, also:
6// 1. Remove `pub mod transport;` from src/lib/mod.rs
7// 2. Remove `.websocket_transport(HappyEyeballsTransport)` calls from
8// src/lib/client.rs
9// 3. Remove the `use crate::transport::HappyEyeballsTransport;` import from
10// src/lib/client.rs
11// 4. Remove `async-wsocket` and `tokio-tungstenite` from Cargo.toml direct
12// dependencies
13//
14// Interim Happy Eyeballs (RFC 8305) WebSocket transport for nostr-sdk.
15//
16// The default async-wsocket transport uses
17// `tokio::net::TcpStream::connect(host:port)`, which resolves DNS and tries
18// addresses sequentially. If IPv6 is returned first but is broken (hangs), the
19// entire connection timeout is consumed before IPv4 is attempted.
20//
21// This module implements a custom `WebSocketTransport` that, for direct
22// connections, resolves DNS and races IPv6/IPv4 with a 250ms head start for
23// IPv6 (per RFC 8305).
24
25use std::{
26 fmt,
27 net::SocketAddr,
28 pin::Pin,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use async_wsocket::{ConnectionMode, Message, WebSocket};
34use futures::{Sink, SinkExt, TryStreamExt, stream::StreamExt};
35use nostr::{Url, util::BoxedFuture};
36use nostr_relay_pool::transport::{
37 error::TransportError,
38 websocket::{WebSocketSink, WebSocketStream, WebSocketTransport},
39};
40use tokio::net::TcpStream;
41use tokio_tungstenite::tungstenite::protocol::Message as TungsteniteMessage;
42
43/// Delay before starting IPv4 attempts when IPv6 addresses are available (RFC
44/// 8305).
45const HAPPY_EYEBALLS_DELAY: Duration = Duration::from_millis(250);
46
47/// Custom WebSocket transport implementing Happy Eyeballs (RFC 8305) for
48/// IPv6/IPv4 fallback.
49#[derive(Debug, Clone, Copy, Default)]
50pub struct HappyEyeballsTransport;
51
52impl WebSocketTransport for HappyEyeballsTransport {
53 fn support_ping(&self) -> bool {
54 true
55 }
56
57 fn connect<'a>(
58 &'a self,
59 url: &'a Url,
60 mode: &'a ConnectionMode,
61 timeout: Duration,
62 ) -> BoxedFuture<'a, Result<(WebSocketSink, WebSocketStream), TransportError>> {
63 Box::pin(async move {
64 match mode {
65 ConnectionMode::Direct => connect_happy_eyeballs(url, timeout).await,
66 // For proxy/tor modes, delegate to the default implementation.
67 _ => connect_default(url, mode, timeout).await,
68 }
69 })
70 }
71}
72
73/// Fallback to the default async-wsocket connection for non-direct modes.
74async fn connect_default(
75 url: &Url,
76 mode: &ConnectionMode,
77 timeout: Duration,
78) -> Result<(WebSocketSink, WebSocketStream), TransportError> {
79 let socket: WebSocket = WebSocket::connect(url, mode, timeout)
80 .await
81 .map_err(TransportError::backend)?;
82 let (tx, rx) = socket.split();
83 let sink: WebSocketSink = Box::new(DefaultSinkAdapter(tx)) as WebSocketSink;
84 let stream: WebSocketStream = Box::pin(rx.map_err(TransportError::backend)) as WebSocketStream;
85 Ok((sink, stream))
86}
87
88/// Connect using Happy Eyeballs: resolve DNS, race IPv6 (with 250ms head start)
89/// against IPv4, then perform TLS + WebSocket handshake on the winning TCP
90/// connection.
91async fn connect_happy_eyeballs(
92 url: &Url,
93 timeout: Duration,
94) -> Result<(WebSocketSink, WebSocketStream), TransportError> {
95 tokio::time::timeout(timeout, connect_happy_eyeballs_inner(url))
96 .await
97 .map_err(|_| {
98 TransportError::backend(std::io::Error::new(
99 std::io::ErrorKind::TimedOut,
100 "connection timed out",
101 ))
102 })?
103}
104
105async fn connect_happy_eyeballs_inner(
106 url: &Url,
107) -> Result<(WebSocketSink, WebSocketStream), TransportError> {
108 let host = url
109 .host_str()
110 .ok_or_else(|| TransportError::backend(IoError("missing host in URL")))?;
111
112 let default_port = match url.scheme() {
113 "wss" => 443,
114 "ws" => 80,
115 _ => 80,
116 };
117 let port = url.port().unwrap_or(default_port);
118
119 // Resolve DNS
120 let addr_str = format!("{host}:{port}");
121 let addrs: Vec<SocketAddr> = tokio::net::lookup_host(&addr_str)
122 .await
123 .map_err(TransportError::backend)?
124 .collect();
125
126 if addrs.is_empty() {
127 return Err(TransportError::backend(IoError(
128 "DNS resolution returned no addresses",
129 )));
130 }
131
132 // Separate into IPv6 and IPv4
133 let mut ipv6_addrs: Vec<SocketAddr> = Vec::new();
134 let mut ipv4_addrs: Vec<SocketAddr> = Vec::new();
135 for addr in addrs {
136 if addr.is_ipv6() {
137 ipv6_addrs.push(addr);
138 } else {
139 ipv4_addrs.push(addr);
140 }
141 }
142
143 // Happy Eyeballs: try to connect via TCP
144 let tcp_stream = happy_eyeballs_tcp(&ipv6_addrs, &ipv4_addrs).await?;
145
146 // Build the WebSocket request URI from the URL
147 let request_uri = url.as_str().to_string();
148
149 // Perform TLS (if wss) + WebSocket handshake
150 let (ws_stream, _response) = tokio_tungstenite::client_async_tls(&request_uri, tcp_stream)
151 .await
152 .map_err(TransportError::backend)?;
153
154 // Split into sink + stream
155 let (native_sink, native_stream) = ws_stream.split();
156
157 // Wrap the sink: async_wsocket::Message -> tungstenite::Message (From impl
158 // exists)
159 let sink: WebSocketSink = Box::new(NativeSinkAdapter(native_sink)) as WebSocketSink;
160
161 // Wrap the stream: tungstenite::Message -> async_wsocket::Message (manual
162 // conversion)
163 let stream: WebSocketStream = Box::pin(native_stream.map(|result| {
164 result
165 .map(tungstenite_to_async_wsocket)
166 .map_err(TransportError::backend)
167 })) as WebSocketStream;
168
169 Ok((sink, stream))
170}
171
172/// Happy Eyeballs TCP connection algorithm.
173///
174/// If both IPv6 and IPv4 addresses are available, starts IPv6 first. If IPv6
175/// doesn't connect within 250ms, starts IPv4 in parallel. Returns whichever
176/// connects first.
177async fn happy_eyeballs_tcp(
178 ipv6_addrs: &[SocketAddr],
179 ipv4_addrs: &[SocketAddr],
180) -> Result<TcpStream, TransportError> {
181 match (ipv6_addrs.is_empty(), ipv4_addrs.is_empty()) {
182 // Only IPv4
183 (true, false) => try_connect_addrs(ipv4_addrs).await,
184 // Only IPv6
185 (false, true) => try_connect_addrs(ipv6_addrs).await,
186 // Both available: race with head start for IPv6
187 (false, false) => {
188 // Start IPv6 attempt
189 let ipv6_fut = try_connect_addrs(ipv6_addrs);
190 tokio::pin!(ipv6_fut);
191
192 // Give IPv6 a 250ms head start
193 let delay = tokio::time::sleep(HAPPY_EYEBALLS_DELAY);
194 tokio::pin!(delay);
195
196 // Wait for either IPv6 to succeed or the delay to expire
197 tokio::select! {
198 biased;
199 result = &mut ipv6_fut => {
200 if let Ok(stream) = result {
201 return Ok(stream);
202 }
203 // IPv6 failed entirely, try IPv4
204 try_connect_addrs(ipv4_addrs).await
205 }
206 _ = &mut delay => {
207 // Delay expired, start IPv4 and race both
208 let ipv4_fut = try_connect_addrs(ipv4_addrs);
209 tokio::pin!(ipv4_fut);
210
211 tokio::select! {
212 biased;
213 result = &mut ipv6_fut => {
214 match result {
215 Ok(stream) => Ok(stream),
216 Err(_) => ipv4_fut.await,
217 }
218 }
219 result = &mut ipv4_fut => {
220 match result {
221 Ok(stream) => Ok(stream),
222 Err(_) => ipv6_fut.await,
223 }
224 }
225 }
226 }
227 }
228 }
229 // No addresses at all
230 (true, true) => Err(TransportError::backend(IoError(
231 "no addresses to connect to",
232 ))),
233 }
234}
235
236/// Try connecting to each address in sequence, returning the first successful
237/// connection.
238async fn try_connect_addrs(addrs: &[SocketAddr]) -> Result<TcpStream, TransportError> {
239 let mut last_err = None;
240 for addr in addrs {
241 match TcpStream::connect(addr).await {
242 Ok(stream) => return Ok(stream),
243 Err(e) => last_err = Some(e),
244 }
245 }
246 Err(TransportError::backend(last_err.unwrap_or_else(|| {
247 std::io::Error::new(
248 std::io::ErrorKind::AddrNotAvailable,
249 "no addresses to connect to",
250 )
251 })))
252}
253
254/// Convert a tungstenite::Message to an async_wsocket::Message.
255///
256/// Since `Message::from_native` is `pub(crate)` in async-wsocket, we replicate
257/// the conversion here.
258fn tungstenite_to_async_wsocket(msg: TungsteniteMessage) -> Message {
259 match msg {
260 TungsteniteMessage::Text(text) => Message::Text(text.to_string()),
261 TungsteniteMessage::Binary(data) => Message::Binary(data.to_vec()),
262 TungsteniteMessage::Ping(data) => Message::Ping(data.to_vec()),
263 TungsteniteMessage::Pong(data) => Message::Pong(data.to_vec()),
264 TungsteniteMessage::Close(frame) => {
265 Message::Close(frame.map(|f| async_wsocket::message::CloseFrame {
266 code: u16::from(f.code),
267 reason: f.reason.to_string(),
268 }))
269 }
270 // From tungstenite docs: "you're not going to get this value while reading"
271 TungsteniteMessage::Frame(_) => unreachable!(),
272 }
273}
274
275/// Sink adapter for the native tokio-tungstenite WebSocketStream.
276///
277/// Converts `async_wsocket::Message` to `tungstenite::Message` using the
278/// existing `From` implementation, and maps errors to `TransportError`.
279struct NativeSinkAdapter<S>(S);
280
281impl<S> Sink<Message> for NativeSinkAdapter<S>
282where
283 S: Sink<TungsteniteMessage, Error = tokio_tungstenite::tungstenite::Error> + Unpin,
284{
285 type Error = TransportError;
286
287 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
288 Pin::new(&mut self.0)
289 .poll_ready(cx)
290 .map_err(TransportError::backend)
291 }
292
293 fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
294 // From<async_wsocket::Message> for tungstenite::Message exists
295 let native_msg: TungsteniteMessage = item.into();
296 Pin::new(&mut self.0)
297 .start_send(native_msg)
298 .map_err(TransportError::backend)
299 }
300
301 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
302 Pin::new(&mut self.0)
303 .poll_flush(cx)
304 .map_err(TransportError::backend)
305 }
306
307 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
308 Pin::new(&mut self.0)
309 .poll_close(cx)
310 .map_err(TransportError::backend)
311 }
312}
313
314/// Sink adapter for the default async-wsocket path (proxy/tor modes).
315///
316/// Wraps the `SplitSink<WebSocket, Message>` and maps errors to
317/// `TransportError`.
318struct DefaultSinkAdapter(futures::stream::SplitSink<WebSocket, Message>);
319
320impl Sink<Message> for DefaultSinkAdapter {
321 type Error = TransportError;
322
323 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
324 Pin::new(&mut self.0)
325 .poll_ready_unpin(cx)
326 .map_err(TransportError::backend)
327 }
328
329 fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
330 Pin::new(&mut self.0)
331 .start_send_unpin(item)
332 .map_err(TransportError::backend)
333 }
334
335 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
336 Pin::new(&mut self.0)
337 .poll_flush_unpin(cx)
338 .map_err(TransportError::backend)
339 }
340
341 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
342 Pin::new(&mut self.0)
343 .poll_close_unpin(cx)
344 .map_err(TransportError::backend)
345 }
346}
347
348/// Simple error type for inline error messages.
349#[derive(Debug)]
350struct IoError(&'static str);
351
352impl fmt::Display for IoError {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.write_str(self.0)
355 }
356}
357
358impl std::error::Error for IoError {}