From 73a8e0b0863bf37e23b8e181f4800a3f53678de8 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 16:28:13 +0200 Subject: [PATCH 01/22] feat(py-client): Add Multipart Upload API --- .../python/src/objectstore_client/__init__.py | 12 +- .../python/src/objectstore_client/client.py | 115 ++++++-- .../python/src/objectstore_client/errors.py | 22 ++ .../python/src/objectstore_client/metrics.py | 21 ++ .../src/objectstore_client/multipart.py | 248 ++++++++++++++++++ clients/python/tests/test_e2e.py | 196 +++++++++++++- clients/python/tests/test_multipart.py | 126 +++++++++ 7 files changed, 719 insertions(+), 21 deletions(-) create mode 100644 clients/python/src/objectstore_client/errors.py create mode 100644 clients/python/src/objectstore_client/multipart.py create mode 100644 clients/python/tests/test_multipart.py diff --git a/clients/python/src/objectstore_client/__init__.py b/clients/python/src/objectstore_client/__init__.py index 41435258..a195220a 100644 --- a/clients/python/src/objectstore_client/__init__.py +++ b/clients/python/src/objectstore_client/__init__.py @@ -2,10 +2,10 @@ from objectstore_client.client import ( Client, GetResponse, - RequestError, Session, Usecase, ) +from objectstore_client.errors import RequestError from objectstore_client.metadata import ( Compression, ExpirationPolicy, @@ -14,13 +14,23 @@ TimeToLive, ) from objectstore_client.metrics import MetricsBackend, NoOpMetricsBackend +from objectstore_client.multipart import ( + CompletePart, + MultipartCompleteError, + MultipartUpload, + PartInfo, +) from objectstore_client.utils import parse_accept_encoding __all__ = [ "Client", + "CompletePart", "Usecase", "Session", "GetResponse", + "MultipartCompleteError", + "MultipartUpload", + "PartInfo", "RequestError", "Compression", "ExpirationPolicy", diff --git a/clients/python/src/objectstore_client/client.py b/clients/python/src/objectstore_client/client.py index 74f18243..5a2d63a6 100644 --- a/clients/python/src/objectstore_client/client.py +++ b/clients/python/src/objectstore_client/client.py @@ -13,6 +13,8 @@ from objectstore_client import utils from objectstore_client.auth import Permission, TokenGenerator, TokenProvider +from objectstore_client.errors import RequestError as _RequestError +from objectstore_client.errors import raise_for_status from objectstore_client.metadata import ( HEADER_EXPIRATION, HEADER_META_PREFIX, @@ -27,23 +29,19 @@ NoOpMetricsBackend, measure_storage_operation, ) +from objectstore_client.multipart import MultipartUpload from objectstore_client.scope import Scope +__all__ = ["Client", "GetResponse", "RequestError", "Session", "Usecase"] + +RequestError = _RequestError + class GetResponse(NamedTuple): metadata: Metadata payload: IO[bytes] -class RequestError(Exception): - """Exception raised if an API call to Objectstore fails.""" - - def __init__(self, message: str, status: int, response: str): - super().__init__(message) - self.status = status - self.response = response - - class Usecase: """ An identifier for a workload in Objectstore, along with defaults to use for all @@ -281,6 +279,25 @@ def _make_url(self, key: str | None, full: bool = False) -> str: return f"http://{self._pool.host}:{self._pool.port}{path}" return path + def _make_multipart_url( + self, + action: str | None, + key: str | None, + query: str | None = None, + ) -> str: + if action == "parts": + resource = "objects:multipart:parts" + elif action == "complete": + resource = "objects:multipart:complete" + else: + resource = "objects:multipart" + + relative_path = f"/v1/{resource}/{self._usecase.name}/{self._scope}/{key or ''}" + path = self._base_path.rstrip("/") + relative_path + if query: + return f"{path}?{query}" + return path + def put( self, contents: bytes | IO[bytes], @@ -429,6 +446,76 @@ def object_url(self, key: str) -> str: """ return self._make_url(key, full=True) + def initiate_multipart_upload( + self, + *, + key: str | None = None, + compression: Compression | Literal["none"] | None = None, + content_type: str | None = None, + metadata: dict[str, str] | None = None, + expiration_policy: ExpirationPolicy | None = None, + origin: str | None = None, + ) -> MultipartUpload: + """Initiate a multipart upload. + + Returns a :class:`~objectstore_client.multipart.MultipartUpload` handle + that can be used to upload parts, list parts, complete, or abort. + + **Important:** unlike :meth:`put`, the ``compression`` parameter only + records the compression algorithm in the object's metadata. It does + **not** auto-compress part data. The caller is responsible for + compressing each part in accordance with the chosen algorithm before + passing it to + :meth:`~objectstore_client.multipart.MultipartUpload.upload_part`. + """ + if compression and compression not in ("none", "zstd"): + raise ValueError(f"Invalid compression: {compression}") + + headers = self._make_headers() + + compression = compression or self._usecase._compression + if compression and compression != "none": + headers["Content-Encoding"] = compression + + if content_type: + headers["Content-Type"] = content_type + + expiration_policy = expiration_policy or self._usecase._expiration_policy + if expiration_policy: + headers[HEADER_EXPIRATION] = format_expiration(expiration_policy) + + if origin: + headers[HEADER_ORIGIN] = origin + + if metadata: + for k, v in metadata.items(): + headers[f"{HEADER_META_PREFIX}{k}"] = v + + if key == "": + key = None + + with measure_storage_operation( + self._metrics_backend, "multipart_initiate", self._usecase.name + ): + response = self._pool.request( + "POST" if not key else "PUT", + self._make_multipart_url(None, key), + headers=headers, + preload_content=True, + decode_content=True, + ) + raise_for_status(response) + res = response.json() + return MultipartUpload(self, res["key"], res["upload_id"]) + + def resume_multipart_upload(self, key: str, upload_id: str) -> MultipartUpload: + """Reconstruct a multipart upload handle from a prior initiate. + + This does not make any network calls. Use it to resume an upload + after a process restart or to continue an upload started elsewhere. + """ + return MultipartUpload(self, key, upload_id) + def delete(self, key: str) -> None: """ Deletes the blob with the given `key`. @@ -444,13 +531,3 @@ def delete(self, key: str) -> None: headers=headers, ) raise_for_status(response) - - -def raise_for_status(response: urllib3.BaseHTTPResponse) -> None: - if response.status >= 400: - res = (response.data or response.read() or b"").decode("utf-8", "replace") - raise RequestError( - f"Objectstore request failed with status {response.status}", - response.status, - res, - ) diff --git a/clients/python/src/objectstore_client/errors.py b/clients/python/src/objectstore_client/errors.py new file mode 100644 index 00000000..099ffa17 --- /dev/null +++ b/clients/python/src/objectstore_client/errors.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import urllib3 + + +class RequestError(Exception): + """Exception raised if an API call to Objectstore fails.""" + + def __init__(self, message: str, status: int, response: str): + super().__init__(message) + self.status = status + self.response = response + + +def raise_for_status(response: urllib3.BaseHTTPResponse) -> None: + if response.status >= 400: + res = (response.data or response.read() or b"").decode("utf-8", "replace") + raise RequestError( + f"Objectstore request failed with status {response.status}", + response.status, + res, + ) diff --git a/clients/python/src/objectstore_client/metrics.py b/clients/python/src/objectstore_client/metrics.py index 801875ca..1096c7da 100644 --- a/clients/python/src/objectstore_client/metrics.py +++ b/clients/python/src/objectstore_client/metrics.py @@ -83,6 +83,7 @@ def __init__(self, backend: MetricsBackend, operation: str, usecase: str): # These may be set during or after the enclosed operation self.start: int | None = None self.elapsed: float | None = None + self.size: int | None = None self.uncompressed_size: int | None = None self.compressed_size: int | None = None self.compression: str = "unknown" @@ -101,6 +102,13 @@ def record_uncompressed_size(self, value: int) -> None: ) self.uncompressed_size = value + def record_size(self, value: int) -> None: + tags = {"usecase": self.usecase} + self.backend.distribution( + f"storage.{self.operation}.size", value, tags=tags, unit="byte" + ) + self.size = value + def record_compressed_size(self, value: int, compression: str = "unknown") -> None: tags = {"usecase": self.usecase, "compression": compression} self.backend.distribution( @@ -124,6 +132,19 @@ def maybe_record_throughputs(self) -> None: if not self.elapsed or self.elapsed <= 0: return None + if self.size: + tags = {"usecase": self.usecase} + self.backend.distribution( + f"storage.{self.operation}.throughput", + self.size / self.elapsed, + tags=tags, + ) + self.backend.distribution( + f"storage.{self.operation}.inverse_throughput", + self.elapsed / self.size, + tags=tags, + ) + sizes = [] if self.uncompressed_size: sizes.append((self.uncompressed_size, "none")) diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py new file mode 100644 index 00000000..502e01e4 --- /dev/null +++ b/clients/python/src/objectstore_client/multipart.py @@ -0,0 +1,248 @@ +from __future__ import annotations + +import base64 +import json +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import datetime +from io import BytesIO +from typing import IO, TYPE_CHECKING +from urllib.parse import urlencode + +from objectstore_client.errors import RequestError, raise_for_status +from objectstore_client.metrics import measure_storage_operation + +if TYPE_CHECKING: + from objectstore_client.client import Session + + +@dataclass +class CompletePart: + """A reference to an uploaded part, used when completing a multipart upload.""" + + part_number: int + etag: str + + +@dataclass +class PartInfo: + """Information about an uploaded part, as returned by list_parts.""" + + part_number: int + etag: str + last_modified: datetime + size: int + + def to_complete_part(self) -> CompletePart: + return CompletePart(part_number=self.part_number, etag=self.etag) + + +class MultipartCompleteError(RequestError): + """Error returned in the body of a multipart complete response. + + Multipart assembly failures are conveyed inside a 200 JSON response body, + but ordinary HTTP 4xx/5xx errors may still be returned before assembly + starts. + """ + + def __init__(self, code: str, message: str): + super().__init__( + f"Multipart complete failed ({code}): {message}", + status=200, + response=message, + ) + self.code = code + + +class MultipartUpload: + """Handle for an in-progress multipart upload. + + Create via :meth:`Session.initiate_multipart_upload` or + :meth:`Session.resume_multipart_upload`. + """ + + def __init__(self, session: Session, key: str, upload_id: str): + self._session = session + self._key = key + self._upload_id = upload_id + + @property + def key(self) -> str: + return self._key + + @property + def upload_id(self) -> str: + return self._upload_id + + def upload_part( + self, + contents: bytes | IO[bytes], + part_number: int, + *, + content_length: int | None = None, + content_md5: bytes | None = None, + ) -> CompletePart: + """Upload a single part. + + Unlike :meth:`Session.put`, this does **not** automatically compress + the contents. If the multipart upload was initiated with compression, + the caller must compress each part before passing it here. + + Args: + contents: The part data. When ``bytes``, ``content_length`` is + derived automatically. When ``IO[bytes]``, ``content_length`` + **must** be provided. + part_number: 1-indexed part number. + content_length: Required when *contents* is a stream. + content_md5: Optional raw 16-byte MD5 digest of the (possibly + compressed) contents. Base64-encoded internally for the + ``Content-MD5`` header. + """ + if isinstance(contents, bytes): + content_length = len(contents) + body: bytes | IO[bytes] = BytesIO(contents) + else: + if content_length is None: + raise ValueError("content_length is required when contents is a stream") + body = contents + + if content_md5 is not None and len(content_md5) != 16: + raise ValueError("content_md5 must be exactly 16 bytes") + + headers = self._session._make_headers() + headers["Content-Length"] = str(content_length) + + if content_md5 is not None: + headers["Content-MD5"] = base64.b64encode(content_md5).decode("ascii") + + query = urlencode( + {"upload_id": self._upload_id, "part_number": str(part_number)} + ) + url = self._session._make_multipart_url("parts", self._key, query) + + with measure_storage_operation( + self._session._metrics_backend, + "multipart.put_part", + self._session._usecase.name, + ) as metric_emitter: + response = self._session._pool.request( + "PUT", + url, + body=body, + headers=headers, + preload_content=True, + decode_content=True, + ) + raise_for_status(response) + res = response.json() + metric_emitter.record_size(content_length) + return CompletePart(part_number=part_number, etag=res["etag"]) + + def list_parts(self) -> list[PartInfo]: + """List all uploaded parts, auto-paginating.""" + all_parts: list[PartInfo] = [] + marker: int | None = None + + while True: + params: dict[str, str] = {"upload_id": self._upload_id} + if marker is not None: + params["part_number_marker"] = str(marker) + + query = urlencode(params) + url = self._session._make_multipart_url("parts", self._key, query) + headers = self._session._make_headers() + + response = self._session._pool.request( + "GET", + url, + headers=headers, + preload_content=True, + ) + raise_for_status(response) + data = response.json() + + for p in data["parts"]: + all_parts.append( + PartInfo( + part_number=p["part_number"], + etag=p["etag"], + last_modified=datetime.fromisoformat(p["last_modified"]), + size=p["size"], + ) + ) + + if not data["is_truncated"]: + return all_parts + + marker = data.get("next_part_number_marker") + if marker is None: + raise RequestError( + "Server returned is_truncated=true but no next_part_number_marker", + status=200, + response=str(data), + ) + + def abort(self) -> None: + """Abort this multipart upload, cleaning up server-side state.""" + query = urlencode({"upload_id": self._upload_id}) + url = self._session._make_multipart_url(None, self._key, query) + headers = self._session._make_headers() + + response = self._session._pool.request( + "DELETE", + url, + headers=headers, + ) + raise_for_status(response) + + def complete(self, parts: Sequence[CompletePart]) -> str: + """Complete the multipart upload, assembling all parts into the final object. + + Returns the final object key. + + Raises :class:`MultipartCompleteError` if the server reports an error + during assembly in a 200 response body. Ordinary non-2xx HTTP errors are + still raised as :class:`RequestError`. + """ + query = urlencode({"upload_id": self._upload_id}) + url = self._session._make_multipart_url("complete", self._key, query) + headers = self._session._make_headers() + headers["Content-Type"] = "application/json" + + request_body = json.dumps( + {"parts": [{"part_number": p.part_number, "etag": p.etag} for p in parts]} + ).encode("utf-8") + + with measure_storage_operation( + self._session._metrics_backend, + "multipart_complete", + self._session._usecase.name, + ): + response = self._session._pool.request( + "POST", + url, + body=request_body, + headers=headers, + preload_content=True, + decode_content=True, + ) + raise_for_status(response) + + # Successful completion responses may include keepalive whitespace. + raw = (response.data or b"").decode("utf-8").strip() + try: + data = json.loads(raw) + except json.JSONDecodeError: + raise RequestError( + "Failed to parse multipart complete response", + status=response.status, + response=raw, + ) + + if "error" in data: + raise MultipartCompleteError( + code=data["error"]["code"], + message=data["error"]["message"], + ) + + return data["key"] diff --git a/clients/python/tests/test_e2e.py b/clients/python/tests/test_e2e.py index 9632d489..207918e4 100644 --- a/clients/python/tests/test_e2e.py +++ b/clients/python/tests/test_e2e.py @@ -14,8 +14,9 @@ import zstandard from objectstore_client import Client, Usecase from objectstore_client.auth import Permission, TokenGenerator -from objectstore_client.client import RequestError +from objectstore_client.errors import RequestError from objectstore_client.metadata import TimeToLive +from objectstore_client.multipart import CompletePart, MultipartCompleteError from objectstore_client.scope import Scope TEST_EDDSA_KID: str = "test_kid" @@ -342,3 +343,196 @@ def test_connect_timeout() -> None: with pytest.raises(urllib3.exceptions.MaxRetryError): session.put(b"test data", compression="zstd") + + +def test_multipart_full_cycle_uncompressed(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-uncompressed") + assert upload.key == "mp-uncompressed" + assert upload.upload_id + + part1 = upload.upload_part(b"hello ", part_number=1) + part2 = upload.upload_part(b"world!", part_number=2) + + final_key = upload.complete([part1, part2]) + assert final_key == "mp-uncompressed" + + retrieved = session.get(final_key) + assert retrieved.payload.read() == b"hello world!" + + +def test_multipart_full_cycle_compressed(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload( + key="mp-compressed", + compression="zstd", + ) + + cctx = zstandard.ZstdCompressor() + compressed_part1 = cctx.compress(b"hello ") + compressed_part2 = cctx.compress(b"world!") + + part1 = upload.upload_part(compressed_part1, part_number=1) + part2 = upload.upload_part(compressed_part2, part_number=2) + + final_key = upload.complete([part1, part2]) + + # Verify raw compressed round-trip + retrieved = session.get(final_key, decompress=False) + assert retrieved.metadata.compression == "zstd" + raw = retrieved.payload.read() + assert raw == compressed_part1 + compressed_part2 + + # Verify transparent decompression + retrieved = session.get(final_key) + assert retrieved.metadata.compression is None + assert retrieved.payload.read() == b"hello world!" + + +def test_multipart_server_generated_key(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload() + assert upload.key + + part = upload.upload_part(b"data", part_number=1) + final_key = upload.complete([part]) + assert final_key + + retrieved = session.get(final_key) + assert retrieved.payload.read() == b"data" + + +def test_multipart_list_parts(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-list-parts") + + upload.upload_part(b"part-two", part_number=2) + upload.upload_part(b"part-one", part_number=1) + + parts = upload.list_parts() + assert len(parts) == 2 + + p1 = next(p for p in parts if p.part_number == 1) + p2 = next(p for p in parts if p.part_number == 2) + assert p1.size == 8 + assert p2.size == 8 + + upload.abort() + + +def test_multipart_abort(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-abort") + upload.upload_part(b"some data", part_number=1) + upload.abort() + + +def test_multipart_metadata_preserved(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload( + key="mp-metadata", + content_type="text/plain", + origin="203.0.113.42", + metadata={"my-key": "my-value"}, + ) + + part = upload.upload_part(b"payload", part_number=1) + final_key = upload.complete([part]) + + retrieved = session.get(final_key) + assert retrieved.metadata.content_type == "text/plain" + assert retrieved.metadata.origin == "203.0.113.42" + assert retrieved.metadata.custom.get("my-key") == "my-value" + + +def test_multipart_complete_with_bad_etag(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-bad-etag") + upload.upload_part(b"real data", part_number=1) + + with pytest.raises(MultipartCompleteError) as exc_info: + upload.complete([CompletePart(part_number=1, etag="bogus-etag")]) + + assert exc_info.value.code + assert exc_info.value.status == 200 + + +def test_multipart_resume(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-resume") + saved_key = upload.key + saved_upload_id = upload.upload_id + + upload.upload_part(b"first", part_number=1) + + # Simulate resuming from saved state + resumed = session.resume_multipart_upload(saved_key, saved_upload_id) + assert resumed.key == saved_key + assert resumed.upload_id == saved_upload_id + + resumed.upload_part(b"second", part_number=2) + + existing = resumed.list_parts() + assert len(existing) == 2 + + parts = [p.to_complete_part() for p in existing] + final_key = resumed.complete(parts) + + retrieved = session.get(final_key) + assert retrieved.payload.read() == b"firstsecond" diff --git a/clients/python/tests/test_multipart.py b/clients/python/tests/test_multipart.py new file mode 100644 index 00000000..2190c5a0 --- /dev/null +++ b/clients/python/tests/test_multipart.py @@ -0,0 +1,126 @@ +import json +from dataclasses import dataclass +from typing import Any + +import pytest +from objectstore_client import Client, Usecase +from objectstore_client.client import RequestError as ClientRequestError +from objectstore_client.errors import RequestError +from objectstore_client.metrics import Tags +from objectstore_client.multipart import CompletePart, MultipartUpload + + +@dataclass +class DistributionRecord: + name: str + value: int | float + tags: Tags | None + unit: str | None + + +class RecordingMetricsBackend: + def __init__(self) -> None: + self.distributions: list[DistributionRecord] = [] + + def increment( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + ) -> None: + return None + + def gauge(self, name: str, value: int | float, tags: Tags | None = None) -> None: + return None + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + self.distributions.append(DistributionRecord(name, value, tags, unit)) + + +class FakeResponse: + def __init__( + self, + status: int, + *, + data: bytes = b"", + json_data: dict[str, Any] | None = None, + ) -> None: + self.status = status + self.data = data + self._json_data = json_data + + def read(self) -> bytes: + return self.data + + def json(self) -> dict[str, Any]: + if self._json_data is not None: + return self._json_data + return json.loads(self.data.decode("utf-8")) + + +def test_request_error_is_reexported_from_client_module() -> None: + assert ClientRequestError is RequestError + + +def test_multipart_complete_raises_http_errors_before_parsing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + client = Client("http://127.0.0.1:8888") + session = client.session(Usecase("testing"), org=1) + upload = MultipartUpload(session, "key", "upload-id") + + monkeypatch.setattr( + session._pool, + "request", + lambda *args, **kwargs: FakeResponse( + 403, data=b'{"detail":"missing or expired auth"}' + ), + ) + + with pytest.raises(RequestError) as exc_info: + upload.complete([CompletePart(part_number=1, etag="etag")]) + + assert exc_info.value.status == 403 + assert exc_info.value.response == '{"detail":"missing or expired auth"}' + + +def test_multipart_put_part_metrics_use_distinct_namespace_without_compression_tags( + monkeypatch: pytest.MonkeyPatch, +) -> None: + metrics_backend = RecordingMetricsBackend() + client = Client("http://127.0.0.1:8888", metrics_backend=metrics_backend) + session = client.session(Usecase("testing", compression="none"), org=1) + responses = iter( + [ + FakeResponse(200, json_data={"key": "key", "upload_id": "upload-id"}), + FakeResponse(200, json_data={"etag": "part-etag"}), + ] + ) + + monkeypatch.setattr( + session._pool, "request", lambda *args, **kwargs: next(responses) + ) + + upload = session.initiate_multipart_upload(key="key", compression="zstd") + part = upload.upload_part(b"compressed", part_number=1) + + assert part.etag == "part-etag" + size_metrics = [ + record + for record in metrics_backend.distributions + if record.name == "storage.multipart.put_part.size" + ] + assert size_metrics == [ + DistributionRecord( + "storage.multipart.put_part.size", + len(b"compressed"), + {"usecase": "testing"}, + "byte", + ) + ] From fce36f2d2b3cf0651e527b84606ae5580396e193 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 16:43:44 +0200 Subject: [PATCH 02/22] add README --- clients/python/README.md | 54 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/clients/python/README.md b/clients/python/README.md index 64b4c818..8dcf3829 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -91,6 +91,60 @@ Arbitrary key-value pairs can be attached to objects and retrieved on download. session.put(b"payload", metadata={"source": "upload-service"}) ``` +### Multipart Upload API + +For large objects, use multipart uploads to upload parts independently and then +assemble them into a final object. + +**Important:** unlike single-object uploads, multipart uploads do **not** +auto-compress. The `compression` passed to `initiate_multipart_upload` only sets +the object's metadata. If compression is enabled, you must compress each part +before calling `upload_part`. + +```python +import zstandard + +from objectstore_client import MultipartCompleteError + +upload = session.initiate_multipart_upload( + key="my-large-object", + compression="zstd", + metadata={"source": "upload-service"}, +) + +compressor = zstandard.ZstdCompressor() +part1 = upload.upload_part(compressor.compress(b"part1"), part_number=1) +part2 = upload.upload_part(compressor.compress(b"part2"), part_number=2) + +try: + key = upload.complete([part1, part2]) +except MultipartCompleteError: + upload.abort() + raise +``` + +You can also let the server generate the final object key: + +```python +upload = session.initiate_multipart_upload() +part = upload.upload_part(b"payload", part_number=1) +key = upload.complete([part]) +``` + +To resume an in-progress multipart upload after a process restart, persist the +`key` and `upload_id`, then reconstruct the upload handle later: + +```python +saved_key = upload.key +saved_upload_id = upload.upload_id + +resumed = session.resume_multipart_upload(saved_key, saved_upload_id) +existing_parts = resumed.list_parts() + +parts = [part.to_complete_part() for part in existing_parts] +key = resumed.complete(parts) +``` + ### Authentication If your Objectstore instance enforces authorization, you must configure authentication From c2bddccd5e8bf28136835d2b4b048bd0f4a81dea Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:01:45 +0200 Subject: [PATCH 03/22] improve --- clients/python/README.md | 16 ++- .../src/objectstore_client/multipart.py | 17 +-- clients/python/tests/test_e2e.py | 111 ++++++++++++++++-- clients/python/tests/test_multipart.py | 13 +- 4 files changed, 132 insertions(+), 25 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 8dcf3829..9362a18d 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -113,8 +113,18 @@ upload = session.initiate_multipart_upload( ) compressor = zstandard.ZstdCompressor() -part1 = upload.upload_part(compressor.compress(b"part1"), part_number=1) -part2 = upload.upload_part(compressor.compress(b"part2"), part_number=2) +compressed_part1 = compressor.compress(b"part1") +compressed_part2 = compressor.compress(b"part2") +part1 = upload.upload_part( + compressed_part1, + part_number=1, + content_length=len(compressed_part1), +) +part2 = upload.upload_part( + compressed_part2, + part_number=2, + content_length=len(compressed_part2), +) try: key = upload.complete([part1, part2]) @@ -127,7 +137,7 @@ You can also let the server generate the final object key: ```python upload = session.initiate_multipart_upload() -part = upload.upload_part(b"payload", part_number=1) +part = upload.upload_part(b"payload", part_number=1, content_length=len(b"payload")) key = upload.complete([part]) ``` diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index 502e01e4..d43dc659 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -79,7 +79,7 @@ def upload_part( contents: bytes | IO[bytes], part_number: int, *, - content_length: int | None = None, + content_length: int, content_md5: bytes | None = None, ) -> CompletePart: """Upload a single part. @@ -89,21 +89,22 @@ def upload_part( the caller must compress each part before passing it here. Args: - contents: The part data. When ``bytes``, ``content_length`` is - derived automatically. When ``IO[bytes]``, ``content_length`` - **must** be provided. + contents: The part data. part_number: 1-indexed part number. - content_length: Required when *contents* is a stream. + content_length: The length in bytes of the payload being uploaded. + When multipart compression is enabled, this must be the length + of the already-compressed part bytes. content_md5: Optional raw 16-byte MD5 digest of the (possibly compressed) contents. Base64-encoded internally for the ``Content-MD5`` header. """ if isinstance(contents, bytes): - content_length = len(contents) + if len(contents) != content_length: + raise ValueError( + "content_length must match the size of the provided bytes payload" + ) body: bytes | IO[bytes] = BytesIO(contents) else: - if content_length is None: - raise ValueError("content_length is required when contents is a stream") body = contents if content_md5 is not None and len(content_md5) != 16: diff --git a/clients/python/tests/test_e2e.py b/clients/python/tests/test_e2e.py index 207918e4..0642cafc 100644 --- a/clients/python/tests/test_e2e.py +++ b/clients/python/tests/test_e2e.py @@ -7,6 +7,7 @@ import time from collections.abc import Generator from datetime import timedelta +from io import BytesIO from pathlib import Path import pytest @@ -28,6 +29,16 @@ ) +class UnrewindableStream(BytesIO): + """Read-only stream that cannot report or restore position.""" + + def seek(self, offset: int, whence: int = 0) -> int: + raise OSError("stream is not seekable") + + def tell(self) -> int: + raise OSError("stream does not expose a stable position") + + class TestTokenGenerator: _instance: TokenGenerator | None = None @@ -358,13 +369,13 @@ def test_multipart_full_cycle_uncompressed(server_url: str) -> None: assert upload.key == "mp-uncompressed" assert upload.upload_id - part1 = upload.upload_part(b"hello ", part_number=1) - part2 = upload.upload_part(b"world!", part_number=2) + part1 = upload.upload_part(b"hello ", part_number=1, content_length=6) + part2 = upload.upload_part(b"world!", part_number=2, content_length=6) final_key = upload.complete([part1, part2]) assert final_key == "mp-uncompressed" - retrieved = session.get(final_key) + retrieved = session.get(final_key, decompress=False) assert retrieved.payload.read() == b"hello world!" @@ -386,8 +397,12 @@ def test_multipart_full_cycle_compressed(server_url: str) -> None: compressed_part1 = cctx.compress(b"hello ") compressed_part2 = cctx.compress(b"world!") - part1 = upload.upload_part(compressed_part1, part_number=1) - part2 = upload.upload_part(compressed_part2, part_number=2) + part1 = upload.upload_part( + compressed_part1, part_number=1, content_length=len(compressed_part1) + ) + part2 = upload.upload_part( + compressed_part2, part_number=2, content_length=len(compressed_part2) + ) final_key = upload.complete([part1, part2]) @@ -403,6 +418,76 @@ def test_multipart_full_cycle_compressed(server_url: str) -> None: assert retrieved.payload.read() == b"hello world!" +def test_multipart_streaming_part_upload_uncompressed(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-streaming-uncompressed") + + part1_payload = b"hello " + part2_payload = b"world!" + part1 = upload.upload_part( + UnrewindableStream(part1_payload), + part_number=1, + content_length=len(part1_payload), + ) + part2 = upload.upload_part( + UnrewindableStream(part2_payload), + part_number=2, + content_length=len(part2_payload), + ) + + final_key = upload.complete([part1, part2]) + + retrieved = session.get(final_key) + assert retrieved.payload.read() == b"hello world!" + + +def test_multipart_streaming_part_upload_compressed(server_url: str) -> None: + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload( + key="mp-streaming-compressed", + compression="zstd", + ) + + cctx = zstandard.ZstdCompressor() + compressed_part1 = cctx.compress(b"hello ") + compressed_part2 = cctx.compress(b"world!") + + part1 = upload.upload_part( + UnrewindableStream(compressed_part1), + part_number=1, + content_length=len(compressed_part1), + ) + part2 = upload.upload_part( + UnrewindableStream(compressed_part2), + part_number=2, + content_length=len(compressed_part2), + ) + + final_key = upload.complete([part1, part2]) + + retrieved = session.get(final_key, decompress=False) + assert retrieved.metadata.compression == "zstd" + assert retrieved.payload.read() == compressed_part1 + compressed_part2 + + retrieved = session.get(final_key) + assert retrieved.metadata.compression is None + assert retrieved.payload.read() == b"hello world!" + + def test_multipart_server_generated_key(server_url: str) -> None: client = Client(server_url, token=TestTokenGenerator.get()) usecase = Usecase( @@ -415,7 +500,7 @@ def test_multipart_server_generated_key(server_url: str) -> None: upload = session.initiate_multipart_upload() assert upload.key - part = upload.upload_part(b"data", part_number=1) + part = upload.upload_part(b"data", part_number=1, content_length=4) final_key = upload.complete([part]) assert final_key @@ -434,8 +519,8 @@ def test_multipart_list_parts(server_url: str) -> None: upload = session.initiate_multipart_upload(key="mp-list-parts") - upload.upload_part(b"part-two", part_number=2) - upload.upload_part(b"part-one", part_number=1) + upload.upload_part(b"part-two", part_number=2, content_length=8) + upload.upload_part(b"part-one", part_number=1, content_length=8) parts = upload.list_parts() assert len(parts) == 2 @@ -458,7 +543,7 @@ def test_multipart_abort(server_url: str) -> None: session = client.session(usecase, org=42, project=1337) upload = session.initiate_multipart_upload(key="mp-abort") - upload.upload_part(b"some data", part_number=1) + upload.upload_part(b"some data", part_number=1, content_length=9) upload.abort() @@ -478,7 +563,7 @@ def test_multipart_metadata_preserved(server_url: str) -> None: metadata={"my-key": "my-value"}, ) - part = upload.upload_part(b"payload", part_number=1) + part = upload.upload_part(b"payload", part_number=1, content_length=7) final_key = upload.complete([part]) retrieved = session.get(final_key) @@ -497,7 +582,7 @@ def test_multipart_complete_with_bad_etag(server_url: str) -> None: session = client.session(usecase, org=42, project=1337) upload = session.initiate_multipart_upload(key="mp-bad-etag") - upload.upload_part(b"real data", part_number=1) + upload.upload_part(b"real data", part_number=1, content_length=9) with pytest.raises(MultipartCompleteError) as exc_info: upload.complete([CompletePart(part_number=1, etag="bogus-etag")]) @@ -519,14 +604,14 @@ def test_multipart_resume(server_url: str) -> None: saved_key = upload.key saved_upload_id = upload.upload_id - upload.upload_part(b"first", part_number=1) + upload.upload_part(b"first", part_number=1, content_length=5) # Simulate resuming from saved state resumed = session.resume_multipart_upload(saved_key, saved_upload_id) assert resumed.key == saved_key assert resumed.upload_id == saved_upload_id - resumed.upload_part(b"second", part_number=2) + resumed.upload_part(b"second", part_number=2, content_length=6) existing = resumed.list_parts() assert len(existing) == 2 diff --git a/clients/python/tests/test_multipart.py b/clients/python/tests/test_multipart.py index 2190c5a0..cfee8914 100644 --- a/clients/python/tests/test_multipart.py +++ b/clients/python/tests/test_multipart.py @@ -68,6 +68,15 @@ def test_request_error_is_reexported_from_client_module() -> None: assert ClientRequestError is RequestError +def test_upload_part_validates_bytes_content_length() -> None: + client = Client("http://127.0.0.1:8888") + session = client.session(Usecase("testing"), org=1) + upload = MultipartUpload(session, "key", "upload-id") + + with pytest.raises(ValueError, match="content_length must match"): + upload.upload_part(b"payload", part_number=1, content_length=1) + + def test_multipart_complete_raises_http_errors_before_parsing( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -108,7 +117,9 @@ def test_multipart_put_part_metrics_use_distinct_namespace_without_compression_t ) upload = session.initiate_multipart_upload(key="key", compression="zstd") - part = upload.upload_part(b"compressed", part_number=1) + part = upload.upload_part( + b"compressed", part_number=1, content_length=len(b"compressed") + ) assert part.etag == "part-etag" size_metrics = [ From 7ace2c6e1bfd67e99222db5030a6d6126166ef8f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:06:27 +0200 Subject: [PATCH 04/22] improve --- clients/python/README.md | 2 +- clients/python/src/objectstore_client/__init__.py | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 9362a18d..45fa1505 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -104,7 +104,7 @@ before calling `upload_part`. ```python import zstandard -from objectstore_client import MultipartCompleteError +from objectstore_client.multipart import MultipartCompleteError upload = session.initiate_multipart_upload( key="my-large-object", diff --git a/clients/python/src/objectstore_client/__init__.py b/clients/python/src/objectstore_client/__init__.py index a195220a..fa26f417 100644 --- a/clients/python/src/objectstore_client/__init__.py +++ b/clients/python/src/objectstore_client/__init__.py @@ -14,23 +14,13 @@ TimeToLive, ) from objectstore_client.metrics import MetricsBackend, NoOpMetricsBackend -from objectstore_client.multipart import ( - CompletePart, - MultipartCompleteError, - MultipartUpload, - PartInfo, -) from objectstore_client.utils import parse_accept_encoding __all__ = [ "Client", - "CompletePart", "Usecase", "Session", "GetResponse", - "MultipartCompleteError", - "MultipartUpload", - "PartInfo", "RequestError", "Compression", "ExpirationPolicy", From 9b76bc7f9cf4006e7dca685472eecb49679fb874 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:16:57 +0200 Subject: [PATCH 05/22] concurrency --- clients/python/README.md | 37 +++++++++++++++++--------------- clients/python/tests/test_e2e.py | 32 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 45fa1505..291f5fb9 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -96,12 +96,13 @@ session.put(b"payload", metadata={"source": "upload-service"}) For large objects, use multipart uploads to upload parts independently and then assemble them into a final object. -**Important:** unlike single-object uploads, multipart uploads do **not** -auto-compress. The `compression` passed to `initiate_multipart_upload` only sets -the object's metadata. If compression is enabled, you must compress each part -before calling `upload_part`. +**Important:** unlike single-object uploads, multipart uploads do **not** auto-compress. +The caller must pre-compress each part according to the compression set as part of the metadata +when initiating the upload. ```python +from concurrent.futures import ThreadPoolExecutor + import zstandard from objectstore_client.multipart import MultipartCompleteError @@ -113,21 +114,23 @@ upload = session.initiate_multipart_upload( ) compressor = zstandard.ZstdCompressor() -compressed_part1 = compressor.compress(b"part1") -compressed_part2 = compressor.compress(b"part2") -part1 = upload.upload_part( - compressed_part1, - part_number=1, - content_length=len(compressed_part1), -) -part2 = upload.upload_part( - compressed_part2, - part_number=2, - content_length=len(compressed_part2), -) +chunks = [b"part1", b"part2", b"part3", b"part4"] + +def upload_part(part_number: int, data: bytes): + compressed = compressor.compress(data) + return upload.upload_part( + compressed, part_number=part_number, content_length=len(compressed) + ) + +with ThreadPoolExecutor(max_workers=4) as executor: + futures = [ + executor.submit(upload_part, i + 1, chunk) + for i, chunk in enumerate(chunks) + ] + parts = [f.result() for f in futures] try: - key = upload.complete([part1, part2]) + key = upload.complete(parts) except MultipartCompleteError: upload.abort() raise diff --git a/clients/python/tests/test_e2e.py b/clients/python/tests/test_e2e.py index 0642cafc..a6c53a7a 100644 --- a/clients/python/tests/test_e2e.py +++ b/clients/python/tests/test_e2e.py @@ -621,3 +621,35 @@ def test_multipart_resume(server_url: str) -> None: retrieved = session.get(final_key) assert retrieved.payload.read() == b"firstsecond" + + +def test_multipart_concurrent_part_uploads(server_url: str) -> None: + from concurrent.futures import ThreadPoolExecutor + + client = Client(server_url, token=TestTokenGenerator.get()) + usecase = Usecase( + "test-usecase", + compression="none", + expiration_policy=TimeToLive(timedelta(days=1)), + ) + session = client.session(usecase, org=42, project=1337) + + upload = session.initiate_multipart_upload(key="mp-concurrent") + + chunks = [f"chunk-{i}".encode() for i in range(8)] + + def upload_part(part_number: int, data: bytes) -> CompletePart: + return upload.upload_part( + data, part_number=part_number, content_length=len(data) + ) + + with ThreadPoolExecutor(max_workers=4) as executor: + futures = [ + executor.submit(upload_part, i + 1, chunk) for i, chunk in enumerate(chunks) + ] + parts = [f.result() for f in futures] + + final_key = upload.complete(parts) + + retrieved = session.get(final_key) + assert retrieved.payload.read() == b"".join(chunks) From dd2bc3243370b17cba0b3ae8ccb2bdb725df476d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:19:21 +0200 Subject: [PATCH 06/22] improve --- clients/python/src/objectstore_client/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/python/src/objectstore_client/client.py b/clients/python/src/objectstore_client/client.py index 5a2d63a6..b3e36291 100644 --- a/clients/python/src/objectstore_client/client.py +++ b/clients/python/src/objectstore_client/client.py @@ -32,8 +32,6 @@ from objectstore_client.multipart import MultipartUpload from objectstore_client.scope import Scope -__all__ = ["Client", "GetResponse", "RequestError", "Session", "Usecase"] - RequestError = _RequestError From a7dcf3eeb09014878257c77430fd245a25498e9a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:22:03 +0200 Subject: [PATCH 07/22] improve --- .../python/src/objectstore_client/metrics.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/clients/python/src/objectstore_client/metrics.py b/clients/python/src/objectstore_client/metrics.py index 1096c7da..6c7e59d0 100644 --- a/clients/python/src/objectstore_client/metrics.py +++ b/clients/python/src/objectstore_client/metrics.py @@ -132,27 +132,18 @@ def maybe_record_throughputs(self) -> None: if not self.elapsed or self.elapsed <= 0: return None + sizes: list[tuple[int, str | None]] = [] if self.size: - tags = {"usecase": self.usecase} - self.backend.distribution( - f"storage.{self.operation}.throughput", - self.size / self.elapsed, - tags=tags, - ) - self.backend.distribution( - f"storage.{self.operation}.inverse_throughput", - self.elapsed / self.size, - tags=tags, - ) - - sizes = [] + sizes.append((self.size, None)) if self.uncompressed_size: sizes.append((self.uncompressed_size, "none")) if self.compressed_size: sizes.append((self.compressed_size, self.compression)) for size, compression in sizes: - tags = {"usecase": self.usecase, "compression": compression} + tags: dict[str, str] = {"usecase": self.usecase} + if compression is not None: + tags["compression"] = compression self.backend.distribution( f"storage.{self.operation}.throughput", size / self.elapsed, tags=tags ) From a73638e7d5edb932151d4c3ee6e2ea9804f3331a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:25:43 +0200 Subject: [PATCH 08/22] improve --- clients/python/src/objectstore_client/client.py | 3 --- clients/python/tests/test_multipart.py | 5 ----- 2 files changed, 8 deletions(-) diff --git a/clients/python/src/objectstore_client/client.py b/clients/python/src/objectstore_client/client.py index b3e36291..af8c0dcb 100644 --- a/clients/python/src/objectstore_client/client.py +++ b/clients/python/src/objectstore_client/client.py @@ -13,7 +13,6 @@ from objectstore_client import utils from objectstore_client.auth import Permission, TokenGenerator, TokenProvider -from objectstore_client.errors import RequestError as _RequestError from objectstore_client.errors import raise_for_status from objectstore_client.metadata import ( HEADER_EXPIRATION, @@ -32,8 +31,6 @@ from objectstore_client.multipart import MultipartUpload from objectstore_client.scope import Scope -RequestError = _RequestError - class GetResponse(NamedTuple): metadata: Metadata diff --git a/clients/python/tests/test_multipart.py b/clients/python/tests/test_multipart.py index cfee8914..131a11e5 100644 --- a/clients/python/tests/test_multipart.py +++ b/clients/python/tests/test_multipart.py @@ -4,7 +4,6 @@ import pytest from objectstore_client import Client, Usecase -from objectstore_client.client import RequestError as ClientRequestError from objectstore_client.errors import RequestError from objectstore_client.metrics import Tags from objectstore_client.multipart import CompletePart, MultipartUpload @@ -64,10 +63,6 @@ def json(self) -> dict[str, Any]: return json.loads(self.data.decode("utf-8")) -def test_request_error_is_reexported_from_client_module() -> None: - assert ClientRequestError is RequestError - - def test_upload_part_validates_bytes_content_length() -> None: client = Client("http://127.0.0.1:8888") session = client.session(Usecase("testing"), org=1) From 4c412a9a6f944b2bbf83de20f3410a93552d7517 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 17:32:28 +0200 Subject: [PATCH 09/22] improve --- .../python/src/objectstore_client/client.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/clients/python/src/objectstore_client/client.py b/clients/python/src/objectstore_client/client.py index af8c0dcb..69af615e 100644 --- a/clients/python/src/objectstore_client/client.py +++ b/clients/python/src/objectstore_client/client.py @@ -441,6 +441,22 @@ def object_url(self, key: str) -> str: """ return self._make_url(key, full=True) + def delete(self, key: str) -> None: + """ + Deletes the blob with the given `key`. + """ + + headers = self._make_headers() + with measure_storage_operation( + self._metrics_backend, "delete", self._usecase.name + ): + response = self._pool.request( + "DELETE", + self._make_url(key), + headers=headers, + ) + raise_for_status(response) + def initiate_multipart_upload( self, *, @@ -510,19 +526,3 @@ def resume_multipart_upload(self, key: str, upload_id: str) -> MultipartUpload: after a process restart or to continue an upload started elsewhere. """ return MultipartUpload(self, key, upload_id) - - def delete(self, key: str) -> None: - """ - Deletes the blob with the given `key`. - """ - - headers = self._make_headers() - with measure_storage_operation( - self._metrics_backend, "delete", self._usecase.name - ): - response = self._pool.request( - "DELETE", - self._make_url(key), - headers=headers, - ) - raise_for_status(response) From 4b51aa1e0b5de193ac07e38cd7f508ee2bd730d9 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 18:02:42 +0200 Subject: [PATCH 10/22] improve --- .../python/src/objectstore_client/client.py | 20 +++++---- .../src/objectstore_client/multipart.py | 43 +++++++++---------- clients/python/tests/test_e2e.py | 40 ++++++++--------- clients/python/tests/test_multipart.py | 4 +- 4 files changed, 53 insertions(+), 54 deletions(-) diff --git a/clients/python/src/objectstore_client/client.py b/clients/python/src/objectstore_client/client.py index 69af615e..a24b0397 100644 --- a/clients/python/src/objectstore_client/client.py +++ b/clients/python/src/objectstore_client/client.py @@ -467,16 +467,16 @@ def initiate_multipart_upload( expiration_policy: ExpirationPolicy | None = None, origin: str | None = None, ) -> MultipartUpload: - """Initiate a multipart upload. + """ + Initiates a multipart upload. Returns a :class:`~objectstore_client.multipart.MultipartUpload` handle that can be used to upload parts, list parts, complete, or abort. **Important:** unlike :meth:`put`, the ``compression`` parameter only - records the compression algorithm in the object's metadata. It does - **not** auto-compress part data. The caller is responsible for - compressing each part in accordance with the chosen algorithm before - passing it to + records the compression algorithm in the object's metadata. + The caller is responsible for compressing each part in accordance with the + chosen algorithm before passing it to :meth:`~objectstore_client.multipart.MultipartUpload.upload_part`. """ if compression and compression not in ("none", "zstd"): @@ -506,7 +506,7 @@ def initiate_multipart_upload( key = None with measure_storage_operation( - self._metrics_backend, "multipart_initiate", self._usecase.name + self._metrics_backend, "multipart.initiate", self._usecase.name ): response = self._pool.request( "POST" if not key else "PUT", @@ -520,9 +520,11 @@ def initiate_multipart_upload( return MultipartUpload(self, res["key"], res["upload_id"]) def resume_multipart_upload(self, key: str, upload_id: str) -> MultipartUpload: - """Reconstruct a multipart upload handle from a prior initiate. + """ + Reconstructs a multipart upload handle. - This does not make any network calls. Use it to resume an upload - after a process restart or to continue an upload started elsewhere. + This does not make any network calls. + Use it to resume an upload after a process restart or to + continue an upload started elsewhere. """ return MultipartUpload(self, key, upload_id) diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index d43dc659..f3653d2b 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -26,7 +26,7 @@ class CompletePart: @dataclass class PartInfo: - """Information about an uploaded part, as returned by list_parts.""" + """Information about an uploaded part""" part_number: int etag: str @@ -38,12 +38,7 @@ def to_complete_part(self) -> CompletePart: class MultipartCompleteError(RequestError): - """Error returned in the body of a multipart complete response. - - Multipart assembly failures are conveyed inside a 200 JSON response body, - but ordinary HTTP 4xx/5xx errors may still be returned before assembly - starts. - """ + """Error returned as part of a multipart:complete response's body.""" def __init__(self, code: str, message: str): super().__init__( @@ -55,7 +50,8 @@ def __init__(self, code: str, message: str): class MultipartUpload: - """Handle for an in-progress multipart upload. + """ + Handle for an in-progress multipart upload. Create via :meth:`Session.initiate_multipart_upload` or :meth:`Session.resume_multipart_upload`. @@ -74,34 +70,37 @@ def key(self) -> str: def upload_id(self) -> str: return self._upload_id - def upload_part( + def put_part( self, contents: bytes | IO[bytes], - part_number: int, *, + part_number: int, content_length: int, content_md5: bytes | None = None, ) -> CompletePart: - """Upload a single part. + """ + Uploads a single part. - Unlike :meth:`Session.put`, this does **not** automatically compress - the contents. If the multipart upload was initiated with compression, - the caller must compress each part before passing it here. + IMPORTANT: Unlike :meth:`Session.put`, this does **not** + automatically compress `contents`. + The caller must pre-compress each part according to the + compression set as part of the metadata when initiating + the upload. Args: - contents: The part data. + contents: The part data. If this upload was initiated + with compression, this must be pre-compressed. part_number: 1-indexed part number. - content_length: The length in bytes of the payload being uploaded. - When multipart compression is enabled, this must be the length - of the already-compressed part bytes. - content_md5: Optional raw 16-byte MD5 digest of the (possibly - compressed) contents. Base64-encoded internally for the - ``Content-MD5`` header. + content_length: The length in bytes of the payload + being uploaded. If this upload was initiated with + compression, this must be the post-compression + length. + content_md5: Optional raw MD5 digest of `contents`. """ if isinstance(contents, bytes): if len(contents) != content_length: raise ValueError( - "content_length must match the size of the provided bytes payload" + "content_length must match the size of the provided payload" ) body: bytes | IO[bytes] = BytesIO(contents) else: diff --git a/clients/python/tests/test_e2e.py b/clients/python/tests/test_e2e.py index a6c53a7a..d4d69a02 100644 --- a/clients/python/tests/test_e2e.py +++ b/clients/python/tests/test_e2e.py @@ -369,8 +369,8 @@ def test_multipart_full_cycle_uncompressed(server_url: str) -> None: assert upload.key == "mp-uncompressed" assert upload.upload_id - part1 = upload.upload_part(b"hello ", part_number=1, content_length=6) - part2 = upload.upload_part(b"world!", part_number=2, content_length=6) + part1 = upload.put_part(b"hello ", part_number=1, content_length=6) + part2 = upload.put_part(b"world!", part_number=2, content_length=6) final_key = upload.complete([part1, part2]) assert final_key == "mp-uncompressed" @@ -397,10 +397,10 @@ def test_multipart_full_cycle_compressed(server_url: str) -> None: compressed_part1 = cctx.compress(b"hello ") compressed_part2 = cctx.compress(b"world!") - part1 = upload.upload_part( + part1 = upload.put_part( compressed_part1, part_number=1, content_length=len(compressed_part1) ) - part2 = upload.upload_part( + part2 = upload.put_part( compressed_part2, part_number=2, content_length=len(compressed_part2) ) @@ -431,12 +431,12 @@ def test_multipart_streaming_part_upload_uncompressed(server_url: str) -> None: part1_payload = b"hello " part2_payload = b"world!" - part1 = upload.upload_part( + part1 = upload.put_part( UnrewindableStream(part1_payload), part_number=1, content_length=len(part1_payload), ) - part2 = upload.upload_part( + part2 = upload.put_part( UnrewindableStream(part2_payload), part_number=2, content_length=len(part2_payload), @@ -466,12 +466,12 @@ def test_multipart_streaming_part_upload_compressed(server_url: str) -> None: compressed_part1 = cctx.compress(b"hello ") compressed_part2 = cctx.compress(b"world!") - part1 = upload.upload_part( + part1 = upload.put_part( UnrewindableStream(compressed_part1), part_number=1, content_length=len(compressed_part1), ) - part2 = upload.upload_part( + part2 = upload.put_part( UnrewindableStream(compressed_part2), part_number=2, content_length=len(compressed_part2), @@ -500,7 +500,7 @@ def test_multipart_server_generated_key(server_url: str) -> None: upload = session.initiate_multipart_upload() assert upload.key - part = upload.upload_part(b"data", part_number=1, content_length=4) + part = upload.put_part(b"data", part_number=1, content_length=4) final_key = upload.complete([part]) assert final_key @@ -519,8 +519,8 @@ def test_multipart_list_parts(server_url: str) -> None: upload = session.initiate_multipart_upload(key="mp-list-parts") - upload.upload_part(b"part-two", part_number=2, content_length=8) - upload.upload_part(b"part-one", part_number=1, content_length=8) + upload.put_part(b"part-two", part_number=2, content_length=8) + upload.put_part(b"part-one", part_number=1, content_length=8) parts = upload.list_parts() assert len(parts) == 2 @@ -543,7 +543,7 @@ def test_multipart_abort(server_url: str) -> None: session = client.session(usecase, org=42, project=1337) upload = session.initiate_multipart_upload(key="mp-abort") - upload.upload_part(b"some data", part_number=1, content_length=9) + upload.put_part(b"some data", part_number=1, content_length=9) upload.abort() @@ -563,7 +563,7 @@ def test_multipart_metadata_preserved(server_url: str) -> None: metadata={"my-key": "my-value"}, ) - part = upload.upload_part(b"payload", part_number=1, content_length=7) + part = upload.put_part(b"payload", part_number=1, content_length=7) final_key = upload.complete([part]) retrieved = session.get(final_key) @@ -582,7 +582,7 @@ def test_multipart_complete_with_bad_etag(server_url: str) -> None: session = client.session(usecase, org=42, project=1337) upload = session.initiate_multipart_upload(key="mp-bad-etag") - upload.upload_part(b"real data", part_number=1, content_length=9) + upload.put_part(b"real data", part_number=1, content_length=9) with pytest.raises(MultipartCompleteError) as exc_info: upload.complete([CompletePart(part_number=1, etag="bogus-etag")]) @@ -604,14 +604,14 @@ def test_multipart_resume(server_url: str) -> None: saved_key = upload.key saved_upload_id = upload.upload_id - upload.upload_part(b"first", part_number=1, content_length=5) + upload.put_part(b"first", part_number=1, content_length=5) # Simulate resuming from saved state resumed = session.resume_multipart_upload(saved_key, saved_upload_id) assert resumed.key == saved_key assert resumed.upload_id == saved_upload_id - resumed.upload_part(b"second", part_number=2, content_length=6) + resumed.put_part(b"second", part_number=2, content_length=6) existing = resumed.list_parts() assert len(existing) == 2 @@ -638,14 +638,12 @@ def test_multipart_concurrent_part_uploads(server_url: str) -> None: chunks = [f"chunk-{i}".encode() for i in range(8)] - def upload_part(part_number: int, data: bytes) -> CompletePart: - return upload.upload_part( - data, part_number=part_number, content_length=len(data) - ) + def put_part(part_number: int, data: bytes) -> CompletePart: + return upload.put_part(data, part_number=part_number, content_length=len(data)) with ThreadPoolExecutor(max_workers=4) as executor: futures = [ - executor.submit(upload_part, i + 1, chunk) for i, chunk in enumerate(chunks) + executor.submit(put_part, i + 1, chunk) for i, chunk in enumerate(chunks) ] parts = [f.result() for f in futures] diff --git a/clients/python/tests/test_multipart.py b/clients/python/tests/test_multipart.py index 131a11e5..1bca8280 100644 --- a/clients/python/tests/test_multipart.py +++ b/clients/python/tests/test_multipart.py @@ -69,7 +69,7 @@ def test_upload_part_validates_bytes_content_length() -> None: upload = MultipartUpload(session, "key", "upload-id") with pytest.raises(ValueError, match="content_length must match"): - upload.upload_part(b"payload", part_number=1, content_length=1) + upload.put_part(b"payload", part_number=1, content_length=1) def test_multipart_complete_raises_http_errors_before_parsing( @@ -112,7 +112,7 @@ def test_multipart_put_part_metrics_use_distinct_namespace_without_compression_t ) upload = session.initiate_multipart_upload(key="key", compression="zstd") - part = upload.upload_part( + part = upload.put_part( b"compressed", part_number=1, content_length=len(b"compressed") ) From 125f42abd7bdd81c238eec3215bc32282bcacd72 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 12 May 2026 18:09:31 +0200 Subject: [PATCH 11/22] fix: address CI failures and review feedback - Fix sphinx doc build: suppress ambiguous cross-reference warnings for re-exported symbols and use fully qualified references - Fix inconsistent metric operation name: multipart_complete -> multipart.complete - Fix line-too-long and typo in docstrings --- clients/python/docs/conf.py | 2 ++ clients/python/src/objectstore_client/multipart.py | 11 ++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/clients/python/docs/conf.py b/clients/python/docs/conf.py index b0c4d7d3..8c2b0c7d 100644 --- a/clients/python/docs/conf.py +++ b/clients/python/docs/conf.py @@ -25,3 +25,5 @@ "undoc-members": True, "show-inheritance": True, } + +suppress_warnings = ["ref.python"] diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index f3653d2b..770b6cc8 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -53,8 +53,8 @@ class MultipartUpload: """ Handle for an in-progress multipart upload. - Create via :meth:`Session.initiate_multipart_upload` or - :meth:`Session.resume_multipart_upload`. + Create via :meth:`~objectstore_client.client.Session.initiate_multipart_upload` or + :meth:`~objectstore_client.client.Session.resume_multipart_upload`. """ def __init__(self, session: Session, key: str, upload_id: str): @@ -81,8 +81,9 @@ def put_part( """ Uploads a single part. - IMPORTANT: Unlike :meth:`Session.put`, this does **not** - automatically compress `contents`. + IMPORTANT: Unlike + :meth:`~objectstore_client.client.Session.put`, + this does **not** automatically compress `contents`. The caller must pre-compress each part according to the compression set as part of the metadata when initiating the upload. @@ -215,7 +216,7 @@ def complete(self, parts: Sequence[CompletePart]) -> str: with measure_storage_operation( self._session._metrics_backend, - "multipart_complete", + "multipart.complete", self._session._usecase.name, ): response = self._session._pool.request( From f2f1ddd2c51276dfac0047502f3c41a5b6cd92bc Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 13 May 2026 14:27:34 +0200 Subject: [PATCH 12/22] improve --- clients/python/src/objectstore_client/multipart.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index 770b6cc8..b701c558 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -210,8 +210,13 @@ def complete(self, parts: Sequence[CompletePart]) -> str: headers = self._session._make_headers() headers["Content-Type"] = "application/json" + sorted_parts = sorted(parts, key=lambda p: p.part_number) request_body = json.dumps( - {"parts": [{"part_number": p.part_number, "etag": p.etag} for p in parts]} + { + "parts": [ + {"part_number": p.part_number, "etag": p.etag} for p in sorted_parts + ] + } ).encode("utf-8") with measure_storage_operation( From acb64e5bf4065b0e7ef026af8a457456ba7ac5fa Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 12:15:20 +0200 Subject: [PATCH 13/22] improve --- clients/python/docs/conf.py | 2 - clients/python/tests/test_multipart.py | 73 -------------------------- 2 files changed, 75 deletions(-) diff --git a/clients/python/docs/conf.py b/clients/python/docs/conf.py index 8c2b0c7d..b0c4d7d3 100644 --- a/clients/python/docs/conf.py +++ b/clients/python/docs/conf.py @@ -25,5 +25,3 @@ "undoc-members": True, "show-inheritance": True, } - -suppress_warnings = ["ref.python"] diff --git a/clients/python/tests/test_multipart.py b/clients/python/tests/test_multipart.py index 1bca8280..1b3966dd 100644 --- a/clients/python/tests/test_multipart.py +++ b/clients/python/tests/test_multipart.py @@ -1,47 +1,12 @@ import json -from dataclasses import dataclass from typing import Any import pytest from objectstore_client import Client, Usecase from objectstore_client.errors import RequestError -from objectstore_client.metrics import Tags from objectstore_client.multipart import CompletePart, MultipartUpload -@dataclass -class DistributionRecord: - name: str - value: int | float - tags: Tags | None - unit: str | None - - -class RecordingMetricsBackend: - def __init__(self) -> None: - self.distributions: list[DistributionRecord] = [] - - def increment( - self, - name: str, - value: int | float = 1, - tags: Tags | None = None, - ) -> None: - return None - - def gauge(self, name: str, value: int | float, tags: Tags | None = None) -> None: - return None - - def distribution( - self, - name: str, - value: int | float, - tags: Tags | None = None, - unit: str | None = None, - ) -> None: - self.distributions.append(DistributionRecord(name, value, tags, unit)) - - class FakeResponse: def __init__( self, @@ -92,41 +57,3 @@ def test_multipart_complete_raises_http_errors_before_parsing( assert exc_info.value.status == 403 assert exc_info.value.response == '{"detail":"missing or expired auth"}' - - -def test_multipart_put_part_metrics_use_distinct_namespace_without_compression_tags( - monkeypatch: pytest.MonkeyPatch, -) -> None: - metrics_backend = RecordingMetricsBackend() - client = Client("http://127.0.0.1:8888", metrics_backend=metrics_backend) - session = client.session(Usecase("testing", compression="none"), org=1) - responses = iter( - [ - FakeResponse(200, json_data={"key": "key", "upload_id": "upload-id"}), - FakeResponse(200, json_data={"etag": "part-etag"}), - ] - ) - - monkeypatch.setattr( - session._pool, "request", lambda *args, **kwargs: next(responses) - ) - - upload = session.initiate_multipart_upload(key="key", compression="zstd") - part = upload.put_part( - b"compressed", part_number=1, content_length=len(b"compressed") - ) - - assert part.etag == "part-etag" - size_metrics = [ - record - for record in metrics_backend.distributions - if record.name == "storage.multipart.put_part.size" - ] - assert size_metrics == [ - DistributionRecord( - "storage.multipart.put_part.size", - len(b"compressed"), - {"usecase": "testing"}, - "byte", - ) - ] From ac14da9c11d782f5844c93259e09668cb67ca4d2 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 12:26:56 +0200 Subject: [PATCH 14/22] fix: restore suppress_warnings for sphinx docs build The ref.python suppression is needed because Session is re-exported from __init__.py, causing ambiguous cross-reference warnings with -W. Also update auto-generated rst to include errors and multipart modules. --- clients/python/docs/conf.py | 4 ++++ clients/python/docs/objectstore_client.rst | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/clients/python/docs/conf.py b/clients/python/docs/conf.py index b0c4d7d3..3e342004 100644 --- a/clients/python/docs/conf.py +++ b/clients/python/docs/conf.py @@ -25,3 +25,7 @@ "undoc-members": True, "show-inheritance": True, } + +# Session is re-exported from __init__.py and defined in client.py, +# causing ambiguous cross-reference warnings with -W. +suppress_warnings = ["ref.python"] diff --git a/clients/python/docs/objectstore_client.rst b/clients/python/docs/objectstore_client.rst index d45997fd..28fdf9a9 100644 --- a/clients/python/docs/objectstore_client.rst +++ b/clients/python/docs/objectstore_client.rst @@ -25,6 +25,14 @@ objectstore\_client.client module :show-inheritance: :undoc-members: +objectstore\_client.errors module +--------------------------------- + +.. automodule:: objectstore_client.errors + :members: + :show-inheritance: + :undoc-members: + objectstore\_client.metadata module ----------------------------------- @@ -41,6 +49,14 @@ objectstore\_client.metrics module :show-inheritance: :undoc-members: +objectstore\_client.multipart module +------------------------------------ + +.. automodule:: objectstore_client.multipart + :members: + :show-inheritance: + :undoc-members: + objectstore\_client.scope module -------------------------------- From ae2a5e8848691a6e856234bc5a35ed6db9545c48 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 12:29:25 +0200 Subject: [PATCH 15/22] fix: resolve sphinx ambiguous cross-references at the source Set __module__ on re-exported classes in __init__.py so Sphinx sees a single canonical target, instead of suppressing all ref.python warnings. --- clients/python/docs/conf.py | 4 ---- clients/python/src/objectstore_client/__init__.py | 6 ++++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/python/docs/conf.py b/clients/python/docs/conf.py index 3e342004..b0c4d7d3 100644 --- a/clients/python/docs/conf.py +++ b/clients/python/docs/conf.py @@ -25,7 +25,3 @@ "undoc-members": True, "show-inheritance": True, } - -# Session is re-exported from __init__.py and defined in client.py, -# causing ambiguous cross-reference warnings with -W. -suppress_warnings = ["ref.python"] diff --git a/clients/python/src/objectstore_client/__init__.py b/clients/python/src/objectstore_client/__init__.py index fa26f417..9e27fa8d 100644 --- a/clients/python/src/objectstore_client/__init__.py +++ b/clients/python/src/objectstore_client/__init__.py @@ -16,6 +16,12 @@ from objectstore_client.metrics import MetricsBackend, NoOpMetricsBackend from objectstore_client.utils import parse_accept_encoding +# Set canonical module so Sphinx doesn't see ambiguous cross-references +# between e.g. objectstore_client.Session and objectstore_client.client.Session. +for _sym in list(locals().values()): + if isinstance(_sym, type) and _sym.__module__.startswith("objectstore_client."): + _sym.__module__ = __name__ + __all__ = [ "Client", "Usecase", From f5e6af3ea07f3f42de50822f65d3a9806cdf2cba Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 14:26:35 +0200 Subject: [PATCH 16/22] fix: properly resolve sphinx ambiguous cross-reference warning Clear __all__ on the top-level package during doc generation so re-exported symbols only have a single Sphinx target (under their defining submodule). Replaces both the suppress_warnings blanket and the __module__ hack. --- clients/python/docs/conf.py | 7 +++++++ clients/python/src/objectstore_client/__init__.py | 6 ------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/clients/python/docs/conf.py b/clients/python/docs/conf.py index b0c4d7d3..60e36fec 100644 --- a/clients/python/docs/conf.py +++ b/clients/python/docs/conf.py @@ -25,3 +25,10 @@ "undoc-members": True, "show-inheritance": True, } + + +import objectstore_client # noqa: E402 + +# Prevent autodoc from documenting re-exported symbols in the top-level package, +# so each class has a single Sphinx target under its defining submodule. +objectstore_client.__all__ = [] diff --git a/clients/python/src/objectstore_client/__init__.py b/clients/python/src/objectstore_client/__init__.py index 9e27fa8d..fa26f417 100644 --- a/clients/python/src/objectstore_client/__init__.py +++ b/clients/python/src/objectstore_client/__init__.py @@ -16,12 +16,6 @@ from objectstore_client.metrics import MetricsBackend, NoOpMetricsBackend from objectstore_client.utils import parse_accept_encoding -# Set canonical module so Sphinx doesn't see ambiguous cross-references -# between e.g. objectstore_client.Session and objectstore_client.client.Session. -for _sym in list(locals().values()): - if isinstance(_sym, type) and _sym.__module__.startswith("objectstore_client."): - _sym.__module__ = __name__ - __all__ = [ "Client", "Usecase", From 25378ff7c9a35cd0bb444eab92b03f77bca99085 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 14:33:44 +0200 Subject: [PATCH 17/22] fix: use targeted suppress_warnings for sphinx ambiguous cross-refs ref.python is the most specific suppression Sphinx supports for the duplicate-target warning caused by re-exports in __init__.py. --- clients/python/docs/conf.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/clients/python/docs/conf.py b/clients/python/docs/conf.py index 60e36fec..8b1c9450 100644 --- a/clients/python/docs/conf.py +++ b/clients/python/docs/conf.py @@ -26,9 +26,7 @@ "show-inheritance": True, } - -import objectstore_client # noqa: E402 - -# Prevent autodoc from documenting re-exported symbols in the top-level package, -# so each class has a single Sphinx target under its defining submodule. -objectstore_client.__all__ = [] +# Re-exported symbols in __init__.py create duplicate Sphinx targets +# (e.g. objectstore_client.Session vs objectstore_client.client.Session). +# This is the most specific suppression Sphinx supports for that warning. +suppress_warnings = ["ref.python"] From a090c4ccd7d7fd32816c164042d3ff2e27602d4f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 14:50:17 +0200 Subject: [PATCH 18/22] improve --- clients/python/src/objectstore_client/multipart.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index b701c558..159be641 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -42,7 +42,7 @@ class MultipartCompleteError(RequestError): def __init__(self, code: str, message: str): super().__init__( - f"Multipart complete failed ({code}): {message}", + f"Multipart upload completion failed ({code}): {message}", status=200, response=message, ) From 7859ba286002d3457f53a7ef76387b226387de6a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 14:57:08 +0200 Subject: [PATCH 19/22] improve --- .../python/src/objectstore_client/multipart.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index 159be641..40d6d05c 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -140,7 +140,7 @@ def put_part( return CompletePart(part_number=part_number, etag=res["etag"]) def list_parts(self) -> list[PartInfo]: - """List all uploaded parts, auto-paginating.""" + """Lists all uploaded parts.""" all_parts: list[PartInfo] = [] marker: int | None = None @@ -184,7 +184,7 @@ def list_parts(self) -> list[PartInfo]: ) def abort(self) -> None: - """Abort this multipart upload, cleaning up server-side state.""" + """Aborts this multipart upload, cleaning up server-side state.""" query = urlencode({"upload_id": self._upload_id}) url = self._session._make_multipart_url(None, self._key, query) headers = self._session._make_headers() @@ -197,13 +197,13 @@ def abort(self) -> None: raise_for_status(response) def complete(self, parts: Sequence[CompletePart]) -> str: - """Complete the multipart upload, assembling all parts into the final object. + """Completes the multipart upload, assembling all parts into the final object. Returns the final object key. Raises :class:`MultipartCompleteError` if the server reports an error - during assembly in a 200 response body. Ordinary non-2xx HTTP errors are - still raised as :class:`RequestError`. + during assembly, or :class:`RequestError` if the server returns a non-2XX + response. """ query = urlencode({"upload_id": self._upload_id}) url = self._session._make_multipart_url("complete", self._key, query) @@ -234,16 +234,11 @@ def complete(self, parts: Sequence[CompletePart]) -> str: ) raise_for_status(response) - # Successful completion responses may include keepalive whitespace. raw = (response.data or b"").decode("utf-8").strip() try: data = json.loads(raw) except json.JSONDecodeError: - raise RequestError( - "Failed to parse multipart complete response", - status=response.status, - response=raw, - ) + raise ValueError("Failed to parse multipart complete response") if "error" in data: raise MultipartCompleteError( From 6922ed81023f19072162090fea56fcb2b99d01de Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 15:08:29 +0200 Subject: [PATCH 20/22] improve --- clients/python/README.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 291f5fb9..78701ec4 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -136,14 +136,6 @@ except MultipartCompleteError: raise ``` -You can also let the server generate the final object key: - -```python -upload = session.initiate_multipart_upload() -part = upload.upload_part(b"payload", part_number=1, content_length=len(b"payload")) -key = upload.complete([part]) -``` - To resume an in-progress multipart upload after a process restart, persist the `key` and `upload_id`, then reconstruct the upload handle later: From 070e8795b87532d35a59fa31b554589a96831423 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 15:11:48 +0200 Subject: [PATCH 21/22] improve --- clients/python/README.md | 3 +-- clients/python/src/objectstore_client/multipart.py | 5 +---- clients/python/tests/test_e2e.py | 3 +-- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 78701ec4..c8516401 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -146,8 +146,7 @@ saved_upload_id = upload.upload_id resumed = session.resume_multipart_upload(saved_key, saved_upload_id) existing_parts = resumed.list_parts() -parts = [part.to_complete_part() for part in existing_parts] -key = resumed.complete(parts) +key = resumed.complete(existing_parts) ``` ### Authentication diff --git a/clients/python/src/objectstore_client/multipart.py b/clients/python/src/objectstore_client/multipart.py index 40d6d05c..62ee8134 100644 --- a/clients/python/src/objectstore_client/multipart.py +++ b/clients/python/src/objectstore_client/multipart.py @@ -33,9 +33,6 @@ class PartInfo: last_modified: datetime size: int - def to_complete_part(self) -> CompletePart: - return CompletePart(part_number=self.part_number, etag=self.etag) - class MultipartCompleteError(RequestError): """Error returned as part of a multipart:complete response's body.""" @@ -196,7 +193,7 @@ def abort(self) -> None: ) raise_for_status(response) - def complete(self, parts: Sequence[CompletePart]) -> str: + def complete(self, parts: Sequence[CompletePart | PartInfo]) -> str: """Completes the multipart upload, assembling all parts into the final object. Returns the final object key. diff --git a/clients/python/tests/test_e2e.py b/clients/python/tests/test_e2e.py index d4d69a02..7401fd69 100644 --- a/clients/python/tests/test_e2e.py +++ b/clients/python/tests/test_e2e.py @@ -616,8 +616,7 @@ def test_multipart_resume(server_url: str) -> None: existing = resumed.list_parts() assert len(existing) == 2 - parts = [p.to_complete_part() for p in existing] - final_key = resumed.complete(parts) + final_key = resumed.complete(existing) retrieved = session.get(final_key) assert retrieved.payload.read() == b"firstsecond" From 078aaa5df3a5aa0f7d8d291705362170346e54ad Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 15 May 2026 15:13:39 +0200 Subject: [PATCH 22/22] improve --- clients/python/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/python/README.md b/clients/python/README.md index c8516401..7f80e617 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -146,7 +146,9 @@ saved_upload_id = upload.upload_id resumed = session.resume_multipart_upload(saved_key, saved_upload_id) existing_parts = resumed.list_parts() -key = resumed.complete(existing_parts) +# Upload missing parts... + +key = resumed.complete(new_parts + existing_parts) ``` ### Authentication