-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
165 lines (145 loc) · 5.75 KB
/
Copy pathclient.py
File metadata and controls
165 lines (145 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import json as json_package
import os
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
from httpx import Client, HTTPError, RequestError
from httpx import Response as HTTPXResponse
from httpx_retries import Retry, RetryTransport
from mpt_api_client.constants import APPLICATION_JSON
from mpt_api_client.exceptions import MPTError, MPTMaxRetryError
from mpt_api_client.http.client_utils import get_query_params, validate_base_url
from mpt_api_client.http.query_options import QueryOptions
from mpt_api_client.http.request_response_utils import handle_response_http_error
from mpt_api_client.http.types import HeaderTypes, QueryParam, RequestFiles, Response
from mpt_api_client.models import ResourceData
if TYPE_CHECKING:
from mpt_api_client.auth.base import Authentication
def json_to_file_payload(resource_data: ResourceData | None) -> bytes:
"""Convert resource data to file payload."""
if resource_data is None:
resource_data = {}
return json_package.dumps(
resource_data, ensure_ascii=False, separators=(",", ":"), allow_nan=False
).encode("utf-8")
class HTTPClient:
"""Sync HTTP client for interacting with SoftwareOne Marketplace Platform API."""
def __init__(
self,
*,
authentication: "Authentication",
base_url: str | None = None,
timeout: float = 20.0,
retries: int = 5,
):
self._retries = retries
retry = Retry(
total=self._retries,
allowed_methods={"DELETE", "GET", "HEAD", "OPTIONS", "POST", "PUT", "PATCH"},
)
transport = RetryTransport(retry=retry)
base_url = validate_base_url(base_url or os.getenv("MPT_API_BASE_URL"))
authentication.configure(base_url=base_url, timeout=timeout, retries=self._retries)
self.httpx_client = Client(
base_url=base_url,
headers={"User-Agent": "swo-marketplace-client/1.0"},
auth=authentication,
timeout=timeout,
transport=transport,
follow_redirects=True,
)
def request( # noqa: WPS211
self,
method: str,
url: str,
*,
files: RequestFiles | None = None,
json: Any | None = None,
query_params: QueryParam | None = None,
headers: HeaderTypes | None = None,
json_file_key: str = "_attachment_data",
force_multipart: bool = False,
options: QueryOptions | None = None,
) -> Response:
"""Perform an HTTP request.
Args:
method: HTTP method.
url: URL to send the request to.
files: Request files.
json: Request JSON data.
query_params: Query parameters.
headers: Request headers.
json_file_key: json file name for data when sending a multipart request.
force_multipart: force multipart request even if file is not provided.
options: Additional options for the request.
Returns:
Response object.
Raises:
MPTError: If the request fails.
MPTApiError: If the response contains an error.
MPTHttpError: If the response contains an HTTP error.
"""
files = dict(files or {})
if force_multipart or (files and json):
files[json_file_key] = (None, json_to_file_payload(json), APPLICATION_JSON)
json = None
params_str = get_query_params(query_params, options)
try:
response = self.httpx_client.request(
method,
url,
files=files,
json=json,
params=params_str or None,
headers=headers,
)
except RequestError as err:
raise MPTMaxRetryError(str(err), self._retries + 1) from err
except HTTPError as err:
raise MPTError(f"HTTP Error: {err}") from err
handle_response_http_error(response)
return Response(
headers=dict(response.headers),
status_code=response.status_code,
content=response.content,
)
@contextmanager
def stream(
self,
method: str,
url: str,
*,
headers: HeaderTypes | None = None,
query_params: QueryParam | None = None,
options: QueryOptions | None = None,
) -> Iterator[HTTPXResponse]:
"""Open a streaming response without buffering its body fully in memory.
Useful for JSONL/NDJSON endpoints; callers consume the body via the yielded
response (e.g. ``iter_lines()``). Redirects are followed automatically.
Args:
method: HTTP method.
url: URL to send the request to.
headers: Request headers.
query_params: Query parameters.
options: Additional options for the request.
Yields:
The open streaming response.
Raises:
MPTError: If the request fails.
MPTApiError: If the response contains an error.
MPTHttpError: If the response contains an HTTP error.
MPTMaxRetryError: If the request fails after maximum retry attempts.
"""
params_str = get_query_params(query_params, options)
try:
with self.httpx_client.stream(
method, url, params=params_str or None, headers=headers
) as response:
if response.is_error:
response.read()
handle_response_http_error(response)
yield response
except RequestError as err:
raise MPTMaxRetryError(str(err), self._retries + 1) from err
except HTTPError as err:
raise MPTError(f"HTTP Error: {err}") from err