From d481ccc91fba0291d23c738deb72b843448bac35 Mon Sep 17 00:00:00 2001 From: Zach Maddox Date: Mon, 13 Apr 2026 14:52:48 -0400 Subject: [PATCH 1/4] default to no row limit inside data cloud --- src/datacustomcode/client.py | 16 +++-- src/datacustomcode/config.yaml | 1 + src/datacustomcode/io/reader/base.py | 6 +- src/datacustomcode/io/reader/query_api.py | 42 +++++++++--- src/datacustomcode/io/reader/sf_cli.py | 25 +++++-- tests/io/reader/test_query_api.py | 82 +++++++++++++++++++++++ tests/io/reader/test_sf_cli.py | 68 ++++++++++++++++++- tests/test_client.py | 12 ++-- 8 files changed, 221 insertions(+), 31 deletions(-) diff --git a/src/datacustomcode/client.py b/src/datacustomcode/client.py index 80f20a8..e1dba20 100644 --- a/src/datacustomcode/client.py +++ b/src/datacustomcode/client.py @@ -185,12 +185,16 @@ def _new_function_client(cls) -> Client: ) return cls._instance - def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: + def read_dlo( + self, name: str, row_limit: Optional[int] = None + ) -> PySparkDataFrame: """Read a DLO from Data Cloud. Args: name: The name of the DLO to read. - row_limit: Maximum number of rows to fetch (default: 1000). + row_limit: Maximum number of rows to fetch. When ``None``, the + reader's configured ``default_row_limit`` is used (1000 for + local development, no limit when deployed). Returns: A PySpark DataFrame containing the DLO data. @@ -198,12 +202,16 @@ def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: self._record_dlo_access(name) return self._reader.read_dlo(name, row_limit=row_limit) - def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: + def read_dmo( + self, name: str, row_limit: Optional[int] = None + ) -> PySparkDataFrame: """Read a DMO from Data Cloud. Args: name: The name of the DMO to read. - row_limit: Maximum number of rows to fetch (default: 1000). + row_limit: Maximum number of rows to fetch. When ``None``, the + reader's configured ``default_row_limit`` is used (1000 for + local development, no limit when deployed). Returns: A PySpark DataFrame containing the DMO data. diff --git a/src/datacustomcode/config.yaml b/src/datacustomcode/config.yaml index 0267b6f..bf21209 100644 --- a/src/datacustomcode/config.yaml +++ b/src/datacustomcode/config.yaml @@ -2,6 +2,7 @@ reader_config: type_config_name: QueryAPIDataCloudReader options: credentials_profile: default + default_row_limit: 1000 writer_config: type_config_name: PrintDataCloudWriter diff --git a/src/datacustomcode/io/reader/base.py b/src/datacustomcode/io/reader/base.py index f5e69f3..9e2618c 100644 --- a/src/datacustomcode/io/reader/base.py +++ b/src/datacustomcode/io/reader/base.py @@ -15,7 +15,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Optional, Union from datacustomcode.io.base import BaseDataAccessLayer @@ -33,7 +33,7 @@ def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, + row_limit: Optional[int] = None, ) -> PySparkDataFrame: ... @abstractmethod @@ -41,5 +41,5 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, + row_limit: Optional[int] = None, ) -> PySparkDataFrame: ... diff --git a/src/datacustomcode/io/reader/query_api.py b/src/datacustomcode/io/reader/query_api.py index 98d2596..cc75e5b 100644 --- a/src/datacustomcode/io/reader/query_api.py +++ b/src/datacustomcode/io/reader/query_api.py @@ -37,6 +37,7 @@ SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}" +SQL_QUERY_TEMPLATE_NO_LIMIT: Final = "SELECT * FROM {}" def create_cdp_connection( @@ -122,6 +123,7 @@ def __init__( credentials_profile: str = "default", dataspace: Optional[str] = None, sf_cli_org: Optional[str] = None, + default_row_limit: Optional[int] = None, ) -> None: """Initialize QueryAPIDataCloudReader. @@ -137,8 +139,12 @@ def __init__( reader delegates to :class:`SFCLIDataCloudReader` which calls the Data Cloud REST API directly using the token obtained from ``sf org display``, bypassing the CDP token-exchange flow. + default_row_limit: Default maximum number of rows to fetch when + ``row_limit`` is not explicitly passed to read methods. When + ``None``, no limit is applied (all rows are returned). """ self.spark = spark + self._default_row_limit = default_row_limit if sf_cli_org: logger.debug( f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'" @@ -147,6 +153,7 @@ def __init__( spark=spark, sf_cli_org=sf_cli_org, dataspace=dataspace, + default_row_limit=default_row_limit, ) self._conn = None else: @@ -158,19 +165,37 @@ def __init__( ) self._conn = create_cdp_connection(credentials, dataspace) + def _build_query(self, name: str, row_limit: Optional[int]) -> str: + """Build a SQL query, applying the default row limit when needed. + + Args: + name: Object name to query. + row_limit: Explicit row limit, or ``None`` to use the configured default. + + Returns: + SQL query string. + """ + effective_limit = ( + row_limit if row_limit is not None else self._default_row_limit + ) + if effective_limit is not None: + return SQL_QUERY_TEMPLATE.format(name, effective_limit) + return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name) + def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, + row_limit: Optional[int] = None, ) -> PySparkDataFrame: """ - Read a Data Lake Object (DLO) from the Data Cloud, limited to a number of rows. + Read a Data Lake Object (DLO) from the Data Cloud. Args: name (str): The name of the DLO. schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO. - row_limit (int): Maximum number of rows to fetch. + row_limit (Optional[int]): Maximum number of rows to fetch. + When ``None``, the configured ``default_row_limit`` is used. Returns: PySparkDataFrame: The PySpark DataFrame. @@ -181,7 +206,7 @@ def read_dlo( if sf_cli_reader is not None: return sf_cli_reader.read_dlo(name, schema, row_limit) - query = SQL_QUERY_TEMPLATE.format(name, row_limit) + query = self._build_query(name, row_limit) assert self._conn is not None pandas_df = self._conn.get_pandas_dataframe(query) @@ -197,15 +222,16 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, + row_limit: Optional[int] = None, ) -> PySparkDataFrame: """ - Read a Data Model Object (DMO) from the Data Cloud, limited to a number of rows. + Read a Data Model Object (DMO) from the Data Cloud. Args: name (str): The name of the DMO. schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO. - row_limit (int): Maximum number of rows to fetch. + row_limit (Optional[int]): Maximum number of rows to fetch. + When ``None``, the configured ``default_row_limit`` is used. Returns: PySparkDataFrame: The PySpark DataFrame. @@ -216,7 +242,7 @@ def read_dmo( if sf_cli_reader is not None: return sf_cli_reader.read_dmo(name, schema, row_limit) - query = SQL_QUERY_TEMPLATE.format(name, row_limit) + query = self._build_query(name, row_limit) assert self._conn is not None pandas_df = self._conn.get_pandas_dataframe(query) diff --git a/src/datacustomcode/io/reader/sf_cli.py b/src/datacustomcode/io/reader/sf_cli.py index 49a5838..200964e 100644 --- a/src/datacustomcode/io/reader/sf_cli.py +++ b/src/datacustomcode/io/reader/sf_cli.py @@ -55,6 +55,7 @@ def __init__( spark: SparkSession, sf_cli_org: str, dataspace: Optional[str] = None, + default_row_limit: Optional[int] = None, ) -> None: """Initialize SFCLIDataCloudReader. @@ -64,9 +65,13 @@ def __init__( (e.g. the alias given to ``sf org login web --alias dev1``). dataspace: Optional dataspace identifier. If ``None`` or ``"default"`` the query runs against the default dataspace. + default_row_limit: Default maximum number of rows to fetch when + ``row_limit`` is not explicitly passed to read methods. When + ``None``, no limit is applied (all rows are returned). """ self.spark = spark self.sf_cli_org = sf_cli_org + self._default_row_limit = default_row_limit self.dataspace = ( dataspace if dataspace and dataspace != "default" else "default" ) @@ -132,12 +137,12 @@ def _get_token(self) -> tuple[str, str]: logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'") return access_token, instance_url - def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame: + def _execute_query(self, sql: str, row_limit: Optional[int]) -> pd.DataFrame: """Execute *sql* against the Data Cloud REST endpoint. Args: sql: Base SQL query (no ``LIMIT`` clause). - row_limit: Maximum rows to return. + row_limit: Maximum rows to return, or ``None`` for no limit. Returns: Pandas DataFrame with query results. @@ -147,10 +152,16 @@ def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame: """ access_token, instance_url = self._get_token() + effective_limit = ( + row_limit if row_limit is not None else self._default_row_limit + ) url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql" headers = {"Authorization": f"Bearer {access_token}"} params = {"dataspace": self.dataspace} - body = {"sql": f"{sql} LIMIT {row_limit}"} + if effective_limit is not None: + body = {"sql": f"{sql} LIMIT {effective_limit}"} + else: + body = {"sql": sql} logger.debug(f"Executing Data Cloud query: {body['sql']}") @@ -190,14 +201,14 @@ def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, + row_limit: Optional[int] = None, ) -> PySparkDataFrame: """Read a Data Lake Object (DLO) from Data Cloud. Args: name: DLO name. schema: Optional explicit schema. - row_limit: Maximum rows to fetch. + row_limit: Maximum rows to fetch, or ``None`` to use the configured default. Returns: PySpark DataFrame. @@ -211,14 +222,14 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, + row_limit: Optional[int] = None, ) -> PySparkDataFrame: """Read a Data Model Object (DMO) from Data Cloud. Args: name: DMO name. schema: Optional explicit schema. - row_limit: Maximum rows to fetch. + row_limit: Maximum rows to fetch, or ``None`` to use the configured default. Returns: PySpark DataFrame. diff --git a/tests/io/reader/test_query_api.py b/tests/io/reader/test_query_api.py index 8e2e77a..05f8199 100644 --- a/tests/io/reader/test_query_api.py +++ b/tests/io/reader/test_query_api.py @@ -20,6 +20,7 @@ from datacustomcode.io.reader.query_api import ( SQL_QUERY_TEMPLATE, + SQL_QUERY_TEMPLATE_NO_LIMIT, QueryAPIDataCloudReader, ) from datacustomcode.io.reader.utils import _pandas_to_spark_schema @@ -188,6 +189,7 @@ def reader_without_init(self, mock_spark_session): with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None): reader = QueryAPIDataCloudReader(None) # None is ignored due to mock reader.spark = mock_spark_session + reader._default_row_limit = 1000 yield reader def test_pandas_to_spark_schema_function(self): @@ -341,3 +343,83 @@ def test_read_dmo_schema_is_lowercase( _, schema_arg = reader_without_init.spark.createDataFrame.call_args[0] assert all(f.name == f.name.lower() for f in schema_arg.fields) + + +@pytest.mark.usefixtures("patch_all_requests") +class TestQueryAPIDataCloudReaderNoDefaultLimit: + """Tests for deployed behavior where default_row_limit is None (no limit).""" + + @pytest.fixture(scope="class", autouse=True) + def patch_all_requests(self, request): + patches = [] + for target in [ + "requests.get", + "requests.post", + "requests.session", + "requests.adapters.HTTPAdapter.send", + "urllib3.connectionpool.HTTPConnectionPool.urlopen", + ]: + patcher = patch(target) + patches.append(patcher) + patcher.start() + + def fin(): + for patcher in patches: + patcher.stop() + + request.addfinalizer(fin) + + @pytest.fixture + def mock_spark_session(self): + spark = MagicMock() + spark.createDataFrame.return_value = spark + return spark + + @pytest.fixture + def mock_pandas_dataframe(self): + return pd.DataFrame({"Col1__c": [1, 2], "Col2__c": ["a", "b"]}) + + @pytest.fixture + def mock_connection(self, mock_pandas_dataframe): + mock_conn = MagicMock() + mock_conn.get_pandas_dataframe.return_value = mock_pandas_dataframe + return mock_conn + + @pytest.fixture + def reader_no_limit(self, mock_spark_session): + """Reader with no default row limit (simulates deployed environment).""" + with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None): + reader = QueryAPIDataCloudReader(None) + reader.spark = mock_spark_session + reader._default_row_limit = None + yield reader + + def test_read_dlo_no_limit_when_deployed( + self, reader_no_limit, mock_connection, mock_pandas_dataframe + ): + """When default_row_limit is None and no explicit row_limit, omit LIMIT.""" + reader_no_limit._conn = mock_connection + reader_no_limit.read_dlo("test_dlo") + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dlo") + ) + + def test_read_dmo_no_limit_when_deployed( + self, reader_no_limit, mock_connection, mock_pandas_dataframe + ): + """When default_row_limit is None and no explicit row_limit, omit LIMIT.""" + reader_no_limit._conn = mock_connection + reader_no_limit.read_dmo("test_dmo") + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dmo") + ) + + def test_read_dlo_explicit_limit_still_applied_when_deployed( + self, reader_no_limit, mock_connection, mock_pandas_dataframe + ): + """An explicit row_limit always applies, even without a default.""" + reader_no_limit._conn = mock_connection + reader_no_limit.read_dlo("test_dlo", row_limit=500) + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE.format("test_dlo", 500) + ) diff --git a/tests/io/reader/test_sf_cli.py b/tests/io/reader/test_sf_cli.py index a9e4dff..eafa92e 100644 --- a/tests/io/reader/test_sf_cli.py +++ b/tests/io/reader/test_sf_cli.py @@ -15,11 +15,18 @@ def _make_reader( - sf_cli_org: str = "dev1", dataspace: str | None = None + sf_cli_org: str = "dev1", + dataspace: str | None = None, + default_row_limit: int | None = 1000, ) -> SFCLIDataCloudReader: spark = MagicMock() spark.createDataFrame.return_value = MagicMock() - return SFCLIDataCloudReader(spark=spark, sf_cli_org=sf_cli_org, dataspace=dataspace) + return SFCLIDataCloudReader( + spark=spark, + sf_cli_org=sf_cli_org, + dataspace=dataspace, + default_row_limit=default_row_limit, + ) def _sf_display_output( @@ -327,7 +334,7 @@ def test_executes_select_star_query(self, reader, sample_df, method, obj_name): ) as mock_exec: getattr(reader, method)(obj_name) - mock_exec.assert_called_once_with(f"SELECT * FROM {obj_name}", 1000) + mock_exec.assert_called_once_with(f"SELECT * FROM {obj_name}", None) @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) def test_custom_row_limit(self, reader, sample_df, method): @@ -391,3 +398,58 @@ def test_returns_spark_dataframe(self, reader, sample_df, method): result = getattr(reader, method)("SomeObj") assert result is expected + + +# --------------------------------------------------------------------------- +# No default row limit (deployed environment) +# --------------------------------------------------------------------------- + + +class TestSFCLINoDefaultRowLimit: + """Tests for deployed behavior where default_row_limit is None.""" + + @pytest.fixture + def reader(self): + return _make_reader(default_row_limit=None) + + @pytest.fixture + def mock_token(self, reader): + with patch.object( + reader, "_get_token", return_value=("tok", "https://org.salesforce.com") + ): + yield + + def _mock_response( + self, status_code: int = 200, json_body: dict | None = None, text: str = "" + ) -> MagicMock: + response = MagicMock() + response.status_code = status_code + response.text = text + response.json.return_value = json_body or {} + return response + + def test_execute_query_omits_limit_when_no_default(self, reader, mock_token): + """When default_row_limit is None and row_limit is None, no LIMIT clause.""" + api_response = {"metadata": [{"name": "col"}], "data": [["v"]]} + with patch( + "requests.post", + return_value=self._mock_response(json_body=api_response), + ) as mock_post: + reader._execute_query("SELECT * FROM foo", None) + + body = mock_post.call_args.kwargs["json"] + assert body["sql"] == "SELECT * FROM foo" + + def test_execute_query_applies_explicit_limit_when_no_default( + self, reader, mock_token + ): + """An explicit row_limit is always applied, even without a default.""" + api_response = {"metadata": [{"name": "col"}], "data": [["v"]]} + with patch( + "requests.post", + return_value=self._mock_response(json_body=api_response), + ) as mock_post: + reader._execute_query("SELECT * FROM foo", 42) + + body = mock_post.call_args.kwargs["json"] + assert body["sql"] == "SELECT * FROM foo LIMIT 42" diff --git a/tests/test_client.py b/tests/test_client.py index 40a52bb..9400801 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,11 +25,11 @@ class MockDataCloudReader(BaseDataCloudReader): CONFIG_NAME = "MockDataCloudReader" - def read_dlo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame: + def read_dlo(self, name: str, schema=None, row_limit=None) -> DataFrame: df = MagicMock(spec=DataFrame) return df - def read_dmo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame: + def read_dmo(self, name: str, schema=None, row_limit=None) -> DataFrame: df = MagicMock(spec=DataFrame) return df @@ -153,7 +153,7 @@ def test_read_dlo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dlo("test_dlo") - reader.read_dlo.assert_called_once_with("test_dlo", row_limit=1000) + reader.read_dlo.assert_called_once_with("test_dlo", row_limit=None) assert result is mock_df assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] @@ -166,7 +166,7 @@ def test_read_dmo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dmo("test_dmo") - reader.read_dmo.assert_called_once_with("test_dmo", row_limit=1000) + reader.read_dmo.assert_called_once_with("test_dmo", row_limit=None) assert result is mock_df assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] @@ -238,7 +238,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dlo("source_dlo") client.write_to_dlo("target_dlo", df, WriteMode.APPEND) - reader.read_dlo.assert_called_once_with("source_dlo", row_limit=1000) + reader.read_dlo.assert_called_once_with("source_dlo", row_limit=None) writer.write_to_dlo.assert_called_once_with( "target_dlo", mock_df, WriteMode.APPEND ) @@ -253,7 +253,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dmo("source_dmo") client.write_to_dmo("target_dmo", df, WriteMode.MERGE) - reader.read_dmo.assert_called_once_with("source_dmo", row_limit=1000) + reader.read_dmo.assert_called_once_with("source_dmo", row_limit=None) writer.write_to_dmo.assert_called_once_with( "target_dmo", mock_df, WriteMode.MERGE ) From 11ff105be90ef26699a37b443c469dc4a10afc26 Mon Sep 17 00:00:00 2001 From: Zach Maddox Date: Thu, 16 Apr 2026 16:50:48 -0400 Subject: [PATCH 2/4] remove row_limit parameter completely --- src/datacustomcode/client.py | 18 ++----- src/datacustomcode/io/reader/base.py | 4 +- src/datacustomcode/io/reader/query_api.py | 32 +++++-------- src/datacustomcode/io/reader/sf_cli.py | 27 +++++------ src/datacustomcode/io/writer/print.py | 2 +- tests/io/reader/test_query_api.py | 33 ------------- tests/io/reader/test_sf_cli.py | 57 +++++++---------------- tests/io/writer/test_print.py | 1 + tests/test_client.py | 39 +++------------- 9 files changed, 52 insertions(+), 161 deletions(-) diff --git a/src/datacustomcode/client.py b/src/datacustomcode/client.py index e1dba20..01aed31 100644 --- a/src/datacustomcode/client.py +++ b/src/datacustomcode/client.py @@ -185,39 +185,29 @@ def _new_function_client(cls) -> Client: ) return cls._instance - def read_dlo( - self, name: str, row_limit: Optional[int] = None - ) -> PySparkDataFrame: + def read_dlo(self, name: str) -> PySparkDataFrame: """Read a DLO from Data Cloud. Args: name: The name of the DLO to read. - row_limit: Maximum number of rows to fetch. When ``None``, the - reader's configured ``default_row_limit`` is used (1000 for - local development, no limit when deployed). Returns: A PySpark DataFrame containing the DLO data. """ self._record_dlo_access(name) - return self._reader.read_dlo(name, row_limit=row_limit) + return self._reader.read_dlo(name) - def read_dmo( - self, name: str, row_limit: Optional[int] = None - ) -> PySparkDataFrame: + def read_dmo(self, name: str) -> PySparkDataFrame: """Read a DMO from Data Cloud. Args: name: The name of the DMO to read. - row_limit: Maximum number of rows to fetch. When ``None``, the - reader's configured ``default_row_limit`` is used (1000 for - local development, no limit when deployed). Returns: A PySpark DataFrame containing the DMO data. """ self._record_dmo_access(name) - return self._reader.read_dmo(name, row_limit=row_limit) + return self._reader.read_dmo(name) def write_to_dlo( self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs diff --git a/src/datacustomcode/io/reader/base.py b/src/datacustomcode/io/reader/base.py index 9e2618c..ddea31b 100644 --- a/src/datacustomcode/io/reader/base.py +++ b/src/datacustomcode/io/reader/base.py @@ -15,7 +15,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Union from datacustomcode.io.base import BaseDataAccessLayer @@ -33,7 +33,6 @@ def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: Optional[int] = None, ) -> PySparkDataFrame: ... @abstractmethod @@ -41,5 +40,4 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: Optional[int] = None, ) -> PySparkDataFrame: ... diff --git a/src/datacustomcode/io/reader/query_api.py b/src/datacustomcode/io/reader/query_api.py index cc75e5b..dae6ce0 100644 --- a/src/datacustomcode/io/reader/query_api.py +++ b/src/datacustomcode/io/reader/query_api.py @@ -139,9 +139,9 @@ def __init__( reader delegates to :class:`SFCLIDataCloudReader` which calls the Data Cloud REST API directly using the token obtained from ``sf org display``, bypassing the CDP token-exchange flow. - default_row_limit: Default maximum number of rows to fetch when - ``row_limit`` is not explicitly passed to read methods. When - ``None``, no limit is applied (all rows are returned). + default_row_limit: Maximum number of rows to fetch automatically. + When ``None``, no limit is applied (all rows are returned). + Set via ``default_row_limit`` in ``config.yaml`` reader options. """ self.spark = spark self._default_row_limit = default_row_limit @@ -165,28 +165,23 @@ def __init__( ) self._conn = create_cdp_connection(credentials, dataspace) - def _build_query(self, name: str, row_limit: Optional[int]) -> str: - """Build a SQL query, applying the default row limit when needed. + def _build_query(self, name: str) -> str: + """Build a SQL query, applying the configured default row limit. Args: name: Object name to query. - row_limit: Explicit row limit, or ``None`` to use the configured default. Returns: SQL query string. """ - effective_limit = ( - row_limit if row_limit is not None else self._default_row_limit - ) - if effective_limit is not None: - return SQL_QUERY_TEMPLATE.format(name, effective_limit) + if self._default_row_limit is not None: + return SQL_QUERY_TEMPLATE.format(name, self._default_row_limit) return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name) def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: Optional[int] = None, ) -> PySparkDataFrame: """ Read a Data Lake Object (DLO) from the Data Cloud. @@ -194,8 +189,6 @@ def read_dlo( Args: name (str): The name of the DLO. schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO. - row_limit (Optional[int]): Maximum number of rows to fetch. - When ``None``, the configured ``default_row_limit`` is used. Returns: PySparkDataFrame: The PySpark DataFrame. @@ -204,9 +197,9 @@ def read_dlo( self, "_sf_cli_reader", None ) if sf_cli_reader is not None: - return sf_cli_reader.read_dlo(name, schema, row_limit) + return sf_cli_reader.read_dlo(name, schema) - query = self._build_query(name, row_limit) + query = self._build_query(name) assert self._conn is not None pandas_df = self._conn.get_pandas_dataframe(query) @@ -222,7 +215,6 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: Optional[int] = None, ) -> PySparkDataFrame: """ Read a Data Model Object (DMO) from the Data Cloud. @@ -230,8 +222,6 @@ def read_dmo( Args: name (str): The name of the DMO. schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO. - row_limit (Optional[int]): Maximum number of rows to fetch. - When ``None``, the configured ``default_row_limit`` is used. Returns: PySparkDataFrame: The PySpark DataFrame. @@ -240,9 +230,9 @@ def read_dmo( self, "_sf_cli_reader", None ) if sf_cli_reader is not None: - return sf_cli_reader.read_dmo(name, schema, row_limit) + return sf_cli_reader.read_dmo(name, schema) - query = self._build_query(name, row_limit) + query = self._build_query(name) assert self._conn is not None pandas_df = self._conn.get_pandas_dataframe(query) diff --git a/src/datacustomcode/io/reader/sf_cli.py b/src/datacustomcode/io/reader/sf_cli.py index 200964e..adfb3ed 100644 --- a/src/datacustomcode/io/reader/sf_cli.py +++ b/src/datacustomcode/io/reader/sf_cli.py @@ -65,9 +65,9 @@ def __init__( (e.g. the alias given to ``sf org login web --alias dev1``). dataspace: Optional dataspace identifier. If ``None`` or ``"default"`` the query runs against the default dataspace. - default_row_limit: Default maximum number of rows to fetch when - ``row_limit`` is not explicitly passed to read methods. When - ``None``, no limit is applied (all rows are returned). + default_row_limit: Maximum number of rows to fetch automatically. + When ``None``, no limit is applied (all rows are returned). + Set via ``default_row_limit`` in ``config.yaml`` reader options. """ self.spark = spark self.sf_cli_org = sf_cli_org @@ -137,12 +137,14 @@ def _get_token(self) -> tuple[str, str]: logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'") return access_token, instance_url - def _execute_query(self, sql: str, row_limit: Optional[int]) -> pd.DataFrame: + def _execute_query(self, sql: str) -> pd.DataFrame: """Execute *sql* against the Data Cloud REST endpoint. + The configured ``default_row_limit`` is automatically appended as a + ``LIMIT`` clause when set (typically for local development). + Args: sql: Base SQL query (no ``LIMIT`` clause). - row_limit: Maximum rows to return, or ``None`` for no limit. Returns: Pandas DataFrame with query results. @@ -152,14 +154,11 @@ def _execute_query(self, sql: str, row_limit: Optional[int]) -> pd.DataFrame: """ access_token, instance_url = self._get_token() - effective_limit = ( - row_limit if row_limit is not None else self._default_row_limit - ) url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql" headers = {"Authorization": f"Bearer {access_token}"} params = {"dataspace": self.dataspace} - if effective_limit is not None: - body = {"sql": f"{sql} LIMIT {effective_limit}"} + if self._default_row_limit is not None: + body = {"sql": f"{sql} LIMIT {self._default_row_limit}"} else: body = {"sql": sql} @@ -201,19 +200,17 @@ def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: Optional[int] = None, ) -> PySparkDataFrame: """Read a Data Lake Object (DLO) from Data Cloud. Args: name: DLO name. schema: Optional explicit schema. - row_limit: Maximum rows to fetch, or ``None`` to use the configured default. Returns: PySpark DataFrame. """ - pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit) + pandas_df = self._execute_query(f"SELECT * FROM {name}") if not schema: schema = _pandas_to_spark_schema(pandas_df) return self.spark.createDataFrame(pandas_df, schema) @@ -222,19 +219,17 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: Optional[int] = None, ) -> PySparkDataFrame: """Read a Data Model Object (DMO) from Data Cloud. Args: name: DMO name. schema: Optional explicit schema. - row_limit: Maximum rows to fetch, or ``None`` to use the configured default. Returns: PySpark DataFrame. """ - pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit) + pandas_df = self._execute_query(f"SELECT * FROM {name}") if not schema: schema = _pandas_to_spark_schema(pandas_df) return self.spark.createDataFrame(pandas_df, schema) diff --git a/src/datacustomcode/io/writer/print.py b/src/datacustomcode/io/writer/print.py index 5645f7a..2fcec24 100644 --- a/src/datacustomcode/io/writer/print.py +++ b/src/datacustomcode/io/writer/print.py @@ -90,7 +90,7 @@ def validate_dataframe_columns_against_dlo( schema. """ # Get DLO schema (no data, just schema) - dlo_df = self.reader.read_dlo(dlo_name, row_limit=0) + dlo_df = self.reader.read_dlo(dlo_name).limit(0) dlo_columns = set(dlo_df.columns) df_columns = set(dataframe.columns) diff --git a/tests/io/reader/test_query_api.py b/tests/io/reader/test_query_api.py index 05f8199..dc5d49b 100644 --- a/tests/io/reader/test_query_api.py +++ b/tests/io/reader/test_query_api.py @@ -296,30 +296,6 @@ def test_read_dmo_with_schema( args, _ = reader_without_init.spark.createDataFrame.call_args assert args[1] is custom_schema - def test_read_dlo_with_custom_row_limit( - self, reader_without_init, mock_connection, mock_pandas_dataframe - ): - """Test read_dlo method with custom row_limit.""" - reader_without_init._conn = mock_connection - - reader_without_init.read_dlo("test_dlo", row_limit=50) - - mock_connection.get_pandas_dataframe.assert_called_once_with( - SQL_QUERY_TEMPLATE.format("test_dlo", 50) - ) - - def test_read_dmo_with_custom_row_limit( - self, reader_without_init, mock_connection, mock_pandas_dataframe - ): - """Test read_dmo method with custom row_limit.""" - reader_without_init._conn = mock_connection - - reader_without_init.read_dmo("test_dmo", row_limit=25) - - mock_connection.get_pandas_dataframe.assert_called_once_with( - SQL_QUERY_TEMPLATE.format("test_dmo", 25) - ) - def test_read_dlo_schema_is_lowercase( self, reader_without_init, mock_connection, mock_pandas_dataframe ): @@ -414,12 +390,3 @@ def test_read_dmo_no_limit_when_deployed( SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dmo") ) - def test_read_dlo_explicit_limit_still_applied_when_deployed( - self, reader_no_limit, mock_connection, mock_pandas_dataframe - ): - """An explicit row_limit always applies, even without a default.""" - reader_no_limit._conn = mock_connection - reader_no_limit.read_dlo("test_dlo", row_limit=500) - mock_connection.get_pandas_dataframe.assert_called_once_with( - SQL_QUERY_TEMPLATE.format("test_dlo", 500) - ) diff --git a/tests/io/reader/test_sf_cli.py b/tests/io/reader/test_sf_cli.py index eafa92e..86f62ef 100644 --- a/tests/io/reader/test_sf_cli.py +++ b/tests/io/reader/test_sf_cli.py @@ -197,7 +197,7 @@ def test_posts_to_correct_url(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 100) + reader._execute_query("SELECT * FROM foo") url = mock_post.call_args[0][0] assert ( @@ -210,7 +210,7 @@ def test_passes_bearer_token_header(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") headers = mock_post.call_args.kwargs["headers"] assert headers["Authorization"] == "Bearer mytoken" @@ -220,20 +220,21 @@ def test_passes_dataspace_param(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") params = mock_post.call_args.kwargs["params"] assert params["dataspace"] == "default" - def test_appends_limit_to_sql(self, reader, mock_token): + def test_appends_default_limit_to_sql(self, reader, mock_token): + """default_row_limit (1000) is automatically appended as LIMIT.""" api_response = {"metadata": [], "data": []} with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 42) + reader._execute_query("SELECT * FROM foo") body = mock_post.call_args.kwargs["json"] - assert body["sql"] == "SELECT * FROM foo LIMIT 42" + assert body["sql"] == "SELECT * FROM foo LIMIT 1000" def test_returns_dataframe_with_rows(self, reader, mock_token): api_response = { @@ -243,7 +244,7 @@ def test_returns_dataframe_with_rows(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ): - df = reader._execute_query("SELECT * FROM foo", 100) + df = reader._execute_query("SELECT * FROM foo") assert list(df.columns) == ["id", "name"] assert len(df) == 2 @@ -253,7 +254,7 @@ def test_returns_empty_dataframe_when_no_rows(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ): - df = reader._execute_query("SELECT * FROM foo", 100) + df = reader._execute_query("SELECT * FROM foo") assert list(df.columns) == ["id", "name"] assert len(df) == 0 @@ -264,7 +265,7 @@ def test_http_error_raises_runtime_error(self, reader, mock_token): return_value=self._mock_response(status_code=401, text="Unauthorized"), ): with pytest.raises(RuntimeError, match="HTTP 401"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_http_error_uses_json_message_when_available(self, reader, mock_token): error_body = [{"message": "insufficient privileges"}] @@ -272,14 +273,14 @@ def test_http_error_uses_json_message_when_available(self, reader, mock_token): response.json.return_value = error_body with patch("requests.post", return_value=response): with pytest.raises(RuntimeError, match="insufficient privileges"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_http_error_falls_back_to_text_when_json_not_list(self, reader, mock_token): response = self._mock_response(status_code=500, text="Internal Server Error") response.json.return_value = {"error": "oops"} # dict, not list with patch("requests.post", return_value=response): with pytest.raises(RuntimeError, match="Internal Server Error"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_request_exception_raises_runtime_error(self, reader, mock_token): import requests as req_lib @@ -288,7 +289,7 @@ def test_request_exception_raises_runtime_error(self, reader, mock_token): "requests.post", side_effect=req_lib.RequestException("connection refused") ): with pytest.raises(RuntimeError, match="Data Cloud query request failed"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_custom_dataspace_passed_as_param(self): reader = _make_reader(dataspace="myspace") @@ -300,7 +301,7 @@ def test_custom_dataspace_passed_as_param(self): "requests.post", return_value=self._mock_response(json_body=api_response), ) as mock_post: - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") params = mock_post.call_args.kwargs["params"] assert params["dataspace"] == "myspace" @@ -334,17 +335,7 @@ def test_executes_select_star_query(self, reader, sample_df, method, obj_name): ) as mock_exec: getattr(reader, method)(obj_name) - mock_exec.assert_called_once_with(f"SELECT * FROM {obj_name}", None) - - @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) - def test_custom_row_limit(self, reader, sample_df, method): - with patch.object( - reader, "_execute_query", return_value=sample_df - ) as mock_exec: - getattr(reader, method)("SomeObj", row_limit=50) - - _, row_limit_arg = mock_exec.call_args[0] - assert row_limit_arg == 50 + mock_exec.assert_called_once_with(f"SELECT * FROM {obj_name}") @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) def test_auto_infers_schema_when_none_given(self, reader, sample_df, method): @@ -429,27 +420,13 @@ def _mock_response( return response def test_execute_query_omits_limit_when_no_default(self, reader, mock_token): - """When default_row_limit is None and row_limit is None, no LIMIT clause.""" + """When default_row_limit is None, no LIMIT clause is appended.""" api_response = {"metadata": [{"name": "col"}], "data": [["v"]]} with patch( "requests.post", return_value=self._mock_response(json_body=api_response), ) as mock_post: - reader._execute_query("SELECT * FROM foo", None) + reader._execute_query("SELECT * FROM foo") body = mock_post.call_args.kwargs["json"] assert body["sql"] == "SELECT * FROM foo" - - def test_execute_query_applies_explicit_limit_when_no_default( - self, reader, mock_token - ): - """An explicit row_limit is always applied, even without a default.""" - api_response = {"metadata": [{"name": "col"}], "data": [["v"]]} - with patch( - "requests.post", - return_value=self._mock_response(json_body=api_response), - ) as mock_post: - reader._execute_query("SELECT * FROM foo", 42) - - body = mock_post.call_args.kwargs["json"] - assert body["sql"] == "SELECT * FROM foo LIMIT 42" diff --git a/tests/io/writer/test_print.py b/tests/io/writer/test_print.py index 9245daa..bd5b1f0 100644 --- a/tests/io/writer/test_print.py +++ b/tests/io/writer/test_print.py @@ -28,6 +28,7 @@ def mock_reader(self): reader = MagicMock() mock_dlo_df = MagicMock() mock_dlo_df.columns = ["col1", "col2"] + mock_dlo_df.limit.return_value = mock_dlo_df reader.read_dlo.return_value = mock_dlo_df return reader diff --git a/tests/test_client.py b/tests/test_client.py index 9400801..01a45bf 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,11 +25,11 @@ class MockDataCloudReader(BaseDataCloudReader): CONFIG_NAME = "MockDataCloudReader" - def read_dlo(self, name: str, schema=None, row_limit=None) -> DataFrame: + def read_dlo(self, name: str, schema=None) -> DataFrame: df = MagicMock(spec=DataFrame) return df - def read_dmo(self, name: str, schema=None, row_limit=None) -> DataFrame: + def read_dmo(self, name: str, schema=None) -> DataFrame: df = MagicMock(spec=DataFrame) return df @@ -153,7 +153,7 @@ def test_read_dlo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dlo("test_dlo") - reader.read_dlo.assert_called_once_with("test_dlo", row_limit=None) + reader.read_dlo.assert_called_once_with("test_dlo") assert result is mock_df assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] @@ -166,7 +166,7 @@ def test_read_dmo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dmo("test_dmo") - reader.read_dmo.assert_called_once_with("test_dmo", row_limit=None) + reader.read_dmo.assert_called_once_with("test_dmo") assert result is mock_df assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] @@ -238,7 +238,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dlo("source_dlo") client.write_to_dlo("target_dlo", df, WriteMode.APPEND) - reader.read_dlo.assert_called_once_with("source_dlo", row_limit=None) + reader.read_dlo.assert_called_once_with("source_dlo") writer.write_to_dlo.assert_called_once_with( "target_dlo", mock_df, WriteMode.APPEND ) @@ -253,40 +253,13 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dmo("source_dmo") client.write_to_dmo("target_dmo", df, WriteMode.MERGE) - reader.read_dmo.assert_called_once_with("source_dmo", row_limit=None) + reader.read_dmo.assert_called_once_with("source_dmo") writer.write_to_dmo.assert_called_once_with( "target_dmo", mock_df, WriteMode.MERGE ) assert "source_dmo" in client._data_layer_history[DataCloudObjectType.DMO] - def test_read_dlo_with_row_limit(self, reset_client, mock_spark, mock_proxy): - """Test that row_limit parameter is passed through to reader.""" - reader = MagicMock(spec=BaseDataCloudReader) - writer = MagicMock(spec=BaseDataCloudWriter) - mock_df = MagicMock(spec=DataFrame) - reader.read_dlo.return_value = mock_df - - client = Client(reader=reader, writer=writer, proxy=mock_proxy) - result = client.read_dlo("test_dlo", row_limit=500) - - reader.read_dlo.assert_called_once_with("test_dlo", row_limit=500) - assert result is mock_df - assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] - - def test_read_dmo_with_row_limit(self, reset_client, mock_spark, mock_proxy): - """Test that row_limit parameter is passed through to reader.""" - reader = MagicMock(spec=BaseDataCloudReader) - writer = MagicMock(spec=BaseDataCloudWriter) - mock_df = MagicMock(spec=DataFrame) - reader.read_dmo.return_value = mock_df - - client = Client(reader=reader, writer=writer, proxy=mock_proxy) - result = client.read_dmo("test_dmo", row_limit=100) - - reader.read_dmo.assert_called_once_with("test_dmo", row_limit=100) - assert result is mock_df - assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] # Add tests for DefaultSparkSessionProvider From 392456a231c28e683501c1277f08670d763cbc40 Mon Sep 17 00:00:00 2001 From: Zach Maddox Date: Thu, 16 Apr 2026 17:19:34 -0400 Subject: [PATCH 3/4] fix lint errors --- tests/io/reader/test_query_api.py | 1 - tests/test_client.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/io/reader/test_query_api.py b/tests/io/reader/test_query_api.py index dc5d49b..33cb54d 100644 --- a/tests/io/reader/test_query_api.py +++ b/tests/io/reader/test_query_api.py @@ -389,4 +389,3 @@ def test_read_dmo_no_limit_when_deployed( mock_connection.get_pandas_dataframe.assert_called_once_with( SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dmo") ) - diff --git a/tests/test_client.py b/tests/test_client.py index 01a45bf..5981ca9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -261,7 +261,6 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): assert "source_dmo" in client._data_layer_history[DataCloudObjectType.DMO] - # Add tests for DefaultSparkSessionProvider class TestDefaultSparkSessionProvider: From 9f1e03cddb98354d418ed48c57c9b916daf934f0 Mon Sep 17 00:00:00 2001 From: Zach Maddox Date: Fri, 17 Apr 2026 10:55:24 -0400 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1928786..badde00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog +## 2.0.0 + +### Breaking Changes + +- **Removed the `row_limit` parameter from `read_dlo()` and `read_dmo()`.** + + These methods no longer accept a `row_limit` argument. When running locally, reads are automatically capped at 1000 rows to prevent accidentally fetching large datasets during development. When deployed to Data Cloud, no limit is applied and all records are returned. + + **Why:** The `row_limit` parameter duplicated PySpark's built-in `.limit()` and created a behavioral difference between local and deployed environments. The 1000-row safety net is now handled internally via the `default_row_limit` setting in `config.yaml`, and deployed environments naturally omit it. + + **Migration:** Remove any `row_limit` arguments from your `read_dlo()` and `read_dmo()` calls. If you need a specific number of rows, use PySpark's `.limit()` on the returned DataFrame: + + ```python + # Before + df = client.read_dlo("MyObject__dll", row_limit=500) + + # After + df = client.read_dlo("MyObject__dll").limit(500) + ``` + ## 1.0.0 ### Breaking Changes