Skip to content

Rowwise subquery eval#36735

Open
frankmcsherry wants to merge 7 commits into
MaterializeInc:mainfrom
frankmcsherry:rowwise-subquery-eval
Open

Rowwise subquery eval#36735
frankmcsherry wants to merge 7 commits into
MaterializeInc:mainfrom
frankmcsherry:rowwise-subquery-eval

Conversation

@frankmcsherry
Copy link
Copy Markdown
Contributor

@frankmcsherry frankmcsherry commented May 26, 2026

WIP for a lowering strategy for subqueries that only reference the current row, converting to a stateless eval_relation table func rather than the stateful MRE operators. Some examples of the improvements in the .slt file edits.

Move the per-operator constant-evaluation helpers (fold_reduce_constant,
fold_topk_constant, fold_flat_map_constant, fold_filter_constant) from the
FoldConstants transform (mz-transform) into a new mz_expr::relation::eval
module, and have FoldConstants delegate to them. No behavior change; the
helper bodies are moved verbatim. This relocates the evaluation primitives to
mz-expr so they can be shared by code that cannot depend on mz-transform.
Add eval_relation, which interprets a MirRelationExpr whose leaves are all
Constants (or Gets of locals bound by an enclosing Let) to its consolidated
rows, building on the per-operator helpers. It threads an environment so it
handles Let bindings and local Gets, and rejects anything it cannot evaluate
row-locally (Gets of stored collections, LetRec, ArrangeBy, a TopK with a
non-literal limit). eval_relation_with_input pre-binds a placeholder local to
supplied input rows, the entry point a row-local table function will use.

This is the shared evaluator that both FoldConstants and a forthcoming
row-local table function can call; it lives in mz-expr alongside the per-row
MirScalarExpr/AggregateFunc/TableFunc evaluators it composes.
EvalRelation { relation, input_id, output_type } houses a closed
MirRelationExpr whose only leaves are Constants and a distinguished
Get(Id::Local(input_id)) standing for the single input row. Its eval binds the
input row (packed from the FlatMap argument expressions) to input_id and
interprets the relation with eval_relation_with_input, yielding the table
function's output rows.

This is the vehicle for lowering a row-local subquery to a stateless FlatMap.
TableFunc is serde-only, so no proto wiring is needed; the housed types derive
the required Ord/Hash/Eq/MzReflect. empty_on_null_input and
preserves_monotonicity are false (the relation may use nulls and may contain
Reduce/TopK); WITH ORDINALITY is disallowed. The housed relation is opaque to
MIR traversals (visitors recurse into FlatMap's input/exprs, not its func), so
no dependent match arms change.
Add a third path to branch() in HIR->MIR lowering. When a correlated subquery
is a pure function of the outer row, decorrelate it against a single-row
placeholder Get; if the result is closed (is_rowwise_closed), house it in a
TableFunc::EvalRelation driven by a FlatMap over the outer relation, replacing
the stateful keyed Reduce (and its distinct/lookup/guard scaffolding) with a
stateless per-row evaluation.

is_rowwise_closed requires: every leaf a Constant or a Get of a bound local /
the placeholder; no stored collections, recursion, or arrangements; no TopK
with a non-evaluable limit; and no unmaterializable functions (now(), mz_now(),
current_database(), ...) -- those are resolved to literals by a later optimizer
pass that cannot see into the housed relation, so they must fall back to the
keyed path rather than error at runtime.

The gate is two-layer and default-deny: a cheap is_rowwise_candidate HIR
allowlist, then is_rowwise_closed on the decorrelated MIR; anything failing
either falls through to the existing keyed path unchanged. branch's apply
closure relaxes from FnOnce to Fn so the trial can run without consuming the
fallback.

WHERE/LATERAL/EXISTS/IN correlated subqueries collapse to a fully stateless
FlatMap; select-list scalar subqueries lose the keyed Reduce but retain
lower_subqueries' distinct+join scaffold (a redundant semijoin a later
optimizer pass can remove). No result regressions across the
subquery/join/aggregate/topk/list/window suites; one decorrelated-plan
snapshot in outer_join_lowering.slt updated (a strict simplification).
Add an slt that pins, via EXPLAIN OPTIMIZED PLAN and result checks, which
correlated-subquery idioms lower to a stateless FlatMap eval_relation
(WHERE/LATERAL/EXISTS/IN), which retain the redundant distinct+join scaffold
(select-list scalar subqueries, incl. the array_agg(DISTINCT ...) example),
and which correctly fall back to the keyed path (subqueries over a stored
table; subqueries using unmaterializable functions).
@frankmcsherry frankmcsherry force-pushed the rowwise-subquery-eval branch from 6285890 to 8acd9e9 Compare May 26, 2026 22:49
@frankmcsherry frankmcsherry force-pushed the rowwise-subquery-eval branch from 8acd9e9 to b80c690 Compare May 27, 2026 15:56
@frankmcsherry frankmcsherry marked this pull request as ready for review May 27, 2026 16:17
@frankmcsherry frankmcsherry requested a review from a team as a code owner May 27, 2026 16:17
Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query:

SELECT (SELECT count(*) FROM generate_series(1, n)) AS c FROM (VALUES (10000000000)) AS t(n);

used to run on clusterd with fixed memory, with this PR it runs on environmentd and OoMs it. Even after a timeout and getting cancelled it just keeps running:

materialize=> SELECT (SELECT count(*) FROM generate_series(1, n)) AS c FROM (VALUES (10000000000)) AS t(n);
ERROR:  canceling statement due to statement timeout
HINT:  Consider increasing the maximum allowed statement duration for this session by setting the statement_timeout session variable. For example, `SET statement_timeout = '120s'`.

@frankmcsherry
Copy link
Copy Markdown
Contributor Author

frankmcsherry commented May 28, 2026

Makes sense. The dataflow evaluator is better optimized than the scalar evaluator (which is mostly programmed to bail out when it sees lots of data, rather than perform the work efficiently). We were unfortunately unable to merge #22753 which is the fix for the demo query, but tbh it's mostly cosmetic (in that, no one does this outside of tests, and the PR fixes behavior that only happens in tests). If you actually want to produce 10B records from one row .. probably worth imagining that you might have 10B records from that row. If it were array_agg rather than COUNT(*) of course it wouldn't work.

I'm checking out what it would take to fix the underlying issue (at the time it was "rewrite all of our aggregation operators, because they are implemented 'in unary'"). I'm also ok with "this doesn't work great, but folks get all the memory savings on the queries they are issuing."

The tl;dr is that we have two implementations of COUNT(*) in MZ: one in fold_constants that uses the SQL implementation based on row iterators, and on in DD that just adds up numbers. The former is much worse, but we don't have DD in scalar evaluation. =/

@frankmcsherry
Copy link
Copy Markdown
Contributor Author

frankmcsherry commented May 28, 2026

Ah, and fwiw it may run in finite memory (seems to) but also has taken about 10m to run so far on a 100cc, and still going. I'm not sure either of these are good. :D. (edit: it took 20m).

@frankmcsherry
Copy link
Copy Markdown
Contributor Author

Lucky you! I am here to break all of your tests. :D

#36776

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants