|
48 | 48 | CallToolRequestParams, |
49 | 49 | CallToolResult, |
50 | 50 | InitializeResult, |
| 51 | + JSONRPCError, |
| 52 | + JSONRPCNotification, |
51 | 53 | JSONRPCRequest, |
| 54 | + JSONRPCResponse, |
52 | 55 | ListToolsResult, |
53 | 56 | PaginatedRequestParams, |
54 | 57 | ReadResourceRequestParams, |
@@ -902,6 +905,60 @@ async def test_streamable_http_client_error_handling(initialized_client_session: |
902 | 905 | assert "Unknown resource: unknown://test-error" in exc_info.value.error.message |
903 | 906 |
|
904 | 907 |
|
| 908 | +@pytest.mark.anyio |
| 909 | +async def test_streamable_http_request_error_does_not_close_writer() -> None: |
| 910 | + async def handler(request: httpx.Request) -> httpx.Response: |
| 911 | + body = json.loads(request.content) |
| 912 | + if body["method"] == "tools/list": |
| 913 | + raise httpx.ConnectError("boom", request=request) |
| 914 | + |
| 915 | + return httpx.Response( |
| 916 | + 200, |
| 917 | + headers={"content-type": "application/json"}, |
| 918 | + json={"jsonrpc": "2.0", "id": body["id"], "result": {}}, |
| 919 | + request=request, |
| 920 | + ) |
| 921 | + |
| 922 | + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: |
| 923 | + async with streamable_http_client("http://testserver/mcp", http_client=client) as (read_stream, write_stream): |
| 924 | + await write_stream.send(SessionMessage(JSONRPCRequest(jsonrpc="2.0", id="bad", method="tools/list"))) |
| 925 | + |
| 926 | + with anyio.fail_after(1): |
| 927 | + error_message = await read_stream.receive() |
| 928 | + |
| 929 | + assert isinstance(error_message, SessionMessage) |
| 930 | + assert isinstance(error_message.message, JSONRPCError) |
| 931 | + assert error_message.message.id == "bad" |
| 932 | + assert error_message.message.error.code == types.INTERNAL_ERROR |
| 933 | + |
| 934 | + await write_stream.send(SessionMessage(JSONRPCRequest(jsonrpc="2.0", id="ok", method="ping"))) |
| 935 | + |
| 936 | + with anyio.fail_after(1): |
| 937 | + response_message = await read_stream.receive() |
| 938 | + |
| 939 | + assert isinstance(response_message, SessionMessage) |
| 940 | + assert isinstance(response_message.message, JSONRPCResponse) |
| 941 | + assert response_message.message.id == "ok" |
| 942 | + |
| 943 | + |
| 944 | +@pytest.mark.anyio |
| 945 | +async def test_streamable_http_notification_error_still_closes_writer() -> None: |
| 946 | + request_seen = anyio.Event() |
| 947 | + |
| 948 | + async def handler(request: httpx.Request) -> httpx.Response: |
| 949 | + request_seen.set() |
| 950 | + raise httpx.ConnectError("boom", request=request) |
| 951 | + |
| 952 | + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: |
| 953 | + async with streamable_http_client("http://testserver/mcp", http_client=client) as (_, write_stream): |
| 954 | + await write_stream.send( |
| 955 | + SessionMessage(JSONRPCNotification(jsonrpc="2.0", method="notifications/cancelled")) |
| 956 | + ) |
| 957 | + |
| 958 | + with anyio.fail_after(1): # pragma: no branch |
| 959 | + await request_seen.wait() |
| 960 | + |
| 961 | + |
905 | 962 | @pytest.mark.anyio |
906 | 963 | async def test_streamable_http_client_session_persistence(basic_app: Starlette) -> None: |
907 | 964 | """The session persists across multiple requests on one connection.""" |
|
0 commit comments