diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index d2536189d..9439809b2 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -69,6 +69,8 @@ async def main(): logger = logging.getLogger(__name__) +DEFAULT_READ_EOF_DRAIN_TIMEOUT_SECONDS = 1.0 + LifespanResultT = TypeVar("LifespanResultT", default=Any) _ParamsT = TypeVar("_ParamsT", bound=BaseModel, default=BaseModel) @@ -406,6 +408,13 @@ async def run( # the initialization lifecycle, but can do so with any available node # rather than requiring initialization for each connection. stateless: bool = False, + # When True, treat read EOF as a half-close and allow in-flight handlers + # to drain their responses via the still-open write stream (e.g. stdio + # with bash-redirected stdin). + drain_on_read_close: bool = False, + # Maximum time to wait for in-flight handlers to drain after read EOF. + # None means wait indefinitely. + read_eof_drain_timeout_seconds: float | None = DEFAULT_READ_EOF_DRAIN_TIMEOUT_SECONDS, ) -> None: async with self.lifespan(self) as lifespan_context: dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( @@ -416,6 +425,8 @@ async def run( # the next request (spec says SHOULD NOT, not MUST NOT) sees # the initialized state instead of failing the init-gate. inline_methods=frozenset({"initialize"}), + close_write_stream_on_read_close=not drain_on_read_close, + read_eof_drain_timeout_seconds=read_eof_drain_timeout_seconds, ) runner = ServerRunner( server=self, diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index fdb69571d..647bb8c57 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -848,6 +848,7 @@ async def run_stdio_async(self) -> None: read_stream, write_stream, self._lowlevel_server.create_initialization_options(), + drain_on_read_close=True, ) async def run_sse_async( # pragma: no cover diff --git a/src/mcp/shared/jsonrpc_dispatcher.py b/src/mcp/shared/jsonrpc_dispatcher.py index 457e6b6f7..52de5a305 100644 --- a/src/mcp/shared/jsonrpc_dispatcher.py +++ b/src/mcp/shared/jsonrpc_dispatcher.py @@ -23,6 +23,7 @@ import contextvars import logging from collections.abc import Awaitable, Callable, Mapping +from contextlib import AsyncExitStack from dataclasses import dataclass, field from typing import Any, Generic, Literal, TypeVar, cast, overload @@ -226,6 +227,8 @@ def __init__( peer_cancel_mode: PeerCancelMode = "interrupt", raise_handler_exceptions: bool = False, inline_methods: frozenset[str] = frozenset(), + close_write_stream_on_read_close: bool = True, + read_eof_drain_timeout_seconds: float | None = None, ) -> None: ... @overload def __init__( @@ -237,6 +240,8 @@ def __init__( peer_cancel_mode: PeerCancelMode = "interrupt", raise_handler_exceptions: bool = False, inline_methods: frozenset[str] = frozenset(), + close_write_stream_on_read_close: bool = True, + read_eof_drain_timeout_seconds: float | None = None, ) -> None: ... def __init__( self, @@ -247,6 +252,8 @@ def __init__( peer_cancel_mode: PeerCancelMode = "interrupt", raise_handler_exceptions: bool = False, inline_methods: frozenset[str] = frozenset(), + close_write_stream_on_read_close: bool = True, + read_eof_drain_timeout_seconds: float | None = None, ) -> None: self._read_stream = read_stream self._write_stream = write_stream @@ -259,6 +266,8 @@ def __init__( ) self._peer_cancel_mode: PeerCancelMode = peer_cancel_mode self._raise_handler_exceptions = raise_handler_exceptions + self._close_write_stream_on_read_close = close_write_stream_on_read_close + self._read_eof_drain_timeout_seconds = read_eof_drain_timeout_seconds # Request methods handled inline in the read loop (awaited before the # next message is dequeued) instead of spawned concurrently. Use for # methods whose side effects must be observable to the next message, @@ -400,13 +409,17 @@ async def run( `await tg.start(dispatcher.run, ...)` resumes when `send_raw_request` is usable. """ + normal_eof = False try: async with anyio.create_task_group() as tg: self._tg = tg self._running = True task_status.started() try: - async with self._read_stream, self._write_stream: + async with AsyncExitStack() as stack: + await stack.enter_async_context(self._read_stream) + if self._close_write_stream_on_read_close: + await stack.enter_async_context(self._write_stream) try: async for item in self._read_stream: # Duck-typed: `_context_streams.ContextReceiveStream` @@ -425,20 +438,26 @@ async def run( # (callers outside this task group) with CONNECTION_CLOSED. self._running = False self._fan_out_closed() + normal_eof = True finally: - # Transport closed: cancel in-flight handlers. Without this - # the task-group join waits for them, and a handler that - # outlives its caller (its request timed out client-side, or - # the client disconnected mid-call) would keep `run()` from - # returning forever. Same behaviour as `Server.run()` before - # the dispatcher rework. - tg.cancel_scope.cancel() + if not normal_eof or self._close_write_stream_on_read_close: + # Transport closed abnormally: cancel in-flight handlers. + # On normal EOF, let already-received handlers drain + # their responses before the task group exits. + tg.cancel_scope.cancel() + elif self._read_eof_drain_timeout_seconds is None: + pass + else: + tg.cancel_scope.deadline = anyio.current_time() + self._read_eof_drain_timeout_seconds finally: # Covers the cancel/crash paths where the inline fan-out above is # never reached. Idempotent. self._running = False self._tg = None self._fan_out_closed() + if not self._close_write_stream_on_read_close: + with anyio.CancelScope(shield=True): + await self._write_stream.aclose() async def _dispatch( self, diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 50597e10e..cd4d6b31e 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -148,14 +148,24 @@ def __init__( write_stream: WriteStream[SessionMessage], # If none, reading will never time out read_timeout_seconds: float | None = None, + # When True, closing/EOF on the read stream closes the write stream too. + # + # For full-duplex transports (e.g., stdio), an input EOF can be a + # half-close: the peer is done sending requests but still expects + # responses on the output stream. In that case, callers may opt out so + # in-flight handlers can drain their responses before shutdown. + close_write_stream_on_read_close: bool = True, ) -> None: self._read_stream = read_stream self._write_stream = write_stream self._response_streams = {} self._request_id = 0 self._session_read_timeout_seconds = read_timeout_seconds + self._close_write_stream_on_read_close = close_write_stream_on_read_close self._progress_callbacks = {} self._exit_stack = AsyncExitStack() + self._exit_stack.push_async_callback(self._read_stream.aclose) + self._exit_stack.push_async_callback(self._write_stream.aclose) async def __aenter__(self) -> Self: self._task_group = anyio.create_task_group() @@ -291,7 +301,10 @@ def _receive_notification_adapter(self) -> TypeAdapter[ReceiveNotificationT]: raise NotImplementedError async def _receive_loop(self) -> None: - async with self._read_stream, self._write_stream: + async with AsyncExitStack() as stack: + await stack.enter_async_context(self._read_stream) + if self._close_write_stream_on_read_close: + await stack.enter_async_context(self._write_stream) try: async def _handle_session_message(message: SessionMessage) -> None: diff --git a/tests/server/test_cancel_handling.py b/tests/server/test_cancel_handling.py index cff5a37c1..32088abe6 100644 --- a/tests/server/test_cancel_handling.py +++ b/tests/server/test_cancel_handling.py @@ -19,6 +19,7 @@ InitializeRequestParams, JSONRPCNotification, JSONRPCRequest, + JSONRPCResponse, ListToolsResult, PaginatedRequestParams, TextContent, @@ -100,17 +101,79 @@ async def first_request(): @pytest.mark.anyio -async def test_server_cancels_in_flight_handlers_on_transport_close(): - """When the transport closes mid-request, server.run() must cancel in-flight - handlers rather than join on them. +async def test_server_drains_in_flight_handlers_on_transport_read_eof(): + """When the transport's read side hits EOF (e.g., stdio stdin closes), the + server must drain already-started handlers so their responses reach the + peer via the still-open write side.""" + handler_started = anyio.Event() + handler_allowed_to_finish = anyio.Event() + server_run_returned = anyio.Event() - Without the cancel, the task group waits for the handler, which then tries - to respond through a write stream that _receive_loop already closed, - raising ClosedResourceError and crashing server.run() with exit code 1. + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + handler_started.set() + await handler_allowed_to_finish.wait() + return CallToolResult(content=[TextContent(type="text", text="ok")]) - This drives server.run() with raw memory streams because InMemoryTransport - wraps it in its own finally-cancel (_memory.py) which masks the bug. - """ + server = Server("test", on_call_tool=handle_call_tool) + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run( + server_read, + server_write, + server.create_initialization_options(), + drain_on_read_close=True, + read_eof_drain_timeout_seconds=None, + ) + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocol_version=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + client_info=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(init_req)) + await from_server.receive() # init response + await to_server.send(SessionMessage(initialized)) + await to_server.send(SessionMessage(call_req)) + + await handler_started.wait() + + # Close the server's input stream — this is what stdin EOF does. + # server.run()'s incoming_messages loop ends, finally-cancel fires, + # handler gets CancelledError, server.run() returns. + await to_server.aclose() + + handler_allowed_to_finish.set() + + response = await from_server.receive() + assert isinstance(response.message, JSONRPCResponse) + assert response.message.id == 2 + + await server_run_returned.wait() + + +@pytest.mark.anyio +async def test_server_bounds_drain_on_read_eof_when_handler_never_finishes(): handler_started = anyio.Event() handler_cancelled = anyio.Event() server_run_returned = anyio.Event() @@ -121,7 +184,6 @@ async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestPar await anyio.sleep_forever() finally: handler_cancelled.set() - # unreachable: sleep_forever only exits via cancellation raise AssertionError # pragma: no cover server = Server("test", on_call_tool=handle_call_tool) @@ -129,6 +191,128 @@ async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestPar to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + async def run_server(): + await server.run( + server_read, + server_write, + server.create_initialization_options(), + drain_on_read_close=True, + read_eof_drain_timeout_seconds=0.05, + ) + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocol_version=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + client_info=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + + with anyio.fail_after(2): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(init_req)) + await from_server.receive() # init response + await to_server.send(SessionMessage(initialized)) + await to_server.send(SessionMessage(call_req)) + + await handler_started.wait() + await to_server.aclose() + + await server_run_returned.wait() + + assert handler_cancelled.is_set() + + +@pytest.mark.anyio +async def test_server_reraises_handler_cancellation_when_server_is_cancelled(): + """If the server task is cancelled (e.g. KeyboardInterrupt), in-flight + request handlers will get cancelled too. Cancellation must be re-raised so + the task group can unwind cleanly.""" + handler_started = anyio.Event() + server_run_returned = anyio.Event() + cancel_scope = anyio.CancelScope() + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + handler_started.set() + await anyio.sleep_forever() + raise AssertionError # pragma: no cover + + server = Server("test", on_call_tool=handle_call_tool) + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + try: + with cancel_scope: + await server.run(server_read, server_write, server.create_initialization_options()) + finally: + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocol_version=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + client_info=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(init_req)) + await from_server.receive() # init response + await to_server.send(SessionMessage(initialized)) + await to_server.send(SessionMessage(call_req)) + + await handler_started.wait() + cancel_scope.cancel() + await server_run_returned.wait() + + +@pytest.mark.anyio +async def test_server_drops_response_when_write_stream_closes_mid_request(): + """If the write side closes while a handler is in-flight, responding may + raise (ClosedResourceError/BrokenResourceError). The handler task should + exit without crashing the server.""" + handler_started = anyio.Event() + allow_finish = anyio.Event() + server_run_returned = anyio.Event() + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + handler_started.set() + await allow_finish.wait() + return CallToolResult(content=[TextContent(type="text", text="ok")]) + + server = Server("test", on_call_tool=handle_call_tool) + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + async def run_server(): await server.run(server_read, server_write, server.create_initialization_options()) server_run_returned.set() @@ -161,16 +345,13 @@ async def run_server(): await to_server.send(SessionMessage(call_req)) await handler_started.wait() + await server_write.aclose() - # Close the server's input stream — this is what stdin EOF does. - # server.run()'s incoming_messages loop ends, finally-cancel fires, - # handler gets CancelledError, server.run() returns. + allow_finish.set() await to_server.aclose() await server_run_returned.wait() - assert handler_cancelled.is_set() - @pytest.mark.anyio async def test_server_handles_transport_close_with_pending_server_to_client_requests(): diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 054a157b3..8f97c679a 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -8,10 +8,27 @@ import anyio import pytest +from mcp.server import Server, ServerRequestContext from mcp.server.mcpserver import MCPServer from mcp.server.stdio import stdio_server from mcp.shared.message import SessionMessage -from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter +from mcp.types import ( + LATEST_PROTOCOL_VERSION, + CallToolRequestParams, + CallToolResult, + ClientCapabilities, + Implementation, + InitializeRequestParams, + JSONRPCMessage, + JSONRPCNotification, + JSONRPCRequest, + JSONRPCResponse, + ListToolsResult, + PaginatedRequestParams, + TextContent, + Tool, + jsonrpc_message_adapter, +) @pytest.mark.anyio @@ -169,3 +186,87 @@ async def lifespan(server: MCPServer) -> AsyncIterator[None]: assert events == ["setup", "cleanup"] response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + + +@pytest.mark.anyio +async def test_stdio_server_drains_in_flight_responses_on_stdin_eof(): + """When stdin reaches EOF (e.g., bash-redirected input), already-received + requests must still be able to emit their responses on stdout.""" + stdin = io.StringIO() + stdout = io.StringIO() + + tool_started_count = 0 + both_tools_started = anyio.Event() + allow_tools_to_finish = anyio.Event() + + async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult: + return ListToolsResult(tools=[Tool(name="slow", description="test", input_schema={})]) + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + nonlocal tool_started_count + tool_started_count += 1 + if tool_started_count == 2: + both_tools_started.set() + await allow_tools_to_finish.wait() + return CallToolResult(content=[TextContent(type="text", text="ok")]) + + server = Server("test", on_list_tools=handle_list_tools, on_call_tool=handle_call_tool) + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=0, + method="initialize", + params=InitializeRequestParams( + protocol_version=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + client_info=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + list_tools = JSONRPCRequest(jsonrpc="2.0", id=10, method="tools/list") + call_1 = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + call_2 = JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + + for message in (init_req, initialized, list_tools, call_1, call_2): + stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n") + stdin.seek(0) + + async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( + read_stream, + write_stream, + ): + with anyio.fail_after(5): + async with anyio.create_task_group() as tg: # pragma: no branch + + async def run_server() -> None: + await server.run( + read_stream, + write_stream, + server.create_initialization_options(), + drain_on_read_close=True, + ) + + tg.start_soon(run_server) + await both_tools_started.wait() + allow_tools_to_finish.set() + + stdout.seek(0) + output_lines = [line.strip() for line in stdout.readlines()] + messages = [jsonrpc_message_adapter.validate_json(line) for line in output_lines] + ids: set[int | str] = set() + for message in messages: + assert isinstance(message, JSONRPCResponse) + ids.add(message.id) + + assert 1 in ids + assert 2 in ids diff --git a/tests/shared/test_jsonrpc_dispatcher.py b/tests/shared/test_jsonrpc_dispatcher.py index b2a24c87d..39efaf8b4 100644 --- a/tests/shared/test_jsonrpc_dispatcher.py +++ b/tests/shared/test_jsonrpc_dispatcher.py @@ -249,6 +249,62 @@ async def drive() -> None: s2c_recv.close() +@pytest.mark.anyio +async def test_run_closes_write_stream_after_clean_eof_without_drain_timeout(): + c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( + c2s_recv, + s2c_send, + close_write_stream_on_read_close=False, + read_eof_drain_timeout_seconds=None, + ) + on_request, on_notify = echo_handlers(Recorder()) + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, c2s_send, c2s_recv, s2c_send, s2c_recv: + await tg.start(server.run, on_request, on_notify) + c2s_send.close() + with pytest.raises(anyio.EndOfStream): # pragma: no branch + await s2c_recv.receive() + + +@pytest.mark.anyio +async def test_run_drains_in_flight_handlers_on_clean_eof_without_timeout(): + c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( + c2s_recv, + s2c_send, + close_write_stream_on_read_close=False, + read_eof_drain_timeout_seconds=None, + ) + handler_started = anyio.Event() + handler_allowed_to_finish = anyio.Event() + + async def handle_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]: + handler_started.set() + await handler_allowed_to_finish.wait() + return {"drained": True} + + async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None: + raise NotImplementedError + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, c2s_send, c2s_recv, s2c_send, s2c_recv: + await tg.start(server.run, handle_request, on_notify) + await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="x", params=None))) + await handler_started.wait() + c2s_send.close() + handler_allowed_to_finish.set() + + response = await s2c_recv.receive() + assert isinstance(response, SessionMessage) + assert isinstance(response.message, JSONRPCResponse) + assert response.message.id == 1 + assert response.message.result == {"drained": True} + + @pytest.mark.anyio async def test_run_closes_write_stream_on_exit(): """run() enters both streams; the write end is released on EOF.""" diff --git a/tests/shared/test_session.py b/tests/shared/test_session.py index 8a53b0819..d1df0fa3b 100644 --- a/tests/shared/test_session.py +++ b/tests/shared/test_session.py @@ -1,5 +1,8 @@ +from typing import Any + import anyio import pytest +from pydantic import TypeAdapter from mcp import Client, types from mcp.client.session import ClientSession @@ -7,7 +10,7 @@ from mcp.shared.exceptions import MCPError from mcp.shared.memory import create_client_server_memory_streams from mcp.shared.message import SessionMessage -from mcp.shared.session import RequestResponder +from mcp.shared.session import BaseSession, RequestId, RequestResponder from mcp.types import ( PARSE_ERROR, CancelledNotification, @@ -16,6 +19,7 @@ EmptyResult, ErrorData, JSONRPCError, + JSONRPCNotification, JSONRPCRequest, JSONRPCResponse, ServerNotification, @@ -291,6 +295,40 @@ async def mock_server(): await ev_response.wait() +@pytest.mark.anyio +async def test_receive_loop_can_leave_write_stream_open_on_read_eof(): + class TestSession(BaseSession[Any, Any, Any, Any, Any]): + async def _send_response(self, request_id: RequestId, response: Any | ErrorData) -> None: + raise NotImplementedError # pragma: no cover + + @property + def _receive_request_adapter(self) -> TypeAdapter[Any]: + return TypeAdapter(object) # pragma: no cover + + @property + def _receive_notification_adapter(self) -> TypeAdapter[Any]: + return TypeAdapter(object) # pragma: no cover + + read_send, read_receive = anyio.create_memory_object_stream[SessionMessage | Exception](1) + write_send, write_receive = anyio.create_memory_object_stream[SessionMessage](1) + session = TestSession(read_receive, write_send, close_write_stream_on_read_close=False) + receive_loop_returned = anyio.Event() + + async def drive_receive_loop() -> None: + await session._receive_loop() # pyright: ignore[reportPrivateUsage] + receive_loop_returned.set() + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, read_send, read_receive, write_send, write_receive: + tg.start_soon(drive_receive_loop) + await read_send.aclose() + await receive_loop_returned.wait() + + marker = SessionMessage(message=JSONRPCNotification(jsonrpc="2.0", method="still-open")) + await write_send.send(marker) + assert await write_receive.receive() is marker + + @pytest.mark.anyio async def test_null_id_error_surfaced_via_message_handler(): """Test that a JSONRPCError with id=None is surfaced to the message handler.