Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/docs/pypaimon/ray-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,13 @@ columns (`s.*`). Requires the `datafusion` package: `pip install pypaimon[sql]`.
Use `lit()` for literals starting with `s.` or `t.`.
- `condition`: an optional SQL-style boolean expression. Use `s.<col>` and
`t.<col>` to reference source and target columns.
- Multiple clauses are evaluated in order; the first matching condition wins:
```python
when_matched=[
WhenMatched(update="*", condition="s.ts > t.ts"),
WhenMatched(update="*"), # fallback for unmatched rows
]
```

**Parameters:**
- `source`: a `ray.data.Dataset`, `pyarrow.Table`, `pandas.DataFrame`, or a
Expand Down
15 changes: 9 additions & 6 deletions paimon-python/pypaimon/ray/data_evolution_merge_into.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,15 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on
raise ValueError(
"At least one of when_matched or when_not_matched must be non-empty."
)
if len(when_matched) > 1 or len(when_not_matched) > 1:
raise NotImplementedError(
"merge_into currently supports a single WhenMatched and a single "
"WhenNotMatched clause; multi-clause fall-through will be added "
"in a follow-up PR."
)
for label, clauses in [("when_matched", when_matched),
("when_not_matched", when_not_matched)]:
for i, clause in enumerate(clauses[:-1]):
if clause.condition is None:
raise ValueError(
f"Only the last {label} clause may omit its condition. "
f"Clause at index {i} has no condition, making subsequent "
f"clauses unreachable."
)
target_on_cols, source_on_cols = _normalize_on(on)

from pypaimon.catalog.catalog_factory import CatalogFactory
Expand Down
137 changes: 83 additions & 54 deletions paimon-python/pypaimon/ray/data_evolution_merge_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,43 +87,57 @@ def build_matched_update_ds(
right_on=tuple(f"s.{c}" for c in source_on),
)

# MVP supports a single matched clause; future fan-out (conditions, multi-
# clause fall-through) must thread every clause's spec through the
# transform — guard so silent first-only behaviour can't sneak in.
assert len(clauses) == 1, (
f"build_matched_update_ds expected 1 clause, got {len(clauses)}"
)
spec = clauses[0].spec
condition = clauses[0].condition
captured_update_cols = list(update_cols)
captured_row_id_name = row_id_name
captured_on_pairs = list(zip(source_on, target_on))
captured_schema = update_schema

captured_apply = None
captured_rewritten = None
if condition is not None:
from pypaimon.ray.merge_condition import (
apply_condition, remap_source_on_keys, rewrite_condition,
)
on_map = dict(zip(source_on, target_on))
captured_rewritten = remap_source_on_keys(
rewrite_condition(condition), on_map,
)
captured_apply = apply_condition
on_map = dict(zip(source_on, target_on))
prepared_clauses = []
for clause in clauses:
rewritten = None
if clause.condition is not None:
from pypaimon.ray.merge_condition import (
remap_source_on_keys, rewrite_condition,
)
rewritten = remap_source_on_keys(
rewrite_condition(clause.condition), on_map,
)
prepared_clauses.append((clause.spec, rewritten))

_filter_batch = None
if any(r is not None for _, r in prepared_clauses):
from pypaimon.ray.merge_condition import filter_batch as _filter_batch

def _transform(batch: pa.Table) -> pa.Table:
if captured_apply is not None:
batch = captured_apply(
batch, captured_rewritten, captured_schema,
)
if batch.num_rows == 0:
return batch
return vectorized_matched_transform(
batch, spec, captured_on_pairs,
captured_update_cols, captured_row_id_name,
captured_schema,
)
remaining = batch
parts = []
for spec, rewritten in prepared_clauses:
if remaining.num_rows == 0:
break
if rewritten is not None:
matched = _filter_batch(
remaining, rewritten, _pre_rewritten=True,
)
else:
matched = remaining
if matched.num_rows == 0:
continue
parts.append(vectorized_matched_transform(
matched, spec, captured_on_pairs,
captured_update_cols, captured_row_id_name,
captured_schema,
))
if rewritten is not None and matched.num_rows < remaining.num_rows:
not_cond = f"COALESCE(NOT ({rewritten}), TRUE)"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking test gap: the not-matched path covers NULL condition fall-through, but the matched path relies on the same COALESCE(NOT (...), TRUE) behavior here. Consider adding a matched-path NULL fall-through test to lock this down.

remaining = _filter_batch(
remaining, not_cond, _pre_rewritten=True,
)
else:
remaining = remaining.slice(0, 0)
if not parts:
return captured_schema.empty_table()
return pa.concat_tables(parts)

return joined.map_batches(_transform, **_map_kwargs(ray_remote_args))

Expand Down Expand Up @@ -324,32 +338,47 @@ def build_not_matched_insert_ds(
right_on=tuple(f"t.{c}" for c in target_on),
)

# MVP supports a single not-matched clause; see build_matched_update_ds
# for why we assert instead of silently dropping the rest.
assert len(clauses) == 1, (
f"build_not_matched_insert_ds expected 1 clause, got {len(clauses)}"
)
spec = clauses[0].spec
condition = clauses[0].condition
captured_apply = None
captured_rewritten = None
if condition is not None:
from pypaimon.ray.merge_condition import apply_condition, rewrite_condition
captured_rewritten = rewrite_condition(condition)
captured_apply = apply_condition
prepared_clauses = []
for clause in clauses:
rewritten = None
if clause.condition is not None:
from pypaimon.ray.merge_condition import rewrite_condition
rewritten = rewrite_condition(clause.condition)
prepared_clauses.append((clause.spec, rewritten))

_filter_batch_nm = None
if any(r is not None for _, r in prepared_clauses):
from pypaimon.ray.merge_condition import filter_batch as _filter_batch_nm

def _transform(batch: pa.Table) -> pa.Table:
if captured_apply is not None:
batch = captured_apply(
batch, captured_rewritten, out_schema,
)
if batch.num_rows == 0:
return _coerce_large_string_types(batch)
return _coerce_large_string_types(
vectorized_insert_transform(
batch, spec, captured_field_names, out_schema
)
)
remaining = batch
parts = []
for spec, rewritten in prepared_clauses:
if remaining.num_rows == 0:
break
if rewritten is not None:
matched = _filter_batch_nm(
remaining, rewritten, _pre_rewritten=True,
)
if matched.num_rows > 0:
parts.append(vectorized_insert_transform(
matched, spec, captured_field_names, out_schema
))
if matched.num_rows < remaining.num_rows:
not_cond = f"COALESCE(NOT ({rewritten}), TRUE)"
remaining = _filter_batch_nm(
remaining, not_cond, _pre_rewritten=True,
)
else:
remaining = remaining.slice(0, 0)
else:
parts.append(vectorized_insert_transform(
remaining, spec, captured_field_names, out_schema
))
remaining = remaining.slice(0, 0)
if not parts:
return _coerce_large_string_types(out_schema.empty_table())
return _coerce_large_string_types(pa.concat_tables(parts))

return unmatched.map_batches(
_transform, **_map_kwargs(ray_remote_args)
Expand Down
Loading
Loading