Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 96 additions & 16 deletions packages/gooddata-pandas/src/gooddata_pandas/arrow_convertor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

import logging
from typing import Callable

import orjson
import pandas
from gooddata_sdk.type_converter import AttributeConverterStore

from gooddata_pandas.arrow_types import TypesMapper

Expand Down Expand Up @@ -42,6 +44,63 @@

_REQUIRED_SCHEMA_KEYS = (_META_XTAB, _META_MODEL, _META_VIEW)

logger = logging.getLogger(__name__)


def read_model_labels(table: pa.Table) -> dict:
"""Return the ``labels`` dict from the Arrow table's ``x-gdc-model-v1`` schema metadata.

Returns an empty dict when the metadata key is absent so callers can use it
unconditionally without extra None-checks.
"""
if not table.schema.metadata or b"x-gdc-model-v1" not in table.schema.metadata:
return {}
return orjson.loads(table.schema.metadata[b"x-gdc-model-v1"]).get("labels", {})


def _get_date_converter_for_label(label_id: str, model_labels: dict):
"""Return a type Converter for date-granularity labels, or None for plain text attributes.

Reads the ``granularity`` field from Arrow model metadata (``x-gdc-model-v1``) and
looks up the matching converter in ``AttributeConverterStore``.

- ``DAY`` / ``MONTH`` / ``YEAR`` → ``DateConverter`` (→ ``pandas.Timestamp`` via external fn)
- ``WEEK`` / ``QUARTER`` → ``StringConverter`` (no-op)
- ``MINUTE`` / ``HOUR`` → ``DatetimeConverter``
- No granularity (text attrs) → ``None`` (caller skips conversion)
"""
info = model_labels.get(label_id, {})
granularity = info.get("granularity")
if not granularity:
return None
return AttributeConverterStore.find_converter("DATE", granularity.upper())


def convert_label_values(label_id: str, values: list, model_labels: dict) -> list:
"""Apply date-granularity type conversion to a list of attribute values from an Arrow column.

Mirrors the non-Arrow execution path (``AttributeConverterStore`` in ``_typed_attribute_value``):

- ``DAY`` / ``MONTH`` / ``YEAR`` granularity → ``pandas.Timestamp``
- ``WEEK`` / ``QUARTER`` → ``str`` (unchanged)
- No granularity (text attributes) → values returned as the **same object**

``None`` values are passed through unchanged.

Args:
label_id: Arrow column name / GoodData label local ID.
values: Raw values from ``table.column(label_id).to_pylist()``.
model_labels: The ``labels`` dict from ``x-gdc-model-v1`` schema metadata
(as returned by :func:`read_model_labels`).

Returns:
Converted list, or the original *values* object when no conversion is needed.
"""
converter = _get_date_converter_for_label(label_id, model_labels)
if converter is None:
return values
return [converter.to_external_type(v) if v is not None else None for v in values]


def build_metric_field_index(table: pa.Table) -> dict[int, str]:
"""Return {metric_dimension_index: arrow_field_name} from the table schema.
Expand Down Expand Up @@ -77,9 +136,14 @@ def _parse_schema_metadata(table: pa.Table) -> dict:
raise ValueError(
"Arrow table has no schema metadata. Expected GoodData metadata keys: " + ", ".join(_REQUIRED_SCHEMA_KEYS)
)
schema_meta = {
k.decode(): orjson.loads(v) for k, v in table.schema.metadata.items() if k.decode() in _REQUIRED_SCHEMA_KEYS
}
schema_meta = {}
for _k, _v in table.schema.metadata.items():
try:
_k_str = _k.decode()
except UnicodeDecodeError:
continue
if _k_str in _REQUIRED_SCHEMA_KEYS:
schema_meta[_k_str] = orjson.loads(_v)
missing = [k for k in _REQUIRED_SCHEMA_KEYS if k not in schema_meta]
if missing:
raise ValueError(
Expand Down Expand Up @@ -186,10 +250,15 @@ def _build_inline_index(
totals_meta = xtab_meta.get("totalsMetadata", {})
total_ref_vals: list = [None] * table.num_rows
if totals_meta:
for field in table.schema:
if field.name.startswith(_COL_TOTAL_REF_PREFIX):
total_ref_vals = table.column(field.name).to_pylist()
break
total_ref_cols = [f.name for f in table.schema if f.name.startswith(_COL_TOTAL_REF_PREFIX)]
if total_ref_cols:
if len(total_ref_cols) > 1:
logger.warning(
"Arrow table has %d __total_ref* columns; only %r is used for aggregation names.",
len(total_ref_cols),
total_ref_cols[0],
Comment on lines +253 to +259
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder, did this actually happen? multiple total ref columns means something is very hosed on the backend...

being defensive is fine, no problem here but if you run into this warning then perhaps there's a more serious problem in xtab impl?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it didn't happen yet, just a defensive warning "what if", so as you say, if it happens in the future, we should react on this. Original implementation silently picked the first column with the prefix.

)
total_ref_vals = table.column(total_ref_cols[0]).to_pylist()

# Precompute per-row aggregation name and kept-label set for total rows.
agg_for_row: list[str | None] = [None] * table.num_rows
Expand All @@ -212,16 +281,17 @@ def _build_inline_index(
values = table.column(lid).to_pylist()
processed = []
for i, v in enumerate(values):
if row_types[i] != 0 and isinstance(v, str):
if ref in kept_labels_for_row[i]:
# Outer label kept as real attribute value in a subtotal row.
processed.append(v)
elif v == "":
# Aggregated level left empty by the server — fill with agg name.
processed.append(agg_for_row[i] if agg_for_row[i] else v)
if row_types[i] != 0:
if isinstance(v, str):
if ref in kept_labels_for_row[i]:
processed.append(v)
elif v == "":
processed.append(agg_for_row[i] if agg_for_row[i] else v)
else:
processed.append(v.upper())
else:
# Aggregation function marker (e.g. 'sum') — uppercase it.
processed.append(v.upper())
# Non-string value in a total row — replace with the aggregation name when available.
processed.append(agg_for_row[i] if agg_for_row[i] is not None else v)
else:
processed.append(v)
arrays.append(processed)
Expand Down Expand Up @@ -410,6 +480,11 @@ def _label_ids_in_dim(dim: dict) -> set:
(dim for dim in execution_dims if col_ref_label_ids <= _label_ids_in_dim(dim)),
{},
)
if not col_dim and execution_dims:
logger.warning(
"No execution dimension contains column label IDs %s; column_totals_indexes will be empty.",
col_ref_label_ids,
)
else:
col_dim = next(
(dim for dim in execution_dims if any("measureGroupHeaders" in h for h in dim.get("headers", []))),
Expand Down Expand Up @@ -486,6 +561,11 @@ def _label_ids_in_dim(dim: dict) -> set:
(dim for dim in execution_dims if ref_label_ids <= _label_ids_in_dim(dim)),
{},
)
if not row_dim and execution_dims:
logger.warning(
"No execution dimension contains row label IDs %s; row_totals_indexes will be empty.",
ref_label_ids,
)
else:
# Metrics-only: the dimension containing measureGroupHeaders is the output-row dim.
row_dim = next(
Expand Down
4 changes: 4 additions & 0 deletions packages/gooddata-pandas/src/gooddata_pandas/arrow_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ class ArrowConfig:
custom_mapping is not provided.
custom_mapping: Arrow type → pandas dtype mapping dict. Only used when
types_mapper=TypesMapper.CUSTOM, ignored otherwise.
max_bytes: Optional byte-size limit for the Arrow response body. When set,
``read_result_arrow`` raises ``ResultSizeBytesLimitExceeded`` if the
raw IPC payload exceeds this value before parsing begins.
"""

self_destruct: bool = False
types_mapper: TypesMapper = TypesMapper.DEFAULT
custom_mapping: dict | None = field(default=None)
max_bytes: int | None = field(default=None)
35 changes: 26 additions & 9 deletions packages/gooddata-pandas/src/gooddata_pandas/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
)
from gooddata_sdk.utils import IdObjType

try:
from gooddata_pandas.arrow_convertor import build_metric_field_index
except ImportError:
pass # Only needed when use_arrow=True; callers guard with _ARROW_AVAILABLE checks

from gooddata_pandas.utils import (
ColumnsDef,
IndexDef,
Expand All @@ -34,6 +29,12 @@
get_catalog_attributes_for_extract,
)

_ARROW_IMPORT_ERROR: ImportError | None = None
try:
from gooddata_pandas.arrow_convertor import build_metric_field_index, convert_label_values, read_model_labels
except ImportError as _e: # pragma: no cover
_ARROW_IMPORT_ERROR = _e # pragma: no cover


class ExecutionDefinitionBuilder:
_DEFAULT_INDEX_NAME: str = "0"
Expand Down Expand Up @@ -429,20 +430,30 @@ def _extract_from_arrow(
col_to_attr_idx: dict[str, int],
col_to_metric_idx: dict[str, int],
index_to_attr_idx: dict[str, int],
max_bytes: int | None = None,
) -> tuple[dict, dict]:
"""
Arrow-path extraction for indexed() / not_indexed().

Reads the full result in one shot via the binary endpoint, then slices columns
by Arrow field name (metrics) or label id (attributes). No catalog fetch needed.
by Arrow field name (metrics) or label id (attributes).

Date-granularity attribute columns (year/month/day) are converted to
``pandas.Timestamp`` to match the behaviour of the non-Arrow path.
Week and quarter values remain as strings (same as non-Arrow).
"""
table = execution.bare_exec_response.read_result_arrow()
if _ARROW_IMPORT_ERROR is not None:
raise ImportError(
"pyarrow is required for Arrow support. Install it with: pip install gooddata-pandas[arrow]"
) from _ARROW_IMPORT_ERROR
table = execution.bare_exec_response.read_result_arrow(max_bytes=max_bytes)
exec_def = execution.exec_def

if table.num_rows == 0:
return {col: [] for col in cols}, {idx: [] for idx in index_to_attr_idx}

metric_dim_idx_to_field = build_metric_field_index(table)
model_labels = read_model_labels(table)

data: dict[str, list] = {}
for col in cols:
Expand All @@ -451,12 +462,14 @@ def _extract_from_arrow(
data[col] = table.column(field_name).to_pylist()
else:
attr = exec_def.attributes[col_to_attr_idx[col]]
data[col] = table.column(attr.label.id).to_pylist()
label_id = attr.label.id
data[col] = convert_label_values(label_id, table.column(label_id).to_pylist(), model_labels)

index: dict[str, list] = {}
for idx_name, attr_idx in index_to_attr_idx.items():
attr = exec_def.attributes[attr_idx]
index[idx_name] = table.column(attr.label.id).to_pylist()
label_id = attr.label.id
index[idx_name] = convert_label_values(label_id, table.column(label_id).to_pylist(), model_labels)

return data, index

Expand All @@ -471,6 +484,7 @@ def compute_and_extract(
is_cancellable: bool = False,
result_page_len: int | None = None,
use_arrow: bool = False,
max_bytes: int | None = None,
) -> tuple[dict, dict]:
"""
Convenience function that computes and extracts data from the execution response.
Expand All @@ -489,6 +503,8 @@ def compute_and_extract(
Defaults to 1000. Larger values can improve performance for large result sets.
use_arrow (bool, optional): When True, fetches the result via the Arrow IPC binary
endpoint in one shot instead of paginating through JSON. Requires pyarrow.
max_bytes (Optional[int]): Maximum response body size in bytes for the Arrow path.
Raises ResultSizeBytesLimitExceeded when exceeded. Ignored when use_arrow=False.

Returns:
tuple: A tuple containing the following dictionaries:
Expand Down Expand Up @@ -522,6 +538,7 @@ def compute_and_extract(
col_to_attr_idx,
col_to_metric_idx,
index_to_attr_idx,
max_bytes=max_bytes,
)
elif not exec_def.has_attributes():
return _extract_for_metrics_only(execution, cols, col_to_metric_idx), dict()
Expand Down
6 changes: 4 additions & 2 deletions packages/gooddata-pandas/src/gooddata_pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def indexed(
is_cancellable=is_cancellable,
result_page_len=result_page_len,
use_arrow=use_arrow,
max_bytes=self._arrow_config.max_bytes if use_arrow else None,
)

_idx = make_pandas_index(index)
Expand Down Expand Up @@ -210,6 +211,7 @@ def not_indexed(
is_cancellable=is_cancellable,
result_page_len=result_page_len,
use_arrow=use_arrow,
max_bytes=self._arrow_config.max_bytes if use_arrow else None,
)

return pandas.DataFrame(data=data)
Expand Down Expand Up @@ -539,7 +541,7 @@ def for_exec_def_arrow(
on_execution_submitted(execution)

exec_response = execution.bare_exec_response
table = exec_response.read_result_arrow()
table = exec_response.read_result_arrow(max_bytes=self._arrow_config.max_bytes)
return self._table_to_df_and_metadata(table, exec_response, label_overrides, grand_totals_position)

def for_arrow_table(
Expand Down Expand Up @@ -684,7 +686,7 @@ def for_exec_result_id(
result_cache_metadata.execution_response, _check_type=False
),
)
table = exec_response.read_result_arrow()
table = exec_response.read_result_arrow(max_bytes=self._arrow_config.max_bytes)
return self._table_to_df_and_metadata(table, exec_response, label_overrides, grand_totals_position)

return convert_execution_response_to_dataframe(
Expand Down
Loading
Loading