diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/client.rs | 5 | ||||
| -rw-r--r-- | src/lib/mod.rs | 3 | ||||
| -rw-r--r-- | src/lib/transport.rs | 358 |
3 files changed, 366 insertions, 0 deletions
diff --git a/src/lib/client.rs b/src/lib/client.rs index 32c2d37..2a4e081 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs | |||
| @@ -63,6 +63,9 @@ use crate::{ | |||
| 63 | login::{get_likely_logged_in_user, user::get_user_ref_from_cache}, | 63 | login::{get_likely_logged_in_user, user::get_user_ref_from_cache}, |
| 64 | repo_ref::{RepoRef, normalize_grasp_server_url}, | 64 | repo_ref::{RepoRef, normalize_grasp_server_url}, |
| 65 | repo_state::RepoState, | 65 | repo_state::RepoState, |
| 66 | // TEMPORARY: Remove when async-wsocket includes Happy Eyeballs support. | ||
| 67 | // See src/lib/transport.rs header for full removal instructions. | ||
| 68 | transport::HappyEyeballsTransport, | ||
| 66 | }; | 69 | }; |
| 67 | 70 | ||
| 68 | pub fn is_verbose() -> bool { | 71 | pub fn is_verbose() -> bool { |
| @@ -206,6 +209,7 @@ impl Connect for Client { | |||
| 206 | .verify_subscriptions(true), | 209 | .verify_subscriptions(true), |
| 207 | ) | 210 | ) |
| 208 | .signer(keys) | 211 | .signer(keys) |
| 212 | .websocket_transport(HappyEyeballsTransport) // TEMPORARY: see transport.rs | ||
| 209 | .build() | 213 | .build() |
| 210 | } else { | 214 | } else { |
| 211 | nostr_sdk::ClientBuilder::new() | 215 | nostr_sdk::ClientBuilder::new() |
| @@ -214,6 +218,7 @@ impl Connect for Client { | |||
| 214 | .relay_limits(RelayLimits::disable()) | 218 | .relay_limits(RelayLimits::disable()) |
| 215 | .verify_subscriptions(true), | 219 | .verify_subscriptions(true), |
| 216 | ) | 220 | ) |
| 221 | .websocket_transport(HappyEyeballsTransport) // TEMPORARY: see transport.rs | ||
| 217 | .build() | 222 | .build() |
| 218 | }, | 223 | }, |
| 219 | relay_default_set: opts.relay_default_set, | 224 | relay_default_set: opts.relay_default_set, |
diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 1229e8c..f839e7f 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs | |||
| @@ -10,6 +10,9 @@ pub mod mbox_parser; | |||
| 10 | pub mod push; | 10 | pub mod push; |
| 11 | pub mod repo_ref; | 11 | pub mod repo_ref; |
| 12 | pub mod repo_state; | 12 | pub mod repo_state; |
| 13 | // TEMPORARY: Remove when async-wsocket includes Happy Eyeballs support. | ||
| 14 | // See src/lib/transport.rs header for full removal instructions. | ||
| 15 | pub mod transport; | ||
| 13 | pub mod utils; | 16 | pub mod utils; |
| 14 | 17 | ||
| 15 | use anyhow::{Result, anyhow}; | 18 | use anyhow::{Result, anyhow}; |
diff --git a/src/lib/transport.rs b/src/lib/transport.rs new file mode 100644 index 0000000..779d58d --- /dev/null +++ b/src/lib/transport.rs | |||
| @@ -0,0 +1,358 @@ | |||
| 1 | // TEMPORARY: Delete this entire file when async-wsocket merges Happy Eyeballs | ||
| 2 | // support and the rust-nostr version we use includes it. | ||
| 3 | // Upstream PR: https://github.com/shadowylab/async-wsocket/pull/42 | ||
| 4 | // | ||
| 5 | // When removing this file, also: | ||
| 6 | // 1. Remove `pub mod transport;` from src/lib/mod.rs | ||
| 7 | // 2. Remove `.websocket_transport(HappyEyeballsTransport)` calls from | ||
| 8 | // src/lib/client.rs | ||
| 9 | // 3. Remove the `use crate::transport::HappyEyeballsTransport;` import from | ||
| 10 | // src/lib/client.rs | ||
| 11 | // 4. Remove `async-wsocket` and `tokio-tungstenite` from Cargo.toml direct | ||
| 12 | // dependencies | ||
| 13 | // | ||
| 14 | // Interim Happy Eyeballs (RFC 8305) WebSocket transport for nostr-sdk. | ||
| 15 | // | ||
| 16 | // The default async-wsocket transport uses | ||
| 17 | // `tokio::net::TcpStream::connect(host:port)`, which resolves DNS and tries | ||
| 18 | // addresses sequentially. If IPv6 is returned first but is broken (hangs), the | ||
| 19 | // entire connection timeout is consumed before IPv4 is attempted. | ||
| 20 | // | ||
| 21 | // This module implements a custom `WebSocketTransport` that, for direct | ||
| 22 | // connections, resolves DNS and races IPv6/IPv4 with a 250ms head start for | ||
| 23 | // IPv6 (per RFC 8305). | ||
| 24 | |||
| 25 | use std::{ | ||
| 26 | fmt, | ||
| 27 | net::SocketAddr, | ||
| 28 | pin::Pin, | ||
| 29 | task::{Context, Poll}, | ||
| 30 | time::Duration, | ||
| 31 | }; | ||
| 32 | |||
| 33 | use async_wsocket::{ConnectionMode, Message, WebSocket}; | ||
| 34 | use futures::{Sink, SinkExt, TryStreamExt, stream::StreamExt}; | ||
| 35 | use nostr::{Url, util::BoxedFuture}; | ||
| 36 | use nostr_relay_pool::transport::{ | ||
| 37 | error::TransportError, | ||
| 38 | websocket::{WebSocketSink, WebSocketStream, WebSocketTransport}, | ||
| 39 | }; | ||
| 40 | use tokio::net::TcpStream; | ||
| 41 | use tokio_tungstenite::tungstenite::protocol::Message as TungsteniteMessage; | ||
| 42 | |||
| 43 | /// Delay before starting IPv4 attempts when IPv6 addresses are available (RFC | ||
| 44 | /// 8305). | ||
| 45 | const HAPPY_EYEBALLS_DELAY: Duration = Duration::from_millis(250); | ||
| 46 | |||
| 47 | /// Custom WebSocket transport implementing Happy Eyeballs (RFC 8305) for | ||
| 48 | /// IPv6/IPv4 fallback. | ||
| 49 | #[derive(Debug, Clone, Copy, Default)] | ||
| 50 | pub struct HappyEyeballsTransport; | ||
| 51 | |||
| 52 | impl WebSocketTransport for HappyEyeballsTransport { | ||
| 53 | fn support_ping(&self) -> bool { | ||
| 54 | true | ||
| 55 | } | ||
| 56 | |||
| 57 | fn connect<'a>( | ||
| 58 | &'a self, | ||
| 59 | url: &'a Url, | ||
| 60 | mode: &'a ConnectionMode, | ||
| 61 | timeout: Duration, | ||
| 62 | ) -> BoxedFuture<'a, Result<(WebSocketSink, WebSocketStream), TransportError>> { | ||
| 63 | Box::pin(async move { | ||
| 64 | match mode { | ||
| 65 | ConnectionMode::Direct => connect_happy_eyeballs(url, timeout).await, | ||
| 66 | // For proxy/tor modes, delegate to the default implementation. | ||
| 67 | _ => connect_default(url, mode, timeout).await, | ||
| 68 | } | ||
| 69 | }) | ||
| 70 | } | ||
| 71 | } | ||
| 72 | |||
| 73 | /// Fallback to the default async-wsocket connection for non-direct modes. | ||
| 74 | async fn connect_default( | ||
| 75 | url: &Url, | ||
| 76 | mode: &ConnectionMode, | ||
| 77 | timeout: Duration, | ||
| 78 | ) -> Result<(WebSocketSink, WebSocketStream), TransportError> { | ||
| 79 | let socket: WebSocket = WebSocket::connect(url, mode, timeout) | ||
| 80 | .await | ||
| 81 | .map_err(TransportError::backend)?; | ||
| 82 | let (tx, rx) = socket.split(); | ||
| 83 | let sink: WebSocketSink = Box::new(DefaultSinkAdapter(tx)) as WebSocketSink; | ||
| 84 | let stream: WebSocketStream = Box::pin(rx.map_err(TransportError::backend)) as WebSocketStream; | ||
| 85 | Ok((sink, stream)) | ||
| 86 | } | ||
| 87 | |||
| 88 | /// Connect using Happy Eyeballs: resolve DNS, race IPv6 (with 250ms head start) | ||
| 89 | /// against IPv4, then perform TLS + WebSocket handshake on the winning TCP | ||
| 90 | /// connection. | ||
| 91 | async fn connect_happy_eyeballs( | ||
| 92 | url: &Url, | ||
| 93 | timeout: Duration, | ||
| 94 | ) -> Result<(WebSocketSink, WebSocketStream), TransportError> { | ||
| 95 | tokio::time::timeout(timeout, connect_happy_eyeballs_inner(url)) | ||
| 96 | .await | ||
| 97 | .map_err(|_| { | ||
| 98 | TransportError::backend(std::io::Error::new( | ||
| 99 | std::io::ErrorKind::TimedOut, | ||
| 100 | "connection timed out", | ||
| 101 | )) | ||
| 102 | })? | ||
| 103 | } | ||
| 104 | |||
| 105 | async fn connect_happy_eyeballs_inner( | ||
| 106 | url: &Url, | ||
| 107 | ) -> Result<(WebSocketSink, WebSocketStream), TransportError> { | ||
| 108 | let host = url | ||
| 109 | .host_str() | ||
| 110 | .ok_or_else(|| TransportError::backend(IoError("missing host in URL")))?; | ||
| 111 | |||
| 112 | let default_port = match url.scheme() { | ||
| 113 | "wss" => 443, | ||
| 114 | "ws" => 80, | ||
| 115 | _ => 80, | ||
| 116 | }; | ||
| 117 | let port = url.port().unwrap_or(default_port); | ||
| 118 | |||
| 119 | // Resolve DNS | ||
| 120 | let addr_str = format!("{host}:{port}"); | ||
| 121 | let addrs: Vec<SocketAddr> = tokio::net::lookup_host(&addr_str) | ||
| 122 | .await | ||
| 123 | .map_err(TransportError::backend)? | ||
| 124 | .collect(); | ||
| 125 | |||
| 126 | if addrs.is_empty() { | ||
| 127 | return Err(TransportError::backend(IoError( | ||
| 128 | "DNS resolution returned no addresses", | ||
| 129 | ))); | ||
| 130 | } | ||
| 131 | |||
| 132 | // Separate into IPv6 and IPv4 | ||
| 133 | let mut ipv6_addrs: Vec<SocketAddr> = Vec::new(); | ||
| 134 | let mut ipv4_addrs: Vec<SocketAddr> = Vec::new(); | ||
| 135 | for addr in addrs { | ||
| 136 | if addr.is_ipv6() { | ||
| 137 | ipv6_addrs.push(addr); | ||
| 138 | } else { | ||
| 139 | ipv4_addrs.push(addr); | ||
| 140 | } | ||
| 141 | } | ||
| 142 | |||
| 143 | // Happy Eyeballs: try to connect via TCP | ||
| 144 | let tcp_stream = happy_eyeballs_tcp(&ipv6_addrs, &ipv4_addrs).await?; | ||
| 145 | |||
| 146 | // Build the WebSocket request URI from the URL | ||
| 147 | let request_uri = url.as_str().to_string(); | ||
| 148 | |||
| 149 | // Perform TLS (if wss) + WebSocket handshake | ||
| 150 | let (ws_stream, _response) = tokio_tungstenite::client_async_tls(&request_uri, tcp_stream) | ||
| 151 | .await | ||
| 152 | .map_err(TransportError::backend)?; | ||
| 153 | |||
| 154 | // Split into sink + stream | ||
| 155 | let (native_sink, native_stream) = ws_stream.split(); | ||
| 156 | |||
| 157 | // Wrap the sink: async_wsocket::Message -> tungstenite::Message (From impl | ||
| 158 | // exists) | ||
| 159 | let sink: WebSocketSink = Box::new(NativeSinkAdapter(native_sink)) as WebSocketSink; | ||
| 160 | |||
| 161 | // Wrap the stream: tungstenite::Message -> async_wsocket::Message (manual | ||
| 162 | // conversion) | ||
| 163 | let stream: WebSocketStream = Box::pin(native_stream.map(|result| { | ||
| 164 | result | ||
| 165 | .map(tungstenite_to_async_wsocket) | ||
| 166 | .map_err(TransportError::backend) | ||
| 167 | })) as WebSocketStream; | ||
| 168 | |||
| 169 | Ok((sink, stream)) | ||
| 170 | } | ||
| 171 | |||
| 172 | /// Happy Eyeballs TCP connection algorithm. | ||
| 173 | /// | ||
| 174 | /// If both IPv6 and IPv4 addresses are available, starts IPv6 first. If IPv6 | ||
| 175 | /// doesn't connect within 250ms, starts IPv4 in parallel. Returns whichever | ||
| 176 | /// connects first. | ||
| 177 | async fn happy_eyeballs_tcp( | ||
| 178 | ipv6_addrs: &[SocketAddr], | ||
| 179 | ipv4_addrs: &[SocketAddr], | ||
| 180 | ) -> Result<TcpStream, TransportError> { | ||
| 181 | match (ipv6_addrs.is_empty(), ipv4_addrs.is_empty()) { | ||
| 182 | // Only IPv4 | ||
| 183 | (true, false) => try_connect_addrs(ipv4_addrs).await, | ||
| 184 | // Only IPv6 | ||
| 185 | (false, true) => try_connect_addrs(ipv6_addrs).await, | ||
| 186 | // Both available: race with head start for IPv6 | ||
| 187 | (false, false) => { | ||
| 188 | // Start IPv6 attempt | ||
| 189 | let ipv6_fut = try_connect_addrs(ipv6_addrs); | ||
| 190 | tokio::pin!(ipv6_fut); | ||
| 191 | |||
| 192 | // Give IPv6 a 250ms head start | ||
| 193 | let delay = tokio::time::sleep(HAPPY_EYEBALLS_DELAY); | ||
| 194 | tokio::pin!(delay); | ||
| 195 | |||
| 196 | // Wait for either IPv6 to succeed or the delay to expire | ||
| 197 | tokio::select! { | ||
| 198 | biased; | ||
| 199 | result = &mut ipv6_fut => { | ||
| 200 | if let Ok(stream) = result { | ||
| 201 | return Ok(stream); | ||
| 202 | } | ||
| 203 | // IPv6 failed entirely, try IPv4 | ||
| 204 | try_connect_addrs(ipv4_addrs).await | ||
| 205 | } | ||
| 206 | _ = &mut delay => { | ||
| 207 | // Delay expired, start IPv4 and race both | ||
| 208 | let ipv4_fut = try_connect_addrs(ipv4_addrs); | ||
| 209 | tokio::pin!(ipv4_fut); | ||
| 210 | |||
| 211 | tokio::select! { | ||
| 212 | biased; | ||
| 213 | result = &mut ipv6_fut => { | ||
| 214 | match result { | ||
| 215 | Ok(stream) => Ok(stream), | ||
| 216 | Err(_) => ipv4_fut.await, | ||
| 217 | } | ||
| 218 | } | ||
| 219 | result = &mut ipv4_fut => { | ||
| 220 | match result { | ||
| 221 | Ok(stream) => Ok(stream), | ||
| 222 | Err(_) => ipv6_fut.await, | ||
| 223 | } | ||
| 224 | } | ||
| 225 | } | ||
| 226 | } | ||
| 227 | } | ||
| 228 | } | ||
| 229 | // No addresses at all | ||
| 230 | (true, true) => Err(TransportError::backend(IoError( | ||
| 231 | "no addresses to connect to", | ||
| 232 | ))), | ||
| 233 | } | ||
| 234 | } | ||
| 235 | |||
| 236 | /// Try connecting to each address in sequence, returning the first successful | ||
| 237 | /// connection. | ||
| 238 | async fn try_connect_addrs(addrs: &[SocketAddr]) -> Result<TcpStream, TransportError> { | ||
| 239 | let mut last_err = None; | ||
| 240 | for addr in addrs { | ||
| 241 | match TcpStream::connect(addr).await { | ||
| 242 | Ok(stream) => return Ok(stream), | ||
| 243 | Err(e) => last_err = Some(e), | ||
| 244 | } | ||
| 245 | } | ||
| 246 | Err(TransportError::backend(last_err.unwrap_or_else(|| { | ||
| 247 | std::io::Error::new( | ||
| 248 | std::io::ErrorKind::AddrNotAvailable, | ||
| 249 | "no addresses to connect to", | ||
| 250 | ) | ||
| 251 | }))) | ||
| 252 | } | ||
| 253 | |||
| 254 | /// Convert a tungstenite::Message to an async_wsocket::Message. | ||
| 255 | /// | ||
| 256 | /// Since `Message::from_native` is `pub(crate)` in async-wsocket, we replicate | ||
| 257 | /// the conversion here. | ||
| 258 | fn tungstenite_to_async_wsocket(msg: TungsteniteMessage) -> Message { | ||
| 259 | match msg { | ||
| 260 | TungsteniteMessage::Text(text) => Message::Text(text.to_string()), | ||
| 261 | TungsteniteMessage::Binary(data) => Message::Binary(data.to_vec()), | ||
| 262 | TungsteniteMessage::Ping(data) => Message::Ping(data.to_vec()), | ||
| 263 | TungsteniteMessage::Pong(data) => Message::Pong(data.to_vec()), | ||
| 264 | TungsteniteMessage::Close(frame) => { | ||
| 265 | Message::Close(frame.map(|f| async_wsocket::message::CloseFrame { | ||
| 266 | code: u16::from(f.code), | ||
| 267 | reason: f.reason.to_string(), | ||
| 268 | })) | ||
| 269 | } | ||
| 270 | // From tungstenite docs: "you're not going to get this value while reading" | ||
| 271 | TungsteniteMessage::Frame(_) => unreachable!(), | ||
| 272 | } | ||
| 273 | } | ||
| 274 | |||
| 275 | /// Sink adapter for the native tokio-tungstenite WebSocketStream. | ||
| 276 | /// | ||
| 277 | /// Converts `async_wsocket::Message` to `tungstenite::Message` using the | ||
| 278 | /// existing `From` implementation, and maps errors to `TransportError`. | ||
| 279 | struct NativeSinkAdapter<S>(S); | ||
| 280 | |||
| 281 | impl<S> Sink<Message> for NativeSinkAdapter<S> | ||
| 282 | where | ||
| 283 | S: Sink<TungsteniteMessage, Error = tokio_tungstenite::tungstenite::Error> + Unpin, | ||
| 284 | { | ||
| 285 | type Error = TransportError; | ||
| 286 | |||
| 287 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 288 | Pin::new(&mut self.0) | ||
| 289 | .poll_ready(cx) | ||
| 290 | .map_err(TransportError::backend) | ||
| 291 | } | ||
| 292 | |||
| 293 | fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { | ||
| 294 | // From<async_wsocket::Message> for tungstenite::Message exists | ||
| 295 | let native_msg: TungsteniteMessage = item.into(); | ||
| 296 | Pin::new(&mut self.0) | ||
| 297 | .start_send(native_msg) | ||
| 298 | .map_err(TransportError::backend) | ||
| 299 | } | ||
| 300 | |||
| 301 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 302 | Pin::new(&mut self.0) | ||
| 303 | .poll_flush(cx) | ||
| 304 | .map_err(TransportError::backend) | ||
| 305 | } | ||
| 306 | |||
| 307 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 308 | Pin::new(&mut self.0) | ||
| 309 | .poll_close(cx) | ||
| 310 | .map_err(TransportError::backend) | ||
| 311 | } | ||
| 312 | } | ||
| 313 | |||
| 314 | /// Sink adapter for the default async-wsocket path (proxy/tor modes). | ||
| 315 | /// | ||
| 316 | /// Wraps the `SplitSink<WebSocket, Message>` and maps errors to | ||
| 317 | /// `TransportError`. | ||
| 318 | struct DefaultSinkAdapter(futures::stream::SplitSink<WebSocket, Message>); | ||
| 319 | |||
| 320 | impl Sink<Message> for DefaultSinkAdapter { | ||
| 321 | type Error = TransportError; | ||
| 322 | |||
| 323 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 324 | Pin::new(&mut self.0) | ||
| 325 | .poll_ready_unpin(cx) | ||
| 326 | .map_err(TransportError::backend) | ||
| 327 | } | ||
| 328 | |||
| 329 | fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { | ||
| 330 | Pin::new(&mut self.0) | ||
| 331 | .start_send_unpin(item) | ||
| 332 | .map_err(TransportError::backend) | ||
| 333 | } | ||
| 334 | |||
| 335 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 336 | Pin::new(&mut self.0) | ||
| 337 | .poll_flush_unpin(cx) | ||
| 338 | .map_err(TransportError::backend) | ||
| 339 | } | ||
| 340 | |||
| 341 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 342 | Pin::new(&mut self.0) | ||
| 343 | .poll_close_unpin(cx) | ||
| 344 | .map_err(TransportError::backend) | ||
| 345 | } | ||
| 346 | } | ||
| 347 | |||
| 348 | /// Simple error type for inline error messages. | ||
| 349 | #[derive(Debug)] | ||
| 350 | struct IoError(&'static str); | ||
| 351 | |||
| 352 | impl fmt::Display for IoError { | ||
| 353 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 354 | f.write_str(self.0) | ||
| 355 | } | ||
| 356 | } | ||
| 357 | |||
| 358 | impl std::error::Error for IoError {} | ||