From 89fd1cf058c02b2fea2ec6869c3a9d039387b7ce Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Tue, 21 Apr 2026 20:14:53 +0200 Subject: [PATCH] [FLINK-39293][table] `MATCH_RECOGNIZE` fails with `SqlParserException` in views --- .../src/main/codegen/templates/Parser.jj | 23 +- .../sql/validate/SqlValidatorImpl.java | 81 ++- .../batch/sql/MatchRecognizeITCase.java | 673 +++++++++--------- 3 files changed, 414 insertions(+), 363 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj index 1bb939a46fa67..a9802fd631bab 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj +++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj @@ -2202,7 +2202,10 @@ SqlNode TableRef3(ExprContext exprContext, boolean lateral) : [ tableRef = ExtendTable(tableRef) ] tableRef = Over(tableRef) [ tableRef = Snapshot(tableRef) ] - [ tableRef = MatchRecognize(tableRef) ] + [ + LOOKAHEAD(3) + tableRef = MatchRecognize(tableRef) + ] ) | LOOKAHEAD(2) @@ -2210,7 +2213,10 @@ SqlNode TableRef3(ExprContext exprContext, boolean lateral) : tableRef = ParenthesizedExpression(exprContext) tableRef = Over(tableRef) tableRef = addLateral(tableRef, lateral) - [ tableRef = MatchRecognize(tableRef) ] + [ + LOOKAHEAD(3) + tableRef = MatchRecognize(tableRef) + ] | LOOKAHEAD(2) [ ] // "LATERAL" is implicit with "UNNEST", so ignore @@ -3059,6 +3065,7 @@ void AddUnpivotValue(List list) : SqlMatchRecognize MatchRecognize(SqlNode tableRef) : { final Span s, s0, s1, s2; + final SqlIdentifier aliasBeforeMatch; final SqlNodeList measureList; final SqlNodeList partitionList; final SqlNodeList orderList; @@ -3073,6 +3080,12 @@ SqlMatchRecognize MatchRecognize(SqlNode tableRef) : final SqlLiteral isStrictEnds; } { + [ + aliasBeforeMatch = SimpleIdentifier() { + tableRef = SqlStdOperatorTable.AS.createCall( + Span.of(tableRef).end(this), tableRef, aliasBeforeMatch); + } + ] { s = span(); checkNotJoin(tableRef); } ( { s2 = span(); } @@ -7210,7 +7223,7 @@ SqlCall MatchRecognizeCallWithModifier() : { final Span s; final SqlOperator runningOp; - final SqlNode func; + final SqlNode e; } { ( @@ -7219,8 +7232,8 @@ SqlCall MatchRecognizeCallWithModifier() : { runningOp = SqlStdOperatorTable.FINAL; } ) { s = span(); } - func = NamedFunctionCall() { - return runningOp.createCall(s.end(func), func); + e = Expression3(ExprContext.ACCEPT_NON_QUERY) { + return runningOp.createCall(s.end(e), e); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index 6635637888aed..6ef58067ced2f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -169,23 +169,35 @@ * Default implementation of {@link SqlValidator}, the class was copied over because of * CALCITE-4554. * - *

Lines 207 ~ 210, Flink improves error message for functions without appropriate arguments in + *

Lines 219 ~ 222, Flink improves error message for functions without appropriate arguments in * handleUnresolvedFunction. * - *

Lines 1275 ~ 1277, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0. + *

Lines 1287 ~ 1289, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0. * - *

Lines 2036 ~ 2050, Flink improves error message for functions without appropriate arguments in + *

Lines 2048 ~ 2062, Flink improves error message for functions without appropriate arguments in * handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}. * - *

Lines 2576 ~ 2595, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to + *

Lines 2475 ~ 2477, CALCITE-7471 should be removed after upgrading Calcite to 1.42.0. + * + *

Lines 2590 ~ 2609, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to * 1.42.0. * - *

Line 2626 ~2644, set the correct scope for VECTOR_SEARCH. + *

Line 2640 ~2658, set the correct scope for VECTOR_SEARCH. * - *

Lines 3923 ~ 3927, 6602 ~ 6608 Flink improves Optimize the retrieval of sub-operands in + *

Lines 3937 ~ 3941, 6612 ~ 6618 Flink improves Optimize the retrieval of sub-operands in * SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}. * - *

Lines 5343 ~ 5349, FLINK-24352 Add null check for temporal table check on SqlSnapshot. + *

Lines 5357 ~ 5363, FLINK-24352 Add null check for temporal table check on SqlSnapshot. + * + *

Lines 5784-5786, CALCITE-7466 should be removed after upgrading Calcite to 1.42.0. + * + *

Lines 5840-5842, CALCITE-7470 should be removed after upgrading Calcite to 1.42.0. + * + *

Lines 7267-7290, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. + * + *

Lines 7337-7354, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. + * + *

Lines 7399-7407, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. */ public class SqlValidatorImpl implements SqlValidatorWithHints { // ~ Static fields/initializers --------------------------------------------- @@ -2460,7 +2472,9 @@ private SqlNode registerFrom( enclosingNode, alias, forceNullable); - return node; + // ----- FLINK MODIFICATION BEGIN ----- + return newNode; + // ----- FLINK MODIFICATION END ----- case PIVOT: registerPivot( @@ -5767,11 +5781,9 @@ private PairList validateMeasure( setValidatedNodeType(measure, type); fields.add(alias, type); - sqlNodes.add( - SqlStdOperatorTable.AS.createCall( - SqlParserPos.ZERO, - expand, - new SqlIdentifier(alias, SqlParserPos.ZERO))); + // ----- FLINK MODIFICATION BEGIN ----- + sqlNodes.add(expand); + // ----- FLINK MODIFICATION END ----- } SqlNodeList list = new SqlNodeList(sqlNodes, measures.getParserPosition()); @@ -5825,11 +5837,9 @@ private void validateDefinitions(SqlMatchRecognize mr, MatchRecognizeScope scope // Some extra work need required here. // In PREV, NEXT, FINAL and LAST, only one pattern variable is allowed. - sqlNodes.add( - SqlStdOperatorTable.AS.createCall( - SqlParserPos.ZERO, - expand, - new SqlIdentifier(alias, SqlParserPos.ZERO))); + // ----- FLINK MODIFICATION BEGIN ----- + sqlNodes.add(expand); + // ----- FLINK MODIFICATION END ----- final RelDataType type = deriveType(scope, expand); if (!SqlTypeUtil.inBooleanFamily(type)) { @@ -7254,19 +7264,31 @@ private class PatternValidator extends SqlBasicVisitor<@Nullable Set> { int firstLastCount; int prevNextCount; int aggregateCount; + // ----- FLINK MODIFICATION BEGIN ----- + int index; + int argCount; PatternValidator(boolean isMeasure) { - this(isMeasure, 0, 0, 0); + this(isMeasure, 0, 0, 0, 0, 0); } PatternValidator( - boolean isMeasure, int firstLastCount, int prevNextCount, int aggregateCount) { + boolean isMeasure, + int firstLastCount, + int prevNextCount, + int aggregateCount, + int index, + int argCount) { this.isMeasure = isMeasure; this.firstLastCount = firstLastCount; this.prevNextCount = prevNextCount; this.aggregateCount = aggregateCount; + this.index = index; + this.argCount = argCount; } + // ----- FLINK MODIFICATION END ----- + @Override public Set visit(SqlCall call) { boolean isSingle = false; @@ -7312,7 +7334,9 @@ public Set visit(SqlCall call) { call, Static.RESOURCE.patternRunningFunctionInDefine(call.toString())); } - for (SqlNode node : operands) { + // ----- FLINK MODIFICATION BEGIN ----- + for (int i = 0; i < operands.size(); i++) { + SqlNode node = operands.get(i); if (node != null) { vars.addAll( requireNonNull( @@ -7321,10 +7345,13 @@ public Set visit(SqlCall call) { isMeasure, firstLastCount, prevNextCount, - aggregateCount)), + aggregateCount, + i, + operands.size())), () -> "node.accept(PatternValidator) for node " + node)); } } + // ----- FLINK MODIFICATION END ----- if (isSingle) { switch (kind) { @@ -7369,7 +7396,15 @@ public Set visit(SqlIdentifier identifier) { @Override public Set visit(SqlLiteral literal) { - return ImmutableSet.of(); + // ----- FLINK MODIFICATION BEGIN ----- + if ((this.argCount == 1 || this.index < this.argCount - 1) + && (this.firstLastCount > 0 || this.prevNextCount > 0) + && !SqlUtil.isNull(literal)) { + return ImmutableSet.of(literal.toValue()); + } else { + return ImmutableSet.of(); + } + // ----- FLINK MODIFICATION END ----- } @Override diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java index 5aae60f527904..db4d7a3705e6d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java @@ -41,8 +41,6 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.Comparator; -import java.util.List; import static org.apache.flink.api.common.typeinfo.Types.DOUBLE; import static org.apache.flink.api.common.typeinfo.Types.INSTANT; @@ -90,24 +88,23 @@ void testSimplePatternInProcTime() { .column("name", DataTypes.STRING()) .columnByExpression("proctime", "PROCTIME()") .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT T.aid, T.bid, T.cid\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY proctime\n" - + " MEASURES\n" - + " `A\"`.id AS aid,\n" - + " \u006C.id AS bid,\n" - + " C.id AS cid\n" - + " PATTERN (`A\"` \u006C C)\n" - + " DEFINE\n" - + " `A\"` AS name = 'a',\n" - + " \u006C AS name = 'b',\n" - + " C AS name = 'c'\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(6, 7, 8)); + final String sql = + "SELECT T.aid, T.bid, T.cid\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " `A\"`.id AS aid,\n" + + " \u006C.id AS bid,\n" + + " C.id AS cid\n" + + " PATTERN (`A\"` \u006C C)\n" + + " DEFINE\n" + + " `A\"` AS name = 'a',\n" + + " \u006C AS name = 'b',\n" + + " C AS name = 'c'\n" + + ") AS T"; + + assertTableResult(sql, Row.of(6, 7, 8)); } @Test @@ -137,24 +134,23 @@ void testSimplePatternInEventTime() { .column("name", DataTypes.STRING()) .column("ts", DataTypes.TIMESTAMP_LTZ(3)) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT T.aid, T.bid, T.cid\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " `A\"`.id AS aid,\n" - + " \u006C.id AS bid,\n" - + " C.id AS cid\n" - + " PATTERN (`A\"` \u006C C)\n" - + " DEFINE\n" - + " `A\"` AS name = 'a',\n" - + " \u006C AS name = 'b',\n" - + " C AS name = 'c'\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(6, 7, 8)); + final String sql = + "SELECT T.aid, T.bid, T.cid\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " `A\"`.id AS aid,\n" + + " \u006C.id AS bid,\n" + + " C.id AS cid\n" + + " PATTERN (`A\"` \u006C C)\n" + + " DEFINE\n" + + " `A\"` AS name = 'a',\n" + + " \u006C AS name = 'b',\n" + + " C AS name = 'c'\n" + + ") AS T"; + + assertTableResult(sql, Row.of(6, 7, 8)); } @Test @@ -186,24 +182,23 @@ void testTimeConstraint() { .column("name", DataTypes.STRING()) .column("ts", DataTypes.TIMESTAMP(3)) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT T.aid, T.bid, T.cid\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " A.id AS aid,\n" - + " B.id AS bid,\n" - + " C.id AS cid\n" - + " PATTERN (A B C) WITHIN INTERVAL '1' MINUTE\n" - + " DEFINE\n" - + " A AS name = 'a',\n" - + " B AS name = 'b',\n" - + " C AS name = 'c'\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(2, 3, 4)); + + final String sql = + "SELECT T.aid, T.bid, T.cid\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " B.id AS bid,\n" + + " C.id AS cid\n" + + " PATTERN (A B C) WITHIN INTERVAL '1' MINUTE\n" + + " DEFINE\n" + + " A AS name = 'a',\n" + + " B AS name = 'b',\n" + + " C AS name = 'c'\n" + + ") AS T"; + assertTableResult(sql, Row.of(2, 3, 4)); } @Test @@ -235,25 +230,25 @@ void testSimplePatternWithNulls() { .column("nullField", DataTypes.STRING()) .column("ts", DataTypes.TIMESTAMP_LTZ(3)) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT T.aid, T.bNull, T.cid, T.aNull\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " A.id AS aid,\n" - + " A.nullField AS aNull,\n" - + " LAST(B.nullField) AS bNull,\n" - + " C.id AS cid\n" - + " PATTERN (A B C)\n" - + " DEFINE\n" - + " A AS name = 'a' AND nullField IS NULL,\n" - + " B AS name = 'b' AND LAST(A.nullField) IS NULL,\n" - + " C AS name = 'c'\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(1, null, 3, null), Row.of(6, null, 8, null)); + + final String sql = + "SELECT T.aid, T.bNull, T.cid, T.aNull\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " A.nullField AS aNull,\n" + + " LAST(B.nullField) AS bNull,\n" + + " C.id AS cid\n" + + " PATTERN (A B C)\n" + + " DEFINE\n" + + " A AS name = 'a' AND nullField IS NULL,\n" + + " B AS name = 'b' AND LAST(A.nullField) IS NULL,\n" + + " C AS name = 'c'\n" + + ") AS T"; + + assertTableResult(sql, Row.of(1, null, 3, null), Row.of(6, null, 8, null)); } @Test @@ -288,31 +283,29 @@ void testCodeSplitsAreProperlyGenerated() { .column("key2", DataTypes.STRING()) .column("ts", DataTypes.TIMESTAMP_LTZ(3)) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " PARTITION BY key1, key2\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " A.id AS aid,\n" - + " A.key1 AS akey1,\n" - + " LAST(B.id) AS bid,\n" - + " C.id AS cid,\n" - + " C.key2 AS ckey2\n" - + " PATTERN (A B C)\n" - + " DEFINE\n" - + " A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,\n" - + " B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n" - + " C AS name = 'c' AND LAST(A.name) = 'a'\n" - + ") AS T"); - List actual = CollectionUtil.iteratorToList(tableResult.collect()); - actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0)))); - assertThat(actual) - .containsExactly( - Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"), - Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4")); + final String sql = + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY key1, key2\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " A.key1 AS akey1,\n" + + " LAST(B.id) AS bid,\n" + + " C.id AS cid,\n" + + " C.key2 AS ckey2\n" + + " PATTERN (A B C)\n" + + " DEFINE\n" + + " A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,\n" + + " B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n" + + " C AS name = 'c' AND LAST(A.name) = 'a'\n" + + ") AS T"; + + assertTableResult( + sql, + Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"), + Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4")); } @Test @@ -413,31 +406,30 @@ void testMatchRecognizeAppliedToWindowedGrouping() { .column("tax", DataTypes.INT()) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM (\n" - + " SELECT\n" - + " symbol,\n" - + " SUM(price) as price,\n" - + " TUMBLE_ROWTIME(ts, interval '3' second) as rowTime,\n" - + " TUMBLE_START(ts, interval '3' second) as startTime\n" - + " FROM Ticker\n" - + " GROUP BY symbol, TUMBLE(ts, interval '3' second)\n" - + ")\n" - + "MATCH_RECOGNIZE (\n" - + " PARTITION BY symbol\n" - + " ORDER BY rowTime\n" - + " MEASURES\n" - + " B.price as dPrice,\n" - + " B.startTime as dTime\n" - + " ONE ROW PER MATCH\n" - + " PATTERN (A B)\n" - + " DEFINE\n" - + " B AS B.price < A.price\n" - + ")"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of("ACME", 2, now.plusSeconds(3))); + final String sql = + "SELECT *\n" + + "FROM (\n" + + " SELECT\n" + + " symbol,\n" + + " SUM(price) as price,\n" + + " TUMBLE_ROWTIME(ts, interval '3' second) as rowTime,\n" + + " TUMBLE_START(ts, interval '3' second) as startTime\n" + + " FROM Ticker\n" + + " GROUP BY symbol, TUMBLE(ts, interval '3' second)\n" + + ")\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol\n" + + " ORDER BY rowTime\n" + + " MEASURES\n" + + " B.price as dPrice,\n" + + " B.startTime as dTime\n" + + " ONE ROW PER MATCH\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " B AS B.price < A.price\n" + + ")"; + + assertTableResult(sql, Row.of("ACME", 2, now.plusSeconds(3))); } @Test @@ -467,39 +459,39 @@ void testWindowedGroupingAppliedToMatchRecognize() { .column("tax", DataTypes.INT()) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT\n" - + " symbol,\n" - + " SUM(price) as price,\n" - + " TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime,\n" - + " TUMBLE_START(matchRowtime, interval '3' second) as startTime\n" - + "FROM Ticker\n" - + "MATCH_RECOGNIZE (\n" - + " PARTITION BY symbol\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " A.price as price,\n" - + " A.tax as tax,\n" - + " MATCH_ROWTIME() as matchRowtime\n" - + " ONE ROW PER MATCH\n" - + " PATTERN (A)\n" - + " DEFINE\n" - + " A AS A.price > 0\n" - + ") AS T\n" - + "GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second)"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly( - Row.of( - "ACME", - 3, - LocalDateTime.parse("1970-01-01T00:00:02.999"), - LocalDateTime.parse("1970-01-01T00:00")), - Row.of( - "ACME", - 2, - LocalDateTime.parse("1970-01-01T00:00:05.999"), - LocalDateTime.parse("1970-01-01T00:00:03"))); + final String sql = + "SELECT\n" + + " symbol,\n" + + " SUM(price) as price,\n" + + " TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime,\n" + + " TUMBLE_START(matchRowtime, interval '3' second) as startTime\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " A.price as price,\n" + + " A.tax as tax,\n" + + " MATCH_ROWTIME() as matchRowtime\n" + + " ONE ROW PER MATCH\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS A.price > 0\n" + + ") AS T\n" + + "GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second)"; + + assertTableResult( + sql, + Row.of( + "ACME", + 3, + LocalDateTime.parse("1970-01-01T00:00:02.999"), + LocalDateTime.parse("1970-01-01T00:00")), + Row.of( + "ACME", + 2, + LocalDateTime.parse("1970-01-01T00:00:05.999"), + LocalDateTime.parse("1970-01-01T00:00:03"))); } @Test @@ -530,27 +522,27 @@ void testLogicalOffsets() { .column("tax", DataTypes.INT()) .columnByExpression("ts", "TO_TIMESTAMP_LTZ(tstamp, 3)") .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM Ticker\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " FIRST(DOWN.tstamp) AS start_tstamp,\n" - + " LAST(DOWN.tstamp) AS bottom_tstamp,\n" - + " UP.tstamp AS end_tstamp,\n" - + " FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,\n" - + " UP.price + UP.tax AS end_total\n" - + " ONE ROW PER MATCH\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (DOWN{2,} UP)\n" - + " DEFINE\n" - + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" - + " UP AS price < FIRST(DOWN.price)\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(6L, 7L, 8L, 33, 33)); + + final String sql = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " FIRST(DOWN.tstamp) AS start_tstamp,\n" + + " LAST(DOWN.tstamp) AS bottom_tstamp,\n" + + " UP.tstamp AS end_tstamp,\n" + + " FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,\n" + + " UP.price + UP.tax AS end_total\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN{2,} UP)\n" + + " DEFINE\n" + + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" + + " UP AS price < FIRST(DOWN.price)\n" + + ") AS T"; + + assertTableResult(sql, Row.of(6L, 7L, 8L, 33, 33)); } @Test @@ -579,25 +571,24 @@ void testPartitionByWithParallelSource() { .column("tax", DataTypes.INT()) .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM Ticker\n" - + "MATCH_RECOGNIZE (\n" - + " PARTITION BY symbol\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " DOWN.tax AS bottom_tax,\n" - + " UP.tax AS end_tax\n" - + " ONE ROW PER MATCH\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (DOWN UP)\n" - + " DEFINE\n" - + " DOWN AS DOWN.price = 13,\n" - + " UP AS UP.price = 20\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of("ACME", 3, 4)); + final String sql = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " DOWN.tax AS bottom_tax,\n" + + " UP.tax AS end_tax\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN UP)\n" + + " DEFINE\n" + + " DOWN AS DOWN.price = 13,\n" + + " UP AS UP.price = 20\n" + + ") AS T"; + + assertTableResult(sql, Row.of("ACME", 3, 4)); } @Test @@ -628,38 +619,37 @@ void testLogicalOffsetsWithStarVariable() { .column("price", DataTypes.INT()) .columnByExpression("ts", "TO_TIMESTAMP_LTZ(tstamp, 3)") .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM Ticker\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " FIRST(id, 0) as id0,\n" - + " FIRST(id, 1) as id1,\n" - + " FIRST(id, 2) as id2,\n" - + " FIRST(id, 3) as id3,\n" - + " FIRST(id, 4) as id4,\n" - + " FIRST(id, 5) as id5,\n" - + " FIRST(id, 6) as id6,\n" - + " FIRST(id, 7) as id7,\n" - + " LAST(id, 0) as id8,\n" - + " LAST(id, 1) as id9,\n" - + " LAST(id, 2) as id10,\n" - + " LAST(id, 3) as id11,\n" - + " LAST(id, 4) as id12,\n" - + " LAST(id, 5) as id13,\n" - + " LAST(id, 6) as id14,\n" - + " LAST(id, 7) as id15\n" - + " ONE ROW PER MATCH\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (`DOWN\"`{2,} UP)\n" - + " DEFINE\n" - + " `DOWN\"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,\n" - + " UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1)); + final String sql = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " FIRST(id, 0) as id0,\n" + + " FIRST(id, 1) as id1,\n" + + " FIRST(id, 2) as id2,\n" + + " FIRST(id, 3) as id3,\n" + + " FIRST(id, 4) as id4,\n" + + " FIRST(id, 5) as id5,\n" + + " FIRST(id, 6) as id6,\n" + + " FIRST(id, 7) as id7,\n" + + " LAST(id, 0) as id8,\n" + + " LAST(id, 1) as id9,\n" + + " LAST(id, 2) as id10,\n" + + " LAST(id, 3) as id11,\n" + + " LAST(id, 4) as id12,\n" + + " LAST(id, 5) as id13,\n" + + " LAST(id, 6) as id14,\n" + + " LAST(id, 7) as id15\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (`DOWN\"`{2,} UP)\n" + + " DEFINE\n" + + " `DOWN\"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,\n" + + " UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)\n" + + ") AS T"; + + assertTableResult(sql, Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1)); } @Test @@ -686,25 +676,25 @@ void testLogicalOffsetOutsideOfRangeInMeasures() { .column("tax", DataTypes.INT()) .columnByExpression("ts", "TO_TIMESTAMP_LTZ(tstamp, 3)") .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM Ticker\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " FIRST(DOWN.price) as first,\n" - + " LAST(DOWN.price) as last,\n" - + " FIRST(DOWN.price, 5) as nullPrice\n" - + " ONE ROW PER MATCH\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (DOWN{2,} UP)\n" - + " DEFINE\n" - + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" - + " UP AS price > LAST(DOWN.price)\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(19, 13, null)); + + final String sql = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " FIRST(DOWN.price) as first,\n" + + " LAST(DOWN.price) as last,\n" + + " FIRST(DOWN.price, 5) as nullPrice\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN{2,} UP)\n" + + " DEFINE\n" + + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" + + " UP AS price > LAST(DOWN.price)\n" + + ") AS T"; + + assertTableResult(sql, Row.of(19, 13, null)); } /** @@ -754,35 +744,36 @@ void testAggregates() { .column("ts", DataTypes.TIMESTAMP_LTZ(3)) .build())); tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg()); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " FIRST(id) as startId,\n" - + " SUM(A.price) AS sumA,\n" - + " COUNT(D.price) AS countD,\n" - + " SUM(D.price) as sumD,\n" - + " weightedAvg(price, weight) as wAvg,\n" - + " AVG(B.price) AS avgB,\n" - + " SUM(B.price * B.rate) as sumExprB,\n" - + " LAST(id) as endId\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (A+ B+ C D? E)\n" - + " DEFINE\n" - + " A AS SUM(A.price) < 6,\n" - + " B AS SUM(B.price * B.rate) < SUM(A.price) AND\n" - + " SUM(B.price * B.rate) > 0.2 AND\n" - + " SUM(B.price) >= 1 AND\n" - + " AVG(B.price) >= 1 AND\n" - + " weightedAvg(price, weight) > 1\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly( - Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8), - Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12)); + + final String sql = + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " FIRST(id) as startId,\n" + + " SUM(A.price) AS sumA,\n" + + " COUNT(D.price) AS countD,\n" + + " SUM(D.price) as sumD,\n" + + " weightedAvg(price, weight) as wAvg,\n" + + " AVG(B.price) AS avgB,\n" + + " SUM(B.price * B.rate) as sumExprB,\n" + + " LAST(id) as endId\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (A+ B+ C D? E)\n" + + " DEFINE\n" + + " A AS SUM(A.price) < 6,\n" + + " B AS SUM(B.price * B.rate) < SUM(A.price) AND\n" + + " SUM(B.price * B.rate) > 0.2 AND\n" + + " SUM(B.price) >= 1 AND\n" + + " AVG(B.price) >= 1 AND\n" + + " weightedAvg(price, weight) > 1\n" + + ") AS T"; + + assertTableResult( + sql, + Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8), + Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12)); } @Test @@ -816,27 +807,27 @@ void testAggregatesWithNullInputs() { .column("ts", DataTypes.TIMESTAMP_LTZ(3)) .build())); tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg()); - TableResult tableResult = - tEnv.executeSql( - "SELECT *\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " SUM(A.price) as sumA,\n" - + " COUNT(A.id) as countAId,\n" - + " COUNT(A.price) as countAPrice,\n" - + " COUNT(*) as countAll,\n" - + " COUNT(price) as countAllPrice,\n" - + " LAST(id) as endId\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (A+ C)\n" - + " DEFINE\n" - + " A AS SUM(A.price) < 30,\n" - + " C AS C.name = 'c'\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(29, 7L, 5L, 8L, 6L, 8)); + + final String sql = + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " SUM(A.price) as sumA,\n" + + " COUNT(A.id) as countAId,\n" + + " COUNT(A.price) as countAPrice,\n" + + " COUNT(*) as countAll,\n" + + " COUNT(price) as countAllPrice,\n" + + " LAST(id) as endId\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (A+ C)\n" + + " DEFINE\n" + + " A AS SUM(A.price) < 30,\n" + + " C AS C.name = 'c'\n" + + ") AS T"; + + assertTableResult(sql, Row.of(29, 7L, 5L, 8L, 6L, 8)); } @Test @@ -851,21 +842,21 @@ void testAccessingCurrentTime() { .column("name", DataTypes.STRING()) .columnByExpression("proctime", "PROCTIME()") .build())); - TableResult tableResult = - tEnv.executeSql( - "SELECT T.aid\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY proctime\n" - + " MEASURES\n" - + " A.id AS aid,\n" - + " A.proctime AS aProctime,\n" - + " LAST(A.proctime + INTERVAL '1' second) as calculatedField\n" - + " PATTERN (A)\n" - + " DEFINE\n" - + " A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)\n" - + ") AS T"); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())).containsExactly(Row.of(1)); + + final String sql = + "SELECT T.aid\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " A.proctime AS aProctime,\n" + + " LAST(A.proctime + INTERVAL '1' second) as calculatedField\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)\n" + + ") AS T"; + assertTableResult(sql, Row.of(1)); } @Test @@ -906,29 +897,28 @@ void testUserDefinedFunctions() { jobParameters.setString("prefix", prefix); jobParameters.setString("start", Integer.toString(startFrom)); env.getConfig().setGlobalJobParameters(jobParameters); - TableResult tableResult = - tEnv.executeSql( - String.format( - "SELECT *\n" - + "FROM MyTable\n" - + "MATCH_RECOGNIZE (\n" - + " ORDER BY ts\n" - + " MEASURES\n" - + " FIRST(id) as firstId,\n" - + " prefix(A.name) as prefixedNameA,\n" - + " countFrom(A.price) as countFromA,\n" - + " LAST(id) as lastId\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (A+ C)\n" - + " DEFINE\n" - + " A AS prefix(A.name) = '%s:a' AND countFrom(A.price) <= %d\n" - + ") AS T", - prefix, 4 + 4)); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())) - .containsExactly(Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a", 6, 9)); + String sql = + String.format( + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY ts\n" + + " MEASURES\n" + + " FIRST(id) as firstId,\n" + + " prefix(A.name) as prefixedNameA,\n" + + " countFrom(A.price) as countFromA,\n" + + " LAST(id) as lastId\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (A+ C)\n" + + " DEFINE\n" + + " A AS prefix(A.name) = '%s:a' AND countFrom(A.price) <= %d\n" + + ") AS T", + prefix, 4 + 4); + + assertTableResult(sql, Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a", 6, 9)); } - /** Test prefixing function.. */ + /** Test prefixing function... */ public static class PrefixingScalarFunc extends ScalarFunction { private String prefix = "ERROR_VALUE"; @@ -981,4 +971,17 @@ public void accumulate(CountAcc countAcc, Integer value) { countAcc.count += value; } } + + private void assertTableResult(String sql, Row... expected) { + TableResult tableResult = tEnv.executeSql(sql); + assertThat(CollectionUtil.iteratorToList(tableResult.collect())).containsExactly(expected); + + // Also check that same query is able to compile and return same result if it is used in + // view + // test cases for FLINK-39293 + tEnv.executeSql("CREATE VIEW test_view AS \n" + sql); + TableResult tableResultWithView = tEnv.executeSql("SELECT * FROM test_view"); + assertThat(CollectionUtil.iteratorToList(tableResultWithView.collect())) + .containsExactly(expected); + } }