Skip to content

Commit d816bac

Browse files
PKPM 构力结构claude
andcommitted
fix(streamable-http): drain SSE response to EOF instead of closing early
The client closed the SSE response stream immediately after the JSON-RPC response/error event arrived (`await response.aclose()` / `break`), in three places: - _handle_sse_response (POST response) - _handle_reconnection (resumed GET stream) - _handle_resumption_request (explicit resumption GET stream) Aborting the stream before EOF leaves the underlying keepalive connection un-drained. With some servers this stalls the next request that reuses the connection by a fixed delay (~260ms observed against a FastMCP streamable-http server hosted on a background thread inside a desktop app; 37x slower per call). Drain the stream to its natural EOF instead: the caller is still unblocked as soon as the response event is routed to read_stream, and the connection is returned to the pool cleanly. Cancellation/shutdown still tears the stream down promptly because CancelledError propagates out of aiter_sse() and the enclosing `async with` closes the response. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 2472563 commit d816bac

1 file changed

Lines changed: 24 additions & 12 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,15 +241,17 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
241241
logger.debug("Resumption GET SSE connection established")
242242

243243
async for sse in event_source.aiter_sse(): # pragma: no branch
244-
is_complete = await self._handle_sse_event(
244+
await self._handle_sse_event(
245245
sse,
246246
ctx.read_stream_writer,
247247
original_request_id,
248248
ctx.metadata.on_resumption_token_update if ctx.metadata else None,
249249
)
250-
if is_complete:
251-
await event_source.response.aclose()
252-
break
250+
# Drain to EOF rather than breaking on completion (see
251+
# _handle_sse_response): breaking leaves the enclosing
252+
# `async with aconnect_sse(...)` to close an un-drained stream,
253+
# which can stall the next request that reuses the connection.
254+
# Cancellation still tears the stream down promptly.
253255

254256
async def _handle_post_request(self, ctx: RequestContext) -> None:
255257
"""Handle a POST request with response processing."""
@@ -340,6 +342,8 @@ async def _handle_sse_response(
340342
assert isinstance(ctx.session_message.message, JSONRPCRequest)
341343
original_request_id = ctx.session_message.message.id
342344

345+
response_complete = False
346+
343347
try:
344348
event_source = EventSource(response)
345349
async for sse in event_source.aiter_sse(): # pragma: no branch
@@ -358,16 +362,20 @@ async def _handle_sse_response(
358362
resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None),
359363
is_initialization=is_initialization,
360364
)
361-
# If the SSE event indicates completion, like returning response/error
362-
# break the loop
365+
# Once the response/error arrives the caller is unblocked, but we keep
366+
# iterating so the SSE body is drained to EOF before the connection is
367+
# released. Closing the response early (await response.aclose()) leaves
368+
# the keepalive connection un-drained, which can stall the next request
369+
# that reuses it. Cancellation still tears down promptly: CancelledError
370+
# propagates out of aiter_sse() and the caller's
371+
# `async with client.stream(...)` closes the response.
363372
if is_complete:
364-
await response.aclose()
365-
return # Normal completion, no reconnect needed
373+
response_complete = True
366374
except Exception:
367375
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
368376

369-
# Stream ended without response - reconnect if we received an event with ID
370-
if last_event_id is not None: # pragma: no branch
377+
# Stream ended without a response - reconnect if we received an event with ID
378+
if not response_complete and last_event_id is not None:
371379
logger.info("SSE stream disconnected, reconnecting...")
372380
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
373381

@@ -404,6 +412,7 @@ async def _handle_reconnection(
404412
# Track for potential further reconnection
405413
reconnect_last_event_id: str = last_event_id
406414
reconnect_retry_ms = retry_interval_ms
415+
response_complete = False
407416

408417
async for sse in event_source.aiter_sse():
409418
if sse.id: # pragma: no branch
@@ -417,9 +426,12 @@ async def _handle_reconnection(
417426
original_request_id,
418427
ctx.metadata.on_resumption_token_update if ctx.metadata else None,
419428
)
429+
# Drain to EOF instead of closing early (see _handle_sse_response).
420430
if is_complete:
421-
await event_source.response.aclose()
422-
return
431+
response_complete = True
432+
433+
if response_complete:
434+
return
423435

424436
# Stream ended again without response - reconnect again (reset attempt counter)
425437
logger.info("SSE stream disconnected, reconnecting...")

0 commit comments

Comments
 (0)