fix(dataset): resolve Blob v2 external URIs and robustly clean failed writes in add_columns#7152
Open
yyzhao2025 wants to merge 2 commits into
Open
fix(dataset): resolve Blob v2 external URIs and robustly clean failed writes in add_columns#7152yyzhao2025 wants to merge 2 commits into
yyzhao2025 wants to merge 2 commits into
Conversation
df5072c to
89bd69a
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This PR addresses two critical functional gaps in the
add_columnspipeline when working with Blob v2 datasets. It ensures that the schema evolution path achieves feature parity with the main write path regarding external URI resolution, and it introduces a robust, leak-free cleanup mechanism for failed mutations.1. Blob v2 External URI Resolution
Context:
In Blob v2, external blobs (where data resides outside the dataset's native storage) rely on an
ExternalBaseResolverto construct the correct absolute URIs from relative paths stored in the dataset manifest.Previously,
add_columnsused a bareopen_writervia theUpdater, which lacked the context to initialize this resolver. As a result, appending or mutating Blob v2 columns withexternalkind would fail to resolve URIs.Fix:
open_update_writerinwrite.rs. When theUpdatercreates a new writer, it now checks ifstorage_version >= V2_2and if the schema containsblob_v2fields.ExternalBaseResolverusing the dataset's registered base paths and passes it to the writer alongside thesource_store_registry.BatchUDForRecordBatchReaderto writeexternalBlob v2 data during schema evolution, achieving strict parity withwrite_fragments_internal.2. Comprehensive Cleanup for Failed Writes
Context:
The
add_columnsoperation is complex and can fail mid-flight due to various reasons: UDF execution panics, stream ingestion errors, checkpoint lookup/insert failures, or schema merge conflicts. Previously, these failures would eagerly propagate via the?operator, leaving behind orphaned.lancedata files and their corresponding Blob v2 sidecar directories.Fix & Architectural Guarantees:
This PR overhauls the error handling and cleanup lifecycle in
add_columns_implandadd_columns_from_stream.Multi-stage Cleanup:
We replaced concurrent
try_collectstreams with sequential processing augmented with scoped error handling. When a failure occurs, the pipeline now executes a two-stage cleanup:updater.cleanup_unfinished_writer(): Cleans up the currently active, unfinished data file that hasn't been finalized into aFragmentyet.cleanup_new_column_data_files(): Cleans up any fully written but uncommitted fragments generated in the current run.Strict Safety Constraint 1: Preservation of External Data
The underlying
cleanup_data_fragmentslogic now strictly checksbase_id.is_none(). This guarantees that cleanup operations never attempt to delete files that belong to an external base, preventing catastrophic deletion of user-managed source data.Strict Safety Constraint 2: Checkpoint Ownership
For long-running UDFs, fragments are incrementally saved to a
UDFCheckpointStore. Once a fragment is successfully inserted into the checkpoint, it is explicitlypop()-ed from the localfragments_to_cleanuplist. This ensures that if a subsequent step fails, we do not physically delete data files that the checkpoint relies on for resumption.Related Issues
Closes #7075
Scope & Follow-up
This PR tightly scopes the fixes to the
add_columnspipeline.Note on
alter_columns: Whilealter_columnsshares the newly improvedadd_columns_implmachinery (and thus benefits from the unfinished writer cleanup), its finalapply_commitstage currently drops thefragments_to_cleanupvector. A commit failure there could still leave orphaned files. To keep this PR focused and reviewable, the commit-failure cleanup foralter_columnswill be addressed in a separate, dedicated follow-up PR.Type of change
Testing Performed
inline,packed,dedicated, andexternalcleanup scenarios. Validated stream errors, UDF panics, and checkpoint lookup/insert anomalies to ensure zero orphan files and zero deleted external files.add_columnswithRecordBatchReaderandBatchUDFacross all Blob v2 kinds. Asserted bit-for-bit data integrity upon reading back the mutated dataset.cargo fmt), linting (cargo clippy), and existing test suites.