Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from google.cloud import _storage_v2
from google.cloud._storage_v2.types import BidiWriteObjectRedirectedError
from google.cloud._storage_v2.types.storage import BidiWriteObjectRequest
from google.cloud.storage import Blob
from google.cloud.storage.asyncio.async_grpc_client import (
AsyncGrpcClient,
)
Expand Down Expand Up @@ -107,6 +108,7 @@ def __init__(
client: AsyncGrpcClient,
bucket_name: str,
object_name: str,
blob: Optional[Blob] = None,
generation: Optional[int] = None,
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
writer_options: Optional[dict] = None,
Expand Down Expand Up @@ -185,6 +187,7 @@ def __init__(
self.object_name = object_name
self.write_handle = write_handle
self.generation = generation
self.blob = blob

self.write_obj_stream: Optional[_AsyncWriteObjectStream] = None
self._is_stream_open: bool = False
Expand Down Expand Up @@ -297,6 +300,7 @@ async def _do_open():
client=self.client.grpc_client,
bucket_name=self.bucket_name,
object_name=self.object_name,
blob=self.blob,
generation_number=self.generation,
write_handle=self.write_handle,
routing_token=self._routing_token,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.api_core.bidi_async import AsyncBidiRpc

from google.cloud import _storage_v2
from google.cloud.storage import Blob
from google.cloud.storage.asyncio import _utils
from google.cloud.storage.asyncio.async_abstract_object_stream import (
_AsyncAbstractObjectStream,
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
client: AsyncGrpcClient.grpc_client,
bucket_name: str,
object_name: str,
blob: Optional[Blob] = None,
generation_number: Optional[int] = None, # None means new object
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
routing_token: Optional[str] = None,
Expand All @@ -83,7 +85,7 @@ def __init__(
self.client: AsyncGrpcClient.grpc_client = client
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
self.routing_token: Optional[str] = routing_token

self.blob: Optional[Blob] = blob
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"

self.rpc = self.client._client._transport._wrapped_methods[
Expand Down Expand Up @@ -118,10 +120,19 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
# if `generation_number` == 0 new object will be created only if there
# isn't any existing object.
if self.generation_number is None or self.generation_number == 0:
resource_params = {
"name": self.object_name,
"bucket": self._full_bucket_name,
}
Comment on lines +123 to +126
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The indentation of the resource_params dictionary is incorrect and will cause a SyntaxError. The keys and values should be indented inside the dictionary definition. Additionally, ensure that the dictionary is programmatically sorted to align with repository standards for dictionary key ordering.

            resource_params = dict(
                sorted(
                    {
                        "name": self.object_name,
                        "bucket": self._full_bucket_name,
                    }.items()
                )
            )
References
  1. Python code must follow PEP-8 indentation standards to avoid SyntaxErrors. (link)
  2. To ensure dictionary keys remain sorted without manual effort, programmatically sort the dictionary instead of relying on manual ordering in the code.

if self.blob:
if self.blob.content_type:
resource_params["content_type"] = self.blob.content_type
if self.blob.metadata:
resource_params["metadata"] = self.blob.metadata
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
write_object_spec=_storage_v2.WriteObjectSpec(
resource=_storage_v2.Object(
name=self.object_name, bucket=self._full_bucket_name
**resource_params
),
appendable=True,
if_generation_match=self.generation_number,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2026 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import asyncio

from google.cloud.storage.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)
from google.cloud.storage import Blob
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient


async def storage_create_and_write_appendable_object(
bucket_name, object_name, grpc_client=None
):
"""Uploads an appendable object to zonal bucket.

grpc_client: an existing grpc_client to use, this is only for testing.
"""

if grpc_client is None:
grpc_client = AsyncGrpcClient()
blob = Blob.from_uri("gs://{}/{}".format(bucket_name, object_name))
blob.content_type = "text/plain"
writer = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
blob=blob,
generation=0, # throws `FailedPrecondition` if object already exists.
)
# This creates a new appendable object of size 0 and opens it for appending.
await writer.open()

# appends data to the object
# you can perform `.append` multiple times as needed. Data will be appended
# to the end of the object.
await writer.append(b"Some data")

# Once all appends are done, close the gRPC bidirectional stream.
new_object = await writer.close(finalize_on_close=True)
print(new_object)
print(new_object.size)
print(new_object.content_type)
print(
f"Appended object {object_name} created of size {writer.persisted_size} bytes."
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
parser.add_argument("--object_name", help="Your Cloud Storage object name.")

args = parser.parse_args()

asyncio.run(
storage_create_and_write_appendable_object(
bucket_name=args.bucket_name,
object_name=args.object_name,
)
)
Loading