diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 96e6643..7d1c538 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -11,27 +11,126 @@ require_relative "finishable" require "console/event/failure" +require "protocol/http/body/stream" module Async module HTTP module Protocol module HTTP1 + # Writes chunks from a streamable response body using HTTP/1.1 chunked transfer encoding. + class ChunkedOutput + def initialize(connection, trailer = nil) + @connection = connection + @trailer = trailer + end + + def write(chunk) + return 0 if chunk.empty? + + @connection.write_chunk(chunk) + + return chunk.bytesize + end + + def close(error = nil) + if connection = @connection + @connection = nil + + connection.finish_chunked_body(@trailer, error) + end + end + + alias close_write close + end + # An HTTP/1 server connection that receives requests and sends responses. class Server < Connection # Initialize the HTTP/1 server connection. def initialize(...) super - + @ready = Async::Notification.new + @body_task = nil end - + # Called when the connection is closed, signalling any waiting tasks. def closed(error = nil) super - + + @body_task&.stop(error) @ready.signal end - + + # Write a single chunk of data to the connection. + def write_chunk(chunk) + @stream.write("#{chunk.bytesize.to_s(16).upcase}\r\n") + @stream.write(chunk) + @stream.write(CRLF) + + @stream.flush + end + + # Finish a chunked response body. + def finish_chunked_body(trailer = nil, error = nil) + return if closed? + + if trailer&.any? + @stream.write("0\r\n") + write_headers(trailer) + @stream.write("\r\n") + else + @stream.write("0\r\n\r\n") + end + + @stream.flush + ensure + send_end_stream! unless closed? + end + + private def monitor_body_task(task, body_task) + task.async(transient: true) do |subtask| + subtask.annotate("Monitoring HTTP/1 response body for peer close.") + + until closed? or body_task.finished? + @stream&.to_io&.wait_readable + + close unless @stream&.readable? + end + end + end + + private def write_streamable_body(task, version, input, body, head, trailer) + if head + write_body(version, body, head, trailer) + return + end + + write_connection_header(version) + @stream.write("transfer-encoding: chunked\r\n\r\n") + @stream.flush + + output = ChunkedOutput.new(self, trailer) + stream = ::Protocol::HTTP::Body::Stream.new(input, output) + + @body_task = task.async do |subtask| + subtask.annotate("Streaming HTTP/1 response body.") + + body.call(stream) + rescue => error + output.close(error) + raise + ensure + stream.close(error) + end + + monitor = monitor_body_task(task, @body_task) + + @body_task.wait + ensure + monitor&.stop + @body_task = nil + end + # Write a failure response with the given status code. # @parameter status [Integer] The HTTP status code to send. def fail_request(status) @@ -42,7 +141,7 @@ def fail_request(status) # At this point, there is very little we can do to recover: Console.debug(self, "Failed to write failure response!", error) end - + # Read the next incoming request from the connection. # @returns [Request | Nil] The next request, or `nil` if the connection is closed. def next_request @@ -51,25 +150,25 @@ def next_request elsif !idle? @ready.wait end - + # Read an incoming request: return unless request = Request.read(self) - + unless persistent?(request.version, request.method, request.headers) @persistent = false end - + return request rescue ::Protocol::HTTP1::BadRequest fail_request(400) # Conceivably we could retry here, but we don't really know how bad the error is, so it's better to just fail: raise end - + # Server loop. def each(task: Task.current) task.annotate("Reading #{self.version} requests for #{self.class}.") - + while request = next_request # We have received complete request (line + headers), so defer stop until the response is generated. task.defer_stop do @@ -77,31 +176,31 @@ def each(task: Task.current) finishable = Finishable.new(body) request.body = finishable end - + response = yield(request, self) version = request.version body = response&.body - + if hijacked? body&.close return end - + # If a response was generated, send it: if response trailer = response.headers.trailer! headers = response.headers.header - + # Some operations in this method are long running, that is, it's expected that `body.call(stream)` could literally run indefinitely. In order to facilitate garbage collection, we want to nullify as many local variables before calling the streaming body. This ensures that the garbage collection can clean up as much state as possible during the long running operation, so we don't retain objects that are no longer needed. - + if body and protocol = response.protocol # We force a 101 response if the protocol is upgraded - HTTP/2 CONNECT will return 200 for success, but this won't be understood by HTTP/1 clients: write_response(@version, 101, headers) - + # At this point, the request body is hijacked, so we don't want to call #finish below. request = nil response = nil - + if body.stream? return body.call(write_upgrade_body(protocol)) else @@ -110,11 +209,11 @@ def each(task: Task.current) elsif response.status == 101 # This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header. write_response(@version, response.status, headers) - + # Same as above: request = nil response = nil - + if body.stream? return body.call(write_tunnel_body(version)) else @@ -122,12 +221,12 @@ def each(task: Task.current) end else write_response(@version, response.status, headers) - + if request.connect? and response.success? # Same as above: request = nil response = nil - + if body.stream? return body.call(write_tunnel_body(version)) else @@ -135,25 +234,29 @@ def each(task: Task.current) end else head = request.head? - + # Same as above: request = nil response = nil - - write_body(version, body, head, trailer) + + if body&.stream? and version == ::Protocol::HTTP1::Connection::HTTP11 + write_streamable_body(task, version, finishable, body, head, trailer) + else + write_body(version, body, head, trailer) + end end end - + # We are done with the body: body = nil else # If the request failed to generate a response, it was an internal server error: write_response(@version, 500, {}) write_body(version, nil) - + request&.finish end - + if finishable finishable.wait(@persistent) else diff --git a/test/async/http/protocol/http11.rb b/test/async/http/protocol/http11.rb index e07afaf..9d24c1c 100755 --- a/test/async/http/protocol/http11.rb +++ b/test/async/http/protocol/http11.rb @@ -9,6 +9,7 @@ require "async/http/protocol/http11" require "async/http/a_protocol" +require "protocol/http/body/streamable" # Custom error class to track in tests class BodyWriteError < StandardError; end @@ -17,20 +18,20 @@ class BodyWriteError < StandardError; end class ErrorProneBody < ::Protocol::HTTP::Body::Readable def initialize(...) super(...) - + @error = nil @count = 0 end - + attr :error - + def read @count += 1 raise BodyWriteError, "error during write" if @count > 1 - + "Hello" end - + def close(error = nil) @error = error super() @@ -39,98 +40,143 @@ def close(error = nil) describe Async::HTTP::Protocol::HTTP11 do it_behaves_like Async::HTTP::AProtocol - + with "#as_json" do include Sus::Fixtures::Async::HTTP::ServerContext let(:protocol) {subject} - + it "generates a JSON representation" do response = client.get("/") connection = response.connection - + expect(connection.as_json).to be =~ /Async::HTTP::Protocol::HTTP1::Client negotiated HTTP/ ensure response&.close end - + it "generates a JSON string" do response = client.get("/") connection = response.connection - + expect(JSON.dump(connection)).to be == connection.to_json ensure response&.close end end - + with "server" do include Sus::Fixtures::Async::HTTP::ServerContext - + let(:protocol) {subject} - + with "error during body write" do let(:body) {ErrorProneBody.new} - + let(:app) do Protocol::HTTP::Middleware.for do |request| Protocol::HTTP::Response[200, {}, body] end end - + it "handles error in ensure block without NameError" do response = client.get("/") - + expect do response.read end.to raise_exception(EOFError) - + expect(body.error).to be_a(BodyWriteError) end end - + with "bad requests" do def around current = Console.logger.level Console.logger.fatal! - + super ensure Console.logger.level = current end - + it "should fail cleanly when path is empty" do response = client.get("") - + expect(response.status).to be == 400 end end - + + with "idle streaming response body" do + let(:body_closed) {Async::Variable.new} + + let(:app) do + body_closed = self.body_closed + + Protocol::HTTP::Middleware.for do |request| + body = Protocol::HTTP::Body::Streamable.response(request) do |stream| + stream.write(": connected\n\n") + sleep + ensure + body_closed.value = true + stream&.close + end + + Protocol::HTTP::Response[200, {"content-type" => "text/event-stream"}, body] + end + end + + it "closes the body when the client disconnects while the body is idle" do + peer = client_endpoint.connect + + peer.write("GET / HTTP/1.1\r\nhost: localhost\r\naccept: text/event-stream\r\n\r\n") + peer.flush + + buffer = String.new + + until buffer.include?(": connected\n\n") + buffer << peer.readpartial(1024) + end + + peer.close + + result = Async::Task.current.with_timeout(1.0) do + body_closed.wait + rescue Async::TimeoutError + nil + end + + expect(result).to be == true + ensure + peer&.close + end + end + with "head request" do let(:app) do Protocol::HTTP::Middleware.for do |request| Protocol::HTTP::Response[200, {}, ["Hello", "World"]] end end - + it "doesn't reply with body" do 5.times do response = client.head("/") - + expect(response).to be(:success?) expect(response.version).to be == "HTTP/1.1" expect(response.body).to be(:empty?) expect(response.reason).to be == "OK" - + response.read end end end - + with "raw response" do let(:app) do Protocol::HTTP::Middleware.for do |request| peer = request.hijack! - + peer.write( "#{request.version} 200 It worked!\r\n" + "connection: close\r\n" + @@ -138,31 +184,31 @@ def around "Hello World!" ) peer.close - + nil end end - + it "reads raw response" do response = client.get("/") - + expect(response.read).to be == "Hello World!" end - + it "has access to the http reason phrase" do response = client.head("/") - + expect(response.reason).to be == "It worked!" end end - + with "full hijack with empty response" do let(:body) {::Protocol::HTTP::Body::Buffered.new([], 0)} - + let(:app) do ::Protocol::HTTP::Middleware.for do |request| peer = request.hijack! - + peer.write( "#{request.version} 200 It worked!\r\n" + "connection: close\r\n" + @@ -170,16 +216,16 @@ def around "Hello World!" ) peer.close - + ::Protocol::HTTP::Response[-1, {}, body] end end - + it "works properly" do expect(body).to receive(:close) - + response = client.get("/") - + expect(response.read).to be == "Hello World!" end end