Skip to content

Commit edcf3a2

Browse files
authored
MPT-21666 stream statement charges as JSONL (#333)
πŸ€– AI-generated PR β€” Please review carefully. ## What was done Add line-by-line JSONL/NDJSON streaming for statement charges so large charge sets can be consumed without buffering full pages into memory. - Add a `stream()` context-manager primitive to `HTTPClient`/`AsyncHTTPClient` that opens a streaming response (`Accept: application/jsonl`) and follows the redirect to the charge blob automatically. - Add `StreamJSONLMixin`/`AsyncStreamJSONLMixin` that iterate the response lines and yield one parsed model per line; wire it into `StatementChargesService`, exposing `client.billing.statements.charges(id).stream()`. - Populate the `StatementCharge` model with the full Charge schema fields from the MPT OpenAPI spec (typed scalars + nested `BaseModel` refs). - Add unit and e2e coverage; ignore `.claude/` local artifacts. ## Testing Unit and e2e coverage added for the streaming primitive, the JSONL mixins, and the statement charges service. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> Closes [MPT-21666](https://softwareone.atlassian.net/browse/MPT-21666) - Add JSONL/NDJSON streaming for statement charges to stream large charge sets without buffering full pages - Introduce HTTPClient.stream and AsyncHTTPClient.stream context managers that open streaming responses with Accept: application/jsonl and follow charge-blob redirects - Add StreamJSONLMixin and AsyncStreamJSONLMixin to iterate JSONL response lines and yield one parsed model per line - Wire streaming mixins into StatementChargesService and AsyncStatementChargesService so callers can use client.billing.statements.charges(id).stream() - Expand StatementCharge model with full Charge schema fields (typed scalars and nested BaseModel references) - Add unit and end-to-end tests for the streaming primitives, JSONL mixins, and statement charges service - Update .gitignore to ignore local .claude/ artifacts <!-- end of auto-generated comment: release notes by coderabbit.ai --> [MPT-21666]: https://softwareone.atlassian.net/browse/MPT-21666?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
2 parents 5a32fdb + 21579d3 commit edcf3a2

13 files changed

Lines changed: 455 additions & 5 deletions

File tree

β€Ž.gitignoreβ€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,6 @@ make/local.mk
174174

175175
# E2E report
176176
e2e-report.xml
177+
178+
# Claude Code local artifacts
179+
.claude/

β€Žmpt_api_client/constants.pyβ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
APPLICATION_JSON = "application/json"
2+
APPLICATION_JSONL = "application/jsonl"
23
MIMETYPE_EXCEL_XLSX = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"

β€Žmpt_api_client/http/async_client.pyβ€Ž

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import os
2+
from collections.abc import AsyncIterator
3+
from contextlib import asynccontextmanager
24
from typing import Any
35

46
from httpx import AsyncClient, HTTPError, RequestError
7+
from httpx import Response as HTTPXResponse
58
from httpx_retries import Retry, RetryTransport
69

710
from mpt_api_client.constants import APPLICATION_JSON
@@ -113,3 +116,48 @@ async def request( # noqa: WPS211
113116
status_code=response.status_code,
114117
content=response.content,
115118
)
119+
120+
@asynccontextmanager
121+
async def stream(
122+
self,
123+
method: str,
124+
url: str,
125+
*,
126+
headers: HeaderTypes | None = None,
127+
query_params: QueryParam | None = None,
128+
options: QueryOptions | None = None,
129+
) -> AsyncIterator[HTTPXResponse]:
130+
"""Open a streaming response without buffering its body fully in memory.
131+
132+
Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded
133+
response (e.g. ``aiter_lines()``). Redirects are followed automatically.
134+
135+
Args:
136+
method: HTTP method.
137+
url: URL to send the request to.
138+
headers: Request headers.
139+
query_params: Query parameters.
140+
options: Additional options for the request.
141+
142+
Yields:
143+
The open streaming response.
144+
145+
Raises:
146+
MPTError: If the request fails.
147+
MPTApiError: If the response contains an error.
148+
MPTHttpError: If the response contains an HTTP error.
149+
MPTMaxRetryError: If the request fails after maximum retry attempts.
150+
"""
151+
params_str = get_query_params(query_params, options)
152+
try:
153+
async with self.httpx_client.stream(
154+
method, url, params=params_str or None, headers=headers
155+
) as response:
156+
if response.is_error:
157+
await response.aread()
158+
handle_response_http_error(response)
159+
yield response
160+
except RequestError as err:
161+
raise MPTMaxRetryError(str(err), self._retries + 1) from err
162+
except HTTPError as err:
163+
raise MPTError(f"HTTP Error: {err}") from err

β€Žmpt_api_client/http/client.pyβ€Ž

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import json as json_package
22
import os
3+
from collections.abc import Iterator
4+
from contextlib import contextmanager
35
from typing import Any
46

57
from httpx import Client, HTTPError, RequestError
8+
from httpx import Response as HTTPXResponse
69
from httpx_retries import Retry, RetryTransport
710

811
from mpt_api_client.constants import APPLICATION_JSON
@@ -122,3 +125,48 @@ def request( # noqa: WPS211
122125
status_code=response.status_code,
123126
content=response.content,
124127
)
128+
129+
@contextmanager
130+
def stream(
131+
self,
132+
method: str,
133+
url: str,
134+
*,
135+
headers: HeaderTypes | None = None,
136+
query_params: QueryParam | None = None,
137+
options: QueryOptions | None = None,
138+
) -> Iterator[HTTPXResponse]:
139+
"""Open a streaming response without buffering its body fully in memory.
140+
141+
Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded
142+
response (e.g. ``iter_lines()``). Redirects are followed automatically.
143+
144+
Args:
145+
method: HTTP method.
146+
url: URL to send the request to.
147+
headers: Request headers.
148+
query_params: Query parameters.
149+
options: Additional options for the request.
150+
151+
Yields:
152+
The open streaming response.
153+
154+
Raises:
155+
MPTError: If the request fails.
156+
MPTApiError: If the response contains an error.
157+
MPTHttpError: If the response contains an HTTP error.
158+
MPTMaxRetryError: If the request fails after maximum retry attempts.
159+
"""
160+
params_str = get_query_params(query_params, options)
161+
try:
162+
with self.httpx_client.stream(
163+
method, url, params=params_str or None, headers=headers
164+
) as response:
165+
if response.is_error:
166+
response.read()
167+
handle_response_http_error(response)
168+
yield response
169+
except RequestError as err:
170+
raise MPTMaxRetryError(str(err), self._retries + 1) from err
171+
except HTTPError as err:
172+
raise MPTError(f"HTTP Error: {err}") from err

β€Žmpt_api_client/http/mixins/__init__.pyβ€Ž

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
ManagedResourceMixin,
2727
ModifiableResourceMixin,
2828
)
29+
from mpt_api_client.http.mixins.stream_jsonl_mixin import (
30+
AsyncStreamJSONLMixin,
31+
StreamJSONLMixin,
32+
)
2933
from mpt_api_client.http.mixins.update_file_mixin import (
3034
AsyncUpdateFileMixin,
3135
UpdateFileMixin,
@@ -44,6 +48,7 @@
4448
"AsyncGetMixin",
4549
"AsyncManagedResourceMixin",
4650
"AsyncModifiableResourceMixin",
51+
"AsyncStreamJSONLMixin",
4752
"AsyncUpdateFileMixin",
4853
"AsyncUpdateMixin",
4954
"CollectionMixin",
@@ -58,6 +63,7 @@
5863
"ManagedResourceMixin",
5964
"ModifiableResourceMixin",
6065
"QueryableMixin",
66+
"StreamJSONLMixin",
6167
"UpdateFileMixin",
6268
"UpdateMixin",
6369
]
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import json
2+
from collections.abc import AsyncIterator, Iterator
3+
4+
from mpt_api_client.constants import APPLICATION_JSONL
5+
from mpt_api_client.http.mixins.queryable_mixin import QueryableMixin
6+
from mpt_api_client.models import Model as BaseModel
7+
8+
9+
class StreamJSONLMixin[Model: BaseModel](QueryableMixin):
10+
"""Mixin providing JSONL (NDJSON) streaming of a collection line by line."""
11+
12+
def stream(self) -> Iterator[Model]:
13+
"""Stream resources from a JSONL endpoint, yielding one model per line.
14+
15+
Unlike ``iterate()``, which paginates and deserializes full pages, this
16+
consumes a ``application/jsonl`` response line by line without buffering the
17+
whole body in memory.
18+
19+
Yields:
20+
Resources, one per non-empty line of the response.
21+
"""
22+
with self.http_client.stream( # type: ignore[attr-defined]
23+
"GET",
24+
self.build_path(), # type: ignore[attr-defined]
25+
headers={"Accept": APPLICATION_JSONL},
26+
) as response:
27+
for line in response.iter_lines():
28+
if line.strip():
29+
yield self._model_class(json.loads(line)) # type: ignore[attr-defined]
30+
31+
32+
class AsyncStreamJSONLMixin[Model: BaseModel](QueryableMixin):
33+
"""Async mixin providing JSONL (NDJSON) streaming of a collection line by line."""
34+
35+
async def stream(self) -> AsyncIterator[Model]:
36+
"""Stream resources from a JSONL endpoint, yielding one model per line.
37+
38+
Unlike ``iterate()``, which paginates and deserializes full pages, this
39+
consumes a ``application/jsonl`` response line by line without buffering the
40+
whole body in memory.
41+
42+
Yields:
43+
Resources, one per non-empty line of the response.
44+
"""
45+
async with self.http_client.stream( # type: ignore[attr-defined]
46+
"GET",
47+
self.build_path(), # type: ignore[attr-defined]
48+
headers={"Accept": APPLICATION_JSONL},
49+
) as response:
50+
async for line in response.aiter_lines():
51+
if line.strip():
52+
yield self._model_class(json.loads(line)) # type: ignore[attr-defined]

β€Žmpt_api_client/resources/billing/statement_charges.pyβ€Ž

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,85 @@
22
from mpt_api_client.http.mixins import (
33
AsyncCollectionMixin,
44
AsyncGetMixin,
5+
AsyncStreamJSONLMixin,
56
CollectionMixin,
67
GetMixin,
8+
StreamJSONLMixin,
79
)
810
from mpt_api_client.models import Model
11+
from mpt_api_client.models.model import BaseModel
912

1013

1114
class StatementCharge(Model):
12-
"""Statement Charge resource."""
15+
"""Statement Charge resource.
16+
17+
Attributes:
18+
revision: Charge revision number.
19+
external_ids: External identifiers associated with the charge.
20+
search: Search-related details for the charge.
21+
period: Period during which the charge is applicable.
22+
quantity: Quantity associated with the charge.
23+
price: Pricing details for the charge.
24+
description: Description of the charge, if applicable.
25+
attributes: Additional attributes for the charge.
26+
billing_type: Billing type of the charge.
27+
statement_type: Type of statement associated with the charge.
28+
journal: Reference to the journal.
29+
ledger: Reference to the ledger.
30+
custom_ledger: Reference to the custom ledger.
31+
parent: Reference to the parent charge.
32+
upload: Upload status and details, visible to vendors or operations.
33+
processing: Processing status and details, visible to operations.
34+
licensee: Reference to the licensee.
35+
agreement: Reference to the agreement.
36+
subscription: Reference to the subscription.
37+
line: Agreement line associated with the charge, if applicable.
38+
order: Reference to the order.
39+
asset: Reference to the asset.
40+
item: Reference to the product item.
41+
authorization: Reference to the authorization.
42+
statement: Reference to the statement.
43+
buyer: Reference to the buyer.
44+
vendor: Reference to the vendor account.
45+
seller: Reference to the seller.
46+
erp_data: ERP-related data for the charge.
47+
split: Details about the charge split, if applicable.
48+
reconciliation: Reconciliation information for the charge.
49+
audit: Container for audit-related events for the charge.
50+
"""
51+
52+
revision: int | None
53+
external_ids: BaseModel | None
54+
search: BaseModel | None
55+
period: BaseModel | None
56+
quantity: float | None
57+
price: BaseModel | None
58+
description: BaseModel | None
59+
attributes: BaseModel | None
60+
billing_type: str | None
61+
statement_type: str | None
62+
journal: BaseModel | None
63+
ledger: BaseModel | None
64+
custom_ledger: BaseModel | None
65+
parent: BaseModel | None
66+
upload: BaseModel | None
67+
processing: BaseModel | None
68+
licensee: BaseModel | None
69+
agreement: BaseModel | None
70+
subscription: BaseModel | None
71+
line: BaseModel | None
72+
order: BaseModel | None
73+
asset: BaseModel | None
74+
item: BaseModel | None
75+
authorization: BaseModel | None
76+
statement: BaseModel | None
77+
buyer: BaseModel | None
78+
vendor: BaseModel | None
79+
seller: BaseModel | None
80+
erp_data: BaseModel | None
81+
split: BaseModel | None
82+
reconciliation: BaseModel | None
83+
audit: BaseModel | None
1384

1485

1586
class StatementChargesServiceConfig:
@@ -21,6 +92,7 @@ class StatementChargesServiceConfig:
2192

2293

2394
class StatementChargesService(
95+
StreamJSONLMixin[StatementCharge],
2496
GetMixin[StatementCharge],
2597
CollectionMixin[StatementCharge],
2698
Service[StatementCharge],
@@ -30,6 +102,7 @@ class StatementChargesService(
30102

31103

32104
class AsyncStatementChargesService(
105+
AsyncStreamJSONLMixin[StatementCharge],
33106
AsyncGetMixin[StatementCharge],
34107
AsyncCollectionMixin[StatementCharge],
35108
AsyncService[StatementCharge],

β€Žtests/e2e/billing/statement/charge/test_async_statement_charge.pyβ€Ž

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,10 @@ async def test_filter_statement_charges(statement_charges, statement_charge_id):
4242
result = [statement async for statement in filtered_charges.iterate()]
4343

4444
assert len(result) == 1
45+
46+
47+
async def test_stream_statement_charges_jsonl(statement_charges):
48+
result = [charge async for charge in statement_charges.stream()]
49+
50+
assert len(result) > 0
51+
assert all(charge.id for charge in result)

β€Žtests/e2e/billing/statement/charge/test_sync_statement_charge.pyβ€Ž

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,10 @@ def test_filter_statement_charges(statement_charges, statement_charge_id):
4242
result = list(filtered_charges.iterate())
4343

4444
assert len(result) == 1
45+
46+
47+
def test_stream_statement_charges_jsonl(statement_charges):
48+
result = list(statement_charges.stream())
49+
50+
assert len(result) > 0
51+
assert all(charge.id for charge in result)

0 commit comments

Comments
Β (0)