Skip to content

Commit f465183

Browse files
committed
Better HTTP/1 connection handling.
1 parent 6d6fb1a commit f465183

File tree

7 files changed

+45
-38
lines changed

7 files changed

+45
-38
lines changed

async-http.gemspec

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Gem::Specification.new do |spec|
2929
spec.add_dependency "io-endpoint", "~> 0.11"
3030
spec.add_dependency "io-stream", "~> 0.4"
3131
spec.add_dependency "protocol-http", "~> 0.37"
32-
spec.add_dependency "protocol-http1", "~> 0.25"
33-
spec.add_dependency "protocol-http2", "~> 0.18"
32+
spec.add_dependency "protocol-http1", "~> 0.27"
33+
spec.add_dependency "protocol-http2", "~> 0.19"
3434
spec.add_dependency "traces", ">= 0.10"
3535
end

lib/async/http/protocol/http1/client.rb

+12-9
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ def initialize(...)
1818

1919
attr_accessor :pool
2020

21-
def closed!
21+
def closed(error = nil)
2222
super
2323

2424
if pool = @pool
2525
@pool = nil
26+
# If the connection is not reusable, this will retire it from the connection pool and invoke `#close`.
2627
pool.release(self)
2728
end
2829
end
@@ -50,30 +51,32 @@ def call(request, task: Task.current)
5051
task.async(annotation: "Upgrading request...") do
5152
# If this fails, this connection will be closed.
5253
write_upgrade_body(protocol, body)
54+
rescue => error
55+
self.close(error)
5356
end
5457
elsif request.connect?
5558
task.async(annotation: "Tunnneling request...") do
5659
write_tunnel_body(@version, body)
60+
rescue => error
61+
self.close(error)
5762
end
5863
else
5964
task.async(annotation: "Streaming request...") do
6065
# Once we start writing the body, we can't recover if the request fails. That's because the body might be generated dynamically, streaming, etc.
6166
write_body(@version, body, false, trailer)
67+
rescue => error
68+
self.close(error)
6269
end
6370
end
6471
elsif protocol = request.protocol
6572
write_upgrade_body(protocol)
6673
else
67-
write_body(@version, body, false, trailer)
74+
write_body(@version, request.body, false, trailer)
6875
end
6976

70-
response = Response.read(self, request)
71-
72-
return response
73-
rescue
74-
# This will ensure that #reusable? returns false.
75-
self.close
76-
77+
return Response.read(self, request)
78+
rescue => error
79+
self.close(error)
7780
raise
7881
end
7982
end

lib/async/http/protocol/http1/connection.rb

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def http2?
4343

4444
def read_line?
4545
@stream.read_until(CRLF)
46+
rescue Errno::ECONNRESET
47+
return nil
4648
end
4749

4850
def read_line

lib/async/http/protocol/http1/finishable.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ def read
3232
end
3333

3434
def close(error = nil)
35+
super
36+
3537
unless @closed.resolved?
3638
@error = error
3739
@closed.value = true
3840
end
39-
40-
super
4141
end
4242

4343
def wait(persistent = true)

lib/async/http/protocol/http1/server.rb

+19-17
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def initialize(...)
2222
@ready = Async::Notification.new
2323
end
2424

25-
def closed!
25+
def closed(error = nil)
2626
super
2727

2828
@ready.signal
@@ -38,14 +38,12 @@ def fail_request(status)
3838
end
3939

4040
def next_request
41-
# Wait for the connection to become idle before reading the next request:
42-
unless idle?
41+
if closed?
42+
return nil
43+
elsif !idle?
4344
@ready.wait
4445
end
4546

46-
# The default is true.
47-
return unless @persistent
48-
4947
# Read an incoming request:
5048
return unless request = Request.read(self)
5149

@@ -90,37 +88,41 @@ def each(task: Task.current)
9088
# We force a 101 response if the protocol is upgraded - HTTP/2 CONNECT will return 200 for success, but this won't be understood by HTTP/1 clients:
9189
write_response(@version, 101, response.headers)
9290

93-
stream = write_upgrade_body(protocol)
94-
9591
# At this point, the request body is hijacked, so we don't want to call #finish below.
9692
request = nil
9793
response = nil
9894

99-
# In the case of streaming, `finishable` should wrap a `Remainder` body, which we can safely discard later on.
100-
body.call(stream)
95+
if body.stream?
96+
return body.call(write_upgrade_body(protocol))
97+
else
98+
write_upgrade_body(protocol, body)
99+
end
101100
elsif response.status == 101
102101
# This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header.
103102
write_response(@version, response.status, response.headers)
104103

105-
stream = write_tunnel_body(version)
106-
107104
# Same as above:
108105
request = nil
109106
response = nil
110107

111-
body&.call(stream)
108+
if body.stream?
109+
return body.call(write_tunnel_body(version))
110+
else
111+
write_tunnel_body(version, body)
112+
end
112113
else
113114
write_response(@version, response.status, response.headers)
114115

115116
if request.connect? and response.success?
116-
stream = write_tunnel_body(version)
117-
118117
# Same as above:
119118
request = nil
120119
response = nil
121120

122-
# We must return here as no further request processing can be done:
123-
return body.call(stream)
121+
if body.stream?
122+
return body.call(write_tunnel_body(version))
123+
else
124+
write_tunnel_body(version, body)
125+
end
124126
else
125127
head = request.head?
126128

lib/async/http/protocol/http2/stream.rb

+8-6
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ def process_headers(frame)
6262
end
6363

6464
# TODO this might need to be in an ensure block:
65-
if @input and frame.end_stream?
66-
@input.close_write
65+
if input = @input and frame.end_stream?
6766
@input = nil
67+
input.close_write
6868
end
6969
rescue ::Protocol::HTTP2::HeaderError => error
7070
Console.logger.debug(self, error)
@@ -123,6 +123,8 @@ def send_body(body, trailer = nil)
123123

124124
# Called when the output terminates normally.
125125
def finish_output(error = nil)
126+
return if self.closed?
127+
126128
trailer = @output&.trailer
127129

128130
@output = nil
@@ -152,14 +154,14 @@ def window_updated(size)
152154
def closed(error)
153155
super
154156

155-
if @input
156-
@input.close_write(error)
157+
if input = @input
157158
@input = nil
159+
input.close_write(error)
158160
end
159161

160-
if @output
161-
@output.stop(error)
162+
if output = @output
162163
@output = nil
164+
output.stop(error)
163165
end
164166

165167
if pool = @pool and @connection

test/async/http/proxy.rb

-2
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@
153153
upstream.write(chunk)
154154
upstream.flush
155155
end
156-
rescue Async::Wrapper::Cancelled
157-
#ignore
158156
ensure
159157
Console.logger.debug(self) {"Finished writing to upstream..."}
160158
upstream.close_write

0 commit comments

Comments
 (0)