[Enhancement] Push eventstats down by rewriting RexOver to Join + Aggregate (#5483)#5495
[Enhancement] Push eventstats down by rewriting RexOver to Join + Aggregate (#5483)#5495RyanL1997 wants to merge 8 commits into
Conversation
opensearch-project#5483) PPL eventstats lowers to LogicalProject(RexOver(...)) above the scan. No rule in OpenSearchIndexRules.OPEN_SEARCH_PUSHDOWN_RULES matches that shape: every AggregateIndexScanRule config requires LogicalAggregate at the operand root, and RareTopPushdownRule requires a ROW_NUMBER window with a LESS_THAN_OR_EQUAL filter above it. The plan therefore reaches Volcano with RexOver intact, gets converted to EnumerableWindow, and the scan beneath it stays in _source-includes + requestedTotalSize=MAX_INT mode, streaming every matching document to the coordinator just to count it. On 47B-doc indices this times out. This change rewrites Window AST nodes in CalciteRelNodeVisitor.visitWindow into a Join + Aggregate plan: the right side is an Aggregate over a re-pushed copy of the input, which matches AggregateIndexScanRule and pushes down to OpenSearch as size:0 + track_total_hits (no-BY) or a terms aggregation (BY). The left side returns rows as before. The join broadcasts the aggregate value(s) onto each row, preserving the row type [original cols, agg cols] that the legacy lowering produced so downstream consumers see no shape change. NULL-bucket semantics: - bucketNullable=true: INNER join with IS NOT DISTINCT FROM on each partition key, so the NULL bucket on each side matches and NULL-keyed left rows still receive the NULL-bucket aggregate value. - bucketNullable=false: LEFT join with simple equality, IS NOT NULL filter pushed below the right aggregate to match the BUCKET_NON_NULL_AGG pushdown shape stats already uses. NULL-keyed left rows survive with a NULL aggregate value, matching the previous CASE-wrapped behavior. The rewriteability predicate (canRewriteWindowAsAggregateJoin) rejects non-aggregate window functions (ROW_NUMBER / LAG / etc.), non-empty sort lists, non-default frames, and non-bare-field partition keys. Anything outside the eventstats shape falls through to visitWindowAsRexOver, preserving existing behavior for any future Window producer. Follows the precedent in buildStreamWindowSelfJoinPlan: uses Join (not LogicalCorrelate, which causes NPE in RelDecorrelator per the comment at CalciteRelNodeVisitor.java:2348-2352) and mirrors the canonical NULL bucket handling at lines 2442-2449. Reuses aggregateWithTrimming for the right-side aggregate construction so agg-resolution semantics are identical to stats and streamstats. CalcitePPLEventstatsTest verifyLogical expectations are updated to the new lowered shape. verifyPPLToSparkSQL assertions are temporarily removed pending observation of the SparkSqlDialect output for the join+aggregate form; the previous window-form expectations no longer apply. Draft: existing CalciteExplainIT eventstats expected-output files and new NULL-bucket BY integration tests in CalcitePPLEventstatsIT will be added in follow-up commits once CI confirms the lowered shape is exact. Resolves opensearch-project#5483 Signed-off-by: Jialiang Liang <ryanleeang@gmail.com> Signed-off-by: Jialiang Liang <jiallian@amazon.com>
PR Reviewer Guide 🔍(Review updated until commit 6f333fc)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 6f333fc Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit bb8d9b2
Suggestions up to commit 515f35e
Suggestions up to commit f19e1d9
Suggestions up to commit 15c52f7
Suggestions up to commit 1a043bc
|
…tions PPL AstExpressionBuilder.visitWindowFunction wraps the parsed function in a WindowFunction whose inner is a Function, not an AggregateFunction (SQL emits AggregateFunction). The original predicate required AggregateFunction, so it returned false for every eventstats case and the rewrite never fired. Use BuiltinFunctionName.ofAggregation(funcName) so the predicate accepts both inner types, and convert Function to AggregateFunction in stripWindowFunctionForAggregate so aggVisitor resolves it the same way stats does. Test expectation adjustments observed from actual planner output: - IS NOT DISTINCT FROM: Calcite canonicalizes OR(=, AND(IS NULL, IS NULL)) to IS NOT DISTINCT FROM on nullable partition keys (DEPTNO in EMP). - Plain =: on non-nullable partition keys (server in POST.LOGS), RexSimplify drops the IS NULL conjuncts and leaves equality. - Outer Project folded for no-BY cases: the final passthrough projection is a no-op identity in the no-BY case and Calcite folds it; the BY case keeps the project because it drops the right-side group-key column. verifyPPLToSparkSQL calls in CalcitePPLEventstatsEarliestLatestTest are removed pending stabilization of SparkSqlDialect emission for the join+aggregate form. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
|
Persistent review updated to latest commit a444b56 |
…Y keys CI integration failures revealed two cases the rewrite shouldn't fire on: 1. testUnsupportedWindowFunctions — percentile / percentile_approx are in AGGREGATION_FUNC_MAPPING but not WINDOW_FUNC_MAPPING. The legacy rex visitor throws "Unexpected window function: ..." for them, and the test pins that error. My predicate used only ofAggregation so it accepted percentile and the rewrite ran instead of throwing. Now require presence in both maps — percentile (and take/first/last/median, all aggregation- only) fall through to the legacy throw; dc/distinct_count/row_number (all window-only) also fall through unchanged. 2. testEventstatsOnMapPath — `eventstats count() by doc.user.city`. The join condition uses relBuilder.field(2, side, name), which doesn't resolve nested paths; my predicate accepted any QualifiedName so the rewrite produced a plan that failed at field lookup. Now isSimpleQualifiedName requires parts.size() == 1; dotted paths fall through to the legacy RexOver lowering, which handles nested fields via the existing rexVisitor. Plus a spotless reformat to the earliest/latest test that wasn't picked up before push. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
|
Persistent review updated to latest commit 1a043bc |
The eventstats rewrite changes the lowered plan from `Project(RexOver)`
to `Project + Join + Aggregate`. The four affected EXPLAIN tests run in
both modes:
- CalciteExplainIT (pushdown enabled)
- CalciteNoPushdownIT (pushdown disabled)
Both modes share the same logical plan (the rewrite is AST-level, not
pushdown-gated) but the physical plan diverges:
- Pushdown ON: right-side LogicalAggregate gets pushed into the inner
scan as `PushDownContext=[[AGGREGATION->...]]` with `size:0` +
`composite`+`terms` (BY) or `track_total_hits` (no-BY) source builder.
- Pushdown OFF: right side stays as a coordinator-side EnumerableAggregate
over a raw CalciteEnumerableIndexScan, no AGGREGATION in PushDownContext.
The existing convention (loadExpectedPlan in PPLIntegTestCase) loads from
`expectedOutput/calcite/` when pushdown is on and `expectedOutput/calcite_no_pushdown/`
when it's off, so each test keeps its NoPushdownIT coverage — both variants
of the same expected file get regenerated against a local node running
the rewrite.
Files regenerated (4 per mode, 8 total):
- explain_eventstats_earliest_latest.{json}
- explain_eventstats_earliest_latest_custom_time.{json}
- explain_eventstats_earliest_latest_no_group.{json}
- explain_eventstats_null_bucket.{yaml}
Captured against:
- logs_index_mapping.json + logs.json fixtures (5 docs)
- account_index_mapping.json + accounts.json fixtures (1000 docs)
- same fixtures CI uses; rel#N / RelSubset#N IDs are normalized at
compare time by assertJsonEqualsIgnoreId / assertYamlEqualsIgnoreId.
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
|
Persistent review updated to latest commit 15c52f7 |
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
|
Persistent review updated to latest commit f19e1d9 |
My local OS captured composite size as 10000; CI's integ-test cluster
sets it to 1000 (matches every other expected file under
expectedOutput/calcite/agg_composite_*.{json,yaml}). The number doesn't
affect correctness — it's just the per-bucket page size for the composite
aggregation request. Align with the test-framework convention.
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
|
Persistent review updated to latest commit 515f35e |
PPL eventstats accepts three aliases for the cardinality aggregation — dc, distinct_count, distinct_count_approx — all resolving to BuiltinFunctionName.DISTINCT_COUNT_APPROX. The stats command only accepts distinct_count_approx, so AGGREGATION_FUNC_MAPPING registers only that name; the other two are window-only aliases in WINDOW_FUNC_MAPPING. The previous predicate required intersection of both maps, which rejected dc and distinct_count. They fell through to the legacy RexOver lowering — which is the exact buggy "EnumerableWindow over a row-fetching scan" shape opensearch-project#5483 was filed against. Fix was incomplete. Replace the intersection check with: name is in ofWindowFunction AND its canonical aggregation name (BuiltinFunctionName.name().toLowerCase, e.g. "distinct_count_approx") is in ofAggregation. Translate the same way in stripWindowFunctionForAggregate so aggVisitor sees the registered name. For names already in both maps (count/sum/avg/etc.) the canonical name equals the user-typed name, so the lookup is a no-op — no behavior change for the cases that already worked. ROW_NUMBER still falls through because its canonical name "row_number" isn't in the aggregation map. Same for percentile / take / first / last / median / list / values — all rejected by the canonical-name lookup. Verified locally: - eventstats dc(state) → cardinality agg, size:0 - eventstats distinct_count(state) by gender → composite over gender + nested cardinality on state, size:0 Regenerated explain_eventstats_dc.json and explain_eventstats_distinct_count.json with the new shape (composite size 1000 to match CI). Both tests are pushdown-only (enabledOnlyWhenPushdownIsEnabled() + loadFromFile hardcoded to calcite/), so no calcite_no_pushdown/ variants needed. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
|
Persistent review updated to latest commit bb8d9b2 |
…owup
A perf A/B on a local 20k-doc index uncovered a real problem in the
no-BY rewrite that was hidden by tests with bounded result sets.
Before this fix, the no-BY case emitted:
EnumerableNestedLoopJoin(condition=[true], joinType=[inner])
leftScan (returns N rows)
rightScan (returns 1 row — the COUNT() scalar)
Calcite's NestedLoopJoin contract calls Enumerable.enumerator() on the
right side once per left tuple. Each enumerator open on a
CalciteEnumerableIndexScan triggers a fresh OpenSearch _search request.
For a 10k-row left side that means 10k OpenSearch calls. On a remote
cluster (1-10ms RTT per call), the head-less query would take tens of
seconds.
Measured on the local node, 20k docs, single shard, no head:
before: 10,004 OS calls per PPL query, ~1055ms wall
after: 4 OS calls per PPL query, ~174ms wall
That's ~6x faster wall and ~2500x fewer OS round-trips, with no
correctness change (results identical).
Fix: in the no-BY branch of rewriteWindowAsAggregateJoin, project a
literal-0 key column onto both sides (left: append after orig cols;
right: append after agg outputs) and join on equality. The equi-join
condition makes the planner pick EnumerableHashJoin, which drains the
single-row right side once into a hash table and probes per left row
in O(1).
Pushdown still fires on the right side — verified via EXPLAIN that the
right scan still carries `AGGREGATION->...COUNT()` and `size:0` in
PushDownContext; the literal-0 projection is a top-level wrapper that
doesn't disrupt the Aggregate→Scan operand chain
AggregateIndexScanRule.AGGREGATE_SCAN matches.
The BY case is unchanged — it already has an equi-join condition (or
IS NOT DISTINCT FROM for bucketNullable=true) which Calcite handles
correctly via EnumerableMergeJoin.
Test expectation updates:
- CalcitePPLEventstatsTest.testEventstatsCount / testEventstatsAvg
- CalcitePPLEventstatsEarliestLatestTest no-BY variants (4 tests)
- explain_eventstats_dc.json (no-BY, pushdown)
- explain_eventstats_earliest_latest_no_group.json (no-BY, both modes)
Outer LogicalProject now appears in the no-BY case because we must
strip the literal-0 key columns from the join output — it's no longer
a no-op passthrough that Calcite folds.
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
Perf finding & fix on the no-BY rewriteAn internal review flagged a concern about the cost of the two-scan + coordinator-join shape vs the original window form. Ran a local A/B against a 20k-doc accounts index to put numbers on it. Initial measurement uncovered a real problem
For the unbounded result-set case, the no-BY rewrite was triggering one OpenSearch search request per left-side row. Root causeThe no-BY case emitted:
The BY case never hit this — it already has an equi-join condition (or Fix (commit
|
| Scenario | After fix |
|---|---|
| `... | head 10` |
no head (10k-row result) |
~174ms warm, 4 OS calls across 5 runs |
That's ~6× faster wall and ~2,500× fewer OS round-trips for the unbounded case. Latency for the head-bounded case ticks up slightly (~30ms) because of the extra projection + hash-join overhead at small scale, but the absolute numbers are well under one round-trip.
Pushdown verification
Confirmed via EXPLAIN that the right-side aggregate still pushes down with the new shape:
EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
EnumerableCalc(... LITERAL(0) as join_key)
CalciteEnumerableIndexScan(
PushDownContext=[[AGGREGATION->...COUNT()]],
sourceBuilder={"size":0,"track_total_hits":2147483647})
EnumerableCalc(... LITERAL(0) as join_key)
CalciteEnumerableIndexScan(... full _source path ...)
The literal-0 projection wraps the aggregate but doesn't disrupt the Aggregate → Scan operand chain that AggregateIndexScanRule.AGGREGATE_SCAN matches — pushdown fires normally and the right side still issues exactly one size:0 + track_total_hits (or cardinality for dc(), or composite + top_hits for earliest/latest) request.
Test impact
Unit-test expectations updated for 2 tests in CalcitePPLEventstatsTest and 4 in CalcitePPLEventstatsEarliestLatestTest (the no-BY variants now show an outer LogicalProject because the literal-0 key columns must be stripped). 3 EXPLAIN expected files regenerated (explain_eventstats_dc.json pushdown, explain_eventstats_earliest_latest_no_group.json in both modes).
BY-case tests are unchanged — they already use equi-join conditions.
|
Persistent review updated to latest commit 6f333fc |
Description
PPL
eventstatslowers toLogicalProject(RexOver(...))directly above the scan. No rule inOpenSearchIndexRules.OPEN_SEARCH_PUSHDOWN_RULESmatches that shape — everyAggregateIndexScanRuleconfig requiresLogicalAggregateat the operand root, andRareTopPushdownRulerequires aROW_NUMBERwindow with aLESS_THAN_OR_EQUALfilter above. The plan therefore reaches Volcano withRexOverintact, becomesEnumerableWindow, and the scan beneath it stays in_source+requestedTotalSize=MAX_INTmode. On 47B-doc indices the coordinator times out streaming every matching document just to count it. Same pathological behavior withBY(the production query in the issue).This PR rewrites
WindowAST nodes inCalciteRelNodeVisitor.visitWindowintoProject → Join → (input, Aggregate(input)). The right-sideAggregatesits directly over a re-pushed copy of the input, matchingAggregateIndexScanRule.AGGREGATE_SCAN(no-BY) orDEFAULT/BUCKET_NON_NULL_AGG(BY). OpenSearch sees the same shape asstats count()—size:0 + track_total_hitsor atermsaggregation — instead of an unsized row fetch. The left side returns rows as before; the join broadcasts the aggregate value(s) onto each row, preserving the[original cols, agg cols]row type so downstream consumers (limit,head,fields) see no change.Design
Follows the precedent in
buildStreamWindowSelfJoinPlan(CalciteRelNodeVisitor.java:2348-2362):LogicalCorrelate, becauseLogicalCorrelatecauses NPE inRelDecorrelatorper the existing comment.bucketNullable=true→INNERjoin withIS NOT DISTINCT FROM((left.k = right.k) OR (both NULL));bucketNullable=false→LEFTjoin with simple equality,IS NOT NULLfilter pushed below the right aggregate to matchBUCKET_NON_NULL_AGG. NULL-keyed left rows are preserved withNULLaggregate values, matching the previous CASE-wrapped behavior.aggregateWithTrimmingis reused for the right-side aggregate construction, so agg-resolution semantics are identical tostats/streamstats.Rewriteability predicate (
canRewriteWindowAsAggregateJoin) rejects: non-aggregate window functions (ROW_NUMBER/LAG/ etc.), non-emptysortList, non-default frame, non-bare-field partition keys. Anything outside the eventstats shape falls through tovisitWindowAsRexOver, preserving existing behavior for any futureWindowproducer.Coverage against the issue's 7-requirement comment
Comment:
BY+BYboth addressed in a single rewrite;hasGrouptoggles cross-join vs equi-join. ✓aggregateWithTrimming(same pathstatsuses), supports COUNT/SUM/AVG/MIN/MAX/STDDEV/VAR/etc. uniformly. ✓Aggregatewith multiple agg calls, one broadcast join. ✓BYjoin-back —IS NOT DISTINCT FROMforbucketNullable=true;LEFTjoin + right-sideIS NOT NULLfilter forbucketNullable=false. Equivalent to the existing Window+CASE behavior. ✓AggregateIndexScanRuleexpects. ✓Predicate.not(LogicalProject::containsOver)guards inAggregateIndexScanRuleare untouched. Real Window→Aggregate rewrite at the AST level. ✓Verification — status
Draft because the following needs CI to finalize, blocked locally by a pre-existing
core:compileJavafailure onAnalyticsExecutionEngine.java:92(unrelatedanalytics-api:3.7.0-SNAPSHOTsymbol issue that also reproduces onmainwith no edits):CalcitePPLEventstatsTestverifyLogicalexpectations have been updated to my pattern-derived prediction of the new lowered shape. May need 1-2 small adjustments (trim-project corner cases) based on actual output.verifyPPLToSparkSQLcalls are temporarily removed pending observation of theSparkSqlDialectoutput for the join+aggregate form.explain_eventstats_avg.json,_dc.json,_distinct_count.json,_earliest_latest.json,_earliest_latest_custom_time.json,_earliest_latest_no_group.json,_null_bucket.yaml) will diff against the new plan shape. Regeneration from CI logs incoming as a follow-up commit.testEventstatsPushdownExplainmirroringstats count()— assertingAGGREGATION->...COUNT()inPushDownContextandtrack_total_hitsinsourceBuilder) incoming.BYintegration tests inCalcitePPLEventstatsITagainstTEST_INDEX_STATE_COUNTRY_WITH_NULL(bothbucketNullable=trueand=false) incoming.Adjacent unit suites (
CalcitePPLAggregationTest,CalcitePPLStreamstatsTest,CalcitePPLRareTest,CalcitePPLTopTest) should all be unchanged — different visitor paths orStreamWindow/RareTopNAST nodes.Related Issues
Resolves #5483
Check List