From 4131cd3487caab234947b8624275c384d7ececef Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 3 Jun 2026 22:56:00 +0900 Subject: [PATCH] Fix HTTP/1 idle streamable response disconnect HTTP/1 streamable response bodies were previously consumed through the generic body.read path for normal chunked responses. For Protocol::HTTP::Body::Streamable this hides the producing body block behind an internal scheduled fiber. If the body writes an initial chunk and then parks idle, the HTTP/1 server waits on the next body.read, the user body fiber waits independently, and a peer disconnect does not necessarily trigger another socket read or write. The body fiber can therefore remain parked indefinitely. Handle normal HTTP/1.1 streamable responses with a protocol-owned body task instead. The task calls body.call(stream) directly and writes chunks through a small ChunkedOutput adapter, preserving chunked transfer framing while giving the HTTP/1 connection ownership of the task lifecycle. A transient monitor watches for peer close using the existing non-consuming readable? probe and closes the connection only when the socket is no longer readable, avoiding false positives for request-body data or pipelined bytes. Non-streamable bodies, HEAD responses, HTTP/1.0, upgrades, and tunnels continue using the existing paths so ordinary responses do not pay the extra task cost. Add a regression test for issue #224 that opens an HTTP/1.1 streaming response, reads the first SSE chunk, closes the client socket while the body is idle, and asserts the body block unwinds. Closes https://github.com/socketry/async-http/issues/224 --- lib/async/http/protocol/http1/server.rb | 157 ++++++++++++++++++++---- test/async/http/protocol/http11.rb | 124 +++++++++++++------ 2 files changed, 215 insertions(+), 66 deletions(-) diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 96e6643..7d1c538 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -11,27 +11,126 @@ require_relative "finishable" require "console/event/failure" +require "protocol/http/body/stream" module Async module HTTP module Protocol module HTTP1 + # Writes chunks from a streamable response body using HTTP/1.1 chunked transfer encoding. + class ChunkedOutput + def initialize(connection, trailer = nil) + @connection = connection + @trailer = trailer + end + + def write(chunk) + return 0 if chunk.empty? + + @connection.write_chunk(chunk) + + return chunk.bytesize + end + + def close(error = nil) + if connection = @connection + @connection = nil + + connection.finish_chunked_body(@trailer, error) + end + end + + alias close_write close + end + # An HTTP/1 server connection that receives requests and sends responses. class Server < Connection # Initialize the HTTP/1 server connection. def initialize(...) super - + @ready = Async::Notification.new + @body_task = nil end - + # Called when the connection is closed, signalling any waiting tasks. def closed(error = nil) super - + + @body_task&.stop(error) @ready.signal end - + + # Write a single chunk of data to the connection. + def write_chunk(chunk) + @stream.write("#{chunk.bytesize.to_s(16).upcase}\r\n") + @stream.write(chunk) + @stream.write(CRLF) + + @stream.flush + end + + # Finish a chunked response body. + def finish_chunked_body(trailer = nil, error = nil) + return if closed? + + if trailer&.any? + @stream.write("0\r\n") + write_headers(trailer) + @stream.write("\r\n") + else + @stream.write("0\r\n\r\n") + end + + @stream.flush + ensure + send_end_stream! unless closed? + end + + private def monitor_body_task(task, body_task) + task.async(transient: true) do |subtask| + subtask.annotate("Monitoring HTTP/1 response body for peer close.") + + until closed? or body_task.finished? + @stream&.to_io&.wait_readable + + close unless @stream&.readable? + end + end + end + + private def write_streamable_body(task, version, input, body, head, trailer) + if head + write_body(version, body, head, trailer) + return + end + + write_connection_header(version) + @stream.write("transfer-encoding: chunked\r\n\r\n") + @stream.flush + + output = ChunkedOutput.new(self, trailer) + stream = ::Protocol::HTTP::Body::Stream.new(input, output) + + @body_task = task.async do |subtask| + subtask.annotate("Streaming HTTP/1 response body.") + + body.call(stream) + rescue => error + output.close(error) + raise + ensure + stream.close(error) + end + + monitor = monitor_body_task(task, @body_task) + + @body_task.wait + ensure + monitor&.stop + @body_task = nil + end + # Write a failure response with the given status code. # @parameter status [Integer] The HTTP status code to send. def fail_request(status) @@ -42,7 +141,7 @@ def fail_request(status) # At this point, there is very little we can do to recover: Console.debug(self, "Failed to write failure response!", error) end - + # Read the next incoming request from the connection. # @returns [Request | Nil] The next request, or `nil` if the connection is closed. def next_request @@ -51,25 +150,25 @@ def next_request elsif !idle? @ready.wait end - + # Read an incoming request: return unless request = Request.read(self) - + unless persistent?(request.version, request.method, request.headers) @persistent = false end - + return request rescue ::Protocol::HTTP1::BadRequest fail_request(400) # Conceivably we could retry here, but we don't really know how bad the error is, so it's better to just fail: raise end - + # Server loop. def each(task: Task.current) task.annotate("Reading #{self.version} requests for #{self.class}.") - + while request = next_request # We have received complete request (line + headers), so defer stop until the response is generated. task.defer_stop do @@ -77,31 +176,31 @@ def each(task: Task.current) finishable = Finishable.new(body) request.body = finishable end - + response = yield(request, self) version = request.version body = response&.body - + if hijacked? body&.close return end - + # If a response was generated, send it: if response trailer = response.headers.trailer! headers = response.headers.header - + # Some operations in this method are long running, that is, it's expected that `body.call(stream)` could literally run indefinitely. In order to facilitate garbage collection, we want to nullify as many local variables before calling the streaming body. This ensures that the garbage collection can clean up as much state as possible during the long running operation, so we don't retain objects that are no longer needed. - + if body and protocol = response.protocol # We force a 101 response if the protocol is upgraded - HTTP/2 CONNECT will return 200 for success, but this won't be understood by HTTP/1 clients: write_response(@version, 101, headers) - + # At this point, the request body is hijacked, so we don't want to call #finish below. request = nil response = nil - + if body.stream? return body.call(write_upgrade_body(protocol)) else @@ -110,11 +209,11 @@ def each(task: Task.current) elsif response.status == 101 # This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header. write_response(@version, response.status, headers) - + # Same as above: request = nil response = nil - + if body.stream? return body.call(write_tunnel_body(version)) else @@ -122,12 +221,12 @@ def each(task: Task.current) end else write_response(@version, response.status, headers) - + if request.connect? and response.success? # Same as above: request = nil response = nil - + if body.stream? return body.call(write_tunnel_body(version)) else @@ -135,25 +234,29 @@ def each(task: Task.current) end else head = request.head? - + # Same as above: request = nil response = nil - - write_body(version, body, head, trailer) + + if body&.stream? and version == ::Protocol::HTTP1::Connection::HTTP11 + write_streamable_body(task, version, finishable, body, head, trailer) + else + write_body(version, body, head, trailer) + end end end - + # We are done with the body: body = nil else # If the request failed to generate a response, it was an internal server error: write_response(@version, 500, {}) write_body(version, nil) - + request&.finish end - + if finishable finishable.wait(@persistent) else diff --git a/test/async/http/protocol/http11.rb b/test/async/http/protocol/http11.rb index e07afaf..9d24c1c 100755 --- a/test/async/http/protocol/http11.rb +++ b/test/async/http/protocol/http11.rb @@ -9,6 +9,7 @@ require "async/http/protocol/http11" require "async/http/a_protocol" +require "protocol/http/body/streamable" # Custom error class to track in tests class BodyWriteError < StandardError; end @@ -17,20 +18,20 @@ class BodyWriteError < StandardError; end class ErrorProneBody < ::Protocol::HTTP::Body::Readable def initialize(...) super(...) - + @error = nil @count = 0 end - + attr :error - + def read @count += 1 raise BodyWriteError, "error during write" if @count > 1 - + "Hello" end - + def close(error = nil) @error = error super() @@ -39,98 +40,143 @@ def close(error = nil) describe Async::HTTP::Protocol::HTTP11 do it_behaves_like Async::HTTP::AProtocol - + with "#as_json" do include Sus::Fixtures::Async::HTTP::ServerContext let(:protocol) {subject} - + it "generates a JSON representation" do response = client.get("/") connection = response.connection - + expect(connection.as_json).to be =~ /Async::HTTP::Protocol::HTTP1::Client negotiated HTTP/ ensure response&.close end - + it "generates a JSON string" do response = client.get("/") connection = response.connection - + expect(JSON.dump(connection)).to be == connection.to_json ensure response&.close end end - + with "server" do include Sus::Fixtures::Async::HTTP::ServerContext - + let(:protocol) {subject} - + with "error during body write" do let(:body) {ErrorProneBody.new} - + let(:app) do Protocol::HTTP::Middleware.for do |request| Protocol::HTTP::Response[200, {}, body] end end - + it "handles error in ensure block without NameError" do response = client.get("/") - + expect do response.read end.to raise_exception(EOFError) - + expect(body.error).to be_a(BodyWriteError) end end - + with "bad requests" do def around current = Console.logger.level Console.logger.fatal! - + super ensure Console.logger.level = current end - + it "should fail cleanly when path is empty" do response = client.get("") - + expect(response.status).to be == 400 end end - + + with "idle streaming response body" do + let(:body_closed) {Async::Variable.new} + + let(:app) do + body_closed = self.body_closed + + Protocol::HTTP::Middleware.for do |request| + body = Protocol::HTTP::Body::Streamable.response(request) do |stream| + stream.write(": connected\n\n") + sleep + ensure + body_closed.value = true + stream&.close + end + + Protocol::HTTP::Response[200, {"content-type" => "text/event-stream"}, body] + end + end + + it "closes the body when the client disconnects while the body is idle" do + peer = client_endpoint.connect + + peer.write("GET / HTTP/1.1\r\nhost: localhost\r\naccept: text/event-stream\r\n\r\n") + peer.flush + + buffer = String.new + + until buffer.include?(": connected\n\n") + buffer << peer.readpartial(1024) + end + + peer.close + + result = Async::Task.current.with_timeout(1.0) do + body_closed.wait + rescue Async::TimeoutError + nil + end + + expect(result).to be == true + ensure + peer&.close + end + end + with "head request" do let(:app) do Protocol::HTTP::Middleware.for do |request| Protocol::HTTP::Response[200, {}, ["Hello", "World"]] end end - + it "doesn't reply with body" do 5.times do response = client.head("/") - + expect(response).to be(:success?) expect(response.version).to be == "HTTP/1.1" expect(response.body).to be(:empty?) expect(response.reason).to be == "OK" - + response.read end end end - + with "raw response" do let(:app) do Protocol::HTTP::Middleware.for do |request| peer = request.hijack! - + peer.write( "#{request.version} 200 It worked!\r\n" + "connection: close\r\n" + @@ -138,31 +184,31 @@ def around "Hello World!" ) peer.close - + nil end end - + it "reads raw response" do response = client.get("/") - + expect(response.read).to be == "Hello World!" end - + it "has access to the http reason phrase" do response = client.head("/") - + expect(response.reason).to be == "It worked!" end end - + with "full hijack with empty response" do let(:body) {::Protocol::HTTP::Body::Buffered.new([], 0)} - + let(:app) do ::Protocol::HTTP::Middleware.for do |request| peer = request.hijack! - + peer.write( "#{request.version} 200 It worked!\r\n" + "connection: close\r\n" + @@ -170,16 +216,16 @@ def around "Hello World!" ) peer.close - + ::Protocol::HTTP::Response[-1, {}, body] end end - + it "works properly" do expect(body).to receive(:close) - + response = client.get("/") - + expect(response.read).to be == "Hello World!" end end