diff --git a/.gitignore b/.gitignore index 0e321204..35e2d68e 100644 --- a/.gitignore +++ b/.gitignore @@ -174,3 +174,6 @@ make/local.mk # E2E report e2e-report.xml + +# Claude Code local artifacts +.claude/ diff --git a/mpt_api_client/constants.py b/mpt_api_client/constants.py index c0b1552d..e512d738 100644 --- a/mpt_api_client/constants.py +++ b/mpt_api_client/constants.py @@ -1,2 +1,3 @@ APPLICATION_JSON = "application/json" +APPLICATION_JSONL = "application/jsonl" MIMETYPE_EXCEL_XLSX = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" diff --git a/mpt_api_client/http/async_client.py b/mpt_api_client/http/async_client.py index 46a24ded..f03e4b9c 100644 --- a/mpt_api_client/http/async_client.py +++ b/mpt_api_client/http/async_client.py @@ -1,7 +1,10 @@ import os +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager from typing import Any from httpx import AsyncClient, HTTPError, RequestError +from httpx import Response as HTTPXResponse from httpx_retries import Retry, RetryTransport from mpt_api_client.constants import APPLICATION_JSON @@ -113,3 +116,48 @@ async def request( # noqa: WPS211 status_code=response.status_code, content=response.content, ) + + @asynccontextmanager + async def stream( + self, + method: str, + url: str, + *, + headers: HeaderTypes | None = None, + query_params: QueryParam | None = None, + options: QueryOptions | None = None, + ) -> AsyncIterator[HTTPXResponse]: + """Open a streaming response without buffering its body fully in memory. + + Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded + response (e.g. ``aiter_lines()``). Redirects are followed automatically. + + Args: + method: HTTP method. + url: URL to send the request to. + headers: Request headers. + query_params: Query parameters. + options: Additional options for the request. + + Yields: + The open streaming response. + + Raises: + MPTError: If the request fails. + MPTApiError: If the response contains an error. + MPTHttpError: If the response contains an HTTP error. + MPTMaxRetryError: If the request fails after maximum retry attempts. + """ + params_str = get_query_params(query_params, options) + try: + async with self.httpx_client.stream( + method, url, params=params_str or None, headers=headers + ) as response: + if response.is_error: + await response.aread() + handle_response_http_error(response) + yield response + except RequestError as err: + raise MPTMaxRetryError(str(err), self._retries + 1) from err + except HTTPError as err: + raise MPTError(f"HTTP Error: {err}") from err diff --git a/mpt_api_client/http/client.py b/mpt_api_client/http/client.py index 8e6e9517..5862e096 100644 --- a/mpt_api_client/http/client.py +++ b/mpt_api_client/http/client.py @@ -1,8 +1,11 @@ import json as json_package import os +from collections.abc import Iterator +from contextlib import contextmanager from typing import Any from httpx import Client, HTTPError, RequestError +from httpx import Response as HTTPXResponse from httpx_retries import Retry, RetryTransport from mpt_api_client.constants import APPLICATION_JSON @@ -122,3 +125,48 @@ def request( # noqa: WPS211 status_code=response.status_code, content=response.content, ) + + @contextmanager + def stream( + self, + method: str, + url: str, + *, + headers: HeaderTypes | None = None, + query_params: QueryParam | None = None, + options: QueryOptions | None = None, + ) -> Iterator[HTTPXResponse]: + """Open a streaming response without buffering its body fully in memory. + + Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded + response (e.g. ``iter_lines()``). Redirects are followed automatically. + + Args: + method: HTTP method. + url: URL to send the request to. + headers: Request headers. + query_params: Query parameters. + options: Additional options for the request. + + Yields: + The open streaming response. + + Raises: + MPTError: If the request fails. + MPTApiError: If the response contains an error. + MPTHttpError: If the response contains an HTTP error. + MPTMaxRetryError: If the request fails after maximum retry attempts. + """ + params_str = get_query_params(query_params, options) + try: + with self.httpx_client.stream( + method, url, params=params_str or None, headers=headers + ) as response: + if response.is_error: + response.read() + handle_response_http_error(response) + yield response + except RequestError as err: + raise MPTMaxRetryError(str(err), self._retries + 1) from err + except HTTPError as err: + raise MPTError(f"HTTP Error: {err}") from err diff --git a/mpt_api_client/http/mixins/__init__.py b/mpt_api_client/http/mixins/__init__.py index 84af78a1..70aec21e 100644 --- a/mpt_api_client/http/mixins/__init__.py +++ b/mpt_api_client/http/mixins/__init__.py @@ -26,6 +26,10 @@ ManagedResourceMixin, ModifiableResourceMixin, ) +from mpt_api_client.http.mixins.stream_jsonl_mixin import ( + AsyncStreamJSONLMixin, + StreamJSONLMixin, +) from mpt_api_client.http.mixins.update_file_mixin import ( AsyncUpdateFileMixin, UpdateFileMixin, @@ -44,6 +48,7 @@ "AsyncGetMixin", "AsyncManagedResourceMixin", "AsyncModifiableResourceMixin", + "AsyncStreamJSONLMixin", "AsyncUpdateFileMixin", "AsyncUpdateMixin", "CollectionMixin", @@ -58,6 +63,7 @@ "ManagedResourceMixin", "ModifiableResourceMixin", "QueryableMixin", + "StreamJSONLMixin", "UpdateFileMixin", "UpdateMixin", ] diff --git a/mpt_api_client/http/mixins/stream_jsonl_mixin.py b/mpt_api_client/http/mixins/stream_jsonl_mixin.py new file mode 100644 index 00000000..f3322865 --- /dev/null +++ b/mpt_api_client/http/mixins/stream_jsonl_mixin.py @@ -0,0 +1,52 @@ +import json +from collections.abc import AsyncIterator, Iterator + +from mpt_api_client.constants import APPLICATION_JSONL +from mpt_api_client.http.mixins.queryable_mixin import QueryableMixin +from mpt_api_client.models import Model as BaseModel + + +class StreamJSONLMixin[Model: BaseModel](QueryableMixin): + """Mixin providing JSONL (NDJSON) streaming of a collection line by line.""" + + def stream(self) -> Iterator[Model]: + """Stream resources from a JSONL endpoint, yielding one model per line. + + Unlike ``iterate()``, which paginates and deserializes full pages, this + consumes a ``application/jsonl`` response line by line without buffering the + whole body in memory. + + Yields: + Resources, one per non-empty line of the response. + """ + with self.http_client.stream( # type: ignore[attr-defined] + "GET", + self.build_path(), # type: ignore[attr-defined] + headers={"Accept": APPLICATION_JSONL}, + ) as response: + for line in response.iter_lines(): + if line.strip(): + yield self._model_class(json.loads(line)) # type: ignore[attr-defined] + + +class AsyncStreamJSONLMixin[Model: BaseModel](QueryableMixin): + """Async mixin providing JSONL (NDJSON) streaming of a collection line by line.""" + + async def stream(self) -> AsyncIterator[Model]: + """Stream resources from a JSONL endpoint, yielding one model per line. + + Unlike ``iterate()``, which paginates and deserializes full pages, this + consumes a ``application/jsonl`` response line by line without buffering the + whole body in memory. + + Yields: + Resources, one per non-empty line of the response. + """ + async with self.http_client.stream( # type: ignore[attr-defined] + "GET", + self.build_path(), # type: ignore[attr-defined] + headers={"Accept": APPLICATION_JSONL}, + ) as response: + async for line in response.aiter_lines(): + if line.strip(): + yield self._model_class(json.loads(line)) # type: ignore[attr-defined] diff --git a/mpt_api_client/resources/billing/statement_charges.py b/mpt_api_client/resources/billing/statement_charges.py index 61fe1bd2..e46f7bd4 100644 --- a/mpt_api_client/resources/billing/statement_charges.py +++ b/mpt_api_client/resources/billing/statement_charges.py @@ -2,14 +2,85 @@ from mpt_api_client.http.mixins import ( AsyncCollectionMixin, AsyncGetMixin, + AsyncStreamJSONLMixin, CollectionMixin, GetMixin, + StreamJSONLMixin, ) from mpt_api_client.models import Model +from mpt_api_client.models.model import BaseModel class StatementCharge(Model): - """Statement Charge resource.""" + """Statement Charge resource. + + Attributes: + revision: Charge revision number. + external_ids: External identifiers associated with the charge. + search: Search-related details for the charge. + period: Period during which the charge is applicable. + quantity: Quantity associated with the charge. + price: Pricing details for the charge. + description: Description of the charge, if applicable. + attributes: Additional attributes for the charge. + billing_type: Billing type of the charge. + statement_type: Type of statement associated with the charge. + journal: Reference to the journal. + ledger: Reference to the ledger. + custom_ledger: Reference to the custom ledger. + parent: Reference to the parent charge. + upload: Upload status and details, visible to vendors or operations. + processing: Processing status and details, visible to operations. + licensee: Reference to the licensee. + agreement: Reference to the agreement. + subscription: Reference to the subscription. + line: Agreement line associated with the charge, if applicable. + order: Reference to the order. + asset: Reference to the asset. + item: Reference to the product item. + authorization: Reference to the authorization. + statement: Reference to the statement. + buyer: Reference to the buyer. + vendor: Reference to the vendor account. + seller: Reference to the seller. + erp_data: ERP-related data for the charge. + split: Details about the charge split, if applicable. + reconciliation: Reconciliation information for the charge. + audit: Container for audit-related events for the charge. + """ + + revision: int | None + external_ids: BaseModel | None + search: BaseModel | None + period: BaseModel | None + quantity: float | None + price: BaseModel | None + description: BaseModel | None + attributes: BaseModel | None + billing_type: str | None + statement_type: str | None + journal: BaseModel | None + ledger: BaseModel | None + custom_ledger: BaseModel | None + parent: BaseModel | None + upload: BaseModel | None + processing: BaseModel | None + licensee: BaseModel | None + agreement: BaseModel | None + subscription: BaseModel | None + line: BaseModel | None + order: BaseModel | None + asset: BaseModel | None + item: BaseModel | None + authorization: BaseModel | None + statement: BaseModel | None + buyer: BaseModel | None + vendor: BaseModel | None + seller: BaseModel | None + erp_data: BaseModel | None + split: BaseModel | None + reconciliation: BaseModel | None + audit: BaseModel | None class StatementChargesServiceConfig: @@ -21,6 +92,7 @@ class StatementChargesServiceConfig: class StatementChargesService( + StreamJSONLMixin[StatementCharge], GetMixin[StatementCharge], CollectionMixin[StatementCharge], Service[StatementCharge], @@ -30,6 +102,7 @@ class StatementChargesService( class AsyncStatementChargesService( + AsyncStreamJSONLMixin[StatementCharge], AsyncGetMixin[StatementCharge], AsyncCollectionMixin[StatementCharge], AsyncService[StatementCharge], diff --git a/tests/e2e/billing/statement/charge/test_async_statement_charge.py b/tests/e2e/billing/statement/charge/test_async_statement_charge.py index 56549b68..4553992e 100644 --- a/tests/e2e/billing/statement/charge/test_async_statement_charge.py +++ b/tests/e2e/billing/statement/charge/test_async_statement_charge.py @@ -42,3 +42,10 @@ async def test_filter_statement_charges(statement_charges, statement_charge_id): result = [statement async for statement in filtered_charges.iterate()] assert len(result) == 1 + + +async def test_stream_statement_charges_jsonl(statement_charges): + result = [charge async for charge in statement_charges.stream()] + + assert len(result) > 0 + assert all(charge.id for charge in result) diff --git a/tests/e2e/billing/statement/charge/test_sync_statement_charge.py b/tests/e2e/billing/statement/charge/test_sync_statement_charge.py index e1833796..ae1d0fb0 100644 --- a/tests/e2e/billing/statement/charge/test_sync_statement_charge.py +++ b/tests/e2e/billing/statement/charge/test_sync_statement_charge.py @@ -42,3 +42,10 @@ def test_filter_statement_charges(statement_charges, statement_charge_id): result = list(filtered_charges.iterate()) assert len(result) == 1 + + +def test_stream_statement_charges_jsonl(statement_charges): + result = list(statement_charges.stream()) + + assert len(result) > 0 + assert all(charge.id for charge in result) diff --git a/tests/unit/http/mixins/test_stream_jsonl_mixin.py b/tests/unit/http/mixins/test_stream_jsonl_mixin.py new file mode 100644 index 00000000..21722726 --- /dev/null +++ b/tests/unit/http/mixins/test_stream_jsonl_mixin.py @@ -0,0 +1,77 @@ +import httpx +import pytest +import respx + +from mpt_api_client import RQLQuery +from mpt_api_client.http import AsyncService, Service +from mpt_api_client.http.mixins import AsyncStreamJSONLMixin, StreamJSONLMixin +from tests.unit.conftest import API_URL, DummyModel + +JSONL_BODY = b'{"id": "ID-1", "name": "Charge 1"}\n\n{"id": "ID-2", "name": "Charge 2"}\n' + + +class DummyStreamService( # noqa: WPS215 + StreamJSONLMixin[DummyModel], + Service[DummyModel], +): + _endpoint = "/api/v1/charges" + _model_class = DummyModel + + +class AsyncDummyStreamService( # noqa: WPS215 + AsyncStreamJSONLMixin[DummyModel], + AsyncService[DummyModel], +): + _endpoint = "/api/v1/charges" + _model_class = DummyModel + + +@pytest.fixture +def stream_service(http_client): + return DummyStreamService(http_client=http_client) + + +@pytest.fixture +def async_stream_service(async_http_client): + return AsyncDummyStreamService(http_client=async_http_client) + + +@respx.mock +def test_stream_yields_models(stream_service): + route = respx.get(f"{API_URL}/api/v1/charges").mock( + return_value=httpx.Response(httpx.codes.OK, content=JSONL_BODY) + ) + + result = list(stream_service.stream()) + + request = route.calls[0].request + assert [charge.id for charge in result] == ["ID-1", "ID-2"] + assert all(isinstance(charge, DummyModel) for charge in result) + assert request.headers["Accept"] == "application/jsonl" + + +@respx.mock +def test_stream_applies_query_filters(stream_service): + route = respx.get(f"{API_URL}/api/v1/charges").mock( + return_value=httpx.Response(httpx.codes.OK, content=JSONL_BODY) + ) + + result = list(stream_service.filter(RQLQuery(status="active")).stream()) + + request = route.calls[0].request + assert result + assert "status" in request.url.query.decode() + + +@respx.mock +async def test_async_stream_yields_models(async_stream_service): + route = respx.get(f"{API_URL}/api/v1/charges").mock( + return_value=httpx.Response(httpx.codes.OK, content=JSONL_BODY) + ) + + result = [charge async for charge in async_stream_service.stream()] + + request = route.calls[0].request + assert [charge.id for charge in result] == ["ID-1", "ID-2"] + assert all(isinstance(charge, DummyModel) for charge in result) + assert request.headers["Accept"] == "application/jsonl" diff --git a/tests/unit/http/test_async_client.py b/tests/unit/http/test_async_client.py index 7b60a446..1211eeb6 100644 --- a/tests/unit/http/test_async_client.py +++ b/tests/unit/http/test_async_client.py @@ -5,7 +5,7 @@ import respx from httpx import ConnectTimeout, Request, Response, codes -from mpt_api_client.exceptions import MPTError +from mpt_api_client.exceptions import MPTAPIError, MPTError, MPTMaxRetryError from mpt_api_client.http.async_client import AsyncHTTPClient from mpt_api_client.http.query_options import QueryOptions from tests.unit.conftest import API_TOKEN, API_URL @@ -85,6 +85,43 @@ async def test_async_http_call_failure(async_http_client): assert timeout_route.call_count == 6 +async def drain_async_stream(stream_cm): + async with stream_cm as response: + return [line async for line in response.aiter_lines()] + + +@respx.mock +async def test_async_stream_yields_lines(async_http_client): + body = b'{"id": "ID-1"}\n{"id": "ID-2"}\n' + streamed = Response(200, content=body) + stream_route = respx.get(f"{API_URL}/charges").mock(return_value=streamed) + + result = await drain_async_stream( + async_http_client.stream("GET", "/charges", headers={"Accept": "application/jsonl"}) + ) + + request = stream_route.calls[0].request + assert result == ['{"id": "ID-1"}', '{"id": "ID-2"}'] + assert request.headers["Accept"] == "application/jsonl" + + +@respx.mock +async def test_async_stream_error_status(async_http_client): + not_found = Response(404, json={"message": "Not Found"}) + respx.get(f"{API_URL}/charges").mock(return_value=not_found) + + with pytest.raises(MPTAPIError, match=r"404"): + await drain_async_stream(async_http_client.stream("GET", "/charges")) + + +@respx.mock +async def test_async_stream_conn_error(async_http_client): + respx.get(f"{API_URL}/charges").mock(side_effect=ConnectTimeout("Mock Timeout")) + + with pytest.raises(MPTMaxRetryError): + await drain_async_stream(async_http_client.stream("GET", "/charges")) + + async def test_http_call_with_json_and_files(mocker, async_http_client, mock_httpx_response): # noqa: WPS210 json_data = {"foo": "bar"} files = {"file": ("test.txt", io.StringIO("file content"), "text/plain")} diff --git a/tests/unit/http/test_client.py b/tests/unit/http/test_client.py index 56f022b8..2d14ed86 100644 --- a/tests/unit/http/test_client.py +++ b/tests/unit/http/test_client.py @@ -5,7 +5,7 @@ import respx from httpx import ConnectTimeout, Response, codes -from mpt_api_client.exceptions import MPTMaxRetryError +from mpt_api_client.exceptions import MPTAPIError, MPTMaxRetryError from mpt_api_client.http.client import HTTPClient from mpt_api_client.http.query_options import QueryOptions from tests.unit.conftest import API_TOKEN, API_URL @@ -110,6 +110,43 @@ def test_http_call_force_multipart(mocker, http_client): assert payload_tuple[1].decode() == '{"foo":"bar"}' +def drain_stream(stream_cm): + with stream_cm as response: + return list(response.iter_lines()) + + +@respx.mock +def test_stream_yields_lines(http_client): + body = b'{"id": "ID-1"}\n{"id": "ID-2"}\n' + streamed = Response(200, content=body) + stream_route = respx.get(f"{API_URL}/charges").mock(return_value=streamed) + + result = drain_stream( + http_client.stream("GET", "/charges", headers={"Accept": "application/jsonl"}) + ) + + request = stream_route.calls[0].request + assert result == ['{"id": "ID-1"}', '{"id": "ID-2"}'] + assert request.headers["Accept"] == "application/jsonl" + + +@respx.mock +def test_stream_raises_on_error_status(http_client): + not_found = Response(404, json={"message": "Not Found"}) + respx.get(f"{API_URL}/charges").mock(return_value=not_found) + + with pytest.raises(MPTAPIError, match=r"404"): + drain_stream(http_client.stream("GET", "/charges")) + + +@respx.mock +def test_stream_raises_on_connection_error(http_client): + respx.get(f"{API_URL}/charges").mock(side_effect=ConnectTimeout("Mock Timeout")) + + with pytest.raises(MPTMaxRetryError): + drain_stream(http_client.stream("GET", "/charges")) + + def test_request_with_render(mocker, http_client, mock_httpx_response): parent_request = mocker.patch.object( http_client.httpx_client, "request", autospec=True, return_value=mock_httpx_response diff --git a/tests/unit/resources/billing/test_statement_charges.py b/tests/unit/resources/billing/test_statement_charges.py index 4e5a1c37..48b20ef5 100644 --- a/tests/unit/resources/billing/test_statement_charges.py +++ b/tests/unit/resources/billing/test_statement_charges.py @@ -1,7 +1,9 @@ import pytest +from mpt_api_client.models.model import BaseModel from mpt_api_client.resources.billing.statement_charges import ( AsyncStatementChargesService, + StatementCharge, StatementChargesService, ) @@ -36,15 +38,67 @@ def test_async_endpoint(async_statement_charges_service): assert result is True -@pytest.mark.parametrize("method", ["get"]) +@pytest.mark.parametrize("method", ["get", "stream"]) def test_methods_present(statement_charges_service, method): result = hasattr(statement_charges_service, method) assert result is True -@pytest.mark.parametrize("method", ["get"]) +@pytest.mark.parametrize("method", ["get", "stream"]) def test_async_methods_present(async_statement_charges_service, method): result = hasattr(async_statement_charges_service, method) assert result is True + + +@pytest.fixture +def charge_data(): + return { + "id": "CHG-0001", + "revision": 3, + "quantity": 10.5, + "billingType": "automated", + "statementType": "Standard", + "externalIds": {"invoice": "INV12345"}, + "price": {"unitPP": 10, "PPx1": 8.33}, + "period": {"start": "2025-01-01", "end": "2025-12-31"}, + "description": {"value1": "desc-1"}, + "statement": {"id": "STM-0000-0001"}, + "journal": {"id": "BJO-0001"}, + "audit": {"created": {"at": "2024-01-01T00:00:00Z"}}, + } + + +def test_charge_primitive_fields(charge_data): + result = StatementCharge(charge_data) + + assert result.to_dict() == charge_data + + +def test_charge_typed_scalar_fields(charge_data): # noqa: WPS218 + result = StatementCharge(charge_data) + + assert result.id == "CHG-0001" + assert result.revision == 3 + assert result.quantity == pytest.approx(10.5) + assert result.billing_type == "automated" + assert result.statement_type == "Standard" + + +def test_charge_nested_fields(charge_data): # noqa: WPS218 + result = StatementCharge(charge_data) + + assert isinstance(result.external_ids, BaseModel) + assert isinstance(result.price, BaseModel) + assert isinstance(result.period, BaseModel) + assert isinstance(result.statement, BaseModel) + assert isinstance(result.audit, BaseModel) + + +def test_charge_optional_fields_absent(): + result = StatementCharge({"id": "CHG-0001"}) + + assert result.id == "CHG-0001" + assert not hasattr(result, "price") + assert not hasattr(result, "external_ids")