upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md4
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--src/lib/client.rs5
-rw-r--r--src/lib/mod.rs3
-rw-r--r--src/lib/transport.rs358
6 files changed, 374 insertions, 0 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ddfd827..73e4cf3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7 7
8## [Unreleased] 8## [Unreleased]
9 9
10### Fixed
11
12- IPv6 connection failures with Happy Eyeballs (RFC 8305)
13
10## [2.2.0] 14## [2.2.0]
11 15
12### Changed 16### Changed
diff --git a/Cargo.lock b/Cargo.lock
index 8d63861..97aa90a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1538,6 +1538,7 @@ dependencies = [
1538 "anyhow", 1538 "anyhow",
1539 "assert_cmd", 1539 "assert_cmd",
1540 "async-trait", 1540 "async-trait",
1541 "async-wsocket",
1541 "auth-git2", 1542 "auth-git2",
1542 "chacha20poly1305", 1543 "chacha20poly1305",
1543 "chrono", 1544 "chrono",
@@ -1568,6 +1569,7 @@ dependencies = [
1568 "serial_test", 1569 "serial_test",
1569 "test_utils", 1570 "test_utils",
1570 "tokio", 1571 "tokio",
1572 "tokio-tungstenite 0.26.2",
1571 "urlencoding", 1573 "urlencoding",
1572 "zeroize", 1574 "zeroize",
1573] 1575]
diff --git a/Cargo.toml b/Cargo.toml
index 7adf6ff..721893e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,6 +14,7 @@ categories = ["command-line-utilities","development-tools"]
14[dependencies] 14[dependencies]
15anyhow = "1.0.98" 15anyhow = "1.0.98"
16async-trait = "0.1.88" 16async-trait = "0.1.88"
17async-wsocket = "0.13.1" # TEMPORARY: remove when transport.rs is deleted. See transport.rs header.
17auth-git2 = "0.5.8" 18auth-git2 = "0.5.8"
18chacha20poly1305 = "0.10.1" 19chacha20poly1305 = "0.10.1"
19chrono = "0.4" 20chrono = "0.4"
@@ -39,6 +40,7 @@ serde = { version = "1.0.219", features = ["derive"] }
39serde_json = "1.0.140" 40serde_json = "1.0.140"
40serde_yaml = "0.9.34" 41serde_yaml = "0.9.34"
41tokio = { version = "1.46.1", features = ["full"] } 42tokio = { version = "1.46.1", features = ["full"] }
43tokio-tungstenite = "0.26.2" # TEMPORARY: remove when transport.rs is deleted. See transport.rs header.
42urlencoding = "2.1.3" 44urlencoding = "2.1.3"
43zeroize = "1.8.1" 45zeroize = "1.8.1"
44 46
diff --git a/src/lib/client.rs b/src/lib/client.rs
index 32c2d37..2a4e081 100644
--- a/src/lib/client.rs
+++ b/src/lib/client.rs
@@ -63,6 +63,9 @@ use crate::{
63 login::{get_likely_logged_in_user, user::get_user_ref_from_cache}, 63 login::{get_likely_logged_in_user, user::get_user_ref_from_cache},
64 repo_ref::{RepoRef, normalize_grasp_server_url}, 64 repo_ref::{RepoRef, normalize_grasp_server_url},
65 repo_state::RepoState, 65 repo_state::RepoState,
66 // TEMPORARY: Remove when async-wsocket includes Happy Eyeballs support.
67 // See src/lib/transport.rs header for full removal instructions.
68 transport::HappyEyeballsTransport,
66}; 69};
67 70
68pub fn is_verbose() -> bool { 71pub fn is_verbose() -> bool {
@@ -206,6 +209,7 @@ impl Connect for Client {
206 .verify_subscriptions(true), 209 .verify_subscriptions(true),
207 ) 210 )
208 .signer(keys) 211 .signer(keys)
212 .websocket_transport(HappyEyeballsTransport) // TEMPORARY: see transport.rs
209 .build() 213 .build()
210 } else { 214 } else {
211 nostr_sdk::ClientBuilder::new() 215 nostr_sdk::ClientBuilder::new()
@@ -214,6 +218,7 @@ impl Connect for Client {
214 .relay_limits(RelayLimits::disable()) 218 .relay_limits(RelayLimits::disable())
215 .verify_subscriptions(true), 219 .verify_subscriptions(true),
216 ) 220 )
221 .websocket_transport(HappyEyeballsTransport) // TEMPORARY: see transport.rs
217 .build() 222 .build()
218 }, 223 },
219 relay_default_set: opts.relay_default_set, 224 relay_default_set: opts.relay_default_set,
diff --git a/src/lib/mod.rs b/src/lib/mod.rs
index 1229e8c..f839e7f 100644
--- a/src/lib/mod.rs
+++ b/src/lib/mod.rs
@@ -10,6 +10,9 @@ pub mod mbox_parser;
10pub mod push; 10pub mod push;
11pub mod repo_ref; 11pub mod repo_ref;
12pub mod repo_state; 12pub mod repo_state;
13// TEMPORARY: Remove when async-wsocket includes Happy Eyeballs support.
14// See src/lib/transport.rs header for full removal instructions.
15pub mod transport;
13pub mod utils; 16pub mod utils;
14 17
15use anyhow::{Result, anyhow}; 18use anyhow::{Result, anyhow};
diff --git a/src/lib/transport.rs b/src/lib/transport.rs
new file mode 100644
index 0000000..779d58d
--- /dev/null
+++ b/src/lib/transport.rs
@@ -0,0 +1,358 @@
1// TEMPORARY: Delete this entire file when async-wsocket merges Happy Eyeballs
2// support and the rust-nostr version we use includes it.
3// Upstream PR: https://github.com/shadowylab/async-wsocket/pull/42
4//
5// When removing this file, also:
6// 1. Remove `pub mod transport;` from src/lib/mod.rs
7// 2. Remove `.websocket_transport(HappyEyeballsTransport)` calls from
8// src/lib/client.rs
9// 3. Remove the `use crate::transport::HappyEyeballsTransport;` import from
10// src/lib/client.rs
11// 4. Remove `async-wsocket` and `tokio-tungstenite` from Cargo.toml direct
12// dependencies
13//
14// Interim Happy Eyeballs (RFC 8305) WebSocket transport for nostr-sdk.
15//
16// The default async-wsocket transport uses
17// `tokio::net::TcpStream::connect(host:port)`, which resolves DNS and tries
18// addresses sequentially. If IPv6 is returned first but is broken (hangs), the
19// entire connection timeout is consumed before IPv4 is attempted.
20//
21// This module implements a custom `WebSocketTransport` that, for direct
22// connections, resolves DNS and races IPv6/IPv4 with a 250ms head start for
23// IPv6 (per RFC 8305).
24
25use std::{
26 fmt,
27 net::SocketAddr,
28 pin::Pin,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use async_wsocket::{ConnectionMode, Message, WebSocket};
34use futures::{Sink, SinkExt, TryStreamExt, stream::StreamExt};
35use nostr::{Url, util::BoxedFuture};
36use nostr_relay_pool::transport::{
37 error::TransportError,
38 websocket::{WebSocketSink, WebSocketStream, WebSocketTransport},
39};
40use tokio::net::TcpStream;
41use tokio_tungstenite::tungstenite::protocol::Message as TungsteniteMessage;
42
43/// Delay before starting IPv4 attempts when IPv6 addresses are available (RFC
44/// 8305).
45const HAPPY_EYEBALLS_DELAY: Duration = Duration::from_millis(250);
46
47/// Custom WebSocket transport implementing Happy Eyeballs (RFC 8305) for
48/// IPv6/IPv4 fallback.
49#[derive(Debug, Clone, Copy, Default)]
50pub struct HappyEyeballsTransport;
51
52impl WebSocketTransport for HappyEyeballsTransport {
53 fn support_ping(&self) -> bool {
54 true
55 }
56
57 fn connect<'a>(
58 &'a self,
59 url: &'a Url,
60 mode: &'a ConnectionMode,
61 timeout: Duration,
62 ) -> BoxedFuture<'a, Result<(WebSocketSink, WebSocketStream), TransportError>> {
63 Box::pin(async move {
64 match mode {
65 ConnectionMode::Direct => connect_happy_eyeballs(url, timeout).await,
66 // For proxy/tor modes, delegate to the default implementation.
67 _ => connect_default(url, mode, timeout).await,
68 }
69 })
70 }
71}
72
73/// Fallback to the default async-wsocket connection for non-direct modes.
74async fn connect_default(
75 url: &Url,
76 mode: &ConnectionMode,
77 timeout: Duration,
78) -> Result<(WebSocketSink, WebSocketStream), TransportError> {
79 let socket: WebSocket = WebSocket::connect(url, mode, timeout)
80 .await
81 .map_err(TransportError::backend)?;
82 let (tx, rx) = socket.split();
83 let sink: WebSocketSink = Box::new(DefaultSinkAdapter(tx)) as WebSocketSink;
84 let stream: WebSocketStream = Box::pin(rx.map_err(TransportError::backend)) as WebSocketStream;
85 Ok((sink, stream))
86}
87
88/// Connect using Happy Eyeballs: resolve DNS, race IPv6 (with 250ms head start)
89/// against IPv4, then perform TLS + WebSocket handshake on the winning TCP
90/// connection.
91async fn connect_happy_eyeballs(
92 url: &Url,
93 timeout: Duration,
94) -> Result<(WebSocketSink, WebSocketStream), TransportError> {
95 tokio::time::timeout(timeout, connect_happy_eyeballs_inner(url))
96 .await
97 .map_err(|_| {
98 TransportError::backend(std::io::Error::new(
99 std::io::ErrorKind::TimedOut,
100 "connection timed out",
101 ))
102 })?
103}
104
105async fn connect_happy_eyeballs_inner(
106 url: &Url,
107) -> Result<(WebSocketSink, WebSocketStream), TransportError> {
108 let host = url
109 .host_str()
110 .ok_or_else(|| TransportError::backend(IoError("missing host in URL")))?;
111
112 let default_port = match url.scheme() {
113 "wss" => 443,
114 "ws" => 80,
115 _ => 80,
116 };
117 let port = url.port().unwrap_or(default_port);
118
119 // Resolve DNS
120 let addr_str = format!("{host}:{port}");
121 let addrs: Vec<SocketAddr> = tokio::net::lookup_host(&addr_str)
122 .await
123 .map_err(TransportError::backend)?
124 .collect();
125
126 if addrs.is_empty() {
127 return Err(TransportError::backend(IoError(
128 "DNS resolution returned no addresses",
129 )));
130 }
131
132 // Separate into IPv6 and IPv4
133 let mut ipv6_addrs: Vec<SocketAddr> = Vec::new();
134 let mut ipv4_addrs: Vec<SocketAddr> = Vec::new();
135 for addr in addrs {
136 if addr.is_ipv6() {
137 ipv6_addrs.push(addr);
138 } else {
139 ipv4_addrs.push(addr);
140 }
141 }
142
143 // Happy Eyeballs: try to connect via TCP
144 let tcp_stream = happy_eyeballs_tcp(&ipv6_addrs, &ipv4_addrs).await?;
145
146 // Build the WebSocket request URI from the URL
147 let request_uri = url.as_str().to_string();
148
149 // Perform TLS (if wss) + WebSocket handshake
150 let (ws_stream, _response) = tokio_tungstenite::client_async_tls(&request_uri, tcp_stream)
151 .await
152 .map_err(TransportError::backend)?;
153
154 // Split into sink + stream
155 let (native_sink, native_stream) = ws_stream.split();
156
157 // Wrap the sink: async_wsocket::Message -> tungstenite::Message (From impl
158 // exists)
159 let sink: WebSocketSink = Box::new(NativeSinkAdapter(native_sink)) as WebSocketSink;
160
161 // Wrap the stream: tungstenite::Message -> async_wsocket::Message (manual
162 // conversion)
163 let stream: WebSocketStream = Box::pin(native_stream.map(|result| {
164 result
165 .map(tungstenite_to_async_wsocket)
166 .map_err(TransportError::backend)
167 })) as WebSocketStream;
168
169 Ok((sink, stream))
170}
171
172/// Happy Eyeballs TCP connection algorithm.
173///
174/// If both IPv6 and IPv4 addresses are available, starts IPv6 first. If IPv6
175/// doesn't connect within 250ms, starts IPv4 in parallel. Returns whichever
176/// connects first.
177async fn happy_eyeballs_tcp(
178 ipv6_addrs: &[SocketAddr],
179 ipv4_addrs: &[SocketAddr],
180) -> Result<TcpStream, TransportError> {
181 match (ipv6_addrs.is_empty(), ipv4_addrs.is_empty()) {
182 // Only IPv4
183 (true, false) => try_connect_addrs(ipv4_addrs).await,
184 // Only IPv6
185 (false, true) => try_connect_addrs(ipv6_addrs).await,
186 // Both available: race with head start for IPv6
187 (false, false) => {
188 // Start IPv6 attempt
189 let ipv6_fut = try_connect_addrs(ipv6_addrs);
190 tokio::pin!(ipv6_fut);
191
192 // Give IPv6 a 250ms head start
193 let delay = tokio::time::sleep(HAPPY_EYEBALLS_DELAY);
194 tokio::pin!(delay);
195
196 // Wait for either IPv6 to succeed or the delay to expire
197 tokio::select! {
198 biased;
199 result = &mut ipv6_fut => {
200 if let Ok(stream) = result {
201 return Ok(stream);
202 }
203 // IPv6 failed entirely, try IPv4
204 try_connect_addrs(ipv4_addrs).await
205 }
206 _ = &mut delay => {
207 // Delay expired, start IPv4 and race both
208 let ipv4_fut = try_connect_addrs(ipv4_addrs);
209 tokio::pin!(ipv4_fut);
210
211 tokio::select! {
212 biased;
213 result = &mut ipv6_fut => {
214 match result {
215 Ok(stream) => Ok(stream),
216 Err(_) => ipv4_fut.await,
217 }
218 }
219 result = &mut ipv4_fut => {
220 match result {
221 Ok(stream) => Ok(stream),
222 Err(_) => ipv6_fut.await,
223 }
224 }
225 }
226 }
227 }
228 }
229 // No addresses at all
230 (true, true) => Err(TransportError::backend(IoError(
231 "no addresses to connect to",
232 ))),
233 }
234}
235
236/// Try connecting to each address in sequence, returning the first successful
237/// connection.
238async fn try_connect_addrs(addrs: &[SocketAddr]) -> Result<TcpStream, TransportError> {
239 let mut last_err = None;
240 for addr in addrs {
241 match TcpStream::connect(addr).await {
242 Ok(stream) => return Ok(stream),
243 Err(e) => last_err = Some(e),
244 }
245 }
246 Err(TransportError::backend(last_err.unwrap_or_else(|| {
247 std::io::Error::new(
248 std::io::ErrorKind::AddrNotAvailable,
249 "no addresses to connect to",
250 )
251 })))
252}
253
254/// Convert a tungstenite::Message to an async_wsocket::Message.
255///
256/// Since `Message::from_native` is `pub(crate)` in async-wsocket, we replicate
257/// the conversion here.
258fn tungstenite_to_async_wsocket(msg: TungsteniteMessage) -> Message {
259 match msg {
260 TungsteniteMessage::Text(text) => Message::Text(text.to_string()),
261 TungsteniteMessage::Binary(data) => Message::Binary(data.to_vec()),
262 TungsteniteMessage::Ping(data) => Message::Ping(data.to_vec()),
263 TungsteniteMessage::Pong(data) => Message::Pong(data.to_vec()),
264 TungsteniteMessage::Close(frame) => {
265 Message::Close(frame.map(|f| async_wsocket::message::CloseFrame {
266 code: u16::from(f.code),
267 reason: f.reason.to_string(),
268 }))
269 }
270 // From tungstenite docs: "you're not going to get this value while reading"
271 TungsteniteMessage::Frame(_) => unreachable!(),
272 }
273}
274
275/// Sink adapter for the native tokio-tungstenite WebSocketStream.
276///
277/// Converts `async_wsocket::Message` to `tungstenite::Message` using the
278/// existing `From` implementation, and maps errors to `TransportError`.
279struct NativeSinkAdapter<S>(S);
280
281impl<S> Sink<Message> for NativeSinkAdapter<S>
282where
283 S: Sink<TungsteniteMessage, Error = tokio_tungstenite::tungstenite::Error> + Unpin,
284{
285 type Error = TransportError;
286
287 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
288 Pin::new(&mut self.0)
289 .poll_ready(cx)
290 .map_err(TransportError::backend)
291 }
292
293 fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
294 // From<async_wsocket::Message> for tungstenite::Message exists
295 let native_msg: TungsteniteMessage = item.into();
296 Pin::new(&mut self.0)
297 .start_send(native_msg)
298 .map_err(TransportError::backend)
299 }
300
301 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
302 Pin::new(&mut self.0)
303 .poll_flush(cx)
304 .map_err(TransportError::backend)
305 }
306
307 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
308 Pin::new(&mut self.0)
309 .poll_close(cx)
310 .map_err(TransportError::backend)
311 }
312}
313
314/// Sink adapter for the default async-wsocket path (proxy/tor modes).
315///
316/// Wraps the `SplitSink<WebSocket, Message>` and maps errors to
317/// `TransportError`.
318struct DefaultSinkAdapter(futures::stream::SplitSink<WebSocket, Message>);
319
320impl Sink<Message> for DefaultSinkAdapter {
321 type Error = TransportError;
322
323 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
324 Pin::new(&mut self.0)
325 .poll_ready_unpin(cx)
326 .map_err(TransportError::backend)
327 }
328
329 fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
330 Pin::new(&mut self.0)
331 .start_send_unpin(item)
332 .map_err(TransportError::backend)
333 }
334
335 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
336 Pin::new(&mut self.0)
337 .poll_flush_unpin(cx)
338 .map_err(TransportError::backend)
339 }
340
341 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
342 Pin::new(&mut self.0)
343 .poll_close_unpin(cx)
344 .map_err(TransportError::backend)
345 }
346}
347
348/// Simple error type for inline error messages.
349#[derive(Debug)]
350struct IoError(&'static str);
351
352impl fmt::Display for IoError {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.write_str(self.0)
355 }
356}
357
358impl std::error::Error for IoError {}