diff --git a/CHANGELOG.md b/CHANGELOG.md index 1928786..badde00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog +## 2.0.0 + +### Breaking Changes + +- **Removed the `row_limit` parameter from `read_dlo()` and `read_dmo()`.** + + These methods no longer accept a `row_limit` argument. When running locally, reads are automatically capped at 1000 rows to prevent accidentally fetching large datasets during development. When deployed to Data Cloud, no limit is applied and all records are returned. + + **Why:** The `row_limit` parameter duplicated PySpark's built-in `.limit()` and created a behavioral difference between local and deployed environments. The 1000-row safety net is now handled internally via the `default_row_limit` setting in `config.yaml`, and deployed environments naturally omit it. + + **Migration:** Remove any `row_limit` arguments from your `read_dlo()` and `read_dmo()` calls. If you need a specific number of rows, use PySpark's `.limit()` on the returned DataFrame: + + ```python + # Before + df = client.read_dlo("MyObject__dll", row_limit=500) + + # After + df = client.read_dlo("MyObject__dll").limit(500) + ``` + ## 1.0.0 ### Breaking Changes diff --git a/src/datacustomcode/client.py b/src/datacustomcode/client.py index 80f20a8..01aed31 100644 --- a/src/datacustomcode/client.py +++ b/src/datacustomcode/client.py @@ -185,31 +185,29 @@ def _new_function_client(cls) -> Client: ) return cls._instance - def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: + def read_dlo(self, name: str) -> PySparkDataFrame: """Read a DLO from Data Cloud. Args: name: The name of the DLO to read. - row_limit: Maximum number of rows to fetch (default: 1000). Returns: A PySpark DataFrame containing the DLO data. """ self._record_dlo_access(name) - return self._reader.read_dlo(name, row_limit=row_limit) + return self._reader.read_dlo(name) - def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame: + def read_dmo(self, name: str) -> PySparkDataFrame: """Read a DMO from Data Cloud. Args: name: The name of the DMO to read. - row_limit: Maximum number of rows to fetch (default: 1000). Returns: A PySpark DataFrame containing the DMO data. """ self._record_dmo_access(name) - return self._reader.read_dmo(name, row_limit=row_limit) + return self._reader.read_dmo(name) def write_to_dlo( self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs diff --git a/src/datacustomcode/config.yaml b/src/datacustomcode/config.yaml index 0267b6f..bf21209 100644 --- a/src/datacustomcode/config.yaml +++ b/src/datacustomcode/config.yaml @@ -2,6 +2,7 @@ reader_config: type_config_name: QueryAPIDataCloudReader options: credentials_profile: default + default_row_limit: 1000 writer_config: type_config_name: PrintDataCloudWriter diff --git a/src/datacustomcode/io/reader/base.py b/src/datacustomcode/io/reader/base.py index f5e69f3..ddea31b 100644 --- a/src/datacustomcode/io/reader/base.py +++ b/src/datacustomcode/io/reader/base.py @@ -33,7 +33,6 @@ def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, ) -> PySparkDataFrame: ... @abstractmethod @@ -41,5 +40,4 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, ) -> PySparkDataFrame: ... diff --git a/src/datacustomcode/io/reader/query_api.py b/src/datacustomcode/io/reader/query_api.py index 98d2596..dae6ce0 100644 --- a/src/datacustomcode/io/reader/query_api.py +++ b/src/datacustomcode/io/reader/query_api.py @@ -37,6 +37,7 @@ SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}" +SQL_QUERY_TEMPLATE_NO_LIMIT: Final = "SELECT * FROM {}" def create_cdp_connection( @@ -122,6 +123,7 @@ def __init__( credentials_profile: str = "default", dataspace: Optional[str] = None, sf_cli_org: Optional[str] = None, + default_row_limit: Optional[int] = None, ) -> None: """Initialize QueryAPIDataCloudReader. @@ -137,8 +139,12 @@ def __init__( reader delegates to :class:`SFCLIDataCloudReader` which calls the Data Cloud REST API directly using the token obtained from ``sf org display``, bypassing the CDP token-exchange flow. + default_row_limit: Maximum number of rows to fetch automatically. + When ``None``, no limit is applied (all rows are returned). + Set via ``default_row_limit`` in ``config.yaml`` reader options. """ self.spark = spark + self._default_row_limit = default_row_limit if sf_cli_org: logger.debug( f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'" @@ -147,6 +153,7 @@ def __init__( spark=spark, sf_cli_org=sf_cli_org, dataspace=dataspace, + default_row_limit=default_row_limit, ) self._conn = None else: @@ -158,19 +165,30 @@ def __init__( ) self._conn = create_cdp_connection(credentials, dataspace) + def _build_query(self, name: str) -> str: + """Build a SQL query, applying the configured default row limit. + + Args: + name: Object name to query. + + Returns: + SQL query string. + """ + if self._default_row_limit is not None: + return SQL_QUERY_TEMPLATE.format(name, self._default_row_limit) + return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name) + def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, ) -> PySparkDataFrame: """ - Read a Data Lake Object (DLO) from the Data Cloud, limited to a number of rows. + Read a Data Lake Object (DLO) from the Data Cloud. Args: name (str): The name of the DLO. schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO. - row_limit (int): Maximum number of rows to fetch. Returns: PySparkDataFrame: The PySpark DataFrame. @@ -179,9 +197,9 @@ def read_dlo( self, "_sf_cli_reader", None ) if sf_cli_reader is not None: - return sf_cli_reader.read_dlo(name, schema, row_limit) + return sf_cli_reader.read_dlo(name, schema) - query = SQL_QUERY_TEMPLATE.format(name, row_limit) + query = self._build_query(name) assert self._conn is not None pandas_df = self._conn.get_pandas_dataframe(query) @@ -197,15 +215,13 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, ) -> PySparkDataFrame: """ - Read a Data Model Object (DMO) from the Data Cloud, limited to a number of rows. + Read a Data Model Object (DMO) from the Data Cloud. Args: name (str): The name of the DMO. schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO. - row_limit (int): Maximum number of rows to fetch. Returns: PySparkDataFrame: The PySpark DataFrame. @@ -214,9 +230,9 @@ def read_dmo( self, "_sf_cli_reader", None ) if sf_cli_reader is not None: - return sf_cli_reader.read_dmo(name, schema, row_limit) + return sf_cli_reader.read_dmo(name, schema) - query = SQL_QUERY_TEMPLATE.format(name, row_limit) + query = self._build_query(name) assert self._conn is not None pandas_df = self._conn.get_pandas_dataframe(query) diff --git a/src/datacustomcode/io/reader/sf_cli.py b/src/datacustomcode/io/reader/sf_cli.py index 49a5838..adfb3ed 100644 --- a/src/datacustomcode/io/reader/sf_cli.py +++ b/src/datacustomcode/io/reader/sf_cli.py @@ -55,6 +55,7 @@ def __init__( spark: SparkSession, sf_cli_org: str, dataspace: Optional[str] = None, + default_row_limit: Optional[int] = None, ) -> None: """Initialize SFCLIDataCloudReader. @@ -64,9 +65,13 @@ def __init__( (e.g. the alias given to ``sf org login web --alias dev1``). dataspace: Optional dataspace identifier. If ``None`` or ``"default"`` the query runs against the default dataspace. + default_row_limit: Maximum number of rows to fetch automatically. + When ``None``, no limit is applied (all rows are returned). + Set via ``default_row_limit`` in ``config.yaml`` reader options. """ self.spark = spark self.sf_cli_org = sf_cli_org + self._default_row_limit = default_row_limit self.dataspace = ( dataspace if dataspace and dataspace != "default" else "default" ) @@ -132,12 +137,14 @@ def _get_token(self) -> tuple[str, str]: logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'") return access_token, instance_url - def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame: + def _execute_query(self, sql: str) -> pd.DataFrame: """Execute *sql* against the Data Cloud REST endpoint. + The configured ``default_row_limit`` is automatically appended as a + ``LIMIT`` clause when set (typically for local development). + Args: sql: Base SQL query (no ``LIMIT`` clause). - row_limit: Maximum rows to return. Returns: Pandas DataFrame with query results. @@ -150,7 +157,10 @@ def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame: url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql" headers = {"Authorization": f"Bearer {access_token}"} params = {"dataspace": self.dataspace} - body = {"sql": f"{sql} LIMIT {row_limit}"} + if self._default_row_limit is not None: + body = {"sql": f"{sql} LIMIT {self._default_row_limit}"} + else: + body = {"sql": sql} logger.debug(f"Executing Data Cloud query: {body['sql']}") @@ -190,19 +200,17 @@ def read_dlo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, ) -> PySparkDataFrame: """Read a Data Lake Object (DLO) from Data Cloud. Args: name: DLO name. schema: Optional explicit schema. - row_limit: Maximum rows to fetch. Returns: PySpark DataFrame. """ - pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit) + pandas_df = self._execute_query(f"SELECT * FROM {name}") if not schema: schema = _pandas_to_spark_schema(pandas_df) return self.spark.createDataFrame(pandas_df, schema) @@ -211,19 +219,17 @@ def read_dmo( self, name: str, schema: Union[AtomicType, StructType, str, None] = None, - row_limit: int = 1000, ) -> PySparkDataFrame: """Read a Data Model Object (DMO) from Data Cloud. Args: name: DMO name. schema: Optional explicit schema. - row_limit: Maximum rows to fetch. Returns: PySpark DataFrame. """ - pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit) + pandas_df = self._execute_query(f"SELECT * FROM {name}") if not schema: schema = _pandas_to_spark_schema(pandas_df) return self.spark.createDataFrame(pandas_df, schema) diff --git a/src/datacustomcode/io/writer/print.py b/src/datacustomcode/io/writer/print.py index 5645f7a..2fcec24 100644 --- a/src/datacustomcode/io/writer/print.py +++ b/src/datacustomcode/io/writer/print.py @@ -90,7 +90,7 @@ def validate_dataframe_columns_against_dlo( schema. """ # Get DLO schema (no data, just schema) - dlo_df = self.reader.read_dlo(dlo_name, row_limit=0) + dlo_df = self.reader.read_dlo(dlo_name).limit(0) dlo_columns = set(dlo_df.columns) df_columns = set(dataframe.columns) diff --git a/tests/io/reader/test_query_api.py b/tests/io/reader/test_query_api.py index 8e2e77a..33cb54d 100644 --- a/tests/io/reader/test_query_api.py +++ b/tests/io/reader/test_query_api.py @@ -20,6 +20,7 @@ from datacustomcode.io.reader.query_api import ( SQL_QUERY_TEMPLATE, + SQL_QUERY_TEMPLATE_NO_LIMIT, QueryAPIDataCloudReader, ) from datacustomcode.io.reader.utils import _pandas_to_spark_schema @@ -188,6 +189,7 @@ def reader_without_init(self, mock_spark_session): with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None): reader = QueryAPIDataCloudReader(None) # None is ignored due to mock reader.spark = mock_spark_session + reader._default_row_limit = 1000 yield reader def test_pandas_to_spark_schema_function(self): @@ -294,30 +296,6 @@ def test_read_dmo_with_schema( args, _ = reader_without_init.spark.createDataFrame.call_args assert args[1] is custom_schema - def test_read_dlo_with_custom_row_limit( - self, reader_without_init, mock_connection, mock_pandas_dataframe - ): - """Test read_dlo method with custom row_limit.""" - reader_without_init._conn = mock_connection - - reader_without_init.read_dlo("test_dlo", row_limit=50) - - mock_connection.get_pandas_dataframe.assert_called_once_with( - SQL_QUERY_TEMPLATE.format("test_dlo", 50) - ) - - def test_read_dmo_with_custom_row_limit( - self, reader_without_init, mock_connection, mock_pandas_dataframe - ): - """Test read_dmo method with custom row_limit.""" - reader_without_init._conn = mock_connection - - reader_without_init.read_dmo("test_dmo", row_limit=25) - - mock_connection.get_pandas_dataframe.assert_called_once_with( - SQL_QUERY_TEMPLATE.format("test_dmo", 25) - ) - def test_read_dlo_schema_is_lowercase( self, reader_without_init, mock_connection, mock_pandas_dataframe ): @@ -341,3 +319,73 @@ def test_read_dmo_schema_is_lowercase( _, schema_arg = reader_without_init.spark.createDataFrame.call_args[0] assert all(f.name == f.name.lower() for f in schema_arg.fields) + + +@pytest.mark.usefixtures("patch_all_requests") +class TestQueryAPIDataCloudReaderNoDefaultLimit: + """Tests for deployed behavior where default_row_limit is None (no limit).""" + + @pytest.fixture(scope="class", autouse=True) + def patch_all_requests(self, request): + patches = [] + for target in [ + "requests.get", + "requests.post", + "requests.session", + "requests.adapters.HTTPAdapter.send", + "urllib3.connectionpool.HTTPConnectionPool.urlopen", + ]: + patcher = patch(target) + patches.append(patcher) + patcher.start() + + def fin(): + for patcher in patches: + patcher.stop() + + request.addfinalizer(fin) + + @pytest.fixture + def mock_spark_session(self): + spark = MagicMock() + spark.createDataFrame.return_value = spark + return spark + + @pytest.fixture + def mock_pandas_dataframe(self): + return pd.DataFrame({"Col1__c": [1, 2], "Col2__c": ["a", "b"]}) + + @pytest.fixture + def mock_connection(self, mock_pandas_dataframe): + mock_conn = MagicMock() + mock_conn.get_pandas_dataframe.return_value = mock_pandas_dataframe + return mock_conn + + @pytest.fixture + def reader_no_limit(self, mock_spark_session): + """Reader with no default row limit (simulates deployed environment).""" + with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None): + reader = QueryAPIDataCloudReader(None) + reader.spark = mock_spark_session + reader._default_row_limit = None + yield reader + + def test_read_dlo_no_limit_when_deployed( + self, reader_no_limit, mock_connection, mock_pandas_dataframe + ): + """When default_row_limit is None and no explicit row_limit, omit LIMIT.""" + reader_no_limit._conn = mock_connection + reader_no_limit.read_dlo("test_dlo") + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dlo") + ) + + def test_read_dmo_no_limit_when_deployed( + self, reader_no_limit, mock_connection, mock_pandas_dataframe + ): + """When default_row_limit is None and no explicit row_limit, omit LIMIT.""" + reader_no_limit._conn = mock_connection + reader_no_limit.read_dmo("test_dmo") + mock_connection.get_pandas_dataframe.assert_called_once_with( + SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dmo") + ) diff --git a/tests/io/reader/test_sf_cli.py b/tests/io/reader/test_sf_cli.py index a9e4dff..86f62ef 100644 --- a/tests/io/reader/test_sf_cli.py +++ b/tests/io/reader/test_sf_cli.py @@ -15,11 +15,18 @@ def _make_reader( - sf_cli_org: str = "dev1", dataspace: str | None = None + sf_cli_org: str = "dev1", + dataspace: str | None = None, + default_row_limit: int | None = 1000, ) -> SFCLIDataCloudReader: spark = MagicMock() spark.createDataFrame.return_value = MagicMock() - return SFCLIDataCloudReader(spark=spark, sf_cli_org=sf_cli_org, dataspace=dataspace) + return SFCLIDataCloudReader( + spark=spark, + sf_cli_org=sf_cli_org, + dataspace=dataspace, + default_row_limit=default_row_limit, + ) def _sf_display_output( @@ -190,7 +197,7 @@ def test_posts_to_correct_url(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 100) + reader._execute_query("SELECT * FROM foo") url = mock_post.call_args[0][0] assert ( @@ -203,7 +210,7 @@ def test_passes_bearer_token_header(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") headers = mock_post.call_args.kwargs["headers"] assert headers["Authorization"] == "Bearer mytoken" @@ -213,20 +220,21 @@ def test_passes_dataspace_param(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") params = mock_post.call_args.kwargs["params"] assert params["dataspace"] == "default" - def test_appends_limit_to_sql(self, reader, mock_token): + def test_appends_default_limit_to_sql(self, reader, mock_token): + """default_row_limit (1000) is automatically appended as LIMIT.""" api_response = {"metadata": [], "data": []} with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ) as mock_post: - reader._execute_query("SELECT * FROM foo", 42) + reader._execute_query("SELECT * FROM foo") body = mock_post.call_args.kwargs["json"] - assert body["sql"] == "SELECT * FROM foo LIMIT 42" + assert body["sql"] == "SELECT * FROM foo LIMIT 1000" def test_returns_dataframe_with_rows(self, reader, mock_token): api_response = { @@ -236,7 +244,7 @@ def test_returns_dataframe_with_rows(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ): - df = reader._execute_query("SELECT * FROM foo", 100) + df = reader._execute_query("SELECT * FROM foo") assert list(df.columns) == ["id", "name"] assert len(df) == 2 @@ -246,7 +254,7 @@ def test_returns_empty_dataframe_when_no_rows(self, reader, mock_token): with patch( "requests.post", return_value=self._mock_response(json_body=api_response) ): - df = reader._execute_query("SELECT * FROM foo", 100) + df = reader._execute_query("SELECT * FROM foo") assert list(df.columns) == ["id", "name"] assert len(df) == 0 @@ -257,7 +265,7 @@ def test_http_error_raises_runtime_error(self, reader, mock_token): return_value=self._mock_response(status_code=401, text="Unauthorized"), ): with pytest.raises(RuntimeError, match="HTTP 401"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_http_error_uses_json_message_when_available(self, reader, mock_token): error_body = [{"message": "insufficient privileges"}] @@ -265,14 +273,14 @@ def test_http_error_uses_json_message_when_available(self, reader, mock_token): response.json.return_value = error_body with patch("requests.post", return_value=response): with pytest.raises(RuntimeError, match="insufficient privileges"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_http_error_falls_back_to_text_when_json_not_list(self, reader, mock_token): response = self._mock_response(status_code=500, text="Internal Server Error") response.json.return_value = {"error": "oops"} # dict, not list with patch("requests.post", return_value=response): with pytest.raises(RuntimeError, match="Internal Server Error"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_request_exception_raises_runtime_error(self, reader, mock_token): import requests as req_lib @@ -281,7 +289,7 @@ def test_request_exception_raises_runtime_error(self, reader, mock_token): "requests.post", side_effect=req_lib.RequestException("connection refused") ): with pytest.raises(RuntimeError, match="Data Cloud query request failed"): - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") def test_custom_dataspace_passed_as_param(self): reader = _make_reader(dataspace="myspace") @@ -293,7 +301,7 @@ def test_custom_dataspace_passed_as_param(self): "requests.post", return_value=self._mock_response(json_body=api_response), ) as mock_post: - reader._execute_query("SELECT * FROM foo", 10) + reader._execute_query("SELECT * FROM foo") params = mock_post.call_args.kwargs["params"] assert params["dataspace"] == "myspace" @@ -327,17 +335,7 @@ def test_executes_select_star_query(self, reader, sample_df, method, obj_name): ) as mock_exec: getattr(reader, method)(obj_name) - mock_exec.assert_called_once_with(f"SELECT * FROM {obj_name}", 1000) - - @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) - def test_custom_row_limit(self, reader, sample_df, method): - with patch.object( - reader, "_execute_query", return_value=sample_df - ) as mock_exec: - getattr(reader, method)("SomeObj", row_limit=50) - - _, row_limit_arg = mock_exec.call_args[0] - assert row_limit_arg == 50 + mock_exec.assert_called_once_with(f"SELECT * FROM {obj_name}") @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) def test_auto_infers_schema_when_none_given(self, reader, sample_df, method): @@ -391,3 +389,44 @@ def test_returns_spark_dataframe(self, reader, sample_df, method): result = getattr(reader, method)("SomeObj") assert result is expected + + +# --------------------------------------------------------------------------- +# No default row limit (deployed environment) +# --------------------------------------------------------------------------- + + +class TestSFCLINoDefaultRowLimit: + """Tests for deployed behavior where default_row_limit is None.""" + + @pytest.fixture + def reader(self): + return _make_reader(default_row_limit=None) + + @pytest.fixture + def mock_token(self, reader): + with patch.object( + reader, "_get_token", return_value=("tok", "https://org.salesforce.com") + ): + yield + + def _mock_response( + self, status_code: int = 200, json_body: dict | None = None, text: str = "" + ) -> MagicMock: + response = MagicMock() + response.status_code = status_code + response.text = text + response.json.return_value = json_body or {} + return response + + def test_execute_query_omits_limit_when_no_default(self, reader, mock_token): + """When default_row_limit is None, no LIMIT clause is appended.""" + api_response = {"metadata": [{"name": "col"}], "data": [["v"]]} + with patch( + "requests.post", + return_value=self._mock_response(json_body=api_response), + ) as mock_post: + reader._execute_query("SELECT * FROM foo") + + body = mock_post.call_args.kwargs["json"] + assert body["sql"] == "SELECT * FROM foo" diff --git a/tests/io/writer/test_print.py b/tests/io/writer/test_print.py index 9245daa..bd5b1f0 100644 --- a/tests/io/writer/test_print.py +++ b/tests/io/writer/test_print.py @@ -28,6 +28,7 @@ def mock_reader(self): reader = MagicMock() mock_dlo_df = MagicMock() mock_dlo_df.columns = ["col1", "col2"] + mock_dlo_df.limit.return_value = mock_dlo_df reader.read_dlo.return_value = mock_dlo_df return reader diff --git a/tests/test_client.py b/tests/test_client.py index 40a52bb..5981ca9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,11 +25,11 @@ class MockDataCloudReader(BaseDataCloudReader): CONFIG_NAME = "MockDataCloudReader" - def read_dlo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame: + def read_dlo(self, name: str, schema=None) -> DataFrame: df = MagicMock(spec=DataFrame) return df - def read_dmo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame: + def read_dmo(self, name: str, schema=None) -> DataFrame: df = MagicMock(spec=DataFrame) return df @@ -153,7 +153,7 @@ def test_read_dlo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dlo("test_dlo") - reader.read_dlo.assert_called_once_with("test_dlo", row_limit=1000) + reader.read_dlo.assert_called_once_with("test_dlo") assert result is mock_df assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] @@ -166,7 +166,7 @@ def test_read_dmo(self, reset_client, mock_spark, mock_proxy): client = Client(reader=reader, writer=writer, proxy=mock_proxy) result = client.read_dmo("test_dmo") - reader.read_dmo.assert_called_once_with("test_dmo", row_limit=1000) + reader.read_dmo.assert_called_once_with("test_dmo") assert result is mock_df assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] @@ -238,7 +238,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dlo("source_dlo") client.write_to_dlo("target_dlo", df, WriteMode.APPEND) - reader.read_dlo.assert_called_once_with("source_dlo", row_limit=1000) + reader.read_dlo.assert_called_once_with("source_dlo") writer.write_to_dlo.assert_called_once_with( "target_dlo", mock_df, WriteMode.APPEND ) @@ -253,41 +253,13 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy): df = client.read_dmo("source_dmo") client.write_to_dmo("target_dmo", df, WriteMode.MERGE) - reader.read_dmo.assert_called_once_with("source_dmo", row_limit=1000) + reader.read_dmo.assert_called_once_with("source_dmo") writer.write_to_dmo.assert_called_once_with( "target_dmo", mock_df, WriteMode.MERGE ) assert "source_dmo" in client._data_layer_history[DataCloudObjectType.DMO] - def test_read_dlo_with_row_limit(self, reset_client, mock_spark, mock_proxy): - """Test that row_limit parameter is passed through to reader.""" - reader = MagicMock(spec=BaseDataCloudReader) - writer = MagicMock(spec=BaseDataCloudWriter) - mock_df = MagicMock(spec=DataFrame) - reader.read_dlo.return_value = mock_df - - client = Client(reader=reader, writer=writer, proxy=mock_proxy) - result = client.read_dlo("test_dlo", row_limit=500) - - reader.read_dlo.assert_called_once_with("test_dlo", row_limit=500) - assert result is mock_df - assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO] - - def test_read_dmo_with_row_limit(self, reset_client, mock_spark, mock_proxy): - """Test that row_limit parameter is passed through to reader.""" - reader = MagicMock(spec=BaseDataCloudReader) - writer = MagicMock(spec=BaseDataCloudWriter) - mock_df = MagicMock(spec=DataFrame) - reader.read_dmo.return_value = mock_df - - client = Client(reader=reader, writer=writer, proxy=mock_proxy) - result = client.read_dmo("test_dmo", row_limit=100) - - reader.read_dmo.assert_called_once_with("test_dmo", row_limit=100) - assert result is mock_df - assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO] - # Add tests for DefaultSparkSessionProvider class TestDefaultSparkSessionProvider: