Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## 2.0.0

### Breaking Changes

- **Removed the `row_limit` parameter from `read_dlo()` and `read_dmo()`.**

These methods no longer accept a `row_limit` argument. When running locally, reads are automatically capped at 1000 rows to prevent accidentally fetching large datasets during development. When deployed to Data Cloud, no limit is applied and all records are returned.

**Why:** The `row_limit` parameter duplicated PySpark's built-in `.limit()` and created a behavioral difference between local and deployed environments. The 1000-row safety net is now handled internally via the `default_row_limit` setting in `config.yaml`, and deployed environments naturally omit it.

**Migration:** Remove any `row_limit` arguments from your `read_dlo()` and `read_dmo()` calls. If you need a specific number of rows, use PySpark's `.limit()` on the returned DataFrame:

```python
# Before
df = client.read_dlo("MyObject__dll", row_limit=500)

# After
df = client.read_dlo("MyObject__dll").limit(500)
```

## 1.0.0

### Breaking Changes
Expand Down
10 changes: 4 additions & 6 deletions src/datacustomcode/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,31 +185,29 @@ def _new_function_client(cls) -> Client:
)
return cls._instance

def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
def read_dlo(self, name: str) -> PySparkDataFrame:
"""Read a DLO from Data Cloud.

Args:
name: The name of the DLO to read.
row_limit: Maximum number of rows to fetch (default: 1000).

Returns:
A PySpark DataFrame containing the DLO data.
"""
self._record_dlo_access(name)
return self._reader.read_dlo(name, row_limit=row_limit)
return self._reader.read_dlo(name)

def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
def read_dmo(self, name: str) -> PySparkDataFrame:
"""Read a DMO from Data Cloud.

Args:
name: The name of the DMO to read.
row_limit: Maximum number of rows to fetch (default: 1000).

Returns:
A PySpark DataFrame containing the DMO data.
"""
self._record_dmo_access(name)
return self._reader.read_dmo(name, row_limit=row_limit)
return self._reader.read_dmo(name)

def write_to_dlo(
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs
Expand Down
1 change: 1 addition & 0 deletions src/datacustomcode/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ reader_config:
type_config_name: QueryAPIDataCloudReader
options:
credentials_profile: default
default_row_limit: 1000

writer_config:
type_config_name: PrintDataCloudWriter
Expand Down
2 changes: 0 additions & 2 deletions src/datacustomcode/io/reader/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ def read_dlo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame: ...

@abstractmethod
def read_dmo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame: ...
36 changes: 26 additions & 10 deletions src/datacustomcode/io/reader/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@


SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}"
SQL_QUERY_TEMPLATE_NO_LIMIT: Final = "SELECT * FROM {}"


def create_cdp_connection(
Expand Down Expand Up @@ -122,6 +123,7 @@ def __init__(
credentials_profile: str = "default",
dataspace: Optional[str] = None,
sf_cli_org: Optional[str] = None,
default_row_limit: Optional[int] = None,
) -> None:
"""Initialize QueryAPIDataCloudReader.

Expand All @@ -137,8 +139,12 @@ def __init__(
reader delegates to :class:`SFCLIDataCloudReader` which calls
the Data Cloud REST API directly using the token obtained from
``sf org display``, bypassing the CDP token-exchange flow.
default_row_limit: Maximum number of rows to fetch automatically.
When ``None``, no limit is applied (all rows are returned).
Set via ``default_row_limit`` in ``config.yaml`` reader options.
"""
self.spark = spark
self._default_row_limit = default_row_limit
if sf_cli_org:
logger.debug(
f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'"
Expand All @@ -147,6 +153,7 @@ def __init__(
spark=spark,
sf_cli_org=sf_cli_org,
dataspace=dataspace,
default_row_limit=default_row_limit,
)
self._conn = None
else:
Expand All @@ -158,19 +165,30 @@ def __init__(
)
self._conn = create_cdp_connection(credentials, dataspace)

def _build_query(self, name: str) -> str:
"""Build a SQL query, applying the configured default row limit.

Args:
name: Object name to query.

Returns:
SQL query string.
"""
if self._default_row_limit is not None:
return SQL_QUERY_TEMPLATE.format(name, self._default_row_limit)
return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name)

def read_dlo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame:
"""
Read a Data Lake Object (DLO) from the Data Cloud, limited to a number of rows.
Read a Data Lake Object (DLO) from the Data Cloud.

Args:
name (str): The name of the DLO.
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO.
row_limit (int): Maximum number of rows to fetch.

Returns:
PySparkDataFrame: The PySpark DataFrame.
Expand All @@ -179,9 +197,9 @@ def read_dlo(
self, "_sf_cli_reader", None
)
if sf_cli_reader is not None:
return sf_cli_reader.read_dlo(name, schema, row_limit)
return sf_cli_reader.read_dlo(name, schema)

query = SQL_QUERY_TEMPLATE.format(name, row_limit)
query = self._build_query(name)

assert self._conn is not None
pandas_df = self._conn.get_pandas_dataframe(query)
Expand All @@ -197,15 +215,13 @@ def read_dmo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame:
"""
Read a Data Model Object (DMO) from the Data Cloud, limited to a number of rows.
Read a Data Model Object (DMO) from the Data Cloud.

Args:
name (str): The name of the DMO.
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO.
row_limit (int): Maximum number of rows to fetch.

Returns:
PySparkDataFrame: The PySpark DataFrame.
Expand All @@ -214,9 +230,9 @@ def read_dmo(
self, "_sf_cli_reader", None
)
if sf_cli_reader is not None:
return sf_cli_reader.read_dmo(name, schema, row_limit)
return sf_cli_reader.read_dmo(name, schema)

query = SQL_QUERY_TEMPLATE.format(name, row_limit)
query = self._build_query(name)

assert self._conn is not None
pandas_df = self._conn.get_pandas_dataframe(query)
Expand Down
24 changes: 15 additions & 9 deletions src/datacustomcode/io/reader/sf_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(
spark: SparkSession,
sf_cli_org: str,
dataspace: Optional[str] = None,
default_row_limit: Optional[int] = None,
) -> None:
"""Initialize SFCLIDataCloudReader.

Expand All @@ -64,9 +65,13 @@ def __init__(
(e.g. the alias given to ``sf org login web --alias dev1``).
dataspace: Optional dataspace identifier. If ``None`` or
``"default"`` the query runs against the default dataspace.
default_row_limit: Maximum number of rows to fetch automatically.
When ``None``, no limit is applied (all rows are returned).
Set via ``default_row_limit`` in ``config.yaml`` reader options.
"""
self.spark = spark
self.sf_cli_org = sf_cli_org
self._default_row_limit = default_row_limit
self.dataspace = (
dataspace if dataspace and dataspace != "default" else "default"
)
Expand Down Expand Up @@ -132,12 +137,14 @@ def _get_token(self) -> tuple[str, str]:
logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'")
return access_token, instance_url

def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
def _execute_query(self, sql: str) -> pd.DataFrame:
"""Execute *sql* against the Data Cloud REST endpoint.

The configured ``default_row_limit`` is automatically appended as a
``LIMIT`` clause when set (typically for local development).

Args:
sql: Base SQL query (no ``LIMIT`` clause).
row_limit: Maximum rows to return.

Returns:
Pandas DataFrame with query results.
Expand All @@ -150,7 +157,10 @@ def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql"
headers = {"Authorization": f"Bearer {access_token}"}
params = {"dataspace": self.dataspace}
body = {"sql": f"{sql} LIMIT {row_limit}"}
if self._default_row_limit is not None:
body = {"sql": f"{sql} LIMIT {self._default_row_limit}"}
else:
body = {"sql": sql}

logger.debug(f"Executing Data Cloud query: {body['sql']}")

Expand Down Expand Up @@ -190,19 +200,17 @@ def read_dlo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame:
"""Read a Data Lake Object (DLO) from Data Cloud.

Args:
name: DLO name.
schema: Optional explicit schema.
row_limit: Maximum rows to fetch.

Returns:
PySpark DataFrame.
"""
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
pandas_df = self._execute_query(f"SELECT * FROM {name}")
if not schema:
schema = _pandas_to_spark_schema(pandas_df)
return self.spark.createDataFrame(pandas_df, schema)
Expand All @@ -211,19 +219,17 @@ def read_dmo(
self,
name: str,
schema: Union[AtomicType, StructType, str, None] = None,
row_limit: int = 1000,
) -> PySparkDataFrame:
"""Read a Data Model Object (DMO) from Data Cloud.

Args:
name: DMO name.
schema: Optional explicit schema.
row_limit: Maximum rows to fetch.

Returns:
PySpark DataFrame.
"""
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
pandas_df = self._execute_query(f"SELECT * FROM {name}")
if not schema:
schema = _pandas_to_spark_schema(pandas_df)
return self.spark.createDataFrame(pandas_df, schema)
2 changes: 1 addition & 1 deletion src/datacustomcode/io/writer/print.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def validate_dataframe_columns_against_dlo(
schema.
"""
# Get DLO schema (no data, just schema)
dlo_df = self.reader.read_dlo(dlo_name, row_limit=0)
dlo_df = self.reader.read_dlo(dlo_name).limit(0)
dlo_columns = set(dlo_df.columns)
df_columns = set(dataframe.columns)

Expand Down
Loading
Loading