diff --git a/NEWS.md b/NEWS.md index ce19f2ea..4daf130e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,25 @@ # NEWS +4.1.0 - unreleased +------------------ + +### Added + +- WebTransport client API (`hackney:wt_connect/1,2`, `wt_send/2`, + `wt_recv/1,2`, `wt_setopts/2`, `wt_close/1,2`). It mirrors the WebSocket + API so code can switch by swapping the `ws_` prefix for `wt_`. Runs over + HTTP/3 (QUIC) by default, HTTP/2 optional. One session multiplexes many + streams (`wt_open_stream/2`, `wt_stream_send/3,4`, `wt_stream_recv/2,3`, + `wt_close_stream/2`, `wt_reset_stream/3`, `wt_stop_sending/3`) plus + unreliable datagrams (`wt_send_datagram/2`) and `wt_session_info/1`. + Backed by the `webtransport` library. Caller-supplied request headers and + path are checked for CR/LF/NUL, and a buffer cap bounds unread data. + See the WebTransport Guide. + +### Dependencies + +- Add `webtransport` 0.2.6. + 4.0.3 - 2026-05-28 ------------------ diff --git a/README.md b/README.md index b60cfad4..5a2ee9db 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ An HTTP client for Erlang. Simple, reliable, fast. - **Streaming** - Stream request bodies, response bodies, or both. Handle large files without loading them in memory. - **Async responses** - Get response chunks as messages. Process other work while waiting. - **WebSocket support** - Full WebSocket client with the same process-per-connection model. +- **WebTransport support** - WebTransport client over HTTP/3 (or HTTP/2) with a WebSocket-shaped API; switch by swapping the `ws_` prefix for `wt_`. - **IPv6 first** - Happy Eyeballs algorithm tries IPv6 before IPv4 for faster connections on modern networks. - **SSL by default** - Secure connections with certificate verification using Mozilla's CA bundle. - **Automatic decompression** - Transparently decompress gzip/deflate responses with `{auto_decompress, true}`. @@ -56,6 +57,7 @@ Payload = <<"{\"key\": \"value\"}">>, | [HTTP/2 Guide](guides/http2_guide.md) | HTTP/2 protocol, ALPN, multiplexing, flow control | | [HTTP/3 Guide](guides/http3_guide.md) | HTTP/3 over QUIC, opt-in configuration, Alt-Svc | | [WebSocket Guide](guides/websocket_guide.md) | Connect, send, receive, active mode | +| [WebTransport Guide](guides/webtransport_guide.md) | Streams, datagrams, multiplexing, server handlers | | [Design Guide](guides/design.md) | Architecture, pooling, load regulation internals | | [Migration Guide](guides/MIGRATION.md) | Upgrading from hackney 1.x | | [API Reference](https://hexdocs.pm/hackney) | Full module documentation | @@ -140,6 +142,17 @@ ok = hackney:ws_send(Conn, {text, <<"hello">>}), hackney:ws_close(Conn). ``` +### WebTransport + +Same shape as WebSocket, over HTTP/3 (QUIC). Swap `ws_` for `wt_`: + +```erlang +{ok, Conn} = hackney:wt_connect(<<"https://example.com/wt">>), +ok = hackney:wt_send(Conn, {binary, <<"hello">>}), +{ok, {binary, <<"hello">>}} = hackney:wt_recv(Conn), +hackney:wt_close(Conn). +``` + ### HTTP/2 HTTP/2 is used automatically when the server supports it: diff --git a/guides/webtransport_guide.md b/guides/webtransport_guide.md new file mode 100644 index 00000000..bfcd6de8 --- /dev/null +++ b/guides/webtransport_guide.md @@ -0,0 +1,271 @@ +# WebTransport Guide + +hackney provides a WebTransport client that mirrors the WebSocket API, so +existing code can move from WebSocket to WebTransport by swapping the `ws_` +prefix for `wt_`. It runs over HTTP/3 (QUIC) by default, with HTTP/2 as an +option, and uses the same process-per-connection model. + +WebTransport is the analog of an HTTP/2 connection: one connection carries +many multiplexed streams plus unreliable datagrams. + +## Quick Start + +```erlang +{ok, Conn} = hackney:wt_connect(<<"https://example.com/wt">>), +ok = hackney:wt_send(Conn, {binary, <<"Hello!">>}), +{ok, {binary, <<"Hello!">>}} = hackney:wt_recv(Conn), +hackney:wt_close(Conn). +``` + +Compare with WebSocket; the shape is identical: + +```erlang +{ok, Conn} = hackney:ws_connect(<<"wss://example.com/socket">>), +ok = hackney:ws_send(Conn, {binary, <<"Hello!">>}), +{ok, {binary, <<"Hello!">>}} = hackney:ws_recv(Conn), +hackney:ws_close(Conn). +``` + +## Connecting + +WebTransport always runs over TLS. Use the `https://` scheme; `wss://` is +accepted as an alias so a URL can carry over unchanged. + +```erlang +{ok, Conn} = hackney:wt_connect(<<"https://example.com/wt">>). +``` + +### Connection with Options + +```erlang +{ok, Conn} = hackney:wt_connect(<<"https://example.com/wt">>, [ + {transport, h3}, + {connect_timeout, 5000}, + {recv_timeout, 30000}, + {headers, [{<<"authorization">>, <<"Bearer token">>}]} +]). +``` + +### Available Options + +| Option | Default | Description | +|--------|---------|-------------| +| `transport` | `h3` | `h3` (QUIC) or `h2` | +| `connect_timeout` | 8000 | Session handshake timeout (ms) | +| `recv_timeout` | infinity | Default receive timeout (ms) | +| `active` | `false` | Active mode: false, true, once | +| `headers` | `[]` | Extra headers for the CONNECT request | +| `ssl_options` | `[]` | TLS options: `verify`, `cacerts`/`cacertfile`, `cert`/`certfile`, `key`/`keyfile` | +| `verify` | `verify_peer` | `verify_peer` or `verify_none` | +| `compat_mode` | `latest` | `latest` or `legacy_browser_compat` | +| `max_recv_buffer` | 67108864 | Cap (bytes) on buffered, unread data | + +When verifying with no CA configured, hackney uses the bundled `certifi` +trust store, the same as for HTTPS requests. + +## The Default Message Channel + +`wt_send`/`wt_recv` operate on a single persistent bidirectional stream +opened at connect time. This is the drop-in replacement for the WebSocket +message channel. + +```erlang +ok = hackney:wt_send(Conn, {binary, <<1, 2, 3>>}). +ok = hackney:wt_send(Conn, {text, <<"text is sent as bytes">>}). +{ok, {binary, Data}} = hackney:wt_recv(Conn). +{ok, {binary, Data}} = hackney:wt_recv(Conn, 5000). %% With timeout +``` + +WebTransport has no message framing of its own. To stay interoperable with +any server, hackney does not add a wire format: bytes are written to the +stream as-is, and a received chunk is returned as `{binary, Data}`. Chunks +are reliable and ordered, but are **not** guaranteed to line up with your +send boundaries. If you need message boundaries, delimit them yourself, or +use one stream per message (see below). + +## Datagrams + +Datagrams are unreliable, unordered, and size-limited, like UDP. + +```erlang +ok = hackney:wt_send_datagram(Conn, <<"ping">>). +%% Inbound datagrams arrive on the default channel: +{ok, {datagram, Data}} = hackney:wt_recv(Conn). +``` + +## Multiplexed Streams + +Open as many streams as you want over one session, just like HTTP/2 +multiplexes requests over one connection. Each stream has its own send and +receive channel keyed by id. + +```erlang +{ok, StreamId} = hackney:wt_open_stream(Conn, bidi), %% or uni +ok = hackney:wt_stream_send(Conn, StreamId, <<"request">>), +ok = hackney:wt_stream_send(Conn, StreamId, <<"!">>, fin), %% close write side +{ok, Data} = hackney:wt_stream_recv(Conn, StreamId), +{ok, {fin, Last}} = hackney:wt_stream_recv(Conn, StreamId). %% peer ended stream +``` + +Stream lifecycle: + +```erlang +hackney:wt_close_stream(Conn, StreamId). %% graceful FIN +hackney:wt_reset_stream(Conn, StreamId, ErrorCode). %% abort +hackney:wt_stop_sending(Conn, StreamId, ErrorCode). %% ask peer to stop +``` + +Data on streams the server opens (rather than ones you opened) is surfaced +on the default channel as `{stream, Id, Data}` / `{stream_fin, Id, Data}`. + +## Active Mode + +In active mode every event is forwarded to the owner process, uniformly +tagged with its stream id. + +```erlang +{ok, Conn} = hackney:wt_connect(URL, [{active, true}]), +receive + {hackney_wt, Conn, {binary, Data}} -> handle(Data); + {hackney_wt, Conn, {datagram, Data}} -> handle_dgram(Data); + {hackney_wt, Conn, {stream, Id, Data}} -> handle_stream(Id, Data); + {hackney_wt, Conn, {stream_fin, Id, Data}} -> handle_fin(Id, Data); + {hackney_wt, Conn, closed} -> done; + {hackney_wt_error, Conn, Reason} -> error +end. +``` + +`active, once` delivers a single message and reverts to passive: + +```erlang +{ok, Conn} = hackney:wt_connect(URL, [{active, once}]), +receive {hackney_wt, Conn, Msg} -> ok end, +hackney:wt_setopts(Conn, [{active, once}]). %% arm for the next one +``` + +## Closing Connections + +```erlang +hackney:wt_close(Conn). +hackney:wt_close(Conn, {0, <<"bye">>}). %% {ErrorCode, Reason} +``` + +## Server Side + +hackney is the WebTransport **client**. The peer is any WebTransport +server. To run one in Erlang, use the `webtransport` library (a hackney +dependency, from the [erlang-webtransport](https://github.com/benoitc/erlang-webtransport) +project): start a listener and implement the `webtransport_handler` +behaviour. The handler callbacks are where you receive what the hackney +client sends, and the actions you return are what the client receives back. + +### Handler + +```erlang +-module(my_wt_handler). +-behaviour(webtransport_handler). + +-export([init/3, handle_stream/4, handle_stream_fin/4, + handle_datagram/2, handle_stream_closed/3, terminate/2]). + +init(_Session, _Req, _Opts) -> + {ok, #{}}. + +%% A chunk arrived on a stream (no FIN yet). Reply by returning a `send' +%% action on the SAME stream id; that is what the client reads back. +handle_stream(StreamId, bidi, Data, State) -> + {ok, State, [{send, StreamId, Data}]}; %% echo +handle_stream(_StreamId, uni, _Data, State) -> + {ok, State}. + +%% The peer closed its write side (FIN). For a bidi stream you can answer +%% with a final chunk and FIN to close yours. +handle_stream_fin(StreamId, bidi, Data, State) -> + {ok, State, [{send, StreamId, Data, fin}]}; +handle_stream_fin(_StreamId, uni, _Data, State) -> + {ok, State}. + +handle_datagram(Data, State) -> + {ok, State, [{send_datagram, Data}]}. %% echo a datagram + +handle_stream_closed(_StreamId, _Reason, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. +``` + +Available actions a callback may return in its `{ok, State, Actions}` +result: `{send, StreamId, Data}`, `{send, StreamId, Data, fin}`, +`{send_datagram, Data}`, `{open_stream, bidi | uni}`, +`{close_stream, StreamId}`, `{reset_stream, StreamId, Code}`, +`{stop_sending, StreamId, Code}`, `drain_session`, +`{close_session, Code, Reason}`. + +### Listener + +```erlang +{ok, _} = application:ensure_all_started(webtransport), +{ok, _Pid} = webtransport:start_listener(my_listener, #{ + transport => h3, %% or h2 + port => 4433, + certfile => "cert.pem", + keyfile => "key.pem", + handler => my_wt_handler +}). +%% ... later +ok = webtransport:stop_listener(my_listener). +``` + +### How client calls map to server callbacks + +| hackney client | server callback | reply to client | +|----------------|-----------------|-----------------| +| `wt_send(C, {binary, D})` (default stream) | `handle_stream(Id, bidi, D, S)` | `{send, Id, D2}` → client `wt_recv` returns `{binary, D2}` | +| `wt_send_datagram(C, D)` | `handle_datagram(D, S)` | `{send_datagram, D2}` → client `wt_recv` returns `{datagram, D2}` | +| `wt_open_stream(C, bidi)` + `wt_stream_send(C, Id, D)` | `handle_stream(Id, bidi, D, S)` | `{send, Id, D2}` → client `wt_stream_recv(C, Id)` returns `{ok, D2}` | +| `wt_stream_send(C, Id, D, fin)` | `handle_stream_fin(Id, bidi, D, S)` | `{send, Id, D2, fin}` → client gets `{ok, {fin, D2}}` | + +Reply on the **same stream id** you were called with to answer on that +stream. To push data the client did not ask for, return `{open_stream, ...}` +and send on the new id; the client surfaces it on the default channel as +`{stream, Id, Data}` (or `{hackney_wt, C, {stream, Id, Data}}` in active +mode). + +Browsers and other WebTransport clients work against the same listener; +nothing about the server is hackney-specific. + +## Differences from WebSocket + +| WebSocket | WebTransport | +|-----------|--------------| +| Single ordered message channel | Many multiplexed streams + datagrams | +| Reliable, framed messages | Reliable streams (no framing) + unreliable datagrams | +| `ping`/`pong` frames | Not used (QUIC keepalive); `wt_send` returns `{error, {unsupported_frame, ping}}` | +| `ws://` / `wss://` | `https://` (TLS only) | +| HTTP/1.1 Upgrade, proxies | HTTP/3 (QUIC) or HTTP/2, no proxy | + +`wt_recv`/`wt_stream_recv` return `{error, {active_mode, Mode}}` in active +mode, `{error, timeout}` on timeout, and `{error, closed}` once the session +ends and buffered data is drained. + +## Example: Echo Client + +```erlang +-module(wt_echo). +-export([run/1]). + +run(URL) -> + {ok, Conn} = hackney:wt_connect(URL), + ok = hackney:wt_send(Conn, {binary, <<"hello">>}), + {ok, {binary, Reply}} = hackney:wt_recv(Conn, 5000), + io:format("echo: ~s~n", [Reply]), + hackney:wt_close(Conn). +``` + +## Next Steps + +- [WebSocket Guide](websocket_guide.md) +- [HTTP/3 Guide](http3_guide.md) +- [Getting Started](../GETTING_STARTED.md) +``` diff --git a/rebar.config b/rebar.config index d5ef01bc..eb2c110c 100644 --- a/rebar.config +++ b/rebar.config @@ -52,6 +52,8 @@ {quic, "1.4.5"}, %% Pure Erlang HTTP/2 stack {h2, "0.6.1"}, + %% WebTransport client (HTTP/3 and HTTP/2) - powers the wt_* API + {webtransport, "0.2.6"}, {idna, "~>7.1.0"}, {mimerl, "~>1.4"}, {certifi, "~>2.16.0"}, @@ -80,6 +82,7 @@ {"guides/http2_guide.md", #{title => "HTTP/2 Guide"}}, {"guides/http3_guide.md", #{title => "HTTP/3 Guide"}}, {"guides/websocket_guide.md", #{title => "WebSocket Guide"}}, + {"guides/webtransport_guide.md", #{title => "WebTransport Guide"}}, {"guides/middleware.md", #{title => "Middleware Guide"}}, {"guides/design.md", #{title => "Design Guide"}}, {"guides/MIGRATION.md", #{title => "Migration Guide"}}, @@ -108,7 +111,7 @@ ]}, {base_plt_apps, [erts, stdlib, kernel, crypto, runtime_tools]}, {plt_apps, top_level_deps}, - {plt_extra_apps, [quic, h2]}, + {plt_extra_apps, [quic, h2, webtransport]}, {plt_location, local}, {plt_prefix, "hackney"}, {base_plt_location, "."}, diff --git a/src/hackney.app.src b/src/hackney.app.src index 03bf8e92..eac6d527 100644 --- a/src/hackney.app.src +++ b/src/hackney.app.src @@ -4,7 +4,7 @@ {application, hackney, [ {description, "Simple HTTP client with HTTP/1.1, HTTP/2, and HTTP/3 support"}, - {vsn, "4.0.3"}, + {vsn, "4.1.0"}, {registered, [hackney_pool]}, {applications, [kernel, stdlib, @@ -17,7 +17,8 @@ certifi, ssl_verify_fun, h2, - quic]}, + quic, + webtransport]}, {included_applications, []}, {mod, { hackney_app, []}}, {env, [{timeout, 150000}, diff --git a/src/hackney.erl b/src/hackney.erl index b920e528..68fae8dd 100644 --- a/src/hackney.erl +++ b/src/hackney.erl @@ -27,6 +27,21 @@ ws_setopts/2, ws_close/1, ws_close/2]). +%% WebTransport API +-export([wt_connect/1, wt_connect/2, + wt_send/2, + wt_recv/1, wt_recv/2, + wt_setopts/2, + wt_close/1, wt_close/2, + wt_open_stream/2, + wt_stream_send/3, wt_stream_send/4, + wt_stream_recv/2, wt_stream_recv/3, + wt_close_stream/2, + wt_reset_stream/3, + wt_stop_sending/3, + wt_send_datagram/2, + wt_session_info/1]). + -export([redirect_location/1, location/1]). -export([get_version/0]). @@ -746,6 +761,240 @@ ws_close(WsPid) when is_pid(WsPid) -> ws_close(WsPid, {Code, Reason}) when is_pid(WsPid) -> hackney_ws:close(WsPid, {Code, Reason}). +%%==================================================================== +%% WebTransport API +%%==================================================================== + +%% @doc Connect to a WebTransport server. +%% +%% The URL must use the https:// scheme (wss:// is accepted as an alias so +%% existing ws_* code can switch over by changing only the function name). +%% WebTransport always runs over TLS. +%% +%% Options: +%% +%% +%% Returns `{ok, WtPid}' on success, where WtPid is the hackney_wt process. +-spec wt_connect(binary() | string()) -> {ok, pid()} | {error, term()}. +wt_connect(URL) -> + wt_connect(URL, []). + +-spec wt_connect(binary() | string(), list()) -> {ok, pid()} | {error, term()}. +wt_connect(URL, Options) when is_binary(URL) orelse is_list(URL) -> + #hackney_url{ + scheme = Scheme, + host = Host, + port = Port, + path = Path0, + qs = Query + } = hackney_url:parse_url(URL), + + %% WebTransport runs over HTTP/3 or HTTP/2, both TLS-only. + case Scheme of + https -> ok; + wss -> ok; + _ -> error({invalid_webtransport_scheme, Scheme}) + end, + + Path = case Query of + <<>> -> Path0; + _ -> <> + end, + + Headers = normalize_ws_headers(proplists:get_value(headers, Options, [])), + + %% Mirror the WebSocket GHSA-f9vr guard: reject CR/LF/NUL in the request + %% path or in any caller-supplied header before it reaches the wire. + case valid_wt_fields(Host, Path, Headers) of + ok -> + Transport = proplists:get_value(transport, Options, h3), + ConnectTimeout = proplists:get_value(connect_timeout, Options, 8000), + WtOpts = #{ + host => Host, + port => Port, + path => Path, + transport => Transport, + connect_opts => build_wt_connect_opts(Options, Headers), + connect_timeout => ConnectTimeout, + recv_timeout => proplists:get_value(recv_timeout, Options, infinity), + active => proplists:get_value(active, Options, false), + max_recv_buffer => proplists:get_value(max_recv_buffer, Options, 16#4000000) + }, + case hackney_wt:start_link(WtOpts) of + {ok, WtPid} -> + try hackney_wt:connect(WtPid, ConnectTimeout) of + ok -> + {ok, WtPid}; + {error, Reason} -> + shutdown_wt(WtPid), + {error, Reason} + catch + exit:{noproc, _} -> + {error, {wt_process_died, noproc}} + end; + {error, Reason} -> + {error, Reason} + end; + {error, _} = Err -> + Err + end. + +%% @doc Send on a WebTransport connection. +%% Frame forms: `{text, Data}', `{binary, Data}', `Data' (write to the +%% default stream); `{datagram, Data}'; `{stream, StreamId, Data}' or +%% `{stream, StreamId, Data, fin|nofin}'. +-spec wt_send(pid(), hackney_wt:wt_frame()) -> ok | {error, term()}. +wt_send(WtPid, Frame) when is_pid(WtPid) -> + hackney_wt:send(WtPid, Frame). + +%% @doc Receive the next message on the default channel (passive mode). +-spec wt_recv(pid()) -> {ok, hackney_wt:wt_msg()} | {error, term()}. +wt_recv(WtPid) when is_pid(WtPid) -> + hackney_wt:recv(WtPid). + +-spec wt_recv(pid(), timeout()) -> {ok, hackney_wt:wt_msg()} | {error, term()}. +wt_recv(WtPid, Timeout) when is_pid(WtPid) -> + hackney_wt:recv(WtPid, Timeout). + +%% @doc Set WebTransport options. Supported: [{active, true | false | once}] +-spec wt_setopts(pid(), list()) -> ok | {error, term()}. +wt_setopts(WtPid, Opts) when is_pid(WtPid) -> + hackney_wt:setopts(WtPid, Opts). + +%% @doc Close a WebTransport session gracefully. +-spec wt_close(pid()) -> ok. +wt_close(WtPid) when is_pid(WtPid) -> + hackney_wt:close(WtPid). + +-spec wt_close(pid(), {non_neg_integer(), binary()}) -> ok. +wt_close(WtPid, {Code, Reason}) when is_pid(WtPid) -> + hackney_wt:close(WtPid, {Code, Reason}). + +%% @doc Open a new stream multiplexed over the session. +-spec wt_open_stream(pid(), bidi | uni) -> {ok, non_neg_integer()} | {error, term()}. +wt_open_stream(WtPid, Type) when is_pid(WtPid) -> + hackney_wt:open_stream(WtPid, Type). + +%% @doc Write to a stream (no FIN). +-spec wt_stream_send(pid(), non_neg_integer(), iodata()) -> ok | {error, term()}. +wt_stream_send(WtPid, StreamId, Data) when is_pid(WtPid) -> + hackney_wt:stream_send(WtPid, StreamId, Data). + +%% @doc Write to a stream, optionally closing the write side (FIN). +-spec wt_stream_send(pid(), non_neg_integer(), iodata(), fin | nofin) -> ok | {error, term()}. +wt_stream_send(WtPid, StreamId, Data, Fin) when is_pid(WtPid) -> + hackney_wt:stream_send(WtPid, StreamId, Data, Fin). + +%% @doc Receive the next chunk on a stream (passive mode). +-spec wt_stream_recv(pid(), non_neg_integer()) -> hackney_wt:stream_msg(). +wt_stream_recv(WtPid, StreamId) when is_pid(WtPid) -> + hackney_wt:stream_recv(WtPid, StreamId). + +-spec wt_stream_recv(pid(), non_neg_integer(), timeout()) -> hackney_wt:stream_msg(). +wt_stream_recv(WtPid, StreamId, Timeout) when is_pid(WtPid) -> + hackney_wt:stream_recv(WtPid, StreamId, Timeout). + +%% @doc Close a stream gracefully (send FIN). +-spec wt_close_stream(pid(), non_neg_integer()) -> ok | {error, term()}. +wt_close_stream(WtPid, StreamId) when is_pid(WtPid) -> + hackney_wt:close_stream(WtPid, StreamId). + +%% @doc Abruptly terminate a stream with an error code. +-spec wt_reset_stream(pid(), non_neg_integer(), non_neg_integer()) -> ok | {error, term()}. +wt_reset_stream(WtPid, StreamId, ErrorCode) when is_pid(WtPid) -> + hackney_wt:reset_stream(WtPid, StreamId, ErrorCode). + +%% @doc Ask the peer to stop sending on a stream. +-spec wt_stop_sending(pid(), non_neg_integer(), non_neg_integer()) -> ok | {error, term()}. +wt_stop_sending(WtPid, StreamId, ErrorCode) when is_pid(WtPid) -> + hackney_wt:stop_sending(WtPid, StreamId, ErrorCode). + +%% @doc Send an unreliable datagram. +-spec wt_send_datagram(pid(), iodata()) -> ok | {error, term()}. +wt_send_datagram(WtPid, Data) when is_pid(WtPid) -> + hackney_wt:send_datagram(WtPid, Data). + +%% @doc Return WebTransport session information. +-spec wt_session_info(pid()) -> {ok, map()} | {error, term()}. +wt_session_info(WtPid) when is_pid(WtPid) -> + hackney_wt:session_info(WtPid). + +%% @private Signal the WebTransport process to shut down, ignoring errors. +shutdown_wt(WtPid) -> + try exit(WtPid, shutdown) catch _:_ -> ok end. + +%% @private Reject CR/LF/NUL in the authority, request path, or any +%% caller-supplied header used in the WebTransport CONNECT request +%% (GHSA-f9vr analog). +valid_wt_fields(Host, Path, Headers) -> + Fields = [Host, Path | lists:flatmap(fun({N, V}) -> [N, V] end, Headers)], + case lists:any(fun has_ctl_bytes/1, Fields) of + true -> {error, invalid_handshake_header}; + false -> ok + end. + +has_ctl_bytes(Bin) when is_binary(Bin) -> + binary:match(Bin, [<<"\r">>, <<"\n">>, <<0>>]) =/= nomatch; +has_ctl_bytes(L) when is_list(L) -> + has_ctl_bytes(iolist_to_binary(L)); +has_ctl_bytes(_) -> + false. + +%% @private Translate hackney connect options into a webtransport:connect/4 +%% options map. Honours a caller-supplied CA (cacerts/cacertfile); when +%% verifying with no CA given, fall back to the bundled certifi store. +build_wt_connect_opts(Options, Headers) -> + SslOpts = proplists:get_value(ssl_options, Options, []), + Verify = proplists:get_value(verify, Options, + proplists:get_value(verify, SslOpts, verify_peer)), + Base = #{headers => Headers, verify => Verify}, + Base1 = case proplists:get_value(compat_mode, Options) of + undefined -> Base; + CompatMode -> Base#{compat_mode => CompatMode} + end, + Base2 = wt_ca_opts(Base1, SslOpts, Verify), + wt_cert_opts(Base2, SslOpts). + +%% @private CA trust store selection. +wt_ca_opts(Base, SslOpts, Verify) -> + case {proplists:get_value(cacertfile, SslOpts), + proplists:get_value(cacerts, SslOpts)} of + {undefined, undefined} when Verify =:= verify_peer -> + Base#{cacerts => certifi:cacerts()}; + {undefined, undefined} -> + Base; + {CAFile, _} when CAFile =/= undefined -> + Base#{cacertfile => CAFile}; + {_, CACerts} -> + Base#{cacerts => CACerts} + end. + +%% @private Optional client certificate / key. +wt_cert_opts(Base, SslOpts) -> + Base1 = case {proplists:get_value(certfile, SslOpts), + proplists:get_value(cert, SslOpts)} of + {CertFile, _} when CertFile =/= undefined -> Base#{certfile => CertFile}; + {_, Cert} when Cert =/= undefined -> Base#{cert => Cert}; + _ -> Base + end, + case {proplists:get_value(keyfile, SslOpts), + proplists:get_value(key, SslOpts)} of + {KeyFile, _} when KeyFile =/= undefined -> Base1#{keyfile => KeyFile}; + {_, Key} when Key =/= undefined -> Base1#{key => Key}; + _ -> Base1 + end. + %% @private Normalize WebSocket headers to {binary(), binary()} format normalize_ws_headers(Headers) -> [{hackney_bstr:to_binary(Name), hackney_bstr:to_binary(Value)} diff --git a/src/hackney_wt.erl b/src/hackney_wt.erl new file mode 100644 index 00000000..f55898d5 --- /dev/null +++ b/src/hackney_wt.erl @@ -0,0 +1,894 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. +%%% +%%% Copyright (c) 2026 Benoît Chesneau +%%% +%%% @doc gen_statem process for WebTransport client connections. +%%% +%%% This module mirrors {@link hackney_ws} so an application can move from +%%% WebSocket to WebTransport by swapping the `ws_' API prefix for `wt_'. +%%% It wraps an `erlang-webtransport' client session (HTTP/3 by default, +%%% HTTP/2 optional) and owns that session process. +%%% +%%% == Connection reuse and multiplexing == +%%% +%%% A WebTransport session is the analog of an HTTP/2 connection: one +%%% `hackney_wt' process owns a single session and many streams are +%%% multiplexed over it. `open_stream/2' opens as many bidirectional or +%%% unidirectional streams as the peer's flow control allows; each stream +%%% has its own send (`stream_send/3,4') and receive (`stream_recv/2,3') +%%% channel, keyed by stream id, exactly like the `h2_streams' map in +%%% `hackney_conn'. +%%% +%%% == Default message channel == +%%% +%%% WebTransport has no message framing of its own, so to stay +%%% interoperable with any server we do NOT invent a wire format. The +%%% `send/2' / `recv/1' convenience channel maps onto a single persistent +%%% bidirectional stream opened at connect time: `send/2' writes bytes to +%%% it and `recv/1' returns the next chunk received on it as +%%% `{binary, Data}'. Datagrams and data from server-opened streams are +%%% also surfaced on this channel as `{datagram, Data}' and +%%% `{stream, Id, Data}'. Because there is no framing, chunks are not +%%% guaranteed to align with send boundaries; an application that needs +%%% message boundaries must self-delimit. +%%% +%%% == Delivery modes == +%%% +%%% In passive mode (the default) data is buffered per channel and read +%%% with `recv'/`stream_recv'. In active mode every event is forwarded to +%%% the owner uniformly tagged with its stream id: +%%% `{hackney_wt, Conn, {binary, Data}}', `{hackney_wt, Conn, {datagram, +%%% Data}}', `{hackney_wt, Conn, {stream, Id, Data}}', `{hackney_wt, Conn, +%%% {stream_fin, Id, Data}}', `{hackney_wt, Conn, {stream_closed, Id, +%%% Reason}}' and `{hackney_wt, Conn, closed}'. +%%% +%%% States: +%%% +-module(hackney_wt). +-behaviour(gen_statem). + +%% API +-export([ + start_link/1, + connect/1, connect/2, + send/2, + recv/1, recv/2, + setopts/2, + close/1, close/2, + controlling_process/2, + open_stream/2, + stream_send/3, stream_send/4, + stream_recv/2, stream_recv/3, + close_stream/2, + reset_stream/3, + stop_sending/3, + send_datagram/2, + session_info/1 +]). + +%% gen_statem callbacks +-export([ + init/1, + callback_mode/0, + terminate/3, + code_change/4 +]). + +%% State functions +-export([ + idle/3, + connected/3, + closed/3 +]). + +-define(CONNECT_TIMEOUT, 8000). +-define(RECV_TIMEOUT, infinity). + +%% Bound the bytes buffered across all passive receive queues so a hostile +%% server cannot drive the client to OOM by flooding stream/datagram data +%% that is never consumed. `infinity' disables the cap. This complements +%% WebTransport flow control (which bounds in-flight bytes per stream) by +%% bounding the already-decoded queues; same intent as the WebSocket +%% client's GHSA-q8jg caps. +-define(DEFAULT_MAX_RECV_BUFFER, 16#4000000). %% 64 MiB + +%% A normalized message on the primary channel, handed to recv/2 or, in +%% active mode, to the owner. +-type wt_msg() :: {binary, binary()} + | {datagram, binary()} + | {stream, non_neg_integer(), binary()} + | {stream_fin, non_neg_integer(), binary()} + | {stream_closed, non_neg_integer(), term()}. + +%% Accepted by send/2. +-type wt_frame() :: {text, iodata()} + | {binary, iodata()} + | iodata() + | {datagram, iodata()} + | {stream, non_neg_integer(), iodata()} + | {stream, non_neg_integer(), iodata(), fin | nofin}. + +%% Returned by stream_recv/2,3. +-type stream_msg() :: {ok, binary()} + | {ok, {fin, binary()}} + | {error, term()}. + +-export_type([wt_msg/0, wt_frame/0, stream_msg/0]). + +%% Per-stream receive state for a client-opened stream (mirrors the role +%% of an HTTP/2 stream's pending caller in hackney_conn). +-record(stream, { + q = queue:new() :: queue:queue(), + from :: {pid(), reference()} | undefined, + %% undefined while open; set when the peer ends the stream + closed :: undefined | term() +}). + +-record(wt_data, { + %% Connection owner (linked via start_link, trap_exit handles death) + owner :: pid(), + + %% Connection identity + host :: string() | binary(), + port :: inet:port_number(), + transport = h3 :: h2 | h3, + path :: binary(), + + %% webtransport:connect/4 options map (TLS, headers, compat_mode, ...) + connect_opts = #{} :: map(), + connect_timeout = ?CONNECT_TIMEOUT :: timeout(), + recv_timeout = ?RECV_TIMEOUT :: timeout(), + + %% Delivery mode + active = false :: false | true | once, + + %% Underlying webtransport session and the persistent default stream + session :: pid() | undefined, + default_stream :: non_neg_integer() | undefined, + + %% Per-stream state for client-opened streams: StreamId => #stream{} + streams = #{} :: #{non_neg_integer() => #stream{}}, + + %% Primary channel buffering (default stream + datagrams + server streams) + recv_q = queue:new() :: queue:queue(), + recv_from :: {pid(), reference()} | undefined, + + %% Total buffered bytes across every queue, for the OOM cap + recv_bytes = 0 :: non_neg_integer(), + max_recv_buffer = ?DEFAULT_MAX_RECV_BUFFER :: non_neg_integer() | infinity, + + %% Reason recorded once the session ends + closed_reason :: term() +}). + +%%==================================================================== +%% API +%%==================================================================== + +%% @doc Start a WebTransport connection process. +%% Options: +%% +-spec start_link(map()) -> {ok, pid()} | {error, term()}. +start_link(Opts) when is_map(Opts) -> + gen_statem:start_link(?MODULE, [self(), Opts], []). + +%% @doc Establish the WebTransport session. Blocks until the CONNECT +%% completes or fails. +-spec connect(pid()) -> ok | {error, term()}. +connect(Pid) -> + connect(Pid, ?CONNECT_TIMEOUT). + +-spec connect(pid(), timeout()) -> ok | {error, term()}. +connect(Pid, Timeout) -> + %% The handshake is driven inside the process; the internal timeout + %% below bounds it, so the call itself waits indefinitely. + gen_statem:call(Pid, {connect, Timeout}, infinity). + +%% @doc Send on the connection. +%% Frame forms: +%% +-spec send(pid(), wt_frame()) -> ok | {error, term()}. +send(Pid, Frame) -> + gen_statem:call(Pid, {send, Frame}). + +%% @doc Receive the next message on the primary channel (passive mode only). +-spec recv(pid()) -> {ok, wt_msg()} | {error, term()}. +recv(Pid) -> + gen_statem:call(Pid, {recv, default}, infinity). + +-spec recv(pid(), timeout()) -> {ok, wt_msg()} | {error, term()}. +recv(Pid, Timeout) -> + gen_statem:call(Pid, {recv, Timeout}, infinity). + +%% @doc Set options. Supported: [{active, true|false|once}] +-spec setopts(pid(), list()) -> ok | {error, term()}. +setopts(Pid, Opts) -> + gen_statem:call(Pid, {setopts, Opts}). + +%% @doc Close the session gracefully (error code 0, no reason). +-spec close(pid()) -> ok. +close(Pid) -> + close(Pid, {0, <<>>}). + +-spec close(pid(), {non_neg_integer(), binary()}) -> ok. +close(Pid, {Code, Reason}) -> + gen_statem:cast(Pid, {close, Code, Reason}). + +%% @doc Assign a new controlling process. +-spec controlling_process(pid(), pid()) -> ok | {error, term()}. +controlling_process(Pid, NewOwner) -> + gen_statem:call(Pid, {controlling_process, NewOwner}). + +%% @doc Open a new stream multiplexed over the session. Returns its id. +-spec open_stream(pid(), bidi | uni) -> {ok, non_neg_integer()} | {error, term()}. +open_stream(Pid, Type) when Type =:= bidi; Type =:= uni -> + gen_statem:call(Pid, {open_stream, Type}). + +%% @doc Write to a stream (no FIN). +-spec stream_send(pid(), non_neg_integer(), iodata()) -> ok | {error, term()}. +stream_send(Pid, StreamId, Data) -> + gen_statem:call(Pid, {stream_send, StreamId, Data, nofin}). + +%% @doc Write to a stream, optionally closing the write side (FIN). +-spec stream_send(pid(), non_neg_integer(), iodata(), fin | nofin) -> ok | {error, term()}. +stream_send(Pid, StreamId, Data, Fin) when Fin =:= fin; Fin =:= nofin -> + gen_statem:call(Pid, {stream_send, StreamId, Data, Fin}). + +%% @doc Receive the next chunk on a client-opened stream (passive mode). +%% Returns `{ok, Data}', `{ok, {fin, Data}}' when the peer ends the +%% stream, or `{error, Reason}'. +-spec stream_recv(pid(), non_neg_integer()) -> stream_msg(). +stream_recv(Pid, StreamId) -> + gen_statem:call(Pid, {stream_recv, StreamId, default}, infinity). + +-spec stream_recv(pid(), non_neg_integer(), timeout()) -> stream_msg(). +stream_recv(Pid, StreamId, Timeout) -> + gen_statem:call(Pid, {stream_recv, StreamId, Timeout}, infinity). + +%% @doc Close a stream gracefully (send FIN). +-spec close_stream(pid(), non_neg_integer()) -> ok | {error, term()}. +close_stream(Pid, StreamId) -> + gen_statem:call(Pid, {close_stream, StreamId}). + +%% @doc Abruptly terminate a stream with an error code. +-spec reset_stream(pid(), non_neg_integer(), non_neg_integer()) -> ok | {error, term()}. +reset_stream(Pid, StreamId, ErrorCode) -> + gen_statem:call(Pid, {reset_stream, StreamId, ErrorCode}). + +%% @doc Ask the peer to stop sending on a stream. +-spec stop_sending(pid(), non_neg_integer(), non_neg_integer()) -> ok | {error, term()}. +stop_sending(Pid, StreamId, ErrorCode) -> + gen_statem:call(Pid, {stop_sending, StreamId, ErrorCode}). + +%% @doc Send an unreliable datagram. +-spec send_datagram(pid(), iodata()) -> ok | {error, term()}. +send_datagram(Pid, Data) -> + gen_statem:call(Pid, {send_datagram, Data}). + +%% @doc Return session information (transport, stream count, flow control). +-spec session_info(pid()) -> {ok, map()} | {error, term()}. +session_info(Pid) -> + gen_statem:call(Pid, session_info). + +%%==================================================================== +%% gen_statem callbacks +%%==================================================================== + +%% @private +callback_mode() -> + [state_functions, state_enter]. + +%% @private +init([Owner, Opts]) -> + process_flag(trap_exit, true), + Data = #wt_data{ + owner = Owner, + host = maps:get(host, Opts), + port = maps:get(port, Opts), + transport = maps:get(transport, Opts, h3), + path = maps:get(path, Opts, <<"/">>), + connect_opts = maps:get(connect_opts, Opts, #{}), + connect_timeout = maps:get(connect_timeout, Opts, ?CONNECT_TIMEOUT), + recv_timeout = maps:get(recv_timeout, Opts, ?RECV_TIMEOUT), + active = maps:get(active, Opts, false), + max_recv_buffer = maps:get(max_recv_buffer, Opts, ?DEFAULT_MAX_RECV_BUFFER) + }, + {ok, idle, Data}. + +%% @private +terminate(_Reason, _State, #wt_data{session = undefined}) -> + ok; +terminate(_Reason, _State, #wt_data{} = Data) -> + close_session_safe(Data), + ok. + +%% @private +code_change(_OldVsn, State, Data, _Extra) -> + {ok, State, Data}. + +%%==================================================================== +%% State: idle +%%==================================================================== + +idle(enter, _OldState, _Data) -> + keep_state_and_data; + +idle({call, From}, {connect, Timeout}, Data) -> + #wt_data{host = Host, port = Port, path = Path, + transport = Transport, connect_opts = COpts0} = Data, + COpts = COpts0#{transport => Transport, timeout => Timeout}, + case webtransport:connect(Host, Port, Path, COpts) of + {ok, Session} -> + %% Open the persistent default bidi stream eagerly; if the + %% server has not granted bidi credit yet, fall back to opening + %% it lazily on the first send. + DefaultStream = case webtransport:open_stream(Session, bidi) of + {ok, Sid} -> Sid; + {error, _} -> undefined + end, + Data1 = Data#wt_data{session = Session, default_stream = DefaultStream}, + {next_state, connected, Data1, [{reply, From, ok}]}; + {error, Reason} -> + {stop_and_reply, normal, [{reply, From, {error, Reason}}]} + end; + +idle({call, From}, _Request, _Data) -> + {keep_state_and_data, [{reply, From, {error, not_connected}}]}; + +idle(info, {'EXIT', Owner, _Reason}, #wt_data{owner = Owner}) -> + {stop, normal}; + +idle(_, _, _) -> + keep_state_and_data. + +%%==================================================================== +%% State: connected +%%==================================================================== + +connected(enter, _OldState, _Data) -> + keep_state_and_data; + +%% --- send ------------------------------------------------------------- +connected({call, From}, {send, Frame}, Data) -> + {Reply, Data1} = do_send(Frame, Data), + {keep_state, Data1, [{reply, From, Reply}]}; + +%% --- primary channel recv -------------------------------------------- +connected({call, From}, {recv, _Timeout}, #wt_data{active = Active}) + when Active =/= false -> + {keep_state_and_data, [{reply, From, {error, {active_mode, Active}}}]}; + +connected({call, From}, {recv, _Timeout}, #wt_data{recv_from = RF}) + when RF =/= undefined -> + {keep_state_and_data, [{reply, From, {error, recv_busy}}]}; + +connected({call, From}, {recv, Timeout0}, Data) -> + Timeout = recv_timeout(Timeout0, Data), + case dequeue(Data#wt_data.recv_q, Data) of + {value, Msg, Q1, Data1} -> + {keep_state, Data1#wt_data{recv_q = Q1}, [{reply, From, {ok, Msg}}]}; + {empty, _} -> + case Data#wt_data.closed_reason of + undefined -> + {keep_state, Data#wt_data{recv_from = From}, recv_timer(recv, Timeout)}; + Reason -> + {keep_state_and_data, [{reply, From, {error, Reason}}]} + end + end; + +%% --- per-stream recv -------------------------------------------------- +connected({call, From}, {stream_recv, _Sid, _Timeout}, #wt_data{active = Active}) + when Active =/= false -> + {keep_state_and_data, [{reply, From, {error, {active_mode, Active}}}]}; + +connected({call, From}, {stream_recv, Sid, Timeout0}, Data) -> + case maps:find(Sid, Data#wt_data.streams) of + error -> + {keep_state_and_data, [{reply, From, {error, unknown_stream}}]}; + {ok, #stream{from = F}} when F =/= undefined -> + {keep_state_and_data, [{reply, From, {error, recv_busy}}]}; + {ok, S} -> + do_stream_recv(From, Sid, S, recv_timeout(Timeout0, Data), Data) + end; + +%% --- options ---------------------------------------------------------- +connected({call, From}, {setopts, Opts}, Data) -> + case proplists:get_value(active, Opts) of + undefined -> + {keep_state_and_data, [{reply, From, ok}]}; + NewActive when NewActive =:= true; NewActive =:= false; NewActive =:= once -> + Data1 = apply_active(NewActive, Data), + {keep_state, Data1, [{reply, From, ok}]}; + _ -> + {keep_state_and_data, [{reply, From, {error, badarg}}]} + end; + +connected({call, From}, {controlling_process, NewOwner}, #wt_data{owner = OldOwner} = Data) -> + unlink(OldOwner), + link(NewOwner), + {keep_state, Data#wt_data{owner = NewOwner}, [{reply, From, ok}]}; + +%% --- native stream / datagram API ------------------------------------ +connected({call, From}, {open_stream, Type}, #wt_data{session = S} = Data) -> + case webtransport:open_stream(S, Type) of + {ok, Sid} = Ok -> + Streams = maps:put(Sid, #stream{}, Data#wt_data.streams), + {keep_state, Data#wt_data{streams = Streams}, [{reply, From, Ok}]}; + {error, _} = Err -> + {keep_state_and_data, [{reply, From, Err}]} + end; + +connected({call, From}, {stream_send, StreamId, SData, Fin}, #wt_data{session = S}) -> + {keep_state_and_data, [{reply, From, webtransport:send(S, StreamId, SData, Fin)}]}; + +connected({call, From}, {close_stream, StreamId}, #wt_data{session = S}) -> + {keep_state_and_data, [{reply, From, webtransport:close_stream(S, StreamId)}]}; + +connected({call, From}, {reset_stream, StreamId, Code}, #wt_data{session = S}) -> + {keep_state_and_data, [{reply, From, webtransport:reset_stream(S, StreamId, Code)}]}; + +connected({call, From}, {stop_sending, StreamId, Code}, #wt_data{session = S}) -> + {keep_state_and_data, [{reply, From, webtransport:stop_sending(S, StreamId, Code)}]}; + +connected({call, From}, {send_datagram, SData}, #wt_data{session = S}) -> + {keep_state_and_data, [{reply, From, webtransport:send_datagram(S, SData)}]}; + +connected({call, From}, session_info, #wt_data{session = S}) -> + {keep_state_and_data, [{reply, From, webtransport:session_info(S)}]}; + +connected({call, From}, _Request, _Data) -> + {keep_state_and_data, [{reply, From, {error, badrequest}}]}; + +%% --- close ------------------------------------------------------------ +connected(cast, {close, Code, Reason}, #wt_data{session = S} = Data) -> + _ = webtransport:close_session(S, Code, Reason), + {next_state, closed, Data#wt_data{closed_reason = closed}}; + +%% --- recv timeouts ---------------------------------------------------- +connected({timeout, recv}, recv, #wt_data{recv_from = From} = Data) + when From =/= undefined -> + {keep_state, Data#wt_data{recv_from = undefined}, [{reply, From, {error, timeout}}]}; +connected({timeout, recv}, recv, _Data) -> + keep_state_and_data; + +connected({timeout, {srecv, Sid}}, {srecv, Sid}, Data) -> + case maps:find(Sid, Data#wt_data.streams) of + {ok, #stream{from = From} = S} when From =/= undefined -> + Streams = maps:put(Sid, S#stream{from = undefined}, Data#wt_data.streams), + {keep_state, Data#wt_data{streams = Streams}, [{reply, From, {error, timeout}}]}; + _ -> + keep_state_and_data + end; + +%% --- session events --------------------------------------------------- +connected(info, {webtransport, Session, Event}, #wt_data{session = Session} = Data) -> + handle_wt_event(Event, Data); + +connected(info, {'EXIT', Session, Reason}, #wt_data{session = Session} = Data) -> + R = case Reason of normal -> closed; _ -> {session_down, Reason} end, + session_ended(R, Data); + +connected(info, {'EXIT', Owner, _Reason}, #wt_data{owner = Owner} = Data) -> + close_session_safe(Data), + {stop, normal}; + +connected(_, _, _) -> + keep_state_and_data. + +%%==================================================================== +%% State: closed +%%==================================================================== + +closed(enter, _OldState, _Data) -> + keep_state_and_data; + +%% Allow draining whatever was buffered before the session ended. +closed({call, From}, {recv, _Timeout}, Data) -> + case dequeue(Data#wt_data.recv_q, Data) of + {value, Msg, Q1, Data1} -> + {keep_state, Data1#wt_data{recv_q = Q1}, [{reply, From, {ok, Msg}}]}; + {empty, _} -> + {keep_state_and_data, [{reply, From, {error, closed_reason(Data)}}]} + end; + +closed({call, From}, {stream_recv, Sid, _Timeout}, Data) -> + case maps:find(Sid, Data#wt_data.streams) of + {ok, S} -> + case queue:out(S#stream.q) of + {{value, Item}, Q1} -> + Data1 = sub_bytes(item_size(Item), Data), + Streams = maps:put(Sid, S#stream{q = Q1}, Data1#wt_data.streams), + {keep_state, Data1#wt_data{streams = Streams}, + [{reply, From, stream_item_result(Item)}]}; + {empty, _} -> + {keep_state_and_data, [{reply, From, {error, closed_reason(Data)}}]} + end; + error -> + {keep_state_and_data, [{reply, From, {error, closed}}]} + end; + +closed({call, From}, {controlling_process, NewOwner}, #wt_data{owner = OldOwner} = Data) -> + unlink(OldOwner), + link(NewOwner), + {keep_state, Data#wt_data{owner = NewOwner}, [{reply, From, ok}]}; + +closed({call, From}, _Request, _Data) -> + {keep_state_and_data, [{reply, From, {error, closed}}]}; + +closed(cast, {close, _Code, _Reason}, _Data) -> + keep_state_and_data; + +closed(info, {'EXIT', Owner, _Reason}, #wt_data{owner = Owner}) -> + {stop, normal}; + +closed(_, _, _) -> + keep_state_and_data. + +%%==================================================================== +%% Internal: send +%%==================================================================== + +%% @private Map a send frame onto the webtransport session. +do_send({text, D}, Data) -> + send_default(D, Data); +do_send({binary, D}, Data) -> + send_default(D, Data); +do_send({datagram, D}, #wt_data{session = S} = Data) -> + {webtransport:send_datagram(S, D), Data}; +do_send({stream, StreamId, D}, #wt_data{session = S} = Data) -> + {webtransport:send(S, StreamId, D, nofin), Data}; +do_send({stream, StreamId, D, Fin}, #wt_data{session = S} = Data) + when Fin =:= fin; Fin =:= nofin -> + {webtransport:send(S, StreamId, D, Fin), Data}; +do_send(Ping, Data) when Ping =:= ping; Ping =:= pong -> + {{error, {unsupported_frame, Ping}}, Data}; +do_send({ping, _}, Data) -> + {{error, {unsupported_frame, ping}}, Data}; +do_send({pong, _}, Data) -> + {{error, {unsupported_frame, pong}}, Data}; +do_send(D, Data) when is_binary(D); is_list(D) -> + send_default(D, Data); +do_send(_Other, Data) -> + {{error, badarg}, Data}. + +%% @private Write to the persistent default stream, opening it lazily if it +%% was not granted at connect time. +send_default(D, #wt_data{session = S, default_stream = undefined} = Data) -> + case webtransport:open_stream(S, bidi) of + {ok, StreamId} -> + {webtransport:send(S, StreamId, D, nofin), Data#wt_data{default_stream = StreamId}}; + {error, _} = Err -> + {Err, Data} + end; +send_default(D, #wt_data{session = S, default_stream = StreamId} = Data) -> + {webtransport:send(S, StreamId, D, nofin), Data}. + +%%==================================================================== +%% Internal: inbound events +%%==================================================================== + +%% @private Classify a webtransport handler event and route it. Data on +%% the default stream becomes the ws-shaped `{binary, Data}'; data on a +%% client-opened stream goes to that stream's per-stream channel; +%% everything else (datagrams, server-opened streams) stays on the primary +%% channel with its native shape. +handle_wt_event(closed, Data) -> + session_ended(closed, Data); +handle_wt_event(Event, #wt_data{active = Active} = Data) when Active =/= false -> + %% Active mode: forward everything to the owner, uniformly tagged. + Data1 = clear_default_on_fin(Event, Data), + case active_msg(Event, Data) of + ignore -> + {keep_state, Data1}; + Msg -> + (Data1#wt_data.owner) ! {hackney_wt, self(), Msg}, + case Active of + once -> {keep_state, Data1#wt_data{active = false}}; + true -> {keep_state, Data1} + end + end; +handle_wt_event(Event, Data) -> + %% Passive mode: buffer per channel. + route_passive(Event, Data). + +%% @private Owner-facing message for active mode (ignore = drop). +active_msg({stream, Sid, _Type, D}, #wt_data{default_stream = Sid}) -> + nonempty_binary(D); +active_msg({stream_fin, Sid, _Type, D}, #wt_data{default_stream = Sid}) -> + nonempty_binary(D); +active_msg({stream, Sid, _Type, D}, _Data) -> + {stream, Sid, D}; +active_msg({stream_fin, Sid, _Type, D}, _Data) -> + {stream_fin, Sid, D}; +active_msg({datagram, D}, _Data) -> + {datagram, D}; +active_msg({stream_closed, Sid, Reason}, _Data) -> + {stream_closed, Sid, Reason}; +active_msg(_Other, _Data) -> + ignore. + +nonempty_binary(<<>>) -> ignore; +nonempty_binary(D) -> {binary, D}. + +%% @private Route an inbound event into the right passive queue. +route_passive({stream, Sid, _Type, D}, #wt_data{default_stream = Sid} = Data) -> + deliver_primary_nonempty({binary, D}, Data); +route_passive({stream_fin, Sid, _Type, D}, #wt_data{default_stream = Sid} = Data) -> + %% Server closed the default stream; drop our id so the next send opens + %% a fresh one. + deliver_primary_nonempty({binary, D}, Data#wt_data{default_stream = undefined}); +route_passive({stream, Sid, _Type, D}, Data) -> + case maps:is_key(Sid, Data#wt_data.streams) of + true -> deliver_stream(Sid, {data, D}, Data); + false -> deliver_primary({stream, Sid, D}, Data) + end; +route_passive({stream_fin, Sid, _Type, D}, Data) -> + case maps:is_key(Sid, Data#wt_data.streams) of + true -> deliver_stream(Sid, {fin, D}, Data); + false -> deliver_primary({stream_fin, Sid, D}, Data) + end; +route_passive({datagram, D}, Data) -> + deliver_primary({datagram, D}, Data); +route_passive({stream_closed, Sid, Reason}, Data) -> + case maps:is_key(Sid, Data#wt_data.streams) of + true -> deliver_stream(Sid, {closed, Reason}, Data); + false -> deliver_primary({stream_closed, Sid, Reason}, Data) + end; +route_passive(_Other, _Data) -> + keep_state_and_data. + +%% @private In active mode the default stream still has to be forgotten on +%% FIN so a later send reopens it. +clear_default_on_fin({stream_fin, Sid, _Type, _D}, #wt_data{default_stream = Sid} = Data) -> + Data#wt_data{default_stream = undefined}; +clear_default_on_fin(_Event, Data) -> + Data. + +%%==================================================================== +%% Internal: primary channel buffering +%%==================================================================== + +deliver_primary_nonempty({binary, <<>>}, _Data) -> + keep_state_and_data; +deliver_primary_nonempty(Msg, Data) -> + deliver_primary(Msg, Data). + +%% @private Hand a primary-channel message to a waiting reader or buffer it. +deliver_primary(Msg, #wt_data{recv_from = undefined} = Data) -> + enqueue_primary(Msg, Data); +deliver_primary(Msg, #wt_data{recv_from = From} = Data) -> + {keep_state, Data#wt_data{recv_from = undefined}, + [{reply, From, {ok, Msg}}, {{timeout, recv}, cancel}]}. + +enqueue_primary(Msg, #wt_data{recv_q = Q} = Data) -> + case add_bytes(msg_size(Msg), Data) of + {ok, Data1} -> + {keep_state, Data1#wt_data{recv_q = queue:in(Msg, Q)}}; + overflow -> + overflow(Data) + end. + +%%==================================================================== +%% Internal: per-stream buffering +%%==================================================================== + +%% @private Serve a per-stream recv from the buffer or wait for data. +do_stream_recv(From, Sid, #stream{q = Q} = S, Timeout, Data) -> + case queue:out(Q) of + {{value, Item}, Q1} -> + Data1 = sub_bytes(item_size(Item), Data), + Streams = maps:put(Sid, S#stream{q = Q1}, Data1#wt_data.streams), + {keep_state, Data1#wt_data{streams = Streams}, + [{reply, From, stream_item_result(Item)}]}; + {empty, _} -> + case S#stream.closed of + undefined -> + Streams = maps:put(Sid, S#stream{from = From}, Data#wt_data.streams), + {keep_state, Data#wt_data{streams = Streams}, + recv_timer({srecv, Sid}, Timeout)}; + Reason -> + {keep_state_and_data, [{reply, From, {error, map_stream_reason(Reason)}}]} + end + end. + +%% @private Hand a per-stream terminal event to a waiting reader or record it. +deliver_stream(Sid, {closed, Reason}, Data) -> + S = maps:get(Sid, Data#wt_data.streams), + case S#stream.from of + undefined -> + buffer_stream(Sid, S, {closed, Reason}, Data); + From -> + S1 = S#stream{from = undefined, closed = Reason}, + Streams = maps:put(Sid, S1, Data#wt_data.streams), + {keep_state, Data#wt_data{streams = Streams}, + [{reply, From, {error, map_stream_reason(Reason)}}, + {{timeout, {srecv, Sid}}, cancel}]} + end; +%% @private Hand a per-stream data/fin item to a waiting reader or buffer it. +deliver_stream(Sid, Item, Data) -> + S = maps:get(Sid, Data#wt_data.streams), + case S#stream.from of + undefined -> + buffer_stream(Sid, S, Item, Data); + From -> + S1 = S#stream{from = undefined, closed = closed_after(Item, S#stream.closed)}, + Streams = maps:put(Sid, S1, Data#wt_data.streams), + {keep_state, Data#wt_data{streams = Streams}, + [{reply, From, stream_item_result(Item)}, {{timeout, {srecv, Sid}}, cancel}]} + end. + +buffer_stream(Sid, S, {closed, Reason}, Data) -> + %% No bytes to buffer; just record the terminal state. + Streams = maps:put(Sid, S#stream{closed = Reason}, Data#wt_data.streams), + {keep_state, Data#wt_data{streams = Streams}}; +buffer_stream(Sid, S, Item, Data) -> + case add_bytes(item_size(Item), Data) of + {ok, Data1} -> + S1 = S#stream{q = queue:in(Item, S#stream.q), + closed = closed_after(Item, S#stream.closed)}, + Streams = maps:put(Sid, S1, Data1#wt_data.streams), + {keep_state, Data1#wt_data{streams = Streams}}; + overflow -> + overflow(Data) + end. + +%% @private A delivered FIN means the stream is half-closed for reading +%% once the queue drains. +closed_after({fin, _}, undefined) -> normal; +closed_after(_Item, Closed) -> Closed. + +stream_item_result({data, D}) -> {ok, D}; +stream_item_result({fin, D}) -> {ok, {fin, D}}. + +map_stream_reason(normal) -> closed; +map_stream_reason(Reason) -> Reason. + +item_size({data, D}) -> byte_size(D); +item_size({fin, D}) -> byte_size(D); +item_size({closed, _}) -> 0. + +%%==================================================================== +%% Internal: buffer accounting and lifecycle +%%==================================================================== + +%% @private Pop a primary-channel message, adjusting the byte counter. +dequeue(Q, Data) -> + case queue:out(Q) of + {{value, Msg}, Q1} -> + {value, Msg, Q1, sub_bytes(msg_size(Msg), Data)}; + {empty, _} = E -> + E + end. + +add_bytes(_Size, #wt_data{max_recv_buffer = infinity} = Data) -> + {ok, Data}; +add_bytes(Size, #wt_data{recv_bytes = B, max_recv_buffer = Max} = Data) -> + NewBytes = B + Size, + case NewBytes > Max of + true -> overflow; + false -> {ok, Data#wt_data{recv_bytes = NewBytes}} + end. + +sub_bytes(Size, #wt_data{recv_bytes = B} = Data) -> + Data#wt_data{recv_bytes = max(0, B - Size)}. + +%% @private Receive buffer cap exceeded: tear the session down. +overflow(Data) -> + close_session_safe(Data), + _ = maybe_notify_error(recv_buffer_overflow, Data), + {next_state, closed, Data#wt_data{closed_reason = recv_buffer_overflow}}. + +msg_size({binary, B}) -> byte_size(B); +msg_size({datagram, B}) -> byte_size(B); +msg_size({stream, _, B}) -> byte_size(B); +msg_size({stream_fin, _, B}) -> byte_size(B); +msg_size({stream_closed, _, _}) -> 0. + +%% @private Apply a new active mode, flushing buffered messages into active +%% delivery as we switch. +apply_active(false, Data) -> + Data#wt_data{active = false}; +apply_active(true, Data) -> + flush_all(Data#wt_data{active = true}); +apply_active(once, #wt_data{owner = Owner} = Data) -> + case dequeue(Data#wt_data.recv_q, Data) of + {value, Msg, Q1, Data1} -> + Owner ! {hackney_wt, self(), Msg}, + %% A single buffered message satisfies `once'; stay passive. + Data1#wt_data{recv_q = Q1, active = false}; + {empty, _} -> + Data#wt_data{active = once} + end. + +%% @private Forward every buffered message (primary + per-stream) to the +%% owner, then run with empty queues. +flush_all(#wt_data{owner = Owner, recv_q = Q, streams = Streams} = Data) -> + lists:foreach(fun(Msg) -> Owner ! {hackney_wt, self(), Msg} end, queue:to_list(Q)), + Streams1 = maps:map( + fun(Sid, #stream{q = SQ} = S) -> + lists:foreach( + fun(Item) -> Owner ! {hackney_wt, self(), stream_active_msg(Sid, Item)} end, + queue:to_list(SQ)), + S#stream{q = queue:new()} + end, Streams), + Data#wt_data{recv_q = queue:new(), recv_bytes = 0, streams = Streams1}. + +stream_active_msg(Sid, {data, D}) -> {stream, Sid, D}; +stream_active_msg(Sid, {fin, D}) -> {stream_fin, Sid, D}; +stream_active_msg(Sid, {closed, R}) -> {stream_closed, Sid, R}. + +%% @private Handle the session ending (graceful close or crash). Reply to +%% any waiting reader, notify the owner in active mode, and move to closed +%% while keeping buffered data drainable. +session_ended(Reason, Data0) -> + {Data1, Actions1} = fail_primary_reader(Reason, Data0), + {Data2, Actions2} = fail_stream_readers(Reason, Data1), + _ = case Data2#wt_data.active of + false -> ok; + _ -> maybe_notify_error(Reason, Data2) + end, + {next_state, closed, Data2#wt_data{closed_reason = Reason}, Actions1 ++ Actions2}. + +fail_primary_reader(_Reason, #wt_data{recv_from = undefined} = Data) -> + {Data, []}; +fail_primary_reader(Reason, #wt_data{recv_from = From} = Data) -> + {Data#wt_data{recv_from = undefined}, + [{reply, From, {error, Reason}}, {{timeout, recv}, cancel}]}. + +fail_stream_readers(Reason, #wt_data{streams = Streams} = Data) -> + maps:fold( + fun(Sid, #stream{from = From} = S, {DAcc, AAcc}) when From =/= undefined -> + Streams1 = maps:put(Sid, S#stream{from = undefined}, DAcc#wt_data.streams), + {DAcc#wt_data{streams = Streams1}, + [{reply, From, {error, Reason}}, {{timeout, {srecv, Sid}}, cancel} | AAcc]}; + (_Sid, _S, Acc) -> + Acc + end, {Data, []}, Streams). + +%% @private In active mode, surface a graceful close as `closed' and any +%% other reason as an error. +maybe_notify_error(closed, #wt_data{owner = Owner}) -> + Owner ! {hackney_wt, self(), closed}; +maybe_notify_error(Reason, #wt_data{owner = Owner}) -> + Owner ! {hackney_wt_error, self(), Reason}. + +closed_reason(#wt_data{closed_reason = undefined}) -> closed; +closed_reason(#wt_data{closed_reason = R}) -> R. + +%% @private gen_statem timeout action for a recv, or none for infinity. +recv_timer(_Name, infinity) -> []; +recv_timer(Name, Timeout) -> [{{timeout, Name}, Timeout, Name}]. + +%% @private Resolve the effective recv timeout (`default' uses the option). +recv_timeout(default, #wt_data{recv_timeout = T}) -> T; +recv_timeout(T, _Data) -> T. + +%% @private Best-effort session close. +close_session_safe(#wt_data{session = undefined}) -> + ok; +close_session_safe(#wt_data{session = Session}) -> + try webtransport:close_session(Session) catch _:_ -> ok end, + ok. diff --git a/test/hackney_wt_tests.erl b/test/hackney_wt_tests.erl new file mode 100644 index 00000000..2b4aff42 --- /dev/null +++ b/test/hackney_wt_tests.erl @@ -0,0 +1,251 @@ +%%% -*- erlang -*- +%%% +%%% WebTransport client tests for hackney. +%%% +%%% Unit tests (URL parsing, scheme validation, header-injection guard) run +%%% without a server. Integration tests spin up a local HTTP/3 WebTransport +%%% echo server (erlang-webtransport) and exercise the wt_* API against it; +%%% they skip gracefully if a test certificate or the listener cannot be +%%% created. +-module(hackney_wt_tests). + +-include_lib("eunit/include/eunit.hrl"). +-include("hackney_lib.hrl"). + +-define(LISTENER, hackney_wt_test_listener). + +%%==================================================================== +%% Unit tests (no server) +%%==================================================================== + +wt_url_parsing_test_() -> + [ + {"Parse https:// URL", + fun() -> + URL = hackney_url:parse_url(<<"https://example.com/wt">>), + ?assertEqual(https, URL#hackney_url.scheme), + ?assertEqual(443, URL#hackney_url.port), + ?assertEqual("example.com", URL#hackney_url.host), + ?assertEqual(<<"/wt">>, URL#hackney_url.path) + end}, + {"Parse https:// URL with custom port and query", + fun() -> + URL = hackney_url:parse_url(<<"https://example.com:8443/wt?room=1">>), + ?assertEqual(8443, URL#hackney_url.port), + ?assertEqual(<<"/wt">>, URL#hackney_url.path), + ?assertEqual(<<"room=1">>, URL#hackney_url.qs) + end} + ]. + +wt_scheme_validation_test_() -> + [ + {"Reject ws:// scheme", + fun() -> + ?assertError({invalid_webtransport_scheme, ws}, + hackney:wt_connect("ws://localhost/wt")) + end}, + {"Reject http:// scheme", + fun() -> + ?assertError({invalid_webtransport_scheme, http}, + hackney:wt_connect("http://localhost/wt")) + end} + ]. + +%% GHSA-f9vr analog: a header carrying CR/LF must be refused before any +%% connection attempt. +wt_header_injection_test_() -> + [ + {"CRLF in header value rejected", + fun() -> + Result = hackney:wt_connect("https://localhost:1/wt", + [{headers, [{<<"x-test">>, <<"bad\r\nInjected: 1">>}]}]), + ?assertEqual({error, invalid_handshake_header}, Result) + end}, + {"NUL in header name rejected", + fun() -> + Result = hackney:wt_connect("https://localhost:1/wt", + [{headers, [{<<"x\0evil">>, <<"v">>}]}]), + ?assertEqual({error, invalid_handshake_header}, Result) + end} + ]. + +%%==================================================================== +%% Integration tests (local HTTP/3 WebTransport echo server) +%%==================================================================== + +wt_integration_test_() -> + {setup, + fun start_wt_server/0, + fun stop_wt_server/1, + fun integration_tests/1}. + +integration_tests(#{ok := false, reason := Reason}) -> + [{"WebTransport integration skipped", + fun() -> ?debugFmt("WebTransport integration skipped: ~p", [Reason]) end}]; +integration_tests(#{ok := true, url := URL}) -> + [ + {"Connect and close", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + ?assert(is_process_alive(C)), + ok = hackney:wt_close(C) + end}}, + {"Default-stream echo (binary)", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + ok = hackney:wt_send(C, {binary, <<"hello">>}), + ?assertEqual({ok, {binary, <<"hello">>}}, hackney:wt_recv(C, 10000)), + ok = hackney:wt_close(C) + end}}, + {"Default-stream echo (text alias)", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + ok = hackney:wt_send(C, {text, <<"hi there">>}), + ?assertEqual({ok, {binary, <<"hi there">>}}, hackney:wt_recv(C, 10000)), + ok = hackney:wt_close(C) + end}}, + {"Datagram echo", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + ok = hackney:wt_send_datagram(C, <<"ping">>), + ?assertEqual({ok, {datagram, <<"ping">>}}, recv_datagram(C, 10000)), + ok = hackney:wt_close(C) + end}}, + {"Explicit multiplexed stream", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + {ok, Sid} = hackney:wt_open_stream(C, bidi), + ?assert(is_integer(Sid)), + ok = hackney:wt_stream_send(C, Sid, <<"abc">>), + ?assertEqual(<<"abc">>, stream_bytes(hackney:wt_stream_recv(C, Sid, 10000))), + ok = hackney:wt_close(C) + end}}, + {"Two streams multiplexed over one session", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + {ok, S1} = hackney:wt_open_stream(C, bidi), + {ok, S2} = hackney:wt_open_stream(C, bidi), + ?assertNotEqual(S1, S2), + ok = hackney:wt_stream_send(C, S1, <<"one">>), + ok = hackney:wt_stream_send(C, S2, <<"two">>), + ?assertEqual(<<"one">>, stream_bytes(hackney:wt_stream_recv(C, S1, 10000))), + ?assertEqual(<<"two">>, stream_bytes(hackney:wt_stream_recv(C, S2, 10000))), + ok = hackney:wt_close(C) + end}}, + {"Active mode delivers default-stream data", + {timeout, 30, fun() -> + {ok, C} = connect(URL, [{active, true}]), + ok = hackney:wt_send(C, {binary, <<"active">>}), + receive + {hackney_wt, C, {binary, <<"active">>}} -> ok + after 10000 -> + ?assert(false) + end, + ok = hackney:wt_close(C) + end}}, + {"recv in active mode is rejected", + {timeout, 30, fun() -> + {ok, C} = connect(URL, [{active, true}]), + ?assertEqual({error, {active_mode, true}}, hackney:wt_recv(C, 100)), + ok = hackney:wt_close(C) + end}}, + {"session_info", + {timeout, 30, fun() -> + {ok, C} = connect(URL), + {ok, Info} = hackney:wt_session_info(C), + ?assert(is_map(Info)), + ?assertEqual(h3, maps:get(transport, Info)), + ?assert(maps:is_key(stream_count, Info)), + ok = hackney:wt_close(C) + end}} + ]. + +%%==================================================================== +%% Helpers +%%==================================================================== + +connect(URL) -> + connect(URL, []). + +connect(URL, Extra) -> + hackney:wt_connect(URL, [{verify, verify_none}, {connect_timeout, 10000} | Extra]). + +%% Read until a datagram is seen (a stray default-stream chunk could in +%% principle arrive first; datagrams on loopback normally arrive directly). +recv_datagram(C, Timeout) -> + case hackney:wt_recv(C, Timeout) of + {ok, {datagram, _}} = R -> R; + {ok, _Other} -> recv_datagram(C, Timeout); + {error, _} = E -> E + end. + +stream_bytes({ok, Data}) when is_binary(Data) -> Data; +stream_bytes({ok, {fin, Data}}) -> Data; +stream_bytes(Other) -> Other. + +start_wt_server() -> + {ok, _} = application:ensure_all_started(hackney), + case make_cert() of + {ok, CertFile, KeyFile} -> + Port = free_port(), + Opts = #{ + transport => h3, + port => Port, + certfile => CertFile, + keyfile => KeyFile, + handler => mock_wt_handler + }, + try webtransport:start_listener(?LISTENER, Opts) of + {ok, _Pid} -> + timer:sleep(200), + URL = "https://localhost:" ++ integer_to_list(Port) ++ "/wt", + #{ok => true, url => URL, started => true}; + {error, Reason} -> + #{ok => false, reason => {listener, Reason}} + catch + Class:Err -> + #{ok => false, reason => {listener_crash, Class, Err}} + end; + {error, Reason} -> + #{ok => false, reason => {cert, Reason}} + end. + +stop_wt_server(#{started := true}) -> + catch webtransport:stop_listener(?LISTENER), + ok; +stop_wt_server(_) -> + ok. + +free_port() -> + {ok, S} = gen_tcp:listen(0, [{reuseaddr, true}]), + {ok, Port} = inet:port(S), + gen_tcp:close(S), + Port. + +%% Generate a short-lived self-signed cert with openssl; returns an error +%% tuple (which causes the integration tests to skip) if anything fails. +make_cert() -> + Dir = test_tmp_dir(), + KeyFile = filename:join(Dir, "wt_test_key.pem"), + CertFile = filename:join(Dir, "wt_test_cert.pem"), + KeyCmd = lists:flatten(io_lib:format( + "openssl genrsa -out ~s 2048 2>/dev/null", [KeyFile])), + _ = os:cmd(KeyCmd), + CertCmd = lists:flatten(io_lib:format( + "openssl req -new -x509 -key ~s -out ~s -days 1 -subj \"/CN=localhost\" 2>/dev/null", + [KeyFile, CertFile])), + _ = os:cmd(CertCmd), + case {filelib:is_regular(KeyFile), filelib:is_regular(CertFile)} of + {true, true} -> {ok, CertFile, KeyFile}; + _ -> {error, openssl_unavailable} + end. + +test_tmp_dir() -> + Base = case os:getenv("TMPDIR") of + false -> "/tmp"; + "" -> "/tmp"; + T -> T + end, + Dir = filename:join(Base, "hackney_wt_test"), + _ = filelib:ensure_dir(filename:join(Dir, "x")), + Dir. diff --git a/test/mock_wt_handler.erl b/test/mock_wt_handler.erl new file mode 100644 index 00000000..5f8cce4b --- /dev/null +++ b/test/mock_wt_handler.erl @@ -0,0 +1,50 @@ +%%% -*- erlang -*- +%%% +%%% Echo handler for hackney WebTransport client tests. +%%% +%%% Implements the webtransport_handler behaviour and echoes back any data +%%% received on streams or datagrams, mirroring the erlang-webtransport +%%% test echo handler. +-module(mock_wt_handler). +-behaviour(webtransport_handler). + +-export([init/3, handle_stream/4, handle_stream_fin/4, + handle_datagram/2, handle_stream_closed/3, terminate/2]). + +-record(state, { + session :: pid(), + streams = #{} :: #{non_neg_integer() => binary()} +}). + +init(Session, _Request, _Opts) -> + {ok, #state{session = Session}}. + +%% Echo bidi data chunks back as they arrive. +handle_stream(StreamId, Type, Data, #state{streams = Streams} = State) -> + Existing = maps:get(StreamId, Streams, <<>>), + State1 = State#state{streams = Streams#{StreamId => <>}}, + case Type of + bidi when Data =/= <<>> -> + {ok, State1, [{send, StreamId, Data}]}; + _ -> + {ok, State1} + end. + +%% On FIN, echo the accumulated data with a FIN for bidi streams. +handle_stream_fin(StreamId, Type, Data, #state{streams = Streams} = State) -> + Existing = maps:get(StreamId, Streams, <<>>), + All = <>, + State1 = State#state{streams = maps:remove(StreamId, Streams)}, + case Type of + bidi -> {ok, State1, [{send, StreamId, All, fin}]}; + uni -> {ok, State1} + end. + +handle_datagram(Data, State) -> + {ok, State, [{send_datagram, Data}]}. + +handle_stream_closed(_StreamId, _Reason, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok.