diff --git a/packages/bigframes/.python-version b/packages/bigframes/.python-version new file mode 100644 index 000000000000..95ed564f82b7 --- /dev/null +++ b/packages/bigframes/.python-version @@ -0,0 +1 @@ +3.14.2 diff --git a/packages/bigframes/bigframes/bigquery/_operations/ai.py b/packages/bigframes/bigframes/bigquery/_operations/ai.py index 7a509d4f95ff..6164c863b391 100644 --- a/packages/bigframes/bigframes/bigquery/_operations/ai.py +++ b/packages/bigframes/bigframes/bigquery/_operations/ai.py @@ -1003,7 +1003,7 @@ def _separate_context_and_series( if isinstance(prompt, series.Series): if prompt.dtype == dtypes.OBJ_REF_DTYPE: # Multi-model support - return [None], [prompt.blob.read_url()] + return [None], [prompt._blob._read_url()] return [None], [prompt] prompt_context: List[str | None] = [] @@ -1040,7 +1040,7 @@ def _convert_series( if result.dtype == dtypes.OBJ_REF_DTYPE: # Support multimodel - return result.blob.read_url() + return result._blob._read_url() return result diff --git a/packages/bigframes/bigframes/blob/_functions.py b/packages/bigframes/bigframes/blob/_functions.py index 5114f60058c1..3869416d1244 100644 --- a/packages/bigframes/bigframes/blob/_functions.py +++ b/packages/bigframes/bigframes/blob/_functions.py @@ -124,605 +124,3 @@ def udf(self): # TODO(b/404605969): remove cleanups when UDF fixes dataset deletion. self._session._function_session._update_temp_artifacts(udf_name, "") return self._session.read_gbq_function(udf_name) - - -def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: - try: - import io - import json - - import requests - from PIL import ExifTags, Image - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() - bts = response.content - - image = Image.open(io.BytesIO(bts)) - exif_data = image.getexif() - exif_dict = {} - - if exif_data: - for tag, value in exif_data.items(): - tag_name = ExifTags.TAGS.get(tag, tag) - # Convert non-serializable types to strings - try: - json.dumps(value) - exif_dict[tag_name] = value - except (TypeError, ValueError): - exif_dict[tag_name] = str(value) - - if verbose: - return json.dumps({"status": "", "content": json.dumps(exif_dict)}) - else: - return json.dumps(exif_dict) - - except Exception as e: - # Return error as JSON with error field - error_result = {"status": f"{type(e).__name__}: {str(e)}", "content": "{}"} - if verbose: - return json.dumps(error_result) - else: - return "{}" - - -exif_func_def = FunctionDef(exif_func, ["pillow", "requests"]) - - -# Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string. -def image_blur_func( - src_obj_ref_rt: str, - dst_obj_ref_rt: str, - ksize_x: int, - ksize_y: int, - ext: str, - verbose: bool, -) -> typing.Optional[str]: - try: - import json - - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - ext = ext or ".jpeg" - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() # Raise exception for HTTP errors - bts = response.content - - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - - if img is None: - raise ValueError( - "Failed to decode image - possibly corrupted or unsupported format" - ) - - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - - success, encoded = cv.imencode(ext, img_blurred) - if not success: - raise ValueError(f"Failed to encode image with extension {ext}") - - bts = encoded.tobytes() - - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext - - put_response = session.put( - url=dst_url, - data=bts, - headers={"Content-Type": content_type}, - timeout=30, - ) - put_response.raise_for_status() - - if verbose: - return json.dumps({"status": "", "content": dst_obj_ref_rt}) - else: - return dst_obj_ref_rt - - except Exception as e: - if verbose: - error_result = { - "status": f"Error: {type(e).__name__}: {str(e)}", - "content": "", - } - return json.dumps(error_result) - else: - return None - - -image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) - - -def image_blur_to_bytes_func( - src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str, verbose: bool -) -> str: - import base64 - import json - - try: - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - ext = ext or ".jpeg" - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() - bts = response.content - - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - if img is None: - raise ValueError( - "Failed to decode image - possibly corrupted or unsupported format" - ) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - success, encoded = cv.imencode(ext, img_blurred) - if not success: - raise ValueError(f"Failed to encode image with extension {ext}") - content = encoded.tobytes() - - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict = {"status": "", "content": encoded_content} - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - except Exception as e: - status = f"Error: {type(e).__name__}: {str(e)}" - encoded_content = base64.b64encode(b"").decode("utf-8") - result_dict = {"status": status, "content": encoded_content} - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - -image_blur_to_bytes_def = FunctionDef( - image_blur_to_bytes_func, ["opencv-python", "numpy", "requests"] -) - - -def image_resize_func( - src_obj_ref_rt: str, - dst_obj_ref_rt: str, - dsize_x: int, - dsize_y: int, - fx: float, - fy: float, - ext: str, - verbose: bool, -) -> typing.Optional[str]: - try: - import json - - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - ext = ext or ".jpeg" - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() - bts = response.content - - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - if img is None: - raise ValueError( - "Failed to decode image - possibly corrupted or unsupported format" - ) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - - success, encoded = cv.imencode(ext, img_resized) - if not success: - raise ValueError(f"Failed to encode image with extension {ext}") - bts = encoded.tobytes() - - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext - - put_response = session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) - put_response.raise_for_status() - - if verbose: - return json.dumps({"status": "", "content": dst_obj_ref_rt}) - else: - return dst_obj_ref_rt - - except Exception as e: - if verbose: - error_result = { - "status": f"Error: {type(e).__name__}: {str(e)}", - "content": "", - } - return json.dumps(error_result) - else: - return None - - -image_resize_def = FunctionDef( - image_resize_func, ["opencv-python", "numpy", "requests"] -) - - -def image_resize_to_bytes_func( - src_obj_ref_rt: str, - dsize_x: int, - dsize_y: int, - fx: float, - fy: float, - ext: str, - verbose: bool, -) -> str: - import base64 - import json - - try: - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - ext = ext or ".jpeg" - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() - bts = response.content - - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - if img is None: - raise ValueError( - "Failed to decode image - possibly corrupted or unsupported format" - ) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - success, encoded = cv.imencode(ext, img_resized) - if not success: - raise ValueError(f"Failed to encode image with extension {ext}") - content = encoded.tobytes() - - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict = {"status": "", "content": encoded_content} - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - except Exception as e: - status = f"Error: {type(e).__name__}: {str(e)}" - encoded_content = base64.b64encode(b"").decode("utf-8") - result_dict = {"status": status, "content": encoded_content} - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - -image_resize_to_bytes_def = FunctionDef( - image_resize_to_bytes_func, ["opencv-python", "numpy", "requests"] -) - - -def image_normalize_func( - src_obj_ref_rt: str, - dst_obj_ref_rt: str, - alpha: float, - beta: float, - norm_type: str, - ext: str, - verbose: bool, -) -> typing.Optional[str]: - try: - import json - - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - ext = ext or ".jpeg" - - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() - bts = response.content - - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - if img is None: - raise ValueError( - "Failed to decode image - possibly corrupted or unsupported format" - ) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) - - success, encoded = cv.imencode(ext, img_normalized) - if not success: - raise ValueError(f"Failed to encode image with extension {ext}") - bts = encoded.tobytes() - - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext - - put_response = session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) - put_response.raise_for_status() - - if verbose: - return json.dumps({"status": "", "content": dst_obj_ref_rt}) - else: - return dst_obj_ref_rt - - except Exception as e: - if verbose: - error_result = { - "status": f"Error: {type(e).__name__}: {str(e)}", - "content": "", - } - return json.dumps(error_result) - else: - return None - - -image_normalize_def = FunctionDef( - image_normalize_func, ["opencv-python", "numpy", "requests"] -) - - -def image_normalize_to_bytes_func( - src_obj_ref_rt: str, - alpha: float, - beta: float, - norm_type: str, - ext: str, - verbose: bool, -) -> str: - import base64 - import json - - try: - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - ext = ext or ".jpeg" - - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - - response = session.get(src_url, timeout=30) - response.raise_for_status() - bts = response.content - - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - if img is None: - raise ValueError( - "Failed to decode image - possibly corrupted or unsupported format" - ) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) - success, encoded = cv.imencode(ext, img_normalized) - if not success: - raise ValueError(f"Failed to encode image with extension {ext}") - content = encoded.tobytes() - - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict = {"status": "", "content": encoded_content} - - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - except Exception as e: - status = f"Error: {type(e).__name__}: {str(e)}" - encoded_content = base64.b64encode(b"").decode("utf-8") - result_dict = {"status": status, "content": encoded_content} - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - -image_normalize_to_bytes_def = FunctionDef( - image_normalize_to_bytes_func, ["opencv-python", "numpy", "requests"] -) - - -# Extracts all text from a PDF url -def pdf_extract_func(src_obj_ref_rt: str, verbose: bool) -> str: - try: - import io - import json - - import requests - from pypdf import PdfReader # type: ignore - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - - response = session.get(src_url, timeout=30, stream=True) - response.raise_for_status() - pdf_bytes = response.content - - pdf_file = io.BytesIO(pdf_bytes) - reader = PdfReader(pdf_file, strict=False) - - all_text = "" - for page in reader.pages: - page_extract_text = page.extract_text() - if page_extract_text: - all_text += page_extract_text - - result_dict = {"status": "", "content": all_text} - - except Exception as e: - result_dict = {"status": str(e), "content": ""} - - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] - - -pdf_extract_def = FunctionDef( - pdf_extract_func, ["pypdf>=5.3.1,<6.0.0", "requests", "cryptography==43.0.3"] -) - - -# Extracts text from a PDF url and chunks it simultaneously -def pdf_chunk_func( - src_obj_ref_rt: str, chunk_size: int, overlap_size: int, verbose: bool -) -> str: - try: - import io - import json - - import requests - from pypdf import PdfReader # type: ignore - from requests import adapters - - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - - response = session.get(src_url, timeout=30, stream=True) - response.raise_for_status() - pdf_bytes = response.content - - pdf_file = io.BytesIO(pdf_bytes) - reader = PdfReader(pdf_file, strict=False) - # extract and chunk text simultaneously - all_text_chunks = [] - curr_chunk = "" - for page in reader.pages: - page_text = page.extract_text() - if page_text: - curr_chunk += page_text - # split the accumulated text into chunks of a specific size with overlaop - # this loop implements a sliding window approach to create chunks - while len(curr_chunk) >= chunk_size: - split_idx = curr_chunk.rfind(" ", 0, chunk_size) - if split_idx == -1: - split_idx = chunk_size - actual_chunk = curr_chunk[:split_idx] - all_text_chunks.append(actual_chunk) - overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size] - curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :] - if curr_chunk: - all_text_chunks.append(curr_chunk) - - result_dict = {"status": "", "content": all_text_chunks} - - except Exception as e: - result_dict = {"status": str(e), "content": []} - - if verbose: - return json.dumps(result_dict) - else: - return json.dumps(result_dict["content"]) - - -pdf_chunk_def = FunctionDef( - pdf_chunk_func, ["pypdf>=5.3.1,<6.0.0", "requests", "cryptography==43.0.3"] -) diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index b89360c691d3..b0ea81e003e1 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -833,7 +833,7 @@ def _get_display_df_and_blob_cols(self) -> tuple[DataFrame, list[str]]: df = self.copy() for col in blob_cols: # TODO(garrettwu): Not necessary to get access urls for all the rows. Update when having a to get URLs from local data. - df[col] = df[col].blob._get_runtime(mode="R", with_metadata=True) + df[col] = df[col]._blob._get_runtime(mode="R", with_metadata=True) return df, blob_cols def _repr_mimebundle_(self, include=None, exclude=None): diff --git a/packages/bigframes/bigframes/ml/llm.py b/packages/bigframes/bigframes/ml/llm.py index bcf59d591f8e..d9e228c90c9f 100644 --- a/packages/bigframes/bigframes/ml/llm.py +++ b/packages/bigframes/bigframes/ml/llm.py @@ -397,7 +397,7 @@ def predict( # TODO(garrettwu): remove transform to ObjRefRuntime when BQML supports ObjRef as input if X["content"].dtype == dtypes.OBJ_REF_DTYPE: - X["content"] = X["content"].blob._get_runtime("R", with_metadata=True) + X["content"] = X["content"]._blob._get_runtime("R", with_metadata=True) options: dict = {} @@ -731,7 +731,7 @@ def predict( isinstance(item, bigframes.series.Series) and item.dtype == dtypes.OBJ_REF_DTYPE ): - item = item.blob._get_runtime("R", with_metadata=True) + item = item._blob._get_runtime("R", with_metadata=True) df_prompt[label] = item df_prompt = df_prompt.drop(columns="bigframes_placeholder_col") diff --git a/packages/bigframes/bigframes/operations/blob.py b/packages/bigframes/bigframes/operations/blob.py index b9a33af2d1ed..9cd7dd0db291 100644 --- a/packages/bigframes/bigframes/operations/blob.py +++ b/packages/bigframes/bigframes/operations/blob.py @@ -14,18 +14,10 @@ from __future__ import annotations -import os -import warnings -from typing import Literal, Optional, Union, cast - -import pandas as pd -import requests import bigframes.dataframe -import bigframes.exceptions as bfe import bigframes.operations as ops import bigframes.series -from bigframes import clients, dtypes from bigframes.core.logging import log_adapter FILE_FOLDER_REGEX = r"^.*\/(.*)$" @@ -33,134 +25,17 @@ @log_adapter.class_logger -class BlobAccessor: +class _BlobAccessor: """ - Blob functions for Series and Index. - - .. note:: - BigFrames Blob is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the - Service Specific Terms(https://cloud.google.com/terms/service-terms#1). Pre-GA products and features are available "as is" - and might have limited support. For more information, see the launch stage descriptions - (https://cloud.google.com/products#product-launch-stages). + Internal blob functions for Series and Index. """ def __init__(self, data: bigframes.series.Series): self._data = data - def uri(self) -> bigframes.series.Series: - """URIs of the Blob. - - Returns: - bigframes.series.Series: URIs as string.""" - s = bigframes.series.Series(self._data._block) - - return s.struct.field("uri") - - def authorizer(self) -> bigframes.series.Series: - """Authorizers of the Blob. - - Returns: - bigframes.series.Series: Autorithers(connection) as string.""" - s = bigframes.series.Series(self._data._block) - - return s.struct.field("authorizer") - - def version(self) -> bigframes.series.Series: - """Versions of the Blob. - - Returns: - bigframes.series.Series: Version as string.""" - # version must be retrieved after fetching metadata - return self._data._apply_unary_op(ops.obj_fetch_metadata_op).struct.field( - "version" - ) - - def metadata(self) -> bigframes.series.Series: - """Retrieve the metadata of the Blob. - - Returns: - bigframes.series.Series: JSON metadata of the Blob. Contains fields: content_type, md5_hash, size and updated(time). - """ - series_to_check = bigframes.series.Series(self._data._block) - # Check if it's a struct series from a verbose operation - if dtypes.is_struct_like(series_to_check.dtype): - pyarrow_dtype = series_to_check.dtype.pyarrow_dtype - if "content" in [field.name for field in pyarrow_dtype]: - content_field_type = pyarrow_dtype.field("content").type - content_bf_type = dtypes.arrow_dtype_to_bigframes_dtype( - content_field_type - ) - if content_bf_type == dtypes.OBJ_REF_DTYPE: - series_to_check = series_to_check.struct.field("content") - details_json = series_to_check._apply_unary_op( - ops.obj_fetch_metadata_op - ).struct.field("details") - import bigframes.bigquery as bbq - - return bbq.json_extract(details_json, "$.gcs_metadata").rename("metadata") - - def content_type(self) -> bigframes.series.Series: - """Retrieve the content type of the Blob. - - Returns: - bigframes.series.Series: string of the content type.""" - return ( - self.metadata() - ._apply_unary_op(ops.JSONValue(json_path="$.content_type")) - .rename("content_type") - ) - - def md5_hash(self) -> bigframes.series.Series: - """Retrieve the md5 hash of the Blob. - - Returns: - bigframes.series.Series: string of the md5 hash.""" - return ( - self.metadata() - ._apply_unary_op(ops.JSONValue(json_path="$.md5_hash")) - .rename("md5_hash") - ) - - def size(self) -> bigframes.series.Series: - """Retrieve the file size of the Blob. - - Returns: - bigframes.series.Series: file size in bytes.""" - return ( - self.metadata() - ._apply_unary_op(ops.JSONValue(json_path="$.size")) - .rename("size") - .astype("Int64") - ) - - def updated(self) -> bigframes.series.Series: - """Retrieve the updated time of the Blob. - - Returns: - bigframes.series.Series: updated time as UTC datetime.""" - import bigframes.pandas as bpd - - updated = ( - self.metadata() - ._apply_unary_op(ops.JSONValue(json_path="$.updated")) - .rename("updated") - .astype("Int64") - ) - - return bpd.to_datetime(updated, unit="us", utc=True) - def _get_runtime( self, mode: str, with_metadata: bool = False ) -> bigframes.series.Series: - """Retrieve the ObjectRefRuntime as JSON. - - Args: - mode (str): mode for the URLs, "R" for read, "RW" for read & write. - metadata (bool, default False): whether to fetch the metadata in the ObjectRefRuntime. - - Returns: - bigframes.series.Series: ObjectRefRuntime JSON. - """ s = ( self._data._apply_unary_op(ops.obj_fetch_metadata_op) if with_metadata @@ -169,913 +44,7 @@ def _get_runtime( return s._apply_unary_op(ops.ObjGetAccessUrl(mode=mode)) - def _df_apply_udf( - self, df: bigframes.dataframe.DataFrame, udf - ) -> bigframes.series.Series: - # Catch and rethrow function axis=1 warning to be more user-friendly. - with warnings.catch_warnings(record=True) as catched_warnings: - s = df.apply(udf, axis=1) - for w in catched_warnings: - if isinstance(w.message, bfe.FunctionAxisOnePreviewWarning): - warnings.warn( - "Blob Functions use bigframes DataFrame Managed function with axis=1 senario, which is a preview feature.", - category=w.category, - stacklevel=2, - ) - else: - warnings.warn_explicit( - message=w.message, - category=w.category, - filename=w.filename, - lineno=w.lineno, - source=w.source, - ) - - return s - - def _apply_udf_or_raise_error( - self, df: bigframes.dataframe.DataFrame, udf, operation_name: str - ) -> bigframes.series.Series: - """Helper to apply UDF with consistent error handling.""" - try: - res = self._df_apply_udf(df, udf) - except Exception as e: - raise RuntimeError(f"{operation_name} UDF execution failed: {e}") from e - - if res is None: - raise RuntimeError(f"{operation_name} returned None result") - - return res - - def read_url(self) -> bigframes.series.Series: - """Retrieve the read URL of the Blob. - - Returns: - bigframes.series.Series: Read only URLs.""" + def _read_url(self) -> bigframes.series.Series: return self._get_runtime(mode="R")._apply_unary_op( ops.JSONValue(json_path="$.access_urls.read_url") ) - - def write_url(self) -> bigframes.series.Series: - """Retrieve the write URL of the Blob. - - Returns: - bigframes.series.Series: Writable URLs.""" - return self._get_runtime(mode="RW")._apply_unary_op( - ops.JSONValue(json_path="$.access_urls.write_url") - ) - - def display( - self, - n: int = 3, - *, - content_type: str = "", - width: Optional[int] = None, - height: Optional[int] = None, - ): - """Display the blob content in the IPython Notebook environment. Only works for image type now. - - Args: - n (int, default 3): number of sample blob objects to display. - content_type (str, default ""): content type of the blob. If unset, use the blob metadata of the storage. Possible values are "image", "audio" and "video". - width (int or None, default None): width in pixels that the image/video are constrained to. If unset, use the global setting in bigframes.options.display.blob_display_width, otherwise image/video's original size or ratio is used. No-op for other content types. - height (int or None, default None): height in pixels that the image/video are constrained to. If unset, use the global setting in bigframes.options.display.blob_display_height, otherwise image/video's original size or ratio is used. No-op for other content types. - """ - import IPython.display as ipy_display - - width = width or bigframes.options.display.blob_display_width - height = height or bigframes.options.display.blob_display_height - - # col name doesn't matter here. Rename to avoid column name conflicts - df = bigframes.series.Series(self._data._block).rename("blob_col").to_frame() - - df["read_url"] = df["blob_col"].blob.read_url() - - if content_type: - df["content_type"] = content_type - else: - df["content_type"] = df["blob_col"].blob.content_type() - - pandas_df, _, query_job = df._block.retrieve_repr_request_results(n) - df._set_internal_query_job(query_job) - - def display_single_url( - read_url: Union[str, pd._libs.missing.NAType], - content_type: Union[str, pd._libs.missing.NAType], - ): - if pd.isna(read_url): - ipy_display.display("") - return - - if pd.isna(content_type): # display as raw data or error - response = requests.get(read_url) - ipy_display.display(response.content) - return - - content_type = cast(str, content_type).casefold() - - if content_type.startswith("image"): - ipy_display.display( - ipy_display.Image(url=read_url, width=width, height=height) - ) - elif content_type.startswith("audio"): - # using url somehow doesn't work with audios - response = requests.get(read_url) - ipy_display.display(ipy_display.Audio(response.content)) - elif content_type.startswith("video"): - ipy_display.display( - ipy_display.Video(read_url, width=width, height=height) - ) - else: # display as raw data - response = requests.get(read_url) - ipy_display.display(response.content) - - for _, row in pandas_df.iterrows(): - display_single_url(row["read_url"], row["content_type"]) - - @property - def session(self): - return self._data._block.session - - def _resolve_connection(self, connection: Optional[str] = None) -> str: - """Resovle the BigQuery connection. - - Args: - connection (str or None, default None): BQ connection used for - function internet transactions, and the output blob if "dst" is - str. If None, uses default connection of the session. - - Returns: - str: the resolved BigQuery connection string in the format: - "project.location.connection_id". - - Raises: - ValueError: If the connection cannot be resolved to a valid string. - """ - connection = connection or self._data._block.session.bq_connection - return clients.get_canonical_bq_connection_id( - connection, - default_project=self._data._block.session._project, - default_location=self._data._block.session._location, - ) - - def get_runtime_json_str( - self, mode: str = "R", *, with_metadata: bool = False - ) -> bigframes.series.Series: - """Get the runtime (contains signed URL to access gcs data) and apply the ToJSONSTring transformation. - - Args: - mode(str or str, default "R"): the mode for accessing the runtime. - Default to "R". Possible values are "R" (read-only) and - "RW" (read-write) - with_metadata (bool, default False): whether to include metadata - in the JSON string. Default to False. - - Returns: - str: the runtime object in the JSON string. - """ - runtime = self._get_runtime(mode=mode, with_metadata=with_metadata) - return runtime._apply_unary_op(ops.ToJSONString()) - - def exif( - self, - *, - engine: Literal[None, "pillow"] = None, - connection: Optional[str] = None, - max_batching_rows: int = 8192, - container_cpu: Union[float, int] = 0.33, - container_memory: str = "512Mi", - verbose: bool = False, - ) -> bigframes.series.Series: - """Extract EXIF data. Now only support image types. - - Args: - engine ('pillow' or None, default None): The engine (bigquery or third party library) used for the function. The value must be specified. - connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session. - max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. - container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. - container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. - - Returns: - bigframes.series.Series: JSON series of key-value pairs if verbose=False, or struct with status and content if verbose=True. - - Raises: - ValueError: If engine is not 'pillow'. - RuntimeError: If EXIF extraction fails or returns invalid structure. - """ - if engine is None or engine.casefold() != "pillow": - raise ValueError("Must specify the engine, supported value is 'pillow'.") - - import bigframes.bigquery as bbq - import bigframes.blob._functions as blob_func - import bigframes.pandas as bpd - - connection = self._resolve_connection(connection) - df = self.get_runtime_json_str(mode="R").to_frame() - df["verbose"] = verbose - - exif_udf = blob_func.TransformFunction( - blob_func.exif_func_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - res = self._apply_udf_or_raise_error(df, exif_udf, "EXIF extraction") - - if verbose: - try: - exif_content_series = bbq.parse_json( - res._apply_unary_op(ops.JSONValue(json_path="$.content")) - ).rename("exif_content") - exif_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - except Exception as e: - raise RuntimeError(f"Failed to parse EXIF JSON result: {e}") from e - results_df = bpd.DataFrame( - {"status": exif_status_series, "content": exif_content_series} - ) - results_struct = bbq.struct(results_df).rename("exif_results") - return results_struct - else: - try: - return bbq.parse_json(res) - except Exception as e: - raise RuntimeError(f"Failed to parse EXIF JSON result: {e}") from e - - def image_blur( - self, - ksize: tuple[int, int], - *, - engine: Literal[None, "opencv"] = None, - dst: Optional[Union[str, bigframes.series.Series]] = None, - connection: Optional[str] = None, - max_batching_rows: int = 8192, - container_cpu: Union[float, int] = 0.33, - container_memory: str = "512Mi", - verbose: bool = False, - ) -> bigframes.series.Series: - """Blurs images. - - Args: - ksize (tuple(int, int)): Kernel size. - engine ('opencv' or None, default None): The engine (bigquery or third party library) used for the function. The value must be specified. - dst (str or bigframes.series.Series or None, default None): Output destination. Can be one of: - str: GCS folder str. The output filenames are the same as the input files. - blob Series: The output file paths are determined by the uris of the blob Series. - None: Output to BQ as bytes. - Encoding is determined by the extension of the output filenames (or input filenames if doesn't have output filenames). If filename doesn't have an extension, use ".jpeg" for encoding. - connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session. - max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. - container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. - container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. - - Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. - - Raises: - ValueError: If engine is not 'opencv' or parameters are invalid. - RuntimeError: If image blur operation fails. - """ - if engine is None or engine.casefold() != "opencv": - raise ValueError("Must specify the engine, supported value is 'opencv'.") - - import bigframes.bigquery as bbq - import bigframes.blob._functions as blob_func - import bigframes.pandas as bpd - - connection = self._resolve_connection(connection) - df = self.get_runtime_json_str(mode="R").to_frame() - - if dst is None: - ext = self.uri().str.extract(FILE_EXT_REGEX) - - image_blur_udf = blob_func.TransformFunction( - blob_func.image_blur_to_bytes_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - df["ksize_x"], df["ksize_y"] = ksize - df["ext"] = ext # type: ignore - df["verbose"] = verbose - res = self._apply_udf_or_raise_error(df, image_blur_udf, "Image blur") - - if verbose: - blurred_content_b64_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - blurred_content_series = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[blurred_content_b64_series] - ) - blurred_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - results_df = bpd.DataFrame( - {"status": blurred_status_series, "content": blurred_content_series} - ) - results_struct = bbq.struct(results_df).rename("blurred_results") - return results_struct - else: - blurred_bytes = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[res] - ).rename("blurred_bytes") - return blurred_bytes - - if isinstance(dst, str): - dst = os.path.join(dst, "") - # Replace src folder with dst folder, keep the file names. - dst_uri = self.uri().str.replace(FILE_FOLDER_REGEX, rf"{dst}\1", regex=True) - dst = cast( - bigframes.series.Series, dst_uri.str.to_blob(connection=connection) - ) - - ext = dst.blob.uri().str.extract(FILE_EXT_REGEX) - - image_blur_udf = blob_func.TransformFunction( - blob_func.image_blur_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - dst_rt = dst.blob.get_runtime_json_str(mode="RW") - - df = df.join(dst_rt, how="outer") - df["ksize_x"], df["ksize_y"] = ksize - df["ext"] = ext # type: ignore - df["verbose"] = verbose - - res = self._apply_udf_or_raise_error(df, image_blur_udf, "Image blur") - res.cache() # to execute the udf - - if verbose: - blurred_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - results_df = bpd.DataFrame( - { - "status": blurred_status_series, - "content": dst.blob.uri().str.to_blob( - connection=self._resolve_connection(connection) - ), - } - ) - results_struct = bbq.struct(results_df).rename("blurred_results") - return results_struct - else: - return dst - - def image_resize( - self, - dsize: tuple[int, int] = (0, 0), - *, - engine: Literal[None, "opencv"] = None, - fx: float = 0.0, - fy: float = 0.0, - dst: Optional[Union[str, bigframes.series.Series]] = None, - connection: Optional[str] = None, - max_batching_rows: int = 8192, - container_cpu: Union[float, int] = 0.33, - container_memory: str = "512Mi", - verbose: bool = False, - ): - """Resize images. - - Args: - dsize (tuple(int, int), default (0, 0)): Destination size. If set to 0, fx and fy parameters determine the size. - engine ('opencv' or None, default None): The engine (bigquery or third party library) used for the function. The value must be specified. - fx (float, default 0.0): scale factor along the horizontal axis. If set to 0.0, dsize parameter determines the output size. - fy (float, defalut 0.0): scale factor along the vertical axis. If set to 0.0, dsize parameter determines the output size. - dst (str or bigframes.series.Series or None, default None): Output destination. Can be one of: - str: GCS folder str. The output filenames are the same as the input files. - blob Series: The output file paths are determined by the uris of the blob Series. - None: Output to BQ as bytes. - Encoding is determined by the extension of the output filenames (or input filenames if doesn't have output filenames). If filename doesn't have an extension, use ".jpeg" for encoding. - connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session. - max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. - container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. - container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. - - Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. - - Raises: - ValueError: If engine is not 'opencv' or parameters are invalid. - RuntimeError: If image resize operation fails. - """ - if engine is None or engine.casefold() != "opencv": - raise ValueError("Must specify the engine, supported value is 'opencv'.") - - dsize_set = dsize[0] > 0 and dsize[1] > 0 - fsize_set = fx > 0.0 and fy > 0.0 - if not dsize_set ^ fsize_set: - raise ValueError( - "Only one of dsize or (fx, fy) parameters must be set. And the set values must be positive. " - ) - - import bigframes.bigquery as bbq - import bigframes.blob._functions as blob_func - import bigframes.pandas as bpd - - connection = self._resolve_connection(connection) - df = self.get_runtime_json_str(mode="R").to_frame() - - if dst is None: - ext = self.uri().str.extract(FILE_EXT_REGEX) - - image_resize_udf = blob_func.TransformFunction( - blob_func.image_resize_to_bytes_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - df["dsize_x"], df["dsize_y"] = dsize - df["fx"], df["fy"] = fx, fy - df["ext"] = ext # type: ignore - df["verbose"] = verbose - res = self._apply_udf_or_raise_error(df, image_resize_udf, "Image resize") - - if verbose: - resized_content_b64_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - resized_content_series = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[resized_content_b64_series] - ) - - resized_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - results_df = bpd.DataFrame( - {"status": resized_status_series, "content": resized_content_series} - ) - results_struct = bbq.struct(results_df).rename("resized_results") - return results_struct - else: - resized_bytes = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[res] - ).rename("resized_bytes") - return resized_bytes - - if isinstance(dst, str): - dst = os.path.join(dst, "") - # Replace src folder with dst folder, keep the file names. - dst_uri = self.uri().str.replace(FILE_FOLDER_REGEX, rf"{dst}\1", regex=True) - dst = cast( - bigframes.series.Series, dst_uri.str.to_blob(connection=connection) - ) - - ext = dst.blob.uri().str.extract(FILE_EXT_REGEX) - - image_resize_udf = blob_func.TransformFunction( - blob_func.image_resize_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - dst_rt = dst.blob.get_runtime_json_str(mode="RW") - - df = df.join(dst_rt, how="outer") - df["dsize_x"], df["dsize_y"] = dsize - df["fx"], df["fy"] = fx, fy - df["ext"] = ext # type: ignore - df["verbose"] = verbose - - res = self._apply_udf_or_raise_error(df, image_resize_udf, "Image resize") - res.cache() # to execute the udf - - if verbose: - resized_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - results_df = bpd.DataFrame( - { - "status": resized_status_series, - "content": dst.blob.uri().str.to_blob( - connection=self._resolve_connection(connection) - ), - } - ) - results_struct = bbq.struct(results_df).rename("resized_results") - return results_struct - else: - return dst - - def image_normalize( - self, - *, - engine: Literal[None, "opencv"] = None, - alpha: float = 1.0, - beta: float = 0.0, - norm_type: str = "l2", - dst: Optional[Union[str, bigframes.series.Series]] = None, - connection: Optional[str] = None, - max_batching_rows: int = 8192, - container_cpu: Union[float, int] = 0.33, - container_memory: str = "512Mi", - verbose: bool = False, - ) -> bigframes.series.Series: - """Normalize images. - - Args: - engine ('opencv' or None, default None): The engine (bigquery or third party library) used for the function. The value must be specified. - alpha (float, default 1.0): Norm value to normalize to or the lower range boundary in case of the range normalization. - beta (float, default 0.0): Upper range boundary in case of the range normalization; it is not used for the norm normalization. - norm_type (str, default "l2"): Normalization type. Accepted values are "inf", "l1", "l2" and "minmax". - dst (str or bigframes.series.Series or None, default None): Output destination. Can be one of: - str: GCS folder str. The output filenames are the same as the input files. - blob Series: The output file paths are determined by the uris of the blob Series. - None: Output to BQ as bytes. - Encoding is determined by the extension of the output filenames (or input filenames if doesn't have output filenames). If filename doesn't have an extension, use ".jpeg" for encoding. - connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session. - max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. - container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. - container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. - - Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. - - Raises: - ValueError: If engine is not 'opencv' or parameters are invalid. - RuntimeError: If image normalize operation fails. - """ - if engine is None or engine.casefold() != "opencv": - raise ValueError("Must specify the engine, supported value is 'opencv'.") - - import bigframes.bigquery as bbq - import bigframes.blob._functions as blob_func - import bigframes.pandas as bpd - - connection = self._resolve_connection(connection) - df = self.get_runtime_json_str(mode="R").to_frame() - - if dst is None: - ext = self.uri().str.extract(FILE_EXT_REGEX) - - image_normalize_udf = blob_func.TransformFunction( - blob_func.image_normalize_to_bytes_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - df["alpha"] = alpha - df["beta"] = beta - df["norm_type"] = norm_type - df["ext"] = ext # type: ignore - df["verbose"] = verbose - res = self._apply_udf_or_raise_error( - df, image_normalize_udf, "Image normalize" - ) - - if verbose: - normalized_content_b64_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - normalized_bytes = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[normalized_content_b64_series] - ) - normalized_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - results_df = bpd.DataFrame( - {"status": normalized_status_series, "content": normalized_bytes} - ) - results_struct = bbq.struct(results_df).rename("normalized_results") - return results_struct - else: - normalized_bytes = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[res] - ).rename("normalized_bytes") - return normalized_bytes - - if isinstance(dst, str): - dst = os.path.join(dst, "") - # Replace src folder with dst folder, keep the file names. - dst_uri = self.uri().str.replace(FILE_FOLDER_REGEX, rf"{dst}\1", regex=True) - dst = cast( - bigframes.series.Series, dst_uri.str.to_blob(connection=connection) - ) - - ext = dst.blob.uri().str.extract(FILE_EXT_REGEX) - - image_normalize_udf = blob_func.TransformFunction( - blob_func.image_normalize_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - dst_rt = dst.blob.get_runtime_json_str(mode="RW") - - df = df.join(dst_rt, how="outer") - df["alpha"] = alpha - df["beta"] = beta - df["norm_type"] = norm_type - df["ext"] = ext # type: ignore - df["verbose"] = verbose - - res = self._apply_udf_or_raise_error(df, image_normalize_udf, "Image normalize") - res.cache() # to execute the udf - - if verbose: - normalized_status_series = res._apply_unary_op( - ops.JSONValue(json_path="$.status") - ) - results_df = bpd.DataFrame( - { - "status": normalized_status_series, - "content": dst.blob.uri().str.to_blob( - connection=self._resolve_connection(connection) - ), - } - ) - results_struct = bbq.struct(results_df).rename("normalized_results") - return results_struct - else: - return dst - - def pdf_extract( - self, - *, - engine: Literal[None, "pypdf"] = None, - connection: Optional[str] = None, - max_batching_rows: int = 1, - container_cpu: Union[float, int] = 2, - container_memory: str = "1Gi", - verbose: bool = False, - ) -> bigframes.series.Series: - """Extracts text from PDF URLs and saves the text as string. - - Args: - engine ('pypdf' or None, default None): The engine (bigquery or third party library) used for the function. The value must be specified. - connection (str or None, default None): BQ connection used for - function internet transactions, and the output blob if "dst" - is str. If None, uses default connection of the session. - max_batching_rows (int, default 1): Max number of rows per batch - send to cloud run to execute the function. - container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. - container_memory (str, default "1Gi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default "False"): controls the verbosity of the output. - When set to True, both error messages and the extracted content - are displayed. Conversely, when set to False, only the extracted - content is presented, suppressing error messages. - - Returns: - bigframes.series.Series: str or struct[str, str], - depend on the "verbose" parameter. - Contains the extracted text from the PDF file. - Includes error messages if verbosity is enabled. - - Raises: - ValueError: If engine is not 'pypdf'. - RuntimeError: If PDF extraction fails or returns invalid structure. - """ - if engine is None or engine.casefold() != "pypdf": - raise ValueError("Must specify the engine, supported value is 'pypdf'.") - - import bigframes.bigquery as bbq - import bigframes.blob._functions as blob_func - import bigframes.pandas as bpd - - connection = self._resolve_connection(connection) - - pdf_extract_udf = blob_func.TransformFunction( - blob_func.pdf_extract_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - df = self.get_runtime_json_str(mode="R").to_frame() - df["verbose"] = verbose - - res = self._apply_udf_or_raise_error(df, pdf_extract_udf, "PDF extraction") - - if verbose: - # Extract content with error handling - try: - content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - except Exception as e: - raise RuntimeError( - f"Failed to extract content field from PDF result: {e}" - ) from e - try: - status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - except Exception as e: - raise RuntimeError( - f"Failed to extract status field from PDF result: {e}" - ) from e - - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df).rename("extracted_results") - return struct_series - else: - return res.rename("extracted_content") - - def pdf_chunk( - self, - *, - engine: Literal[None, "pypdf"] = None, - connection: Optional[str] = None, - chunk_size: int = 2000, - overlap_size: int = 200, - max_batching_rows: int = 1, - container_cpu: Union[float, int] = 2, - container_memory: str = "1Gi", - verbose: bool = False, - ) -> bigframes.series.Series: - """Extracts and chunks text from PDF URLs and saves the text as - arrays of strings. - - Args: - engine ('pypdf' or None, default None): The engine (bigquery or third party library) used for the function. The value must be specified. - connection (str or None, default None): BQ connection used for - function internet transactions, and the output blob if "dst" - is str. If None, uses default connection of the session. - chunk_size (int, default 2000): the desired size of each text chunk - (number of characters). - overlap_size (int, default 200): the number of overlapping characters - between consective chunks. The helps to ensure context is - perserved across chunk boundaries. - max_batching_rows (int, default 1): Max number of rows per batch - send to cloud run to execute the function. - container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. - container_memory (str, default "1Gi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default "False"): controls the verbosity of the output. - When set to True, both error messages and the extracted content - are displayed. Conversely, when set to False, only the extracted - content is presented, suppressing error messages. - - Returns: - bigframe.series.Series: array[str] or struct[str, array[str]], - depend on the "verbose" parameter. - where each string is a chunk of text extracted from PDF. - Includes error messages if verbosity is enabled. - - Raises: - ValueError: If engine is not 'pypdf'. - RuntimeError: If PDF chunking fails or returns invalid structure. - """ - if engine is None or engine.casefold() != "pypdf": - raise ValueError("Must specify the engine, supported value is 'pypdf'.") - - import bigframes.bigquery as bbq - import bigframes.blob._functions as blob_func - import bigframes.pandas as bpd - - connection = self._resolve_connection(connection) - - if chunk_size <= 0: - raise ValueError("chunk_size must be a positive integer.") - if overlap_size < 0: - raise ValueError("overlap_size must be a non-negative integer.") - if overlap_size >= chunk_size: - raise ValueError("overlap_size must be smaller than chunk_size.") - - pdf_chunk_udf = blob_func.TransformFunction( - blob_func.pdf_chunk_def, - session=self._data._block.session, - connection=connection, - max_batching_rows=max_batching_rows, - container_cpu=container_cpu, - container_memory=container_memory, - ).udf() - - df = self.get_runtime_json_str(mode="R").to_frame() - df["chunk_size"] = chunk_size - df["overlap_size"] = overlap_size - df["verbose"] = verbose - - res = self._apply_udf_or_raise_error(df, pdf_chunk_udf, "PDF chunking") - - try: - content_series = bbq.json_extract_string_array(res, "$.content") - except Exception as e: - raise RuntimeError( - f"Failed to extract content array from PDF chunk result: {e}" - ) from e - - if verbose: - try: - status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - except Exception as e: - raise RuntimeError( - f"Failed to extract status field from PDF chunk result: {e}" - ) from e - - results_df = bpd.DataFrame( - {"status": status_series, "content": content_series} - ) - resultes_struct = bbq.struct(results_df).rename("chunked_results") - return resultes_struct - else: - return bbq.json_extract_string_array(res, "$").rename("chunked_content") - - def audio_transcribe( - self, - *, - engine: Literal["bigquery"] = "bigquery", - connection: Optional[str] = None, - model_name: Optional[ - Literal[ - "gemini-2.0-flash-001", - "gemini-2.0-flash-lite-001", - ] - ] = None, - verbose: bool = False, - ) -> bigframes.series.Series: - """ - Transcribe audio content using a Gemini multimodal model. - - Args: - engine ('bigquery'): The engine (bigquery or third party library) used for the function. - connection (str or None, default None): BQ connection used for - function internet transactions, and the output blob if "dst" - is str. If None, uses default connection of the session. - model_name (str): The model for natural language tasks. Accepted - values are "gemini-2.0-flash-lite-001", and "gemini-2.0-flash-001". - See "https://ai.google.dev/gemini-api/docs/models" for model choices. - verbose (bool, default "False"): controls the verbosity of the output. - When set to True, both error messages and the transcribed content - are displayed. Conversely, when set to False, only the transcribed - content is presented, suppressing error messages. - - Returns: - bigframes.series.Series: str or struct[str, str], - depend on the "verbose" parameter. - Contains the transcribed text from the audio file. - Includes error messages if verbosity is enabled. - - Raises: - ValueError: If engine is not 'bigquery'. - RuntimeError: If the transcription result structure is invalid. - """ - if engine.casefold() != "bigquery": - raise ValueError("Must specify the engine, supported value is 'bigquery'.") - - import bigframes.bigquery as bbq - import bigframes.pandas as bpd - - # col name doesn't matter here. Rename to avoid column name conflicts - audio_series = bigframes.series.Series(self._data._block) - - prompt_text = "**Task:** Transcribe the provided audio. **Instructions:** - Your response must contain only the verbatim transcription of the audio. - Do not include any introductory text, summaries, or conversational filler in your response. The output should begin directly with the first word of the audio." - - # Convert the audio series to the runtime representation required by the model. - audio_runtime = audio_series.blob._get_runtime("R", with_metadata=True) - - transcribed_results = bbq.ai.generate( - prompt=(prompt_text, audio_runtime), - connection_id=connection, - endpoint=model_name, - model_params={"generationConfig": {"temperature": 0.0}}, - ) - - # Validate that the result is not None - if transcribed_results is None: - raise RuntimeError("Transcription returned None result") - - transcribed_content_series = transcribed_results.struct.field("result").rename( - "transcribed_content" - ) - - if verbose: - transcribed_status_series = transcribed_results.struct.field("status") - results_df = bpd.DataFrame( - { - "status": transcribed_status_series, - "content": transcribed_content_series, - } - ) - results_struct = bbq.struct(results_df).rename("transcription_results") - return results_struct - else: - return transcribed_content_series.rename("transcribed_content") diff --git a/packages/bigframes/bigframes/operations/strings.py b/packages/bigframes/bigframes/operations/strings.py index 26ff2616a1b7..a5b9944424b0 100644 --- a/packages/bigframes/bigframes/operations/strings.py +++ b/packages/bigframes/bigframes/operations/strings.py @@ -305,6 +305,18 @@ def join(self, sep: str) -> T: ops.ArrayReduceOp(aggregation=agg_ops.StringAggOp(sep=sep)) ) + def _to_blob(self, connection: Optional[str] = None) -> T: + import bigframes.core.blocks + + if hasattr(self._data, "_block") and isinstance( + self._data._block, bigframes.core.blocks.Block + ): + session = self._data._block.session + else: + raise ValueError(f"{self._to_blob.__name__} is only supported via Series.str") + connection = session._create_bq_connection(connection=connection) + return self._data._apply_binary_op(connection, ops.obj_make_ref_op) + def to_blob(self, connection: Optional[str] = None) -> T: """Create a BigFrames Blob series from a series of URIs. @@ -325,16 +337,15 @@ def to_blob(self, connection: Optional[str] = None) -> T: bigframes.series.Series: Blob Series. """ - import bigframes.core.blocks + import warnings + import bigframes.exceptions as bfe - if hasattr(self._data, "_block") and isinstance( - self._data._block, bigframes.core.blocks.Block - ): - session = self._data._block.session - else: - raise ValueError("to_blob is only supported via Series.str") - connection = session._create_bq_connection(connection=connection) - return self._data._apply_binary_op(connection, ops.obj_make_ref_op) + warnings.warn( + "Series.str.to_blob is deprecated and will be removed in a future release. Use bigframes.bigquery.obj functions instead.", + category=bfe.ApiDeprecationWarning, + stacklevel=2, + ) + return self._to_blob(connection) def _parse_flags(flags: int) -> Optional[str]: diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index fbcc949855c2..17addef1ab0a 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -321,16 +321,8 @@ def list(self) -> lists.ListAccessor: return lists.ListAccessor(self) @property - def blob(self) -> blob.BlobAccessor: - """ - Accessor for Blob operations. - """ - warnings.warn( - "The blob accessor is deprecated and will be removed in a future release. Use bigframes.bigquery.obj functions instead.", - category=bfe.ApiDeprecationWarning, - stacklevel=2, - ) - return blob.BlobAccessor(self) + def _blob(self) -> blob._BlobAccessor: + return blob._BlobAccessor(self) @property @validations.requires_ordering() diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index a6bb3041764c..a025256f2b1e 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -2248,12 +2248,17 @@ def from_glob_path( bigframes.pandas.DataFrame: Result BigFrames DataFrame. """ + warnings.warn( + "from_glob_path is deprecated and will be removed in a future release. Use read_gbq with 'ref' column instead.", + category=bfe.ApiDeprecationWarning, + stacklevel=2, + ) # TODO(garrettwu): switch to pseudocolumn when b/374988109 is done. connection = self._create_bq_connection(connection=connection) table = self._create_object_table(path, connection) - s = self._loader.read_gbq_table(table)["uri"].str.to_blob(connection) + s = self._loader.read_gbq_table(table)["uri"].str._to_blob(connection) return s.rename(name).to_frame() def _create_bq_connection( @@ -2312,7 +2317,7 @@ def read_gbq_object_table( table = self.bqclient.get_table(object_table) connection = table._properties["externalDataConfiguration"]["connectionId"] - s = self._loader.read_gbq_table(object_table)["uri"].str.to_blob(connection) + s = self._loader.read_gbq_table(object_table)["uri"].str._to_blob(connection) return s.rename(name).to_frame() # ========================================================================= diff --git a/packages/bigframes/bigframes/session/polars_executor.py b/packages/bigframes/bigframes/session/polars_executor.py index 43e3609ac3c1..06c7fcb925c4 100644 --- a/packages/bigframes/bigframes/session/polars_executor.py +++ b/packages/bigframes/bigframes/session/polars_executor.py @@ -122,7 +122,7 @@ def _is_node_polars_executable(node: nodes.BigFrameNode): return False for expr in node._node_expressions: if isinstance(expr, agg_expressions.Aggregation): - if not type(expr.op) in _COMPATIBLE_AGG_OPS: + if type(expr.op) not in _COMPATIBLE_AGG_OPS: return False if isinstance(expr, expression.Expression): if not set(map(type, _get_expr_ops(expr))).issubset(_COMPATIBLE_SCALAR_OPS): diff --git a/packages/bigframes/docs/templates/toc.yml b/packages/bigframes/docs/templates/toc.yml index 5d043fd85f2a..562b857fee5c 100644 --- a/packages/bigframes/docs/templates/toc.yml +++ b/packages/bigframes/docs/templates/toc.yml @@ -87,9 +87,6 @@ uid: bigframes.operations.lists.ListAccessor - name: PlotAccessor uid: bigframes.operations.plotting.PlotAccessor - - name: BlobAccessor - uid: bigframes.operations.blob.BlobAccessor - status: beta name: Series - name: Window uid: bigframes.core.window.Window diff --git a/packages/bigframes/notebooks/kaggle/vector-search-with-bigframes-over-national-jukebox.ipynb b/packages/bigframes/notebooks/kaggle/vector-search-with-bigframes-over-national-jukebox.ipynb index fe2d567d1b31..3fd66abcbb44 100644 --- a/packages/bigframes/notebooks/kaggle/vector-search-with-bigframes-over-national-jukebox.ipynb +++ b/packages/bigframes/notebooks/kaggle/vector-search-with-bigframes-over-national-jukebox.ipynb @@ -1,23 +1,8 @@ { "cells": [ { + "id": "c62e292f", "cell_type": "markdown", - "metadata": { - "@deathbeds/jupyterlab-fonts": { - "styles": { - "": { - "body[data-jp-deck-mode='presenting'] &": { - "zoom": "194%" - } - } - } - }, - "editable": true, - "slideshow": { - "slide_type": "subslide" - }, - "tags": [] - }, "source": [ "# Creating a searchable index of the National Jukebox\n", "\n", @@ -35,42 +20,42 @@ "To follow along, you'll need a Google Cloud project\n", "\n", "* Go to https://cloud.google.com/free to start a free trial." - ] - }, - { - "cell_type": "markdown", + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { "": { "body[data-jp-deck-mode='presenting'] &": { - "z-index": "0", - "zoom": "216%" + "zoom": "194%" } } } }, + "editable": true, "slideshow": { - "slide_type": "slide" - } + "slide_type": "subslide" + }, + "tags": [] }, + "execution_count": null + }, + { + "id": "7dc312a4", + "cell_type": "markdown", "source": [ "The National Jukebox is a project of the USA Library of Congress to provide access to thousands of acoustic sound recordings from the very earliest days of the commercial record industry.\n", "\n", "* Learn more at https://www.loc.gov/collections/national-jukebox/about-this-collection/\n", "\n", "\"recording" - ] - }, - { - "cell_type": "markdown", + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { "": { "body[data-jp-deck-mode='presenting'] &": { "z-index": "0", - "zoom": "181%" + "zoom": "216%" } } } @@ -79,6 +64,11 @@ "slide_type": "slide" } }, + "execution_count": null + }, + { + "id": "07dcae4b", + "cell_type": "markdown", "source": [ "\n", "To search the National Jukebox, we combine powerful features of BigQuery:\n", @@ -96,16 +86,14 @@ "3. BigQuery DataFrames to use Python instead of SQL.\n", "\n", " https://cloud.google.com/bigquery/docs/bigquery-dataframes-introduction" - ] - }, - { - "cell_type": "markdown", + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { "": { "body[data-jp-deck-mode='presenting'] &": { - "zoom": "275%" + "z-index": "0", + "zoom": "181%" } } } @@ -114,15 +102,38 @@ "slide_type": "slide" } }, + "execution_count": null + }, + { + "id": "8dd2ddab", + "cell_type": "markdown", "source": [ "## Getting started with BigQuery DataFrames (bigframes)\n", "\n", "Install the bigframes package." - ] + ], + "metadata": { + "@deathbeds/jupyterlab-fonts": { + "styles": { + "": { + "body[data-jp-deck-mode='presenting'] &": { + "zoom": "275%" + } + } + } + }, + "slideshow": { + "slide_type": "slide" + } + }, + "execution_count": null }, { + "id": "96cda443", "cell_type": "code", - "execution_count": null, + "source": [ + "%pip install --upgrade bigframes google-cloud-automl google-cloud-translate google-ai-generativelanguage tensorflow " + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -142,13 +153,17 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "%pip install --upgrade bigframes google-cloud-automl google-cloud-translate google-ai-generativelanguage tensorflow " - ] + "execution_count": null, + "outputs": [] }, { + "id": "acf12472", "cell_type": "markdown", + "source": [ + "**Important:** restart the kernel by going to \"Run -> Restart & clear cell outputs\" before continuing.\n", + "\n", + "Configure bigframes to use your GCP project. First, go to \"Add-ons -> Google Cloud SDK\" and click the \"Attach\" button. Then," + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -161,15 +176,17 @@ } } }, - "source": [ - "**Important:** restart the kernel by going to \"Run -> Restart & clear cell outputs\" before continuing.\n", - "\n", - "Configure bigframes to use your GCP project. First, go to \"Add-ons -> Google Cloud SDK\" and click the \"Attach\" button. Then," - ] + "execution_count": null }, { + "id": "fd321077", "cell_type": "code", - "execution_count": null, + "source": [ + "from kaggle_secrets import UserSecretsClient\n", + "user_secrets = UserSecretsClient()\n", + "user_credential = user_secrets.get_gcloud_credential()\n", + "user_secrets.set_tensorflow_credential(user_credential)" + ], "metadata": { "execution": { "iopub.execute_input": "2025-08-14T15:53:08.494636Z", @@ -180,17 +197,21 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "from kaggle_secrets import UserSecretsClient\n", - "user_secrets = UserSecretsClient()\n", - "user_credential = user_secrets.get_gcloud_credential()\n", - "user_secrets.set_tensorflow_credential(user_credential)" - ] + "execution_count": null, + "outputs": [] }, { + "id": "4d837a34", "cell_type": "code", - "execution_count": null, + "source": [ + "import bigframes._config\n", + "import bigframes.pandas as bpd\n", + "\n", + "bpd.options.bigquery.location = \"US\"\n", + "\n", + "# Set to your GCP project ID.\n", + "bpd.options.bigquery.project = \"swast-scratch\"" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -210,19 +231,17 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "import bigframes._config\n", - "import bigframes.pandas as bpd\n", - "\n", - "bpd.options.bigquery.location = \"US\"\n", - "\n", - "# Set to your GCP project ID.\n", - "bpd.options.bigquery.project = \"swast-scratch\"" - ] + "execution_count": null, + "outputs": [] }, { + "id": "008f0a87", "cell_type": "markdown", + "source": [ + "## Reading data\n", + "\n", + "BigQuery DataFrames can read data from BigQuery, GCS, or even local sources. With `engine=\"bigquery\"`, BigQuery's distributed processing reads the file without it ever having to reach your local Python environment." + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -237,15 +256,19 @@ "slide_type": "slide" } }, - "source": [ - "## Reading data\n", - "\n", - "BigQuery DataFrames can read data from BigQuery, GCS, or even local sources. With `engine=\"bigquery\"`, BigQuery's distributed processing reads the file without it ever having to reach your local Python environment." - ] + "execution_count": null }, { + "id": "9a4b35ab", "cell_type": "code", - "execution_count": null, + "source": [ + "df = bpd.read_json(\n", + " \"gs://cloud-samples-data/third-party/usa-loc-national-jukebox/jukebox.jsonl\",\n", + " engine=\"bigquery\",\n", + " orient=\"records\",\n", + " lines=True,\n", + ")" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -265,19 +288,16 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "df = bpd.read_json(\n", - " \"gs://cloud-samples-data/third-party/usa-loc-national-jukebox/jukebox.jsonl\",\n", - " engine=\"bigquery\",\n", - " orient=\"records\",\n", - " lines=True,\n", - ")" - ] + "execution_count": null, + "outputs": [] }, { + "id": "e00dcb01", "cell_type": "code", - "execution_count": null, + "source": [ + "# Use `peek()` instead of `head()` to see arbitrary rows rather than the \"first\" rows.\n", + "df.peek()" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -300,15 +320,15 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "# Use `peek()` instead of `head()` to see arbitrary rows rather than the \"first\" rows.\n", - "df.peek()" - ] + "execution_count": null, + "outputs": [] }, { + "id": "335511be", "cell_type": "code", - "execution_count": null, + "source": [ + "df.shape" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -328,14 +348,18 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "df.shape" - ] + "execution_count": null, + "outputs": [] }, { + "id": "595126a1", "cell_type": "code", - "execution_count": null, + "source": [ + "# For the purposes of a demo, select only a subset of rows.\n", + "df = df.sample(n=250)\n", + "df.cache()\n", + "df.shape" + ], "metadata": { "execution": { "iopub.execute_input": "2025-08-14T15:55:55.448664Z", @@ -346,17 +370,32 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "# For the purposes of a demo, select only a subset of rows.\n", - "df = df.sample(n=250)\n", - "df.cache()\n", - "df.shape" - ] + "execution_count": null, + "outputs": [] }, { + "id": "cbd59dd9", "cell_type": "code", - "execution_count": null, + "source": [ + "# As a side effect of how I extracted the song information from the HTML DOM,\n", + "# we ended up with lists in places where we only expect one item.\n", + "#\n", + "# We can \"explode\" to flatten these lists.\n", + "flattened = df.explode([\n", + " \"Recording Repository\",\n", + " \"Recording Label\",\n", + " \"Recording Take Number\",\n", + " \"Recording Date\",\n", + " \"Recording Matrix Number\",\n", + " \"Recording Catalog Number\",\n", + " \"Media Size\",\n", + " \"Recording Location\",\n", + " \"Summary\",\n", + " \"Rights Advisory\",\n", + " \"Title\",\n", + "])\n", + "flattened.peek()" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -379,31 +418,15 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "# As a side effect of how I extracted the song information from the HTML DOM,\n", - "# we ended up with lists in places where we only expect one item.\n", - "#\n", - "# We can \"explode\" to flatten these lists.\n", - "flattened = df.explode([\n", - " \"Recording Repository\",\n", - " \"Recording Label\",\n", - " \"Recording Take Number\",\n", - " \"Recording Date\",\n", - " \"Recording Matrix Number\",\n", - " \"Recording Catalog Number\",\n", - " \"Media Size\",\n", - " \"Recording Location\",\n", - " \"Summary\",\n", - " \"Rights Advisory\",\n", - " \"Title\",\n", - "])\n", - "flattened.peek()" - ] + "execution_count": null, + "outputs": [] }, { + "id": "84548649", "cell_type": "code", - "execution_count": null, + "source": [ + "flattened.shape" + ], "metadata": { "execution": { "iopub.execute_input": "2025-08-14T15:56:06.546531Z", @@ -414,13 +437,15 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "flattened.shape" - ] + "execution_count": null, + "outputs": [] }, { + "id": "8be3127f", "cell_type": "markdown", + "source": [ + "To access unstructured data from BigQuery, create a URI pointing to a file in Google Cloud Storage (GCS). Then, construct a \"blob\" (also known as an \"Object Ref\" in BigQuery terms) so that BigQuery can read from GCS." + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -437,13 +462,20 @@ }, "tags": [] }, - "source": [ - "To access unstructured data from BigQuery, create a URI pointing to a file in Google Cloud Storage (GCS). Then, construct a \"blob\" (also known as an \"Object Ref\" in BigQuery terms) so that BigQuery can read from GCS." - ] + "execution_count": null }, { + "id": "31277e21", "cell_type": "code", - "execution_count": null, + "source": [ + "flattened = flattened.assign(**{\n", + " \"GCS Prefix\": \"gs://cloud-samples-data/third-party/usa-loc-national-jukebox/\",\n", + " \"GCS Stub\": flattened['URL'].str.extract(r'/(jukebox-[0-9]+)/'),\n", + "})\n", + "flattened[\"GCS URI\"] = flattened[\"GCS Prefix\"] + flattened[\"GCS Stub\"] + \".mp3\"\n", + "# Note: str.to_blob is deprecated.\n", + "flattened[\"GCS Blob\"] = flattened[\"GCS URI\"].str.to_blob()" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -468,18 +500,15 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "flattened = flattened.assign(**{\n", - " \"GCS Prefix\": \"gs://cloud-samples-data/third-party/usa-loc-national-jukebox/\",\n", - " \"GCS Stub\": flattened['URL'].str.extract(r'/(jukebox-[0-9]+)/'),\n", - "})\n", - "flattened[\"GCS URI\"] = flattened[\"GCS Prefix\"] + flattened[\"GCS Stub\"] + \".mp3\"\n", - "flattened[\"GCS Blob\"] = flattened[\"GCS URI\"].str.to_blob()" - ] + "execution_count": null, + "outputs": [] }, { + "id": "d27756f5", "cell_type": "markdown", + "source": [ + "BigQuery (and BigQuery DataFrames) provide access to powerful models and multimodal capabilities. Here, we transcribe audio to text." + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -496,13 +525,20 @@ }, "tags": [] }, - "source": [ - "BigQuery (and BigQuery DataFrames) provide access to powerful models and multimodal capabilities. Here, we transcribe audio to text." - ] + "execution_count": null }, { + "id": "d1f7ad46", "cell_type": "code", - "execution_count": null, + "source": [ + "# Note: .blob.audio_transcribe is removed. This cell will fail.\n", + "# Use bigframes.bigquery.ai.generate instead.\n", + "flattened[\"Transcription\"] = flattened[\"GCS Blob\"].blob.audio_transcribe(\n", + " model_name=\"gemini-2.0-flash-001\",\n", + " verbose=True,\n", + ")\n", + "flattened[\"Transcription\"]" + ], "metadata": { "editable": true, "execution": { @@ -518,17 +554,15 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "flattened[\"Transcription\"] = flattened[\"GCS Blob\"].blob.audio_transcribe(\n", - " model_name=\"gemini-2.0-flash-001\",\n", - " verbose=True,\n", - ")\n", - "flattened[\"Transcription\"]" - ] + "execution_count": null, + "outputs": [] }, { + "id": "1575c468", "cell_type": "markdown", + "source": [ + "Sometimes the model has transient errors. Check the status column to see if there are errors." + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -543,13 +577,16 @@ "slide_type": "slide" } }, - "source": [ - "Sometimes the model has transient errors. Check the status column to see if there are errors." - ] + "execution_count": null }, { + "id": "e53c7a0b", "cell_type": "code", - "execution_count": null, + "source": [ + "print(f\"Successful rows: {(flattened['Transcription'].struct.field('status') == '').sum()}\")\n", + "print(f\"Failed rows: {(flattened['Transcription'].struct.field('status') != '').sum()}\")\n", + "flattened.shape" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -574,16 +611,16 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "print(f\"Successful rows: {(flattened['Transcription'].struct.field('status') == '').sum()}\")\n", - "print(f\"Failed rows: {(flattened['Transcription'].struct.field('status') != '').sum()}\")\n", - "flattened.shape" - ] + "execution_count": null, + "outputs": [] }, { + "id": "3629f4af", "cell_type": "code", - "execution_count": null, + "source": [ + "# Show transcribed lyrics.\n", + "flattened[\"Transcription\"].struct.field(\"content\")" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -603,15 +640,19 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "# Show transcribed lyrics.\n", - "flattened[\"Transcription\"].struct.field(\"content\")" - ] + "execution_count": null, + "outputs": [] }, { + "id": "09ef6c3d", "cell_type": "code", - "execution_count": null, + "source": [ + "# Find all instrumentatal songs\n", + "instrumental = flattened[flattened[\"Transcription\"].struct.field(\"content\") == \"\"]\n", + "print(instrumental.shape)\n", + "song = instrumental.peek(1)\n", + "song" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -634,18 +675,22 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "# Find all instrumentatal songs\n", - "instrumental = flattened[flattened[\"Transcription\"].struct.field(\"content\") == \"\"]\n", - "print(instrumental.shape)\n", - "song = instrumental.peek(1)\n", - "song" - ] + "execution_count": null, + "outputs": [] }, { + "id": "cf15986a", "cell_type": "code", - "execution_count": null, + "source": [ + "import gcsfs\n", + "import IPython.display\n", + "\n", + "fs = gcsfs.GCSFileSystem(project='bigframes-dev')\n", + "with fs.open(song[\"GCS URI\"].iloc[0]) as song_file:\n", + " song_bytes = song_file.read()\n", + "\n", + "IPython.display.Audio(song_bytes)" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -670,20 +715,19 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "import gcsfs\n", - "import IPython.display\n", - "\n", - "fs = gcsfs.GCSFileSystem(project='bigframes-dev')\n", - "with fs.open(song[\"GCS URI\"].iloc[0]) as song_file:\n", - " song_bytes = song_file.read()\n", - "\n", - "IPython.display.Audio(song_bytes)" - ] + "execution_count": null, + "outputs": [] }, { + "id": "778d0ac3", "cell_type": "markdown", + "source": [ + "## Creating a searchable index\n", + "\n", + "To be able to search by semantics rather than just text, generate embeddings and then create an index to efficiently search these.\n", + "\n", + "See also, this example: https://github.com/googleapis/python-bigquery-dataframes/blob/main/notebooks/generative_ai/bq_dataframes_llm_vector_search.ipynb" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -698,17 +742,16 @@ "slide_type": "slide" } }, - "source": [ - "## Creating a searchable index\n", - "\n", - "To be able to search by semantics rather than just text, generate embeddings and then create an index to efficiently search these.\n", - "\n", - "See also, this example: https://github.com/googleapis/python-bigquery-dataframes/blob/main/notebooks/generative_ai/bq_dataframes_llm_vector_search.ipynb" - ] + "execution_count": null }, { + "id": "de7e4e11", "cell_type": "code", - "execution_count": null, + "source": [ + "from bigframes.ml.llm import TextEmbeddingGenerator\n", + "\n", + "text_model = TextEmbeddingGenerator(model_name=\"text-multilingual-embedding-002\")" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -728,16 +771,21 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "from bigframes.ml.llm import TextEmbeddingGenerator\n", - "\n", - "text_model = TextEmbeddingGenerator(model_name=\"text-multilingual-embedding-002\")" - ] + "execution_count": null, + "outputs": [] }, { + "id": "4acfb495", "cell_type": "code", - "execution_count": null, + "source": [ + "df_to_index = (\n", + " flattened\n", + " .assign(content=flattened[\"Transcription\"].struct.field(\"content\"))\n", + " [flattened[\"Transcription\"].struct.field(\"content\") != \"\"]\n", + ")\n", + "embedding = text_model.predict(df_to_index)\n", + "embedding.peek(1)" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -757,20 +805,18 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "df_to_index = (\n", - " flattened\n", - " .assign(content=flattened[\"Transcription\"].struct.field(\"content\"))\n", - " [flattened[\"Transcription\"].struct.field(\"content\") != \"\"]\n", - ")\n", - "embedding = text_model.predict(df_to_index)\n", - "embedding.peek(1)" - ] + "execution_count": null, + "outputs": [] }, { + "id": "a49d1dde", "cell_type": "code", - "execution_count": null, + "source": [ + "# Check the status column to look for errors.\n", + "print(f\"Successful rows: {(embedding['ml_generate_embedding_status'] == '').sum()}\")\n", + "print(f\"Failed rows: {(embedding['ml_generate_embedding_status'] != '').sum()}\")\n", + "embedding.shape" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -795,16 +841,15 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "# Check the status column to look for errors.\n", - "print(f\"Successful rows: {(embedding['ml_generate_embedding_status'] == '').sum()}\")\n", - "print(f\"Failed rows: {(embedding['ml_generate_embedding_status'] != '').sum()}\")\n", - "embedding.shape" - ] + "execution_count": null, + "outputs": [] }, { + "id": "15a5bfd3", "cell_type": "markdown", + "source": [ + "We're now ready to save this to a table." + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -816,13 +861,15 @@ } } }, - "source": [ - "We're now ready to save this to a table." - ] + "execution_count": null }, { + "id": "8b49384c", "cell_type": "code", - "execution_count": null, + "source": [ + "embedding_table_id = f\"{bpd.options.bigquery.project}.kaggle.national_jukebox\"\n", + "embedding.to_gbq(embedding_table_id, if_exists=\"replace\")" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -842,14 +889,20 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "embedding_table_id = f\"{bpd.options.bigquery.project}.kaggle.national_jukebox\"\n", - "embedding.to_gbq(embedding_table_id, if_exists=\"replace\")" - ] + "execution_count": null, + "outputs": [] }, { + "id": "810c77d5", "cell_type": "markdown", + "source": [ + "## Searching the database\n", + "\n", + "To search by semantics, we:\n", + "\n", + "1. Turn our search string into an embedding using the same model as our index.\n", + "2. Find the closest matches to the search string." + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -864,18 +917,17 @@ "slide_type": "slide" } }, - "source": [ - "## Searching the database\n", - "\n", - "To search by semantics, we:\n", - "\n", - "1. Turn our search string into an embedding using the same model as our index.\n", - "2. Find the closest matches to the search string." - ] + "execution_count": null }, { + "id": "fb63ad94", "cell_type": "code", - "execution_count": null, + "source": [ + "import bigframes.pandas as bpd\n", + "\n", + "df_written = bpd.read_gbq(embedding_table_id)\n", + "df_written.peek(1)" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -898,17 +950,22 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "import bigframes.pandas as bpd\n", - "\n", - "df_written = bpd.read_gbq(embedding_table_id)\n", - "df_written.peek(1)" - ] + "execution_count": null, + "outputs": [] }, { + "id": "f19c88d3", "cell_type": "code", - "execution_count": null, + "source": [ + "from bigframes.ml.llm import TextEmbeddingGenerator\n", + "\n", + "search_string = \"walking home\"\n", + "\n", + "text_model = TextEmbeddingGenerator(model_name=\"text-multilingual-embedding-002\")\n", + "search_df = bpd.DataFrame([search_string], columns=['search_string'])\n", + "search_embedding = text_model.predict(search_df)\n", + "search_embedding" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -928,21 +985,24 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "from bigframes.ml.llm import TextEmbeddingGenerator\n", - "\n", - "search_string = \"walking home\"\n", - "\n", - "text_model = TextEmbeddingGenerator(model_name=\"text-multilingual-embedding-002\")\n", - "search_df = bpd.DataFrame([search_string], columns=['search_string'])\n", - "search_embedding = text_model.predict(search_df)\n", - "search_embedding" - ] + "execution_count": null, + "outputs": [] }, { + "id": "06f0312e", "cell_type": "code", - "execution_count": null, + "source": [ + "import bigframes.bigquery as bbq\n", + "\n", + "vector_search_results = bbq.vector_search(\n", + " base_table=f\"swast-scratch.scipy2025.national_jukebox\",\n", + " column_to_search=\"ml_generate_embedding_result\",\n", + " query=search_embedding,\n", + " distance_type=\"COSINE\",\n", + " query_column_to_search=\"ml_generate_embedding_result\",\n", + " top_k=5,\n", + ")" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -967,23 +1027,15 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "import bigframes.bigquery as bbq\n", - "\n", - "vector_search_results = bbq.vector_search(\n", - " base_table=f\"swast-scratch.scipy2025.national_jukebox\",\n", - " column_to_search=\"ml_generate_embedding_result\",\n", - " query=search_embedding,\n", - " distance_type=\"COSINE\",\n", - " query_column_to_search=\"ml_generate_embedding_result\",\n", - " top_k=5,\n", - ")" - ] + "execution_count": null, + "outputs": [] }, { + "id": "fae3fcae", "cell_type": "code", - "execution_count": null, + "source": [ + "vector_search_results.dtypes" + ], "metadata": { "execution": { "iopub.execute_input": "2025-08-14T16:05:50.566930Z", @@ -994,14 +1046,16 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "vector_search_results.dtypes" - ] + "execution_count": null, + "outputs": [] }, { + "id": "38423dde", "cell_type": "code", - "execution_count": null, + "source": [ + "results = vector_search_results[[\"Title\", \"Summary\", \"Names\", \"GCS URI\", \"Transcription\", \"distance\"]].sort_values(\"distance\").to_pandas()\n", + "results" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -1024,15 +1078,15 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "results = vector_search_results[[\"Title\", \"Summary\", \"Names\", \"GCS URI\", \"Transcription\", \"distance\"]].sort_values(\"distance\").to_pandas()\n", - "results" - ] + "execution_count": null, + "outputs": [] }, { + "id": "37a1dfbd", "cell_type": "code", - "execution_count": null, + "source": [ + "print(results[\"Transcription\"].struct.field(\"content\").iloc[0])" + ], "metadata": { "@deathbeds/jupyterlab-fonts": { "styles": { @@ -1052,14 +1106,22 @@ }, "trusted": true }, - "outputs": [], - "source": [ - "print(results[\"Transcription\"].struct.field(\"content\").iloc[0])" - ] + "execution_count": null, + "outputs": [] }, { + "id": "a4748e0f", "cell_type": "code", - "execution_count": null, + "source": [ + "import gcsfs\n", + "import IPython.display\n", + "\n", + "fs = gcsfs.GCSFileSystem(project='bigframes-dev')\n", + "with fs.open(results[\"GCS URI\"].iloc[0]) as song_file:\n", + " song_bytes = song_file.read()\n", + "\n", + "IPython.display.Audio(song_bytes)" + ], "metadata": { "editable": true, "execution": { @@ -1076,26 +1138,18 @@ "tags": [], "trusted": true }, - "outputs": [], - "source": [ - "import gcsfs\n", - "import IPython.display\n", - "\n", - "fs = gcsfs.GCSFileSystem(project='bigframes-dev')\n", - "with fs.open(results[\"GCS URI\"].iloc[0]) as song_file:\n", - " song_bytes = song_file.read()\n", - "\n", - "IPython.display.Audio(song_bytes)" - ] + "execution_count": null, + "outputs": [] }, { + "id": "ff22e7eb", "cell_type": "code", - "execution_count": null, + "source": [], "metadata": { "trusted": true }, - "outputs": [], - "source": [] + "execution_count": null, + "outputs": [] } ], "metadata": { @@ -1132,6 +1186,6 @@ "version": "3.11.13" } }, - "nbformat": 4, - "nbformat_minor": 4 + "nbformat_minor": 4, + "nbformat": 4 } diff --git a/packages/bigframes/tests/system/large/blob/test_function.py b/packages/bigframes/tests/system/large/blob/test_function.py deleted file mode 100644 index bc09baf268d1..000000000000 --- a/packages/bigframes/tests/system/large/blob/test_function.py +++ /dev/null @@ -1,853 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import traceback -import uuid -from typing import Generator - -import pandas as pd -import pytest -from google.cloud import storage - -import bigframes -import bigframes.pandas as bpd -from bigframes import dtypes - -pytest.skip("Skipping blob tests due to b/481790217", allow_module_level=True) - - -@pytest.fixture(scope="function") -def images_output_folder() -> Generator[str, None, None]: - id = uuid.uuid4().hex - folder = os.path.join("gs://bigframes_blob_test/output/", id) - yield folder - - # clean up - try: - cloud_storage_client = storage.Client() - bucket = cloud_storage_client.bucket("bigframes_blob_test") - blobs = bucket.list_blobs(prefix="output/" + id) - for blob in blobs: - blob.delete() - except Exception as exc: - traceback.print_exception(type(exc), exc, None) - - -@pytest.fixture(scope="function") -def images_output_uris(images_output_folder: str) -> list[str]: - return [ - os.path.join(images_output_folder, "img0.jpg"), - os.path.join(images_output_folder, "img1.jpg"), - ] - - -def test_blob_exif( - bq_connection: str, - session: bigframes.Session, -): - exif_image_df = session.from_glob_path( - "gs://bigframes_blob_test/images_exif/*", - name="blob_col", - connection=bq_connection, - ) - - actual = exif_image_df["blob_col"].blob.exif( - engine="pillow", connection=bq_connection, verbose=False - ) - expected = bpd.Series( - ['{"ExifOffset": 47, "Make": "MyCamera"}'], - session=session, - dtype=dtypes.JSON_DTYPE, - ) - pd.testing.assert_series_equal( - actual.to_pandas(), - expected.to_pandas(), - check_dtype=False, - check_index_type=False, - ) - - -def test_blob_exif_verbose( - bq_connection: str, - session: bigframes.Session, -): - exif_image_df = session.from_glob_path( - "gs://bigframes_blob_test/images_exif/*", - name="blob_col", - connection=bq_connection, - ) - - actual = exif_image_df["blob_col"].blob.exif( - engine="pillow", connection=bq_connection, verbose=True - ) - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.JSON_DTYPE - - -def test_blob_image_blur_to_series( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_uris: list[str], - session: bigframes.Session, -): - series = bpd.Series(images_output_uris, session=session).str.to_blob( - connection=bq_connection - ) - - actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=False - ) - - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_blur_to_series_verbose( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_uris: list[str], - session: bigframes.Session, -): - series = bpd.Series(images_output_uris, session=session).str.to_blob( - connection=bq_connection - ) - - actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=True - ) - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - # Content should be blob objects for GCS destination - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_blur_to_folder( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_folder: str, - images_output_uris: list[str], -): - actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), - dst=images_output_folder, - connection=bq_connection, - engine="opencv", - verbose=False, - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_blur_to_folder_verbose( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_folder: str, - images_output_uris: list[str], -): - actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), - dst=images_output_folder, - connection=bq_connection, - engine="opencv", - verbose=True, - ) - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_blur_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): - actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), connection=bq_connection, engine="opencv", verbose=False - ) - - assert isinstance(actual, bpd.Series) - assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE - - -def test_blob_image_blur_to_bq_verbose(images_mm_df: bpd.DataFrame, bq_connection: str): - actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), connection=bq_connection, engine="opencv", verbose=True - ) - - assert isinstance(actual, bpd.Series) - assert len(actual) == 2 - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.BYTES_DTYPE - - -def test_blob_image_resize_to_series( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_uris: list[str], - session: bigframes.Session, -): - series = bpd.Series(images_output_uris, session=session).str.to_blob( - connection=bq_connection - ) - - actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), - dst=series, - connection=bq_connection, - engine="opencv", - verbose=False, - ) - - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_resize_to_series_verbose( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_uris: list[str], - session: bigframes.Session, -): - series = bpd.Series(images_output_uris, session=session).str.to_blob( - connection=bq_connection - ) - - actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), - dst=series, - connection=bq_connection, - engine="opencv", - verbose=True, - ) - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_resize_to_folder( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_folder: str, - images_output_uris: list[str], -): - actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), - dst=images_output_folder, - connection=bq_connection, - engine="opencv", - verbose=False, - ) - - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_resize_to_folder_verbose( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_folder: str, - images_output_uris: list[str], -): - actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), - dst=images_output_folder, - connection=bq_connection, - engine="opencv", - verbose=True, - ) - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - # verify the files exist - assert not content_series.blob.size().isna().any() - - -def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): - actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), connection=bq_connection, engine="opencv", verbose=False - ) - - assert isinstance(actual, bpd.Series) - assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE - - -def test_blob_image_resize_to_bq_verbose( - images_mm_df: bpd.DataFrame, bq_connection: str -): - actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), connection=bq_connection, engine="opencv", verbose=True - ) - - assert isinstance(actual, bpd.Series) - assert len(actual) == 2 - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.BYTES_DTYPE - - -def test_blob_image_normalize_to_series( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_uris: list[str], - session: bigframes.Session, -): - series = bpd.Series(images_output_uris, session=session).str.to_blob( - connection=bq_connection - ) - - actual = images_mm_df["blob_col"].blob.image_normalize( - alpha=50.0, - beta=150.0, - norm_type="minmax", - dst=series, - connection=bq_connection, - engine="opencv", - verbose=False, - ) - - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_normalize_to_series_verbose( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_uris: list[str], - session: bigframes.Session, -): - series = bpd.Series(images_output_uris, session=session).str.to_blob( - connection=bq_connection - ) - - actual = images_mm_df["blob_col"].blob.image_normalize( - alpha=50.0, - beta=150.0, - norm_type="minmax", - dst=series, - connection=bq_connection, - engine="opencv", - verbose=True, - ) - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - -def test_blob_image_normalize_to_folder( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_folder: str, - images_output_uris: list[str], -): - actual = images_mm_df["blob_col"].blob.image_normalize( - alpha=50.0, - beta=150.0, - norm_type="minmax", - dst=images_output_folder, - connection=bq_connection, - engine="opencv", - verbose=False, - ) - - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - - # verify the files exist - assert not actual.blob.size().isna().any() - - -def test_blob_image_normalize_to_folder_verbose( - images_mm_df: bpd.DataFrame, - bq_connection: str, - images_output_folder: str, - images_output_uris: list[str], -): - actual = images_mm_df["blob_col"].blob.image_normalize( - alpha=50.0, - beta=150.0, - norm_type="minmax", - dst=images_output_folder, - connection=bq_connection, - engine="opencv", - verbose=True, - ) - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - -def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): - actual = images_mm_df["blob_col"].blob.image_normalize( - alpha=50.0, - beta=150.0, - norm_type="minmax", - connection=bq_connection, - engine="opencv", - verbose=False, - ) - - assert isinstance(actual, bpd.Series) - assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE - - -def test_blob_image_normalize_to_bq_verbose( - images_mm_df: bpd.DataFrame, bq_connection: str -): - actual = images_mm_df["blob_col"].blob.image_normalize( - alpha=50.0, - beta=150.0, - norm_type="minmax", - connection=bq_connection, - engine="opencv", - verbose=True, - ) - - assert isinstance(actual, bpd.Series) - assert len(actual) == 2 - - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.BYTES_DTYPE - - -def test_blob_pdf_extract( - pdf_mm_df: bpd.DataFrame, - bq_connection: str, -): - actual = ( - pdf_mm_df["pdf"] - .blob.pdf_extract(connection=bq_connection, verbose=False, engine="pypdf") - .explode() - .to_pandas() - ) - - # check relative length - expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." - expected_len = len(expected_text) - - actual_text = actual[actual != ""].iloc[0] - actual_len = len(actual_text) - - relative_length_tolerance = 0.25 - min_acceptable_len = expected_len * (1 - relative_length_tolerance) - max_acceptable_len = expected_len * (1 + relative_length_tolerance) - assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose=False): Extracted text length {actual_len} is outside the acceptable range " - f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " - f"Expected reference length was {expected_len}. " - ) - - # check for major keywords - major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] - for keyword in major_keywords: - assert keyword.lower() in actual_text.lower(), ( - f"Item (verbose=False): Expected keyword '{keyword}' not found in extracted text. " - ) - - -def test_blob_pdf_extract_verbose( - pdf_mm_df: bpd.DataFrame, - bq_connection: str, -): - actual = ( - pdf_mm_df["pdf"] - .blob.pdf_extract(connection=bq_connection, verbose=True, engine="pypdf") - .explode() - .to_pandas() - ) - - # check relative length - expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." - expected_len = len(expected_text) - - # The first entry is for a file that doesn't exist, so we check the second one - successful_results = actual[actual.apply(lambda x: x["status"] == "")] - actual_text = successful_results.apply(lambda x: x["content"]).iloc[0] - actual_len = len(actual_text) - - relative_length_tolerance = 0.25 - min_acceptable_len = expected_len * (1 - relative_length_tolerance) - max_acceptable_len = expected_len * (1 + relative_length_tolerance) - assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose=True): Extracted text length {actual_len} is outside the acceptable range " - f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " - f"Expected reference length was {expected_len}. " - ) - - # check for major keywords - major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] - for keyword in major_keywords: - assert keyword.lower() in actual_text.lower(), ( - f"Item (verbose=True): Expected keyword '{keyword}' not found in extracted text. " - ) - - -def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, bq_connection: str): - actual = ( - pdf_mm_df["pdf"] - .blob.pdf_chunk( - connection=bq_connection, - chunk_size=50, - overlap_size=10, - verbose=False, - engine="pypdf", - ) - .explode() - .to_pandas() - ) - - # check relative length - expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." - expected_len = len(expected_text) - - # First entry is NA - actual_text = "".join(actual.dropna()) - actual_len = len(actual_text) - - relative_length_tolerance = 0.25 - min_acceptable_len = expected_len * (1 - relative_length_tolerance) - max_acceptable_len = expected_len * (1 + relative_length_tolerance) - assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose=False): Extracted text length {actual_len} is outside the acceptable range " - f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " - f"Expected reference length was {expected_len}. " - ) - - # check for major keywords - major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] - for keyword in major_keywords: - assert keyword.lower() in actual_text.lower(), ( - f"Item (verbose=False): Expected keyword '{keyword}' not found in extracted text. " - ) - - -def test_blob_pdf_chunk_verbose(pdf_mm_df: bpd.DataFrame, bq_connection: str): - actual = ( - pdf_mm_df["pdf"] - .blob.pdf_chunk( - connection=bq_connection, - chunk_size=50, - overlap_size=10, - verbose=True, - engine="pypdf", - ) - .explode() - .to_pandas() - ) - - # check relative length - expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." - expected_len = len(expected_text) - - # The first entry is for a file that doesn't exist, so we check the second one - successful_results = actual[actual.apply(lambda x: x["status"] == "")] - actual_text = "".join(successful_results.apply(lambda x: x["content"]).iloc[0]) - actual_len = len(actual_text) - - relative_length_tolerance = 0.25 - min_acceptable_len = expected_len * (1 - relative_length_tolerance) - max_acceptable_len = expected_len * (1 + relative_length_tolerance) - assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose=True): Extracted text length {actual_len} is outside the acceptable range " - f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " - f"Expected reference length was {expected_len}. " - ) - - # check for major keywords - major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] - for keyword in major_keywords: - assert keyword.lower() in actual_text.lower(), ( - f"Item (verbose=True): Expected keyword '{keyword}' not found in extracted text. " - ) - - -@pytest.mark.parametrize( - "model_name", - [ - "gemini-2.0-flash-001", - "gemini-2.0-flash-lite-001", - ], -) -def test_blob_transcribe( - audio_mm_df: bpd.DataFrame, - model_name: str, -): - actual = ( - audio_mm_df["audio"] - .blob.audio_transcribe( - model_name=model_name, # type: ignore - verbose=False, - ) - .to_pandas() - ) - - # check relative length - expected_text = "Now, as all books not primarily intended as picture-books consist principally of types composed to form letterpress" - expected_len = len(expected_text) - - actual_text = actual[0] - - if pd.isna(actual_text) or actual_text == "": - # Ensure the tests are robust to flakes in the model, which isn't - # particularly useful information for the bigframes team. - logging.warning(f"blob_transcribe() model {model_name} verbose=False failure") - return - - actual_len = len(actual_text) - - relative_length_tolerance = 0.2 - min_acceptable_len = expected_len * (1 - relative_length_tolerance) - max_acceptable_len = expected_len * (1 + relative_length_tolerance) - assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose=False): Transcribed text length {actual_len} is outside the acceptable range " - f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " - f"Expected reference length was {expected_len}. " - ) - - # check for major keywords - major_keywords = ["book", "picture"] - for keyword in major_keywords: - assert keyword.lower() in actual_text.lower(), ( - f"Item (verbose=False): Expected keyword '{keyword}' not found in transcribed text. " - ) - - -@pytest.mark.parametrize( - "model_name", - [ - "gemini-2.0-flash-001", - "gemini-2.0-flash-lite-001", - ], -) -def test_blob_transcribe_verbose( - audio_mm_df: bpd.DataFrame, - model_name: str, -): - actual = ( - audio_mm_df["audio"] - .blob.audio_transcribe( - model_name=model_name, # type: ignore - verbose=True, - ) - .to_pandas() - ) - - # check relative length - expected_text = "Now, as all books not primarily intended as picture-books consist principally of types composed to form letterpress" - expected_len = len(expected_text) - - actual_text = actual[0]["content"] - - if pd.isna(actual_text) or actual_text == "": - # Ensure the tests are robust to flakes in the model, which isn't - # particularly useful information for the bigframes team. - logging.warning(f"blob_transcribe() model {model_name} verbose=True failure") - return - - actual_len = len(actual_text) - - relative_length_tolerance = 0.2 - min_acceptable_len = expected_len * (1 - relative_length_tolerance) - max_acceptable_len = expected_len * (1 + relative_length_tolerance) - assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose=True): Transcribed text length {actual_len} is outside the acceptable range " - f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " - f"Expected reference length was {expected_len}. " - ) - - # check for major keywords - major_keywords = ["book", "picture"] - for keyword in major_keywords: - assert keyword.lower() in actual_text.lower(), ( - f"Item (verbose=True): Expected keyword '{keyword}' not found in transcribed text. " - ) diff --git a/packages/bigframes/tests/system/small/blob/test_io.py b/packages/bigframes/tests/system/small/blob/test_io.py deleted file mode 100644 index c89fb4c6e6ed..000000000000 --- a/packages/bigframes/tests/system/small/blob/test_io.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock - -import pandas as pd -import pytest - -import bigframes -import bigframes.pandas as bpd - -pytest.skip("Skipping blob tests due to b/481790217", allow_module_level=True) - - -idisplay = pytest.importorskip("IPython.display") - - -def test_blob_create_from_uri_str( - bq_connection: str, session: bigframes.Session, images_uris -): - uri_series = bpd.Series(images_uris, session=session) - blob_series = uri_series.str.to_blob(connection=bq_connection) - - pd_blob_df = blob_series.struct.explode().to_pandas() - expected_pd_df = pd.DataFrame( - { - "uri": images_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - - pd.testing.assert_frame_equal( - pd_blob_df, expected_pd_df, check_dtype=False, check_index_type=False - ) - - -def test_blob_create_from_glob_path( - bq_connection: str, session: bigframes.Session, images_gcs_path, images_uris -): - blob_df = session.from_glob_path( - images_gcs_path, connection=bq_connection, name="blob_col" - ) - pd_blob_df = ( - blob_df["blob_col"] - .struct.explode() - .to_pandas() - .sort_values("uri") - .reset_index(drop=True) - ) - - expected_df = pd.DataFrame( - { - "uri": images_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - - pd.testing.assert_frame_equal( - pd_blob_df, expected_df, check_dtype=False, check_index_type=False - ) - - -def test_blob_create_read_gbq_object_table( - bq_connection: str, session: bigframes.Session, images_gcs_path, images_uris -): - obj_table = session._create_object_table(images_gcs_path, bq_connection) - - blob_df = session.read_gbq_object_table(obj_table, name="blob_col") - pd_blob_df = ( - blob_df["blob_col"] - .struct.explode() - .to_pandas() - .sort_values("uri") - .reset_index(drop=True) - ) - expected_df = pd.DataFrame( - { - "uri": images_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - - pd.testing.assert_frame_equal( - pd_blob_df, expected_df, check_dtype=False, check_index_type=False - ) - - -def test_display_images(monkeypatch, images_mm_df: bpd.DataFrame): - mock_display = mock.Mock() - monkeypatch.setattr(idisplay, "display", mock_display) - - images_mm_df["blob_col"].blob.display() - - for call in mock_display.call_args_list: - args, _ = call - arg = args[0] - assert isinstance(arg, idisplay.Image) - - -def test_display_nulls( - monkeypatch, - bq_connection: str, - session: bigframes.Session, -): - uri_series = bpd.Series([None, None, None], dtype="string", session=session) - blob_series = uri_series.str.to_blob(connection=bq_connection) - mock_display = mock.Mock() - monkeypatch.setattr(idisplay, "display", mock_display) - - blob_series.blob.display() - - for call in mock_display.call_args_list: - args, _ = call - arg = args[0] - assert arg == "" diff --git a/packages/bigframes/tests/system/small/blob/test_properties.py b/packages/bigframes/tests/system/small/blob/test_properties.py deleted file mode 100644 index f63de38a8ce9..000000000000 --- a/packages/bigframes/tests/system/small/blob/test_properties.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pandas as pd -import pytest - -import bigframes.dtypes as dtypes -import bigframes.pandas as bpd - -pytest.skip("Skipping blob tests due to b/481790217", allow_module_level=True) - - -def test_blob_uri(images_uris: list[str], images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.uri().to_pandas() - expected = pd.Series(images_uris, name="uri") - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) - - -def test_blob_authorizer(images_mm_df: bpd.DataFrame, bq_connection: str): - actual = images_mm_df["blob_col"].blob.authorizer().to_pandas() - expected = pd.Series( - [bq_connection.casefold(), bq_connection.casefold()], name="authorizer" - ) - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) - - -def test_blob_version(images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.version().to_pandas() - expected = pd.Series(["1753907851152593", "1753907851111538"], name="version") - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) - - -def test_blob_metadata(images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.metadata().to_pandas() - expected = pd.Series( - [ - ( - '{"content_type":"image/jpeg",' - '"md5_hash":"e130ad042261a1883cd2cc06831cf748",' - '"size":338390,' - '"updated":1753907851000000}' - ), - ( - '{"content_type":"image/jpeg",' - '"md5_hash":"e2ae3191ff2b809fd0935f01a537c650",' - '"size":43333,' - '"updated":1753907851000000}' - ), - ], - name="metadata", - dtype=dtypes.JSON_DTYPE, - ) - expected.index = expected.index.astype(dtypes.INT_DTYPE) - pd.testing.assert_series_equal(actual, expected) - - -def test_blob_content_type(images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.content_type().to_pandas() - expected = pd.Series(["image/jpeg", "image/jpeg"], name="content_type") - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) - - -def test_blob_md5_hash(images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.md5_hash().to_pandas() - expected = pd.Series( - ["e130ad042261a1883cd2cc06831cf748", "e2ae3191ff2b809fd0935f01a537c650"], - name="md5_hash", - ) - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) - - -def test_blob_size(images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.size().to_pandas() - expected = pd.Series([338390, 43333], name="size") - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) - - -def test_blob_updated(images_mm_df: bpd.DataFrame): - actual = images_mm_df["blob_col"].blob.updated().to_pandas() - expected = pd.Series( - [ - pd.Timestamp("2025-07-30 20:37:31", tz="UTC"), - pd.Timestamp("2025-07-30 20:37:31", tz="UTC"), - ], - name="updated", - ) - - pd.testing.assert_series_equal( - actual, expected, check_dtype=False, check_index_type=False - ) diff --git a/packages/bigframes/tests/system/small/blob/test_urls.py b/packages/bigframes/tests/system/small/blob/test_urls.py deleted file mode 100644 index b2dd6604343e..000000000000 --- a/packages/bigframes/tests/system/small/blob/test_urls.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -import bigframes.pandas as bpd - -pytest.skip("Skipping blob tests due to b/481790217", allow_module_level=True) - - -def test_blob_read_url(images_mm_df: bpd.DataFrame): - urls = images_mm_df["blob_col"].blob.read_url() - - assert urls.str.startswith("https://storage.googleapis.com/").all() - - -def test_blob_write_url(images_mm_df: bpd.DataFrame): - urls = images_mm_df["blob_col"].blob.write_url() - - assert urls.str.startswith("https://storage.googleapis.com/").all() diff --git a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_blob_ops.py b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_blob_ops.py index 4bfd50fef4ec..7130c7ac1610 100644 --- a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_blob_ops.py +++ b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_blob_ops.py @@ -21,15 +21,9 @@ pytest.importorskip("pytest_snapshot") -def test_obj_fetch_metadata(scalar_types_df: bpd.DataFrame, snapshot): - blob_s = scalar_types_df["string_col"].str.to_blob() - sql = blob_s.blob.version().to_frame().sql - snapshot.assert_match(sql, "out.sql") - - def test_obj_get_access_url(scalar_types_df: bpd.DataFrame, snapshot): - blob_s = scalar_types_df["string_col"].str.to_blob() - sql = blob_s.blob.read_url().to_frame().sql + blob_s = scalar_types_df["string_col"].str._to_blob() + sql = blob_s._blob._read_url().to_frame().sql snapshot.assert_match(sql, "out.sql") @@ -45,7 +39,7 @@ def test_obj_get_access_url_with_duration(scalar_types_df: bpd.DataFrame, snapsh def test_obj_make_ref(scalar_types_df: bpd.DataFrame, snapshot): - blob_df = scalar_types_df["string_col"].str.to_blob() + blob_df = scalar_types_df["string_col"].str._to_blob() snapshot.assert_match(blob_df.to_frame().sql, "out.sql")