diff --git a/packages/data-designer-config/src/data_designer/config/run_config.py b/packages/data-designer-config/src/data_designer/config/run_config.py index ea3393b26..502410ab1 100644 --- a/packages/data-designer-config/src/data_designer/config/run_config.py +++ b/packages/data-designer-config/src/data_designer/config/run_config.py @@ -147,6 +147,9 @@ class RunConfig(ConfigBase): Default is False. progress_interval: How often (in seconds) the async progress reporter emits a consolidated log block. Must be > 0. Default is 5.0. + preserve_dropped_columns: If True, write columns removed by drop processors to + separate dropped-column parquet files. Set to False to omit those artifacts + while still removing dropped columns from the final dataset. Default is True. jinja_rendering_engine: Template renderer used for engine-side Jinja evaluation. ``native`` uses Jinja2's built-in sandbox with the standard filter set and fewer Data Designer-specific restrictions. ``secure`` uses Data Designer's @@ -171,6 +174,12 @@ class RunConfig(ConfigBase): async_trace: bool = False progress_bar: bool = False progress_interval: float = Field(default=5.0, gt=0.0) + preserve_dropped_columns: bool = Field( + default=True, + description=( + "Whether columns removed by drop processors are preserved in separate dropped-column parquet files." + ), + ) jinja_rendering_engine: JinjaRenderingEngine = Field( default=JinjaRenderingEngine.SECURE, description=( diff --git a/packages/data-designer-config/tests/config/test_run_config.py b/packages/data-designer-config/tests/config/test_run_config.py index 9d216025c..1d6efd9c1 100644 --- a/packages/data-designer-config/tests/config/test_run_config.py +++ b/packages/data-designer-config/tests/config/test_run_config.py @@ -24,6 +24,15 @@ def test_run_config_accepts_native_renderer() -> None: assert JinjaRenderingEngine(run_config.jinja_rendering_engine) == JinjaRenderingEngine.NATIVE +def test_run_config_preserves_dropped_columns_by_default() -> None: + assert RunConfig().preserve_dropped_columns is True + + +def test_run_config_accepts_disabled_dropped_column_preservation() -> None: + run_config = RunConfig(preserve_dropped_columns=False) + assert run_config.preserve_dropped_columns is False + + def test_run_config_throttle_shim_rejects_unknown_legacy_fields() -> None: with pytest.raises(ValidationError, match="max_concurrent_requests"): RunConfig(throttle={"max_concurrent_requests": 1}) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 8ce6c0cde..3abed6136 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -97,6 +97,7 @@ _CLIENT_VERSION: str = get_library_version() +PRESERVE_DROPPED_COLUMNS_METADATA_KEY = "preserve_dropped_columns" def _is_async_trace_enabled(settings: RunConfig) -> bool: @@ -272,7 +273,8 @@ def build( compat = self._check_resume_config_compatibility() if resume == ResumeMode.ALWAYS and compat == _ConfigCompatibility.INCOMPATIBLE: raise DatasetGenerationError( - "🛑 Cannot resume: the current config does not match the config used in the interrupted run. " + "🛑 Cannot resume: the current config or dropped-column artifact policy does not match the " + "config used in the interrupted run. " "Use resume=ResumeMode.IF_POSSIBLE to start fresh automatically, or " "resume=ResumeMode.NEVER to force a new run." ) @@ -374,7 +376,12 @@ def build( def _set_metadata_defaults(self) -> None: """Attach config identity fields to every metadata write in this build.""" - self.artifact_storage.set_metadata_defaults(self._data_designer_config.fingerprint()) + self.artifact_storage.set_metadata_defaults( + { + **self._data_designer_config.fingerprint(), + PRESERVE_DROPPED_COLUMNS_METADATA_KEY: self._resource_provider.run_config.preserve_dropped_columns, + } + ) def _has_allow_resize_columns(self) -> bool: return any(getattr(config, "allow_resize", False) for config in self.single_column_configs) @@ -504,6 +511,14 @@ def _load_resume_state(self, num_records: int, buffer_size: int) -> _ResumeState "or start a new run without resume=ResumeMode.ALWAYS." ) + if not self._dropped_column_artifact_policy_matches(metadata): + raise DatasetGenerationError( + "🛑 Cannot resume: preserve_dropped_columns=" + f"{self._resource_provider.run_config.preserve_dropped_columns} does not match the original " + "run's dropped-column artifact policy. Start a fresh run with resume=ResumeMode.NEVER, or " + "use resume=ResumeMode.IF_POSSIBLE to start fresh automatically when the policy differs." + ) + return _ResumeState( num_completed_batches=num_completed_batches, actual_num_records=actual_num_records, @@ -751,6 +766,9 @@ def _check_resume_config_compatibility(self) -> _ConfigCompatibility: ) return _ConfigCompatibility.INCOMPATIBLE + if not self._dropped_column_artifact_policy_matches(metadata): + return _ConfigCompatibility.INCOMPATIBLE + stored_hash = metadata.get("config_hash") stored_version = metadata.get("config_hash_version") if stored_hash is not None: @@ -790,6 +808,24 @@ def _check_resume_config_compatibility(self) -> _ConfigCompatibility: ) return _ConfigCompatibility.COMPATIBLE + def _dropped_column_artifact_policy_matches(self, metadata: dict[str, Any]) -> bool: + """Return whether stored dropped-column artifact behavior matches this run. + + Metadata written before this RunConfig option existed implicitly used the + historical behavior, which preserved dropped-column artifacts. + """ + stored = metadata.get(PRESERVE_DROPPED_COLUMNS_METADATA_KEY, True) + current = self._resource_provider.run_config.preserve_dropped_columns + if stored != current: + logger.warning( + "⚠️ preserve_dropped_columns changed from %s to %s; treating the existing dataset as " + "incompatible for resume because dropped-column parquet artifacts would be inconsistent.", + stored, + current, + ) + return False + return True + def _build_async( self, generators: list[ColumnGenerator], diff --git a/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py b/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py index c2b0a91a6..44b646c95 100644 --- a/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py +++ b/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py @@ -35,7 +35,7 @@ def _resolve_columns(self, available: pd.Index) -> list[str]: def process_after_batch(self, data: pd.DataFrame, *, current_batch_number: int | None) -> pd.DataFrame: logger.info(f"🙈 Dropping columns: {self.config.column_names}") resolved = self._resolve_columns(data.columns) - if current_batch_number is not None: + if current_batch_number is not None and self.resource_provider.run_config.preserve_dropped_columns: self._save_dropped_columns(data, resolved, current_batch_number) if resolved: data.drop(columns=resolved, inplace=True) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index 0a68a84db..5aea97420 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -1690,6 +1690,67 @@ def test_build_resume_raises_on_buffer_size_mismatch(stub_resource_provider, stu builder.build(num_records=4, resume=ResumeMode.ALWAYS) +def test_build_resume_always_raises_on_dropped_column_artifact_policy_mismatch( + stub_resource_provider, + stub_test_config_builder, + tmp_path, + caplog, +): + """resume=ALWAYS rejects runs that would mix dropped-column artifact policies.""" + dataset_dir = tmp_path / "dataset" + _write_metadata( + dataset_dir, + target_num_records=4, + buffer_size=2, + num_completed_batches=1, + actual_num_records=2, + preserve_dropped_columns=True, + ) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + stub_resource_provider.run_config = RunConfig(buffer_size=2, preserve_dropped_columns=False) + + with caplog.at_level(logging.WARNING): + with pytest.raises(DatasetGenerationError, match="does not match the config used"): + builder.build(num_records=4, resume=ResumeMode.ALWAYS) + + assert any("preserve_dropped_columns changed from True to False" in record.message for record in caplog.records) + + +def test_build_if_possible_starts_fresh_on_dropped_column_artifact_policy_mismatch( + stub_resource_provider, + stub_test_config_builder, + tmp_path, +): + """resume=IF_POSSIBLE starts fresh when dropped-column artifact policy differs.""" + dataset_dir = tmp_path / "dataset" + _write_metadata( + dataset_dir, + target_num_records=4, + buffer_size=2, + num_completed_batches=1, + actual_num_records=2, + preserve_dropped_columns=True, + ) + + storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) + stub_resource_provider.artifact_storage = storage + stub_resource_provider.run_config = RunConfig(buffer_size=2, preserve_dropped_columns=False) + builder = DatasetBuilder( + data_designer_config=stub_test_config_builder.build(), + resource_provider=stub_resource_provider, + ) + + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder, "_run_batch"): + with patch.object(builder.batch_manager, "finish"): + final_path = builder.build(num_records=4, resume=ResumeMode.IF_POSSIBLE) + + assert storage.resume == ResumeMode.NEVER + assert (dataset_dir / "sentinel.txt").exists() + assert final_path != dataset_dir / "parquet-files" + + def test_build_resume_raises_on_corrupt_metadata(stub_resource_provider, stub_test_config_builder, tmp_path): """resume=ALWAYS raises clearly when metadata.json was partially written.""" dataset_dir = tmp_path / "dataset" diff --git a/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py b/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py index cfa620752..220771260 100644 --- a/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py +++ b/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py @@ -9,6 +9,7 @@ import data_designer.lazy_heavy_imports as lazy from data_designer.config.processors import DropColumnsProcessorConfig +from data_designer.config.run_config import RunConfig from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor from data_designer.engine.storage.artifact_storage import BatchStage @@ -24,6 +25,7 @@ def stub_processor(stub_processor_config): mock_resource_provider.artifact_storage = Mock() mock_resource_provider.artifact_storage.create_batch_file_path = Mock() mock_resource_provider.artifact_storage.create_batch_file_path.return_value.name = "dropped.parquet" + mock_resource_provider.run_config = RunConfig() processor = DropColumnsProcessor( config=stub_processor_config, resource_provider=mock_resource_provider, @@ -181,3 +183,15 @@ def test_process_after_batch_preview_mode_does_not_save(stub_processor, stub_sam # But no file should be written stub_processor.artifact_storage.write_parquet_file.assert_not_called() + + +def test_process_after_batch_does_not_save_when_preservation_disabled(stub_processor, stub_sample_dataframe): + stub_processor.config.column_names = ["col1", "col2"] + stub_processor.resource_provider.run_config = RunConfig(preserve_dropped_columns=False) + + result = stub_processor.process_after_batch(stub_sample_dataframe.copy(), current_batch_number=0) + + assert "col1" not in result.columns + assert "col2" not in result.columns + assert "col3" in result.columns + stub_processor.artifact_storage.write_parquet_file.assert_not_called() diff --git a/packages/data-designer/tests/interface/test_data_designer.py b/packages/data-designer/tests/interface/test_data_designer.py index fe88509d6..7a0db8052 100644 --- a/packages/data-designer/tests/interface/test_data_designer.py +++ b/packages/data-designer/tests/interface/test_data_designer.py @@ -863,6 +863,89 @@ def test_create_dataset_e2e_using_only_sampler_columns( analysis.to_report() +def test_create_with_drop_true_can_skip_dropped_column_artifacts( + stub_artifact_path, + stub_model_providers, + stub_model_configs, + stub_managed_assets_path, +): + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.add_column( + SamplerColumnConfig( + name="uuid", + sampler_type="uuid", + params={"prefix": "id_", "short_form": True, "uppercase": False}, + ) + ) + config_builder.add_column( + SamplerColumnConfig( + name="hidden_category", + sampler_type="category", + params={"values": ["private"]}, + drop=True, + ) + ) + + data_designer = DataDesigner( + artifact_path=stub_artifact_path, + model_providers=stub_model_providers, + secret_resolver=PlaintextResolver(), + managed_assets_path=stub_managed_assets_path, + ) + data_designer.set_run_config(RunConfig(preserve_dropped_columns=False)) + + results = data_designer.create(config_builder, num_records=3) + + df = results.load_dataset() + assert "uuid" in df.columns + assert "hidden_category" not in df.columns + assert not results.artifact_storage.dropped_columns_dataset_path.exists() + metadata = json.loads(results.artifact_storage.metadata_file_path.read_text()) + assert metadata["preserve_dropped_columns"] is False + + +def test_create_with_drop_true_preserves_columns_only_in_dropped_artifacts( + stub_artifact_path, + stub_model_providers, + stub_model_configs, + stub_managed_assets_path, +): + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.add_column( + SamplerColumnConfig( + name="uuid", + sampler_type="uuid", + params={"prefix": "id_", "short_form": True, "uppercase": False}, + ) + ) + config_builder.add_column( + SamplerColumnConfig( + name="hidden_category", + sampler_type="category", + params={"values": ["private"]}, + drop=True, + ) + ) + + data_designer = DataDesigner( + artifact_path=stub_artifact_path, + model_providers=stub_model_providers, + secret_resolver=PlaintextResolver(), + managed_assets_path=stub_managed_assets_path, + ) + + results = data_designer.create(config_builder, num_records=3) + + main_df = results.load_dataset() + dropped_df = lazy.pd.read_parquet(results.artifact_storage.dropped_columns_dataset_path) + assert "uuid" in main_df.columns + assert "hidden_category" not in main_df.columns + assert "hidden_category" in dropped_df.columns + assert "uuid" not in dropped_df.columns + metadata = json.loads(results.artifact_storage.metadata_file_path.read_text()) + assert metadata["preserve_dropped_columns"] is True + + def test_create_raises_error_when_builder_fails( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ):