[python][ray] Support multi-clause fall-through in merge_into#8115
[python][ray] Support multi-clause fall-through in merge_into#8115XiaoHongbo-Hope wants to merge 12 commits into
Conversation
70c7dab to
2275b47
Compare
Multiple WhenMatched/WhenNotMatched clauses evaluated in order; first matching condition wins, unmatched rows fall through. - Remove single-clause NotImplementedError and assert guards - Matched path: filter per clause, track remaining by _ROW_ID using vectorized pc.is_in - Not-matched path: filter per clause, use NOT(condition) for remaining rows - Lazy import filter_batch only when condition is present - Add tests: fall-through, no-match-skipped, first-wins - Update docs with multi-clause example
2275b47 to
3682611
Compare
- Use COALESCE(NOT(cond), TRUE) in not-matched path to preserve rows where condition evaluates to NULL - test_multi_clause_first_wins now uses distinct partial updates per clause so the winning clause is verifiable - Hoist filter_batch import outside _transform to avoid per-batch import overhead
Check for duplicate t._ROW_ID at the start of _transform before clause fall-through. Prevents silent row loss when multiple source rows match the same target — raises ValueError instead of letting the _ROW_ID filter silently drop the second source row.
Check duplicate _ROW_ID after condition filtering, not before. This preserves the single-clause behavior where condition filters duplicate sources down to one (test_duplicate_source_filtered_by_condition). Also add test_multi_not_matched_null_falls_through to verify NULL condition rows correctly fall through to the next clause.
When multiple clauses are present, check for duplicate t._ROW_ID on the entire batch before clause iteration. This prevents the _ROW_ID filter from silently dropping the second source row across clauses. Single-clause path is unaffected (still allows condition to filter duplicates down to one). Add test_multi_clause_duplicate_source_raises: two source rows matching the same target with multi-clause should raise.
The batch-level check was too strict: it rejected duplicate source rows even when only one was actionable after condition filtering. It also depended on Ray batch boundaries (inconsistent behavior). Per-clause duplicate check remains: if a single clause's matched rows contain duplicate _ROW_ID, that's a real error. Replace test with: two source rows match same target, only one satisfies any clause condition → succeeds, target updated once.
Replace _ROW_ID based filtering with COALESCE(NOT(condition), TRUE) in the matched path. This ensures duplicate source rows that satisfy different clauses both produce output and get caught by the downstream distributed_update_apply duplicate check, regardless of Ray batch boundaries. Add test: two source rows both actionable by different clauses should raise "multiple source rows" from the global check.
| captured_schema, | ||
| )) | ||
| if rewritten is not None and matched.num_rows < remaining.num_rows: | ||
| not_cond = f"COALESCE(NOT ({rewritten}), TRUE)" |
There was a problem hiding this comment.
Non-blocking test gap: the not-matched path covers NULL condition fall-through, but the matched path relies on the same COALESCE(NOT (...), TRUE) behavior here. Consider adding a matched-path NULL fall-through test to lock this down.
|
Thanks for adding multi-clause fall-through support. The overall evaluation order looks aligned with One SQL-compatibility gap still needs to be handled before merging: a non-last clause without a condition should be rejected. Spark SQL rejects This PR currently accepts the equivalent Python API shape, and the implementation will silently make later clauses unreachable because the unconditional clause consumes all remaining rows. For example: when_matched=[
WhenMatched(update="*"),
WhenMatched(update={"age": "s.age"}, condition="s.age > 10"),
]The second clause can never run, while the corresponding SQL form would fail during parsing/analysis. Could we add validation in |
Spark SQL requires that only the last WHEN MATCHED / WHEN NOT MATCHED clause may omit its condition. Add the same validation in _prepare so the Python API rejects unreachable clauses early.
Thanks, updated |
Purpose
Tests