From 581caf3b2ee6b482a3cf05c4f3f65be1457880d6 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 27 May 2026 15:48:46 +0200 Subject: [PATCH] ref(tasks): Remove base64 encoding for bytes parameters in tasks Pass bytes directly to tasks instead of base64-encoding them, now that taskbroker uses msgpack which supports bytes natively. - assemble_download: page_token now accepts bytes | str - fulfill_cross_region_export_request: encrypt_with_public_key now accepts bytes | str Both tasks handle legacy base64 strings for backwards compatibility during rollout. ref STREAM-1011 --- src/sentry/data_export/tasks.py | 36 ++++++++++--------- .../services/relocation_export/impl.py | 3 +- src/sentry/relocation/tasks/process.py | 8 +++-- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/sentry/data_export/tasks.py b/src/sentry/data_export/tasks.py index a785b0adee31ce..04e4217c05831a 100644 --- a/src/sentry/data_export/tasks.py +++ b/src/sentry/data_export/tasks.py @@ -100,11 +100,11 @@ def _sentry_metric_attrs( return attrs -def _page_token_b64_from_processor( +def _page_token_from_processor( processor: IssuesByTagProcessor | DiscoverProcessor | ExploreProcessor, -) -> str | None: +) -> bytes | None: if isinstance(processor, TraceItemFullExportProcessor) and processor.page_token is not None: - return base64.b64encode(processor.page_token).decode("ascii") + return processor.page_token return None @@ -162,7 +162,7 @@ def export_chunk_to_stored_blobs( export_limit: int, environment_id: int | None, first_page: bool = True, - page_token: str | None = None, + page_token: bytes | str | None = None, offset: int = 0, bytes_written: int = 0, batch_size: int = SNUBA_MAX_RESULTS, @@ -174,7 +174,7 @@ def export_chunk_to_stored_blobs( data_export, environment_id, output_mode, - page_token_b64=page_token, + page_token=page_token, ) with tempfile.TemporaryFile(mode="w+b") as tf: @@ -240,7 +240,7 @@ def _schedule_retry( base_bytes_written: int, environment_id: int | None, export_retries: int, - page_token: str | None, + page_token: bytes | str | None, delay_retry: bool = False, ) -> None: assemble_download.apply_async( @@ -280,7 +280,7 @@ def _schedule_next_task( "bytes_written": bytes_written, "environment_id": environment_id, "export_retries": export_retries, - "page_token": _page_token_b64_from_processor(processor), + "page_token": _page_token_from_processor(processor), } should_continue = next_offset < export_limit and ( (isinstance(processor, TraceItemFullExportProcessor) and processor.page_token is not None) @@ -325,7 +325,7 @@ def assemble_download( environment_id: int | None = None, export_retries: int = DEFAULT_EXPORT_RETRIES, *, - page_token: str | None = None, + page_token: bytes | str | None = None, **kwargs: Any, ) -> None: # The API response to export the data contains the ID which you can use @@ -573,7 +573,7 @@ def get_processor( environment_id: int | None, output_mode: OutputMode, *, - page_token_b64: str | None = None, + page_token: bytes | str | None = None, ) -> IssuesByTagProcessor | DiscoverProcessor | ExploreProcessor | TraceItemFullExportProcessor: try: if data_export.query_type == ExportQueryType.ISSUES_BY_TAG: @@ -597,17 +597,21 @@ def get_processor( output_mode=output_mode, ) elif data_export.query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT: - page_token: bytes | None = None - if page_token_b64: - try: - page_token = base64.b64decode(page_token_b64) - except (ValueError, TypeError) as e: - raise ExportError("Invalid export trace item pagination state.") from e + page_token_bytes: bytes | None = None + if page_token is not None: + # Handle both bytes (new) and base64 string (legacy) page tokens + if isinstance(page_token, str): + try: + page_token_bytes = base64.b64decode(page_token) + except (ValueError, TypeError) as e: + raise ExportError("Invalid export trace item pagination state.") from e + else: + page_token_bytes = page_token return TraceItemFullExportProcessor( explore_query=data_export.query_info, organization=data_export.organization, output_mode=output_mode, - page_token=page_token, + page_token=page_token_bytes, ) else: diff --git a/src/sentry/relocation/services/relocation_export/impl.py b/src/sentry/relocation/services/relocation_export/impl.py index 2a6447e62f4905..0c9e2ed7366cb5 100644 --- a/src/sentry/relocation/services/relocation_export/impl.py +++ b/src/sentry/relocation/services/relocation_export/impl.py @@ -3,7 +3,6 @@ # in modules such as this one where hybrid cloud data models or service classes are # defined, because we want to reflect on type annotations and avoid forward references. -import base64 import logging from datetime import UTC, datetime from io import BytesIO @@ -64,7 +63,7 @@ def request_new_export( requesting_region_name, replying_region_name, org_slug, - base64.b64encode(encrypt_with_public_key).decode("utf8"), + encrypt_with_public_key, int(round(datetime.now(tz=UTC).timestamp())), ] ) diff --git a/src/sentry/relocation/tasks/process.py b/src/sentry/relocation/tasks/process.py index f229c1a482e191..9b7341f075cea6 100644 --- a/src/sentry/relocation/tasks/process.py +++ b/src/sentry/relocation/tasks/process.py @@ -319,7 +319,7 @@ def fulfill_cross_region_export_request( requesting_cell_name: str, replying_cell_name: str, org_slug: str, - encrypt_with_public_key: str, + encrypt_with_public_key: bytes | str, # Unix timestamp, in seconds. scheduled_at: int, ) -> None: @@ -334,7 +334,11 @@ def fulfill_cross_region_export_request( """ from sentry.relocation.tasks.transfer import process_relocation_transfer_region - encrypt_with_public_key_bytes = base64.b64decode(encrypt_with_public_key.encode("utf8")) + # Handle both bytes (new) and base64 string (legacy) + if isinstance(encrypt_with_public_key, str): + encrypt_with_public_key_bytes = base64.b64decode(encrypt_with_public_key.encode("utf8")) + else: + encrypt_with_public_key_bytes = encrypt_with_public_key logger_data = { "uuid": uuid_str,