From 3682611361125ec3dc50bad14ca826ba3eaeeb27 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 14:51:40 +0800 Subject: [PATCH 01/12] [ray] Support multi-clause fall-through in merge_into Multiple WhenMatched/WhenNotMatched clauses evaluated in order; first matching condition wins, unmatched rows fall through. - Remove single-clause NotImplementedError and assert guards - Matched path: filter per clause, track remaining by _ROW_ID using vectorized pc.is_in - Not-matched path: filter per clause, use NOT(condition) for remaining rows - Lazy import filter_batch only when condition is present - Add tests: fall-through, no-match-skipped, first-wins - Update docs with multi-clause example --- docs/docs/pypaimon/ray-data.md | 7 + .../pypaimon/ray/data_evolution_merge_into.py | 6 - .../pypaimon/ray/data_evolution_merge_join.py | 138 +++++++++------- .../ray_data_evolution_merge_into_test.py | 149 ++++++++++++++++++ 4 files changed, 240 insertions(+), 60 deletions(-) diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md index d160b4302f65..a6d98f8fcce0 100644 --- a/docs/docs/pypaimon/ray-data.md +++ b/docs/docs/pypaimon/ray-data.md @@ -386,6 +386,13 @@ columns (`s.*`). Requires the `datafusion` package: `pip install pypaimon[sql]`. Use `lit()` for literals starting with `s.` or `t.`. - `condition`: an optional SQL-style boolean expression. Use `s.` and `t.` to reference source and target columns. +- Multiple clauses are evaluated in order; the first matching condition wins: + ```python + when_matched=[ + WhenMatched(update="*", condition="s.ts > t.ts"), + WhenMatched(update="*"), # fallback for unmatched rows + ] + ``` **Parameters:** - `source`: a `ray.data.Dataset`, `pyarrow.Table`, `pandas.DataFrame`, or a diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py b/paimon-python/pypaimon/ray/data_evolution_merge_into.py index fa824b44a2f9..b067e6f20e1d 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py @@ -93,12 +93,6 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on raise ValueError( "At least one of when_matched or when_not_matched must be non-empty." ) - if len(when_matched) > 1 or len(when_not_matched) > 1: - raise NotImplementedError( - "merge_into currently supports a single WhenMatched and a single " - "WhenNotMatched clause; multi-clause fall-through will be added " - "in a follow-up PR." - ) target_on_cols, source_on_cols = _normalize_on(on) from pypaimon.catalog.catalog_factory import CatalogFactory diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index f01f9b59aab8..4c2db09f1a6e 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -87,43 +87,61 @@ def build_matched_update_ds( right_on=tuple(f"s.{c}" for c in source_on), ) - # MVP supports a single matched clause; future fan-out (conditions, multi- - # clause fall-through) must thread every clause's spec through the - # transform — guard so silent first-only behaviour can't sneak in. - assert len(clauses) == 1, ( - f"build_matched_update_ds expected 1 clause, got {len(clauses)}" - ) - spec = clauses[0].spec - condition = clauses[0].condition captured_update_cols = list(update_cols) captured_row_id_name = row_id_name captured_on_pairs = list(zip(source_on, target_on)) captured_schema = update_schema - captured_apply = None - captured_rewritten = None - if condition is not None: - from pypaimon.ray.merge_condition import ( - apply_condition, remap_source_on_keys, rewrite_condition, - ) - on_map = dict(zip(source_on, target_on)) - captured_rewritten = remap_source_on_keys( - rewrite_condition(condition), on_map, - ) - captured_apply = apply_condition + prepared_clauses = [] + for clause in clauses: + rewritten = None + if clause.condition is not None: + from pypaimon.ray.merge_condition import ( + remap_source_on_keys, rewrite_condition, + ) + on_map = dict(zip(source_on, target_on)) + rewritten = remap_source_on_keys( + rewrite_condition(clause.condition), on_map, + ) + prepared_clauses.append((clause.spec, rewritten)) def _transform(batch: pa.Table) -> pa.Table: - if captured_apply is not None: - batch = captured_apply( - batch, captured_rewritten, captured_schema, - ) - if batch.num_rows == 0: - return batch - return vectorized_matched_transform( - batch, spec, captured_on_pairs, - captured_update_cols, captured_row_id_name, - captured_schema, - ) + remaining = batch + parts = [] + for spec, rewritten in prepared_clauses: + if remaining.num_rows == 0: + break + if rewritten is not None: + from pypaimon.ray.merge_condition import filter_batch + matched = filter_batch( + remaining, rewritten, _pre_rewritten=True, + ) + if matched.num_rows > 0: + parts.append(vectorized_matched_transform( + matched, spec, captured_on_pairs, + captured_update_cols, captured_row_id_name, + captured_schema, + )) + if matched.num_rows < remaining.num_rows: + import pyarrow.compute as pc + row_id_col = f"t.{captured_row_id_name}" + mask = pc.invert(pc.is_in( + remaining.column(row_id_col), + matched.column(row_id_col), + )) + remaining = remaining.filter(mask) + else: + remaining = remaining.slice(0, 0) + else: + parts.append(vectorized_matched_transform( + remaining, spec, captured_on_pairs, + captured_update_cols, captured_row_id_name, + captured_schema, + )) + remaining = remaining.slice(0, 0) + if not parts: + return captured_schema.empty_table() + return pa.concat_tables(parts) return joined.map_batches(_transform, **_map_kwargs(ray_remote_args)) @@ -324,32 +342,44 @@ def build_not_matched_insert_ds( right_on=tuple(f"t.{c}" for c in target_on), ) - # MVP supports a single not-matched clause; see build_matched_update_ds - # for why we assert instead of silently dropping the rest. - assert len(clauses) == 1, ( - f"build_not_matched_insert_ds expected 1 clause, got {len(clauses)}" - ) - spec = clauses[0].spec - condition = clauses[0].condition - captured_apply = None - captured_rewritten = None - if condition is not None: - from pypaimon.ray.merge_condition import apply_condition, rewrite_condition - captured_rewritten = rewrite_condition(condition) - captured_apply = apply_condition + prepared_clauses = [] + for clause in clauses: + rewritten = None + if clause.condition is not None: + from pypaimon.ray.merge_condition import rewrite_condition + rewritten = rewrite_condition(clause.condition) + prepared_clauses.append((clause.spec, rewritten)) def _transform(batch: pa.Table) -> pa.Table: - if captured_apply is not None: - batch = captured_apply( - batch, captured_rewritten, out_schema, - ) - if batch.num_rows == 0: - return _coerce_large_string_types(batch) - return _coerce_large_string_types( - vectorized_insert_transform( - batch, spec, captured_field_names, out_schema - ) - ) + remaining = batch + parts = [] + for spec, rewritten in prepared_clauses: + if remaining.num_rows == 0: + break + if rewritten is not None: + from pypaimon.ray.merge_condition import filter_batch + matched = filter_batch( + remaining, rewritten, _pre_rewritten=True, + ) + if matched.num_rows > 0: + parts.append(vectorized_insert_transform( + matched, spec, captured_field_names, out_schema + )) + if matched.num_rows < remaining.num_rows: + not_cond = f"NOT ({rewritten})" + remaining = filter_batch( + remaining, not_cond, _pre_rewritten=True, + ) + else: + remaining = remaining.slice(0, 0) + else: + parts.append(vectorized_insert_transform( + remaining, spec, captured_field_names, out_schema + )) + remaining = remaining.slice(0, 0) + if not parts: + return _coerce_large_string_types(out_schema.empty_table()) + return _coerce_large_string_types(pa.concat_tables(parts)) return unmatched.map_batches( _transform, **_map_kwargs(ray_remote_args) diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index ca06d43e5341..0b0bd240bd81 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1315,6 +1315,155 @@ def test_target_col_helper(self): self.assertEqual(out['name'], ['keep']) self.assertEqual(out['age'], [99]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_matched_clause_fall_through(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1, 2, 3], type=pa.int32()), + 'name': ['a', 'b', 'c'], + 'age': pa.array([10, 20, 30], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2, 3], type=pa.int32()), + 'name': ['a2', 'b2', 'c2'], + 'age': pa.array([99, 88, 77], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 80'), + WhenMatched(update='*'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2, 3]) + self.assertEqual(out['name'], ['a2', 'b2', 'c2']) + self.assertEqual(out['age'], [99, 88, 77]) + + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_not_matched_clause_fall_through(self): + target = self._create_table() + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2, 3], type=pa.int32()), + 'name': ['a', 'b', 'c'], + 'age': pa.array([25, 15, 5], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_not_matched=[ + WhenNotMatched(insert='*', condition='s.age >= 20'), + WhenNotMatched(insert='*'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2, 3]) + + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_clause_no_match_skipped(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a', 'b'], + 'age': pa.array([10, 20], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a2', 'b2'], + 'age': pa.array([5, 5], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 50'), + WhenMatched(update='*', condition='s.age > 30'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['a', 'b']) + self.assertEqual(out['age'], [10, 20]) + + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_clause_first_wins(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['old'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['new'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 50'), + WhenMatched(update='*', condition='s.age > 10'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['new']) + self.assertEqual(out['age'], [99]) + class TargetProjectionTest(unittest.TestCase): From cfcb28d1ff029caadf7949aaf0ed98bd6d8b20c1 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 15:20:15 +0800 Subject: [PATCH 02/12] [ray] Fix NULL row loss, strengthen first-wins test, hoist imports - Use COALESCE(NOT(cond), TRUE) in not-matched path to preserve rows where condition evaluates to NULL - test_multi_clause_first_wins now uses distinct partial updates per clause so the winning clause is verifiable - Hoist filter_batch import outside _transform to avoid per-batch import overhead --- .../pypaimon/ray/data_evolution_merge_join.py | 20 ++++++++++++------- .../ray_data_evolution_merge_into_test.py | 12 ++++++----- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 4c2db09f1a6e..b16f190969f8 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -105,15 +105,19 @@ def build_matched_update_ds( ) prepared_clauses.append((clause.spec, rewritten)) + _filter_batch = None + if any(r is not None for _, r in prepared_clauses): + from pypaimon.ray.merge_condition import filter_batch as _filter_batch + def _transform(batch: pa.Table) -> pa.Table: + import pyarrow.compute as pc remaining = batch parts = [] for spec, rewritten in prepared_clauses: if remaining.num_rows == 0: break if rewritten is not None: - from pypaimon.ray.merge_condition import filter_batch - matched = filter_batch( + matched = _filter_batch( remaining, rewritten, _pre_rewritten=True, ) if matched.num_rows > 0: @@ -123,7 +127,6 @@ def _transform(batch: pa.Table) -> pa.Table: captured_schema, )) if matched.num_rows < remaining.num_rows: - import pyarrow.compute as pc row_id_col = f"t.{captured_row_id_name}" mask = pc.invert(pc.is_in( remaining.column(row_id_col), @@ -350,6 +353,10 @@ def build_not_matched_insert_ds( rewritten = rewrite_condition(clause.condition) prepared_clauses.append((clause.spec, rewritten)) + _filter_batch_nm = None + if any(r is not None for _, r in prepared_clauses): + from pypaimon.ray.merge_condition import filter_batch as _filter_batch_nm + def _transform(batch: pa.Table) -> pa.Table: remaining = batch parts = [] @@ -357,8 +364,7 @@ def _transform(batch: pa.Table) -> pa.Table: if remaining.num_rows == 0: break if rewritten is not None: - from pypaimon.ray.merge_condition import filter_batch - matched = filter_batch( + matched = _filter_batch_nm( remaining, rewritten, _pre_rewritten=True, ) if matched.num_rows > 0: @@ -366,8 +372,8 @@ def _transform(batch: pa.Table) -> pa.Table: matched, spec, captured_field_names, out_schema )) if matched.num_rows < remaining.num_rows: - not_cond = f"NOT ({rewritten})" - remaining = filter_batch( + not_cond = f"COALESCE(NOT ({rewritten}), TRUE)" + remaining = _filter_batch_nm( remaining, not_cond, _pre_rewritten=True, ) else: diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 0b0bd240bd81..3bfb7f53068f 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1442,7 +1442,7 @@ def test_multi_clause_first_wins(self): source = pa.Table.from_pydict( { 'id': pa.array([1], type=pa.int32()), - 'name': ['new'], + 'name': ['first'], 'age': pa.array([99], type=pa.int32()), }, schema=self.pa_schema, @@ -1454,15 +1454,17 @@ def test_multi_clause_first_wins(self): catalog_options=self.catalog_options, on=['id'], when_matched=[ - WhenMatched(update='*', condition='s.age > 50'), - WhenMatched(update='*', condition='s.age > 10'), + WhenMatched(update={'name': 's.name'}, + condition='s.age > 50'), + WhenMatched(update={'age': 's.age'}, + condition='s.age > 10'), ], num_partitions=_TEST_NUM_PARTITIONS, ) out = self._read_sorted(target) - self.assertEqual(out['name'], ['new']) - self.assertEqual(out['age'], [99]) + self.assertEqual(out['name'], ['first']) + self.assertEqual(out['age'], [10]) class TargetProjectionTest(unittest.TestCase): From be2def2f36de769f6549b0017b43ef5e7493335a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 15:23:54 +0800 Subject: [PATCH 03/12] [ray] Fail fast on duplicate source rows in multi-clause matched path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check for duplicate t._ROW_ID at the start of _transform before clause fall-through. Prevents silent row loss when multiple source rows match the same target — raises ValueError instead of letting the _ROW_ID filter silently drop the second source row. --- .../pypaimon/ray/data_evolution_merge_join.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index b16f190969f8..07ecadc1f072 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -111,6 +111,16 @@ def build_matched_update_ds( def _transform(batch: pa.Table) -> pa.Table: import pyarrow.compute as pc + row_id_col = f"t.{captured_row_id_name}" + if batch.num_rows > 0: + ids = batch.column(row_id_col) + n_unique = pc.count_distinct(ids, mode="all").as_py() + if n_unique < batch.num_rows: + raise ValueError( + "merge_into matched multiple source rows to the " + "same target _ROW_ID. Deduplicate the source " + "before merging." + ) remaining = batch parts = [] for spec, rewritten in prepared_clauses: From a663c5c1f24befa134577e518e7d5739640780ab Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 15:29:47 +0800 Subject: [PATCH 04/12] [ray] Remove redundant row_id_col assignment in loop --- paimon-python/pypaimon/ray/data_evolution_merge_join.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 07ecadc1f072..1f334b483a14 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -137,7 +137,6 @@ def _transform(batch: pa.Table) -> pa.Table: captured_schema, )) if matched.num_rows < remaining.num_rows: - row_id_col = f"t.{captured_row_id_name}" mask = pc.invert(pc.is_in( remaining.column(row_id_col), matched.column(row_id_col), From 0efaebd7fc3ddc059b64238f3ac2069a1a07ef97 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 15:35:26 +0800 Subject: [PATCH 05/12] [ray] Move duplicate check to per-clause matched rows Check duplicate _ROW_ID after condition filtering, not before. This preserves the single-clause behavior where condition filters duplicate sources down to one (test_duplicate_source_filtered_by_condition). Also add test_multi_not_matched_null_falls_through to verify NULL condition rows correctly fall through to the next clause. --- .../pypaimon/ray/data_evolution_merge_join.py | 48 ++++++++----------- .../ray_data_evolution_merge_into_test.py | 29 +++++++++++ 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 1f334b483a14..72e0bcf80d0b 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -112,15 +112,6 @@ def build_matched_update_ds( def _transform(batch: pa.Table) -> pa.Table: import pyarrow.compute as pc row_id_col = f"t.{captured_row_id_name}" - if batch.num_rows > 0: - ids = batch.column(row_id_col) - n_unique = pc.count_distinct(ids, mode="all").as_py() - if n_unique < batch.num_rows: - raise ValueError( - "merge_into matched multiple source rows to the " - "same target _ROW_ID. Deduplicate the source " - "before merging." - ) remaining = batch parts = [] for spec, rewritten in prepared_clauses: @@ -130,26 +121,29 @@ def _transform(batch: pa.Table) -> pa.Table: matched = _filter_batch( remaining, rewritten, _pre_rewritten=True, ) - if matched.num_rows > 0: - parts.append(vectorized_matched_transform( - matched, spec, captured_on_pairs, - captured_update_cols, captured_row_id_name, - captured_schema, - )) - if matched.num_rows < remaining.num_rows: - mask = pc.invert(pc.is_in( - remaining.column(row_id_col), - matched.column(row_id_col), - )) - remaining = remaining.filter(mask) - else: - remaining = remaining.slice(0, 0) else: - parts.append(vectorized_matched_transform( - remaining, spec, captured_on_pairs, - captured_update_cols, captured_row_id_name, - captured_schema, + matched = remaining + if matched.num_rows == 0: + continue + ids = matched.column(row_id_col) + if pc.count_distinct(ids, mode="all").as_py() < matched.num_rows: + raise ValueError( + "merge_into matched multiple source rows to " + "the same target _ROW_ID. Deduplicate the " + "source before merging." + ) + parts.append(vectorized_matched_transform( + matched, spec, captured_on_pairs, + captured_update_cols, captured_row_id_name, + captured_schema, + )) + if matched.num_rows < remaining.num_rows: + mask = pc.invert(pc.is_in( + remaining.column(row_id_col), + matched.column(row_id_col), )) + remaining = remaining.filter(mask) + else: remaining = remaining.slice(0, 0) if not parts: return captured_schema.empty_table() diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 3bfb7f53068f..71ce1c6afaa6 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1384,6 +1384,35 @@ def test_multi_not_matched_clause_fall_through(self): out = self._read_sorted(target) self.assertEqual(out['id'], [1, 2, 3]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_not_matched_null_falls_through(self): + target = self._create_table() + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a', 'b'], + 'age': pa.array([None, 25], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_not_matched=[ + WhenNotMatched(insert='*', condition='s.age > 20'), + WhenNotMatched(insert='*'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2]) + self.assertEqual(out['age'], [None, 25]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) def test_multi_clause_no_match_skipped(self): target = self._create_table() From 7b6a6d2337c917712b3dffdf59d285223f48b478 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 16:10:49 +0800 Subject: [PATCH 06/12] [ray] Add batch-level duplicate check for multi-clause matched path When multiple clauses are present, check for duplicate t._ROW_ID on the entire batch before clause iteration. This prevents the _ROW_ID filter from silently dropping the second source row across clauses. Single-clause path is unaffected (still allows condition to filter duplicates down to one). Add test_multi_clause_duplicate_source_raises: two source rows matching the same target with multi-clause should raise. --- .../pypaimon/ray/data_evolution_merge_join.py | 10 +++++ .../ray_data_evolution_merge_into_test.py | 38 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 72e0bcf80d0b..7f4f0a526478 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -109,9 +109,19 @@ def build_matched_update_ds( if any(r is not None for _, r in prepared_clauses): from pypaimon.ray.merge_condition import filter_batch as _filter_batch + _is_multi_clause = len(prepared_clauses) > 1 + def _transform(batch: pa.Table) -> pa.Table: import pyarrow.compute as pc row_id_col = f"t.{captured_row_id_name}" + if _is_multi_clause and batch.num_rows > 0: + ids = batch.column(row_id_col) + if pc.count_distinct(ids, mode="all").as_py() < batch.num_rows: + raise ValueError( + "merge_into matched multiple source rows to " + "the same target _ROW_ID. Deduplicate the " + "source before merging." + ) remaining = batch parts = [] for spec, rewritten in prepared_clauses: diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 71ce1c6afaa6..b722a99876c6 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1495,6 +1495,44 @@ def test_multi_clause_first_wins(self): self.assertEqual(out['name'], ['first']) self.assertEqual(out['age'], [10]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_clause_duplicate_source_raises(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['a'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 1], type=pa.int32()), + 'name': ['x', 'y'], + 'age': pa.array([99, 5], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + with self.assertRaises(Exception) as ctx: + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 50'), + WhenMatched(update='*'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('multiple source rows', str(ctx.exception)) + class TargetProjectionTest(unittest.TestCase): From 48634545d9c0e8a03ad8f9d731d169f0577df89d Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 16:16:16 +0800 Subject: [PATCH 07/12] [ray] Drop mode='all' from count_distinct for consistency --- paimon-python/pypaimon/ray/data_evolution_merge_join.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 7f4f0a526478..86673505c4e5 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -116,7 +116,7 @@ def _transform(batch: pa.Table) -> pa.Table: row_id_col = f"t.{captured_row_id_name}" if _is_multi_clause and batch.num_rows > 0: ids = batch.column(row_id_col) - if pc.count_distinct(ids, mode="all").as_py() < batch.num_rows: + if pc.count_distinct(ids).as_py() < batch.num_rows: raise ValueError( "merge_into matched multiple source rows to " "the same target _ROW_ID. Deduplicate the " @@ -136,7 +136,7 @@ def _transform(batch: pa.Table) -> pa.Table: if matched.num_rows == 0: continue ids = matched.column(row_id_col) - if pc.count_distinct(ids, mode="all").as_py() < matched.num_rows: + if pc.count_distinct(ids).as_py() < matched.num_rows: raise ValueError( "merge_into matched multiple source rows to " "the same target _ROW_ID. Deduplicate the " From 0ad70555fce2e60c0bbba76aea04c7bf7a665fea Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 16:23:08 +0800 Subject: [PATCH 08/12] [ray] Remove batch-level duplicate check, keep per-clause only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The batch-level check was too strict: it rejected duplicate source rows even when only one was actionable after condition filtering. It also depended on Ray batch boundaries (inconsistent behavior). Per-clause duplicate check remains: if a single clause's matched rows contain duplicate _ROW_ID, that's a real error. Replace test with: two source rows match same target, only one satisfies any clause condition → succeeds, target updated once. --- .../pypaimon/ray/data_evolution_merge_join.py | 10 ------- .../ray_data_evolution_merge_into_test.py | 30 ++++++++++--------- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 86673505c4e5..76afb7ce0db4 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -109,19 +109,9 @@ def build_matched_update_ds( if any(r is not None for _, r in prepared_clauses): from pypaimon.ray.merge_condition import filter_batch as _filter_batch - _is_multi_clause = len(prepared_clauses) > 1 - def _transform(batch: pa.Table) -> pa.Table: import pyarrow.compute as pc row_id_col = f"t.{captured_row_id_name}" - if _is_multi_clause and batch.num_rows > 0: - ids = batch.column(row_id_col) - if pc.count_distinct(ids).as_py() < batch.num_rows: - raise ValueError( - "merge_into matched multiple source rows to " - "the same target _ROW_ID. Deduplicate the " - "source before merging." - ) remaining = batch parts = [] for spec, rewritten in prepared_clauses: diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index b722a99876c6..7aff35b4aed2 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1496,7 +1496,7 @@ def test_multi_clause_first_wins(self): self.assertEqual(out['age'], [10]) @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) - def test_multi_clause_duplicate_source_raises(self): + def test_multi_clause_duplicate_source_one_actionable(self): target = self._create_table() self._write( target, @@ -1519,19 +1519,21 @@ def test_multi_clause_duplicate_source_raises(self): schema=self.pa_schema, ) - with self.assertRaises(Exception) as ctx: - merge_into( - target=target, - source=source, - catalog_options=self.catalog_options, - on=['id'], - when_matched=[ - WhenMatched(update='*', condition='s.age > 50'), - WhenMatched(update='*'), - ], - num_partitions=_TEST_NUM_PARTITIONS, - ) - self.assertIn('multiple source rows', str(ctx.exception)) + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 50'), + WhenMatched(update='*', condition='s.age > 80'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['x']) + self.assertEqual(out['age'], [99]) class TargetProjectionTest(unittest.TestCase): From ab75099faafd44f8b086892778052e4cb6437113 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 16:32:18 +0800 Subject: [PATCH 09/12] [ray] Use condition complement for matched remaining, not _ROW_ID Replace _ROW_ID based filtering with COALESCE(NOT(condition), TRUE) in the matched path. This ensures duplicate source rows that satisfy different clauses both produce output and get caught by the downstream distributed_update_apply duplicate check, regardless of Ray batch boundaries. Add test: two source rows both actionable by different clauses should raise "multiple source rows" from the global check. --- .../pypaimon/ray/data_evolution_merge_join.py | 20 +++------- .../ray_data_evolution_merge_into_test.py | 38 +++++++++++++++++++ 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 76afb7ce0db4..323cf1995219 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -110,8 +110,6 @@ def build_matched_update_ds( from pypaimon.ray.merge_condition import filter_batch as _filter_batch def _transform(batch: pa.Table) -> pa.Table: - import pyarrow.compute as pc - row_id_col = f"t.{captured_row_id_name}" remaining = batch parts = [] for spec, rewritten in prepared_clauses: @@ -125,24 +123,16 @@ def _transform(batch: pa.Table) -> pa.Table: matched = remaining if matched.num_rows == 0: continue - ids = matched.column(row_id_col) - if pc.count_distinct(ids).as_py() < matched.num_rows: - raise ValueError( - "merge_into matched multiple source rows to " - "the same target _ROW_ID. Deduplicate the " - "source before merging." - ) parts.append(vectorized_matched_transform( matched, spec, captured_on_pairs, captured_update_cols, captured_row_id_name, captured_schema, )) - if matched.num_rows < remaining.num_rows: - mask = pc.invert(pc.is_in( - remaining.column(row_id_col), - matched.column(row_id_col), - )) - remaining = remaining.filter(mask) + if rewritten is not None and matched.num_rows < remaining.num_rows: + not_cond = f"COALESCE(NOT ({rewritten}), TRUE)" + remaining = _filter_batch( + remaining, not_cond, _pre_rewritten=True, + ) else: remaining = remaining.slice(0, 0) if not parts: diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 7aff35b4aed2..2ca7a5ffbe48 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1535,6 +1535,44 @@ def test_multi_clause_duplicate_source_one_actionable(self): self.assertEqual(out['name'], ['x']) self.assertEqual(out['age'], [99]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_clause_duplicate_both_actionable_raises(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['a'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 1], type=pa.int32()), + 'name': ['x', 'y'], + 'age': pa.array([99, 50], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + with self.assertRaises(Exception) as ctx: + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 80'), + WhenMatched(update='*', condition='s.age > 30'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('multiple source rows', str(ctx.exception)) + class TargetProjectionTest(unittest.TestCase): From b5508cf4a8da51bf00373df4430662dac898328b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 16:44:22 +0800 Subject: [PATCH 10/12] [ray] Hoist on_map construction outside preparation loop --- paimon-python/pypaimon/ray/data_evolution_merge_join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py b/paimon-python/pypaimon/ray/data_evolution_merge_join.py index 323cf1995219..14088979f893 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py @@ -92,6 +92,7 @@ def build_matched_update_ds( captured_on_pairs = list(zip(source_on, target_on)) captured_schema = update_schema + on_map = dict(zip(source_on, target_on)) prepared_clauses = [] for clause in clauses: rewritten = None @@ -99,7 +100,6 @@ def build_matched_update_ds( from pypaimon.ray.merge_condition import ( remap_source_on_keys, rewrite_condition, ) - on_map = dict(zip(source_on, target_on)) rewritten = remap_source_on_keys( rewrite_condition(clause.condition), on_map, ) From 9d522a844caad6cb3eae77e6af0034d884b9f1f2 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 18:12:43 +0800 Subject: [PATCH 11/12] [ray] Add matched-path NULL fall-through test for multi-clause --- .../ray_data_evolution_merge_into_test.py | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 2ca7a5ffbe48..e64aa7a0428d 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1384,6 +1384,47 @@ def test_multi_not_matched_clause_fall_through(self): out = self._read_sorted(target) self.assertEqual(out['id'], [1, 2, 3]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) + def test_multi_matched_null_falls_through(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1, 2, 3], type=pa.int32()), + 'name': ['a', 'b', 'c'], + 'age': pa.array([10, 20, 30], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2, 3], type=pa.int32()), + 'name': ['a2', 'b2', 'c2'], + 'age': pa.array([None, 50, 60], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*', condition='s.age > 40'), + WhenMatched(update='*'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2, 3]) + self.assertEqual(out['name'], ['a2', 'b2', 'c2']) + self.assertEqual(out['age'], [None, 50, 60]) + @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) def test_multi_not_matched_null_falls_through(self): target = self._create_table() From d35f795fbe80109f77992b225a87ee667a7a683a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 4 Jun 2026 18:24:09 +0800 Subject: [PATCH 12/12] [ray] Reject unconditional non-last clause in merge_into Spark SQL requires that only the last WHEN MATCHED / WHEN NOT MATCHED clause may omit its condition. Add the same validation in _prepare so the Python API rejects unreachable clauses early. --- .../pypaimon/ray/data_evolution_merge_into.py | 9 +++++ .../ray_data_evolution_merge_into_test.py | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py b/paimon-python/pypaimon/ray/data_evolution_merge_into.py index b067e6f20e1d..cbfcef907d81 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py @@ -93,6 +93,15 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on raise ValueError( "At least one of when_matched or when_not_matched must be non-empty." ) + for label, clauses in [("when_matched", when_matched), + ("when_not_matched", when_not_matched)]: + for i, clause in enumerate(clauses[:-1]): + if clause.condition is None: + raise ValueError( + f"Only the last {label} clause may omit its condition. " + f"Clause at index {i} has no condition, making subsequent " + f"clauses unreachable." + ) target_on_cols, source_on_cols = _normalize_on(on) from pypaimon.catalog.catalog_factory import CatalogFactory diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index e64aa7a0428d..b54eeb5cf0c9 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -153,6 +153,40 @@ def test_no_clause_raises(self): num_partitions=_TEST_NUM_PARTITIONS, ) + def test_unconditional_non_last_matched_rejected(self): + target = self._create_table() + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update='*'), + WhenMatched(update={'age': 's.age'}, condition='s.age > 10'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('when_matched', str(ctx.exception)) + self.assertIn('unreachable', str(ctx.exception)) + + def test_unconditional_non_last_not_matched_rejected(self): + target = self._create_table() + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_not_matched=[ + WhenNotMatched(insert='*'), + WhenNotMatched(insert='*', condition='s.age > 10'), + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('when_not_matched', str(ctx.exception)) + self.assertIn('unreachable', str(ctx.exception)) + def test_non_de_table_rejected(self): target = self._create_table(options={'row-tracking.enabled': 'true'}) with self.assertRaises(ValueError) as ctx: