[spark] support persist source data to avoid loading data repeatedly#8081
[spark] support persist source data to avoid loading data repeatedly#8081Stefanietry wants to merge 1 commit into
Conversation
4e99876 to
cdf7d4c
Compare
| + "outweighs the benefit of pruning untouched files."); | ||
|
|
||
| public static final ConfigOption<Boolean> DATA_EVOLUTION_DATA_SOURCE_PERSIST_ENABLED = | ||
| key("data-evolution.data.source.persist.enabled") |
There was a problem hiding this comment.
data-evolution.merge-into.source-persist
There was a problem hiding this comment.
Done, the conf has been modified as suggested.
a3b196c to
abfcebd
Compare
abfcebd to
95e9f9b
Compare
| + " 'manifest.compression' = 'snappy',\n" | ||
| + " 'row-tracking.enabled' = 'true',\n" | ||
| + " 'data-evolution.enabled' = 'true',\n" | ||
| + " 'data-evolution.data.source.persist.enabled' = 'true',\n" |
There was a problem hiding this comment.
This still uses the old option name. The PR adds data-evolution.merge-into.source-persist, so this table keeps the new option at its default false and the test never exercises the persist path. Please switch this to the new key.
There was a problem hiding this comment.
Thanks for your reminding, it has been corrected.
| val sourceTableProjExprs = | ||
| allReadFieldsOnSource.toSeq :+ Alias(TrueLiteral, ROW_FROM_SOURCE)() | ||
| val sourceTableProj = Project(sourceTableProjExprs, sourceTable) | ||
| val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) |
There was a problem hiding this comment.
This only wires the cached source into the matched/update path. For a MERGE that has both matched and not-matched clauses, insertActionInvoke still builds its left-anti join from sourceTable, so the source is scanned again after the update path. Could you pass the persisted source into the insert path too, so the new option avoids repeated source loading for the whole merge action?
There was a problem hiding this comment.
Done, the persisted source has been successfully passed to the insert path as per the suggestion, thereby avoiding redundant calculations.
|
The Spark 4.0 implementation also needs the same change. |
95e9f9b to
287eab9
Compare
Done, the optimization have been implemented in Spark 4.0. |
| if (plan.snapshotId() != null) { | ||
| writer.rowIdCheckConflict(plan.snapshotId()) | ||
| val persistSourceDss: Option[Dataset[Row]] = | ||
| if (table.coreOptions().dataEvolutionMergeIntoSourcePersist() && matchedActions.nonEmpty) { |
There was a problem hiding this comment.
This guard means the new option has no effect for insert-only MERGE statements (WHEN NOT MATCHED without any matched update). That path can still load the source twice when file pruning is enabled: targetRelatedSplits builds sourceDss to find touched splits, and insertActionInvoke then builds the left-anti join from sourceTable again. Since this option is described as persisting the source for merge-into, could we enable it whenever the source may be reused, e.g. table.coreOptions().dataEvolutionMergeIntoSourcePersist() && (matchedActions.nonEmpty || notMatchedActions.nonEmpty) or otherwise document that it is intentionally update-only?
287eab9 to
58f4ecd
Compare
Purpose
Purpose: In the UpdateAction mode, it avoids redundant calculations during the process of computing dataSplits and performing join concatenation by persisting the source data.
Linked issue: #8080
Tests
Add SparkDataEvolutionITCase