Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class RunConfig(ConfigBase):
Default is False.
progress_interval: How often (in seconds) the async progress reporter emits a
consolidated log block. Must be > 0. Default is 5.0.
preserve_dropped_columns: If True, write columns removed by drop processors to
separate dropped-column parquet files. Set to False to omit those artifacts
while still removing dropped columns from the final dataset. Default is True.
jinja_rendering_engine: Template renderer used for engine-side Jinja evaluation.
``native`` uses Jinja2's built-in sandbox with the standard filter set and
fewer Data Designer-specific restrictions. ``secure`` uses Data Designer's
Expand All @@ -171,6 +174,12 @@ class RunConfig(ConfigBase):
async_trace: bool = False
progress_bar: bool = False
progress_interval: float = Field(default=5.0, gt=0.0)
preserve_dropped_columns: bool = Field(
default=True,
description=(
"Whether columns removed by drop processors are preserved in separate dropped-column parquet files."
),
)
jinja_rendering_engine: JinjaRenderingEngine = Field(
default=JinjaRenderingEngine.SECURE,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ def test_run_config_accepts_native_renderer() -> None:
assert JinjaRenderingEngine(run_config.jinja_rendering_engine) == JinjaRenderingEngine.NATIVE


def test_run_config_preserves_dropped_columns_by_default() -> None:
assert RunConfig().preserve_dropped_columns is True


def test_run_config_accepts_disabled_dropped_column_preservation() -> None:
run_config = RunConfig(preserve_dropped_columns=False)
assert run_config.preserve_dropped_columns is False


def test_run_config_throttle_shim_rejects_unknown_legacy_fields() -> None:
with pytest.raises(ValidationError, match="max_concurrent_requests"):
RunConfig(throttle={"max_concurrent_requests": 1})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@


_CLIENT_VERSION: str = get_library_version()
PRESERVE_DROPPED_COLUMNS_METADATA_KEY = "preserve_dropped_columns"


def _is_async_trace_enabled(settings: RunConfig) -> bool:
Expand Down Expand Up @@ -272,7 +273,8 @@ def build(
compat = self._check_resume_config_compatibility()
if resume == ResumeMode.ALWAYS and compat == _ConfigCompatibility.INCOMPATIBLE:
raise DatasetGenerationError(
"πŸ›‘ Cannot resume: the current config does not match the config used in the interrupted run. "
"πŸ›‘ Cannot resume: the current config or dropped-column artifact policy does not match the "
"config used in the interrupted run. "
"Use resume=ResumeMode.IF_POSSIBLE to start fresh automatically, or "
"resume=ResumeMode.NEVER to force a new run."
)
Expand Down Expand Up @@ -374,7 +376,12 @@ def build(

def _set_metadata_defaults(self) -> None:
"""Attach config identity fields to every metadata write in this build."""
self.artifact_storage.set_metadata_defaults(self._data_designer_config.fingerprint())
self.artifact_storage.set_metadata_defaults(
{
**self._data_designer_config.fingerprint(),
PRESERVE_DROPPED_COLUMNS_METADATA_KEY: self._resource_provider.run_config.preserve_dropped_columns,
}
)

def _has_allow_resize_columns(self) -> bool:
return any(getattr(config, "allow_resize", False) for config in self.single_column_configs)
Expand Down Expand Up @@ -504,6 +511,14 @@ def _load_resume_state(self, num_records: int, buffer_size: int) -> _ResumeState
"or start a new run without resume=ResumeMode.ALWAYS."
)

if not self._dropped_column_artifact_policy_matches(metadata):
raise DatasetGenerationError(
"πŸ›‘ Cannot resume: preserve_dropped_columns="
f"{self._resource_provider.run_config.preserve_dropped_columns} does not match the original "
"run's dropped-column artifact policy. Start a fresh run with resume=ResumeMode.NEVER, or "
"use resume=ResumeMode.IF_POSSIBLE to start fresh automatically when the policy differs."
)

return _ResumeState(
num_completed_batches=num_completed_batches,
actual_num_records=actual_num_records,
Expand Down Expand Up @@ -751,6 +766,9 @@ def _check_resume_config_compatibility(self) -> _ConfigCompatibility:
)
return _ConfigCompatibility.INCOMPATIBLE

if not self._dropped_column_artifact_policy_matches(metadata):
return _ConfigCompatibility.INCOMPATIBLE

stored_hash = metadata.get("config_hash")
stored_version = metadata.get("config_hash_version")
if stored_hash is not None:
Expand Down Expand Up @@ -790,6 +808,24 @@ def _check_resume_config_compatibility(self) -> _ConfigCompatibility:
)
return _ConfigCompatibility.COMPATIBLE

def _dropped_column_artifact_policy_matches(self, metadata: dict[str, Any]) -> bool:
"""Return whether stored dropped-column artifact behavior matches this run.

Metadata written before this RunConfig option existed implicitly used the
historical behavior, which preserved dropped-column artifacts.
"""
stored = metadata.get(PRESERVE_DROPPED_COLUMNS_METADATA_KEY, True)
current = self._resource_provider.run_config.preserve_dropped_columns
if stored != current:
logger.warning(
"⚠️ preserve_dropped_columns changed from %s to %s; treating the existing dataset as "
"incompatible for resume because dropped-column parquet artifacts would be inconsistent.",
stored,
current,
)
return False
return True

def _build_async(
self,
generators: list[ColumnGenerator],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _resolve_columns(self, available: pd.Index) -> list[str]:
def process_after_batch(self, data: pd.DataFrame, *, current_batch_number: int | None) -> pd.DataFrame:
logger.info(f"πŸ™ˆ Dropping columns: {self.config.column_names}")
resolved = self._resolve_columns(data.columns)
if current_batch_number is not None:
if current_batch_number is not None and self.resource_provider.run_config.preserve_dropped_columns:
Comment thread
nabinchha marked this conversation as resolved.
self._save_dropped_columns(data, resolved, current_batch_number)
if resolved:
data.drop(columns=resolved, inplace=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,67 @@ def test_build_resume_raises_on_buffer_size_mismatch(stub_resource_provider, stu
builder.build(num_records=4, resume=ResumeMode.ALWAYS)


def test_build_resume_always_raises_on_dropped_column_artifact_policy_mismatch(
stub_resource_provider,
stub_test_config_builder,
tmp_path,
caplog,
):
"""resume=ALWAYS rejects runs that would mix dropped-column artifact policies."""
dataset_dir = tmp_path / "dataset"
_write_metadata(
dataset_dir,
target_num_records=4,
buffer_size=2,
num_completed_batches=1,
actual_num_records=2,
preserve_dropped_columns=True,
)

builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2)
stub_resource_provider.run_config = RunConfig(buffer_size=2, preserve_dropped_columns=False)

with caplog.at_level(logging.WARNING):
with pytest.raises(DatasetGenerationError, match="does not match the config used"):
builder.build(num_records=4, resume=ResumeMode.ALWAYS)

assert any("preserve_dropped_columns changed from True to False" in record.message for record in caplog.records)


def test_build_if_possible_starts_fresh_on_dropped_column_artifact_policy_mismatch(
stub_resource_provider,
stub_test_config_builder,
tmp_path,
):
"""resume=IF_POSSIBLE starts fresh when dropped-column artifact policy differs."""
dataset_dir = tmp_path / "dataset"
_write_metadata(
dataset_dir,
target_num_records=4,
buffer_size=2,
num_completed_batches=1,
actual_num_records=2,
preserve_dropped_columns=True,
)

storage = _ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE)
stub_resource_provider.artifact_storage = storage
stub_resource_provider.run_config = RunConfig(buffer_size=2, preserve_dropped_columns=False)
builder = DatasetBuilder(
data_designer_config=stub_test_config_builder.build(),
resource_provider=stub_resource_provider,
)

with patch.object(builder, "_run_model_health_check_if_needed"):
with patch.object(builder, "_run_batch"):
with patch.object(builder.batch_manager, "finish"):
final_path = builder.build(num_records=4, resume=ResumeMode.IF_POSSIBLE)

assert storage.resume == ResumeMode.NEVER
assert (dataset_dir / "sentinel.txt").exists()
assert final_path != dataset_dir / "parquet-files"


def test_build_resume_raises_on_corrupt_metadata(stub_resource_provider, stub_test_config_builder, tmp_path):
"""resume=ALWAYS raises clearly when metadata.json was partially written."""
dataset_dir = tmp_path / "dataset"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.processors import DropColumnsProcessorConfig
from data_designer.config.run_config import RunConfig
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
from data_designer.engine.storage.artifact_storage import BatchStage

Expand All @@ -24,6 +25,7 @@ def stub_processor(stub_processor_config):
mock_resource_provider.artifact_storage = Mock()
mock_resource_provider.artifact_storage.create_batch_file_path = Mock()
mock_resource_provider.artifact_storage.create_batch_file_path.return_value.name = "dropped.parquet"
mock_resource_provider.run_config = RunConfig()
processor = DropColumnsProcessor(
config=stub_processor_config,
resource_provider=mock_resource_provider,
Expand Down Expand Up @@ -181,3 +183,15 @@ def test_process_after_batch_preview_mode_does_not_save(stub_processor, stub_sam

# But no file should be written
stub_processor.artifact_storage.write_parquet_file.assert_not_called()


def test_process_after_batch_does_not_save_when_preservation_disabled(stub_processor, stub_sample_dataframe):
stub_processor.config.column_names = ["col1", "col2"]
stub_processor.resource_provider.run_config = RunConfig(preserve_dropped_columns=False)

result = stub_processor.process_after_batch(stub_sample_dataframe.copy(), current_batch_number=0)

assert "col1" not in result.columns
assert "col2" not in result.columns
assert "col3" in result.columns
stub_processor.artifact_storage.write_parquet_file.assert_not_called()
83 changes: 83 additions & 0 deletions packages/data-designer/tests/interface/test_data_designer.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,89 @@ def test_create_dataset_e2e_using_only_sampler_columns(
analysis.to_report()


def test_create_with_drop_true_can_skip_dropped_column_artifacts(
stub_artifact_path,
stub_model_providers,
stub_model_configs,
stub_managed_assets_path,
):
config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs)
config_builder.add_column(
SamplerColumnConfig(
name="uuid",
sampler_type="uuid",
params={"prefix": "id_", "short_form": True, "uppercase": False},
)
)
config_builder.add_column(
SamplerColumnConfig(
name="hidden_category",
sampler_type="category",
params={"values": ["private"]},
drop=True,
)
)

data_designer = DataDesigner(
artifact_path=stub_artifact_path,
model_providers=stub_model_providers,
secret_resolver=PlaintextResolver(),
managed_assets_path=stub_managed_assets_path,
)
data_designer.set_run_config(RunConfig(preserve_dropped_columns=False))

results = data_designer.create(config_builder, num_records=3)

df = results.load_dataset()
assert "uuid" in df.columns
assert "hidden_category" not in df.columns
assert not results.artifact_storage.dropped_columns_dataset_path.exists()
metadata = json.loads(results.artifact_storage.metadata_file_path.read_text())
assert metadata["preserve_dropped_columns"] is False


def test_create_with_drop_true_preserves_columns_only_in_dropped_artifacts(
stub_artifact_path,
stub_model_providers,
stub_model_configs,
stub_managed_assets_path,
):
config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs)
config_builder.add_column(
SamplerColumnConfig(
name="uuid",
sampler_type="uuid",
params={"prefix": "id_", "short_form": True, "uppercase": False},
)
)
config_builder.add_column(
SamplerColumnConfig(
name="hidden_category",
sampler_type="category",
params={"values": ["private"]},
drop=True,
)
)

data_designer = DataDesigner(
artifact_path=stub_artifact_path,
model_providers=stub_model_providers,
secret_resolver=PlaintextResolver(),
managed_assets_path=stub_managed_assets_path,
)

results = data_designer.create(config_builder, num_records=3)

main_df = results.load_dataset()
dropped_df = lazy.pd.read_parquet(results.artifact_storage.dropped_columns_dataset_path)
assert "uuid" in main_df.columns
assert "hidden_category" not in main_df.columns
assert "hidden_category" in dropped_df.columns
assert "uuid" not in dropped_df.columns
metadata = json.loads(results.artifact_storage.metadata_file_path.read_text())
assert metadata["preserve_dropped_columns"] is True


def test_create_raises_error_when_builder_fails(
stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path
):
Expand Down
Loading