Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,6 @@ make/local.mk

# E2E report
e2e-report.xml

# Claude Code local artifacts
.claude/
1 change: 1 addition & 0 deletions mpt_api_client/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
APPLICATION_JSON = "application/json"
APPLICATION_JSONL = "application/jsonl"
MIMETYPE_EXCEL_XLSX = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
48 changes: 48 additions & 0 deletions mpt_api_client/http/async_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

from httpx import AsyncClient, HTTPError, RequestError
from httpx import Response as HTTPXResponse
from httpx_retries import Retry, RetryTransport

from mpt_api_client.constants import APPLICATION_JSON
Expand Down Expand Up @@ -113,3 +116,48 @@ async def request( # noqa: WPS211
status_code=response.status_code,
content=response.content,
)

@asynccontextmanager
async def stream(
self,
method: str,
url: str,
*,
headers: HeaderTypes | None = None,
query_params: QueryParam | None = None,
options: QueryOptions | None = None,
) -> AsyncIterator[HTTPXResponse]:
"""Open a streaming response without buffering its body fully in memory.

Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded
response (e.g. ``aiter_lines()``). Redirects are followed automatically.

Args:
method: HTTP method.
url: URL to send the request to.
headers: Request headers.
query_params: Query parameters.
options: Additional options for the request.

Yields:
The open streaming response.

Raises:
MPTError: If the request fails.
MPTApiError: If the response contains an error.
MPTHttpError: If the response contains an HTTP error.
MPTMaxRetryError: If the request fails after maximum retry attempts.
"""
params_str = get_query_params(query_params, options)
try:
async with self.httpx_client.stream(
method, url, params=params_str or None, headers=headers
) as response:
if response.is_error:
await response.aread()
handle_response_http_error(response)
yield response
except RequestError as err:
raise MPTMaxRetryError(str(err), self._retries + 1) from err
except HTTPError as err:
raise MPTError(f"HTTP Error: {err}") from err
48 changes: 48 additions & 0 deletions mpt_api_client/http/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json as json_package
import os
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any

from httpx import Client, HTTPError, RequestError
from httpx import Response as HTTPXResponse
from httpx_retries import Retry, RetryTransport

from mpt_api_client.constants import APPLICATION_JSON
Expand Down Expand Up @@ -122,3 +125,48 @@ def request( # noqa: WPS211
status_code=response.status_code,
content=response.content,
)

@contextmanager
def stream(
self,
method: str,
url: str,
*,
headers: HeaderTypes | None = None,
query_params: QueryParam | None = None,
options: QueryOptions | None = None,
) -> Iterator[HTTPXResponse]:
"""Open a streaming response without buffering its body fully in memory.

Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded
response (e.g. ``iter_lines()``). Redirects are followed automatically.

Args:
method: HTTP method.
url: URL to send the request to.
headers: Request headers.
query_params: Query parameters.
options: Additional options for the request.

Yields:
The open streaming response.

Raises:
MPTError: If the request fails.
MPTApiError: If the response contains an error.
MPTHttpError: If the response contains an HTTP error.
MPTMaxRetryError: If the request fails after maximum retry attempts.
"""
params_str = get_query_params(query_params, options)
try:
with self.httpx_client.stream(
method, url, params=params_str or None, headers=headers
) as response:
if response.is_error:
response.read()
handle_response_http_error(response)
yield response
except RequestError as err:
raise MPTMaxRetryError(str(err), self._retries + 1) from err
except HTTPError as err:
raise MPTError(f"HTTP Error: {err}") from err
6 changes: 6 additions & 0 deletions mpt_api_client/http/mixins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
ManagedResourceMixin,
ModifiableResourceMixin,
)
from mpt_api_client.http.mixins.stream_jsonl_mixin import (
AsyncStreamJSONLMixin,
StreamJSONLMixin,
)
from mpt_api_client.http.mixins.update_file_mixin import (
AsyncUpdateFileMixin,
UpdateFileMixin,
Expand All @@ -44,6 +48,7 @@
"AsyncGetMixin",
"AsyncManagedResourceMixin",
"AsyncModifiableResourceMixin",
"AsyncStreamJSONLMixin",
"AsyncUpdateFileMixin",
"AsyncUpdateMixin",
"CollectionMixin",
Expand All @@ -58,6 +63,7 @@
"ManagedResourceMixin",
"ModifiableResourceMixin",
"QueryableMixin",
"StreamJSONLMixin",
"UpdateFileMixin",
"UpdateMixin",
]
52 changes: 52 additions & 0 deletions mpt_api_client/http/mixins/stream_jsonl_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import json
from collections.abc import AsyncIterator, Iterator

from mpt_api_client.constants import APPLICATION_JSONL
from mpt_api_client.http.mixins.queryable_mixin import QueryableMixin
from mpt_api_client.models import Model as BaseModel


class StreamJSONLMixin[Model: BaseModel](QueryableMixin):
"""Mixin providing JSONL (NDJSON) streaming of a collection line by line."""

def stream(self) -> Iterator[Model]:
"""Stream resources from a JSONL endpoint, yielding one model per line.

Unlike ``iterate()``, which paginates and deserializes full pages, this
consumes a ``application/jsonl`` response line by line without buffering the
whole body in memory.

Yields:
Resources, one per non-empty line of the response.
"""
with self.http_client.stream( # type: ignore[attr-defined]
"GET",
self.build_path(), # type: ignore[attr-defined]
headers={"Accept": APPLICATION_JSONL},
) as response:
for line in response.iter_lines():
if line.strip():
yield self._model_class(json.loads(line)) # type: ignore[attr-defined]


class AsyncStreamJSONLMixin[Model: BaseModel](QueryableMixin):
"""Async mixin providing JSONL (NDJSON) streaming of a collection line by line."""

async def stream(self) -> AsyncIterator[Model]:
"""Stream resources from a JSONL endpoint, yielding one model per line.

Unlike ``iterate()``, which paginates and deserializes full pages, this
consumes a ``application/jsonl`` response line by line without buffering the
whole body in memory.

Yields:
Resources, one per non-empty line of the response.
"""
async with self.http_client.stream( # type: ignore[attr-defined]
"GET",
self.build_path(), # type: ignore[attr-defined]
headers={"Accept": APPLICATION_JSONL},
) as response:
async for line in response.aiter_lines():
if line.strip():
yield self._model_class(json.loads(line)) # type: ignore[attr-defined]
75 changes: 74 additions & 1 deletion mpt_api_client/resources/billing/statement_charges.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,85 @@
from mpt_api_client.http.mixins import (
AsyncCollectionMixin,
AsyncGetMixin,
AsyncStreamJSONLMixin,
CollectionMixin,
GetMixin,
StreamJSONLMixin,
)
from mpt_api_client.models import Model
from mpt_api_client.models.model import BaseModel


class StatementCharge(Model):
"""Statement Charge resource."""
"""Statement Charge resource.

Attributes:
revision: Charge revision number.
external_ids: External identifiers associated with the charge.
search: Search-related details for the charge.
period: Period during which the charge is applicable.
quantity: Quantity associated with the charge.
price: Pricing details for the charge.
description: Description of the charge, if applicable.
attributes: Additional attributes for the charge.
billing_type: Billing type of the charge.
statement_type: Type of statement associated with the charge.
journal: Reference to the journal.
ledger: Reference to the ledger.
custom_ledger: Reference to the custom ledger.
parent: Reference to the parent charge.
upload: Upload status and details, visible to vendors or operations.
processing: Processing status and details, visible to operations.
licensee: Reference to the licensee.
agreement: Reference to the agreement.
subscription: Reference to the subscription.
line: Agreement line associated with the charge, if applicable.
order: Reference to the order.
asset: Reference to the asset.
item: Reference to the product item.
authorization: Reference to the authorization.
statement: Reference to the statement.
buyer: Reference to the buyer.
vendor: Reference to the vendor account.
seller: Reference to the seller.
erp_data: ERP-related data for the charge.
split: Details about the charge split, if applicable.
reconciliation: Reconciliation information for the charge.
audit: Container for audit-related events for the charge.
"""

revision: int | None
external_ids: BaseModel | None
search: BaseModel | None
period: BaseModel | None
quantity: float | None
price: BaseModel | None
description: BaseModel | None
attributes: BaseModel | None
billing_type: str | None
statement_type: str | None
journal: BaseModel | None
ledger: BaseModel | None
custom_ledger: BaseModel | None
parent: BaseModel | None
upload: BaseModel | None
processing: BaseModel | None
licensee: BaseModel | None
agreement: BaseModel | None
subscription: BaseModel | None
line: BaseModel | None
order: BaseModel | None
asset: BaseModel | None
item: BaseModel | None
authorization: BaseModel | None
statement: BaseModel | None
buyer: BaseModel | None
vendor: BaseModel | None
seller: BaseModel | None
erp_data: BaseModel | None
split: BaseModel | None
reconciliation: BaseModel | None
audit: BaseModel | None


class StatementChargesServiceConfig:
Expand All @@ -21,6 +92,7 @@ class StatementChargesServiceConfig:


class StatementChargesService(
StreamJSONLMixin[StatementCharge],
GetMixin[StatementCharge],
CollectionMixin[StatementCharge],
Service[StatementCharge],
Expand All @@ -30,6 +102,7 @@ class StatementChargesService(


class AsyncStatementChargesService(
AsyncStreamJSONLMixin[StatementCharge],
AsyncGetMixin[StatementCharge],
AsyncCollectionMixin[StatementCharge],
AsyncService[StatementCharge],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ async def test_filter_statement_charges(statement_charges, statement_charge_id):
result = [statement async for statement in filtered_charges.iterate()]

assert len(result) == 1


async def test_stream_statement_charges_jsonl(statement_charges):
result = [charge async for charge in statement_charges.stream()]

assert len(result) > 0
assert all(charge.id for charge in result)
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ def test_filter_statement_charges(statement_charges, statement_charge_id):
result = list(filtered_charges.iterate())

assert len(result) == 1


def test_stream_statement_charges_jsonl(statement_charges):
result = list(statement_charges.stream())

assert len(result) > 0
assert all(charge.id for charge in result)
Loading