From eeeeba726c2227d7b7f3d0c8c90fee85218e5bed Mon Sep 17 00:00:00 2001 From: DaZuiZui Date: Mon, 1 Jun 2026 16:44:34 +0800 Subject: [PATCH 1/4] Implement table NEXT fill --- .../it/query/recent/IoTDBFillTableIT.java | 155 +++++++++++ .../src/main/codegen/templates/nextFill.ftl | 101 +++++++ .../process/TableNextFillOperator.java | 76 ++++++ .../TableNextFillWithGroupOperator.java | 167 ++++++++++++ .../operator/process/fill/next/NextFill.java | 182 +++++++++++++ .../plan/planner/CommonOperatorUtils.java | 105 +++++--- .../plan/planner/TableOperatorGenerator.java | 44 +++ .../planner/plan/node/PlanGraphPrinter.java | 15 ++ .../plan/relational/analyzer/Analysis.java | 26 ++ .../analyzer/StatementAnalyzer.java | 22 ++ .../plan/relational/planner/QueryPlanner.java | 22 ++ .../iterative/rule/PruneFillColumns.java | 5 + .../PushLimitOffsetIntoTableScan.java | 7 + .../UnaliasSymbolReferences.java | 32 +++ .../relational/sql/parser/AstBuilder.java | 38 +++ .../operator/process/fill/NextFillTest.java | 253 ++++++++++++++++++ .../planner/NextFillNodeSerdeTest.java | 52 ++++ .../sql/parser/FillStatementTest.java | 68 +++++ .../plan/node/CommonPlanNodeDeserializer.java | 3 + .../plan/node/ICoreQueryPlanVisitor.java | 5 + .../plan/planner/plan/node/PlanNodeType.java | 1 + .../relational/planner/node/NextFillNode.java | 195 ++++++++++++++ .../plan/relational/sql/ast/Fill.java | 21 +- .../sql/util/CommonQuerySqlFormatter.java | 3 +- .../plan/statement/component/FillPolicy.java | 1 + .../relational/grammar/sql/RelationalSql.g4 | 1 + 26 files changed, 1561 insertions(+), 39 deletions(-) create mode 100644 iotdb-core/calc-commons/src/main/codegen/templates/nextFill.ftl create mode 100644 iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java create mode 100644 iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java create mode 100644 iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/next/NextFill.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/FillStatementTest.java create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/NextFillNode.java diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java index edd66bdee2dd7..18d2d3c30786b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java @@ -653,6 +653,142 @@ public void normalFillTest() { } } + @Test + public void nextFillTest() { + String[] expectedHeader = + new String[] { + "time", "device_id", "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10" + }; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,1,11,1.1,11.1,true,text1,string1,0xcafebabe01,1970-01-01T00:00:00.001Z,2024-10-01,", + "1970-01-01T00:00:00.002Z,d1,2,22,2.2,22.2,false,text3,string3,0xcafebabe03,1970-01-01T00:00:00.003Z,2024-10-03,", + "1970-01-01T00:00:00.003Z,d1,5,55,5.5,55.5,false,text3,string3,0xcafebabe03,1970-01-01T00:00:00.003Z,2024-10-03,", + "1970-01-01T00:00:00.004Z,d1,5,55,5.5,55.5,false,text4,string4,0xcafebabe04,1970-01-01T00:00:00.004Z,2024-10-04,", + "1970-01-01T00:00:00.005Z,d1,5,55,5.5,55.5,false,text8,string8,0xcafebabe08,1970-01-01T00:00:00.008Z,2024-10-08,", + "1970-01-01T00:00:00.007Z,d1,7,77,7.7,77.7,true,text8,string8,0xcafebabe08,1970-01-01T00:00:00.008Z,2024-10-08,", + "1970-01-01T00:00:00.008Z,d1,null,null,null,null,null,text8,string8,0xcafebabe08,1970-01-01T00:00:00.008Z,2024-10-08,", + }; + tableResultSetEqualTest( + "select * from table1 FILL METHOD NEXT", expectedHeader, retArray, DATABASE_NAME); + + retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,1,11,1.1,11.1,true,text1,string1,0xcafebabe01,1970-01-01T00:00:00.001Z,2024-10-01,", + "1970-01-01T00:00:00.002Z,d1,2,22,2.2,22.2,false,text3,string3,0xcafebabe03,1970-01-01T00:00:00.003Z,2024-10-03,", + "1970-01-01T00:00:00.003Z,d1,5,55,5.5,55.5,false,text3,string3,0xcafebabe03,1970-01-01T00:00:00.003Z,2024-10-03,", + "1970-01-01T00:00:00.004Z,d1,5,55,5.5,55.5,false,text4,string4,0xcafebabe04,1970-01-01T00:00:00.004Z,2024-10-04,", + "1970-01-01T00:00:00.005Z,d1,5,55,5.5,55.5,false,null,null,null,null,null,", + "1970-01-01T00:00:00.007Z,d1,7,77,7.7,77.7,true,text8,string8,0xcafebabe08,1970-01-01T00:00:00.008Z,2024-10-08,", + "1970-01-01T00:00:00.008Z,d1,null,null,null,null,null,text8,string8,0xcafebabe08,1970-01-01T00:00:00.008Z,2024-10-08,", + }; + tableResultSetEqualTest( + "select * from table1 FILL METHOD NEXT TIME_BOUND 2ms TIME_COLUMN 1", + expectedHeader, + retArray, + DATABASE_NAME); + + retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,1,11,1.1,11.1,true,text1,string1,0xcafebabe01,1970-01-01T00:00:00.001Z,2024-10-01,", + "1970-01-01T00:00:00.002Z,d1,2,22,2.2,22.2,false,null,null,null,null,null,", + "1970-01-01T00:00:00.003Z,d1,null,null,null,null,null,text3,string3,0xcafebabe03,1970-01-01T00:00:00.003Z,2024-10-03,", + "1970-01-01T00:00:00.004Z,d1,null,null,null,null,null,text4,string4,0xcafebabe04,1970-01-01T00:00:00.004Z,2024-10-04,", + "1970-01-01T00:00:00.005Z,d1,5,55,5.5,55.5,false,null,null,null,null,null,", + "1970-01-01T00:00:00.007Z,d1,7,77,7.7,77.7,true,null,null,null,null,null,", + "1970-01-01T00:00:00.008Z,d1,null,null,null,null,null,text8,string8,0xcafebabe08,1970-01-01T00:00:00.008Z,2024-10-08,", + }; + tableResultSetEqualTest( + "select * from table1 FILL METHOD NEXT TIME_BOUND 2ms TIME_COLUMN 11", + expectedHeader, + retArray, + DATABASE_NAME); + + expectedHeader = new String[] {"s1", "s6"}; + retArray = + new String[] { + "1,text1,", "2,text3,", "5,text3,", "5,text4,", "5,text8,", "7,text8,", "null,text8,", + }; + tableResultSetEqualTest( + "select s1,s6 from table1 FILL METHOD NEXT", expectedHeader, retArray, DATABASE_NAME); + + expectedHeader = new String[] {"time", "s1", "s6"}; + retArray = + new String[] { + "1970-01-01T00:00:00.008Z,null,text8,", + "1970-01-01T00:00:00.007Z,7,text8,", + "1970-01-01T00:00:00.005Z,5,text8,", + "1970-01-01T00:00:00.004Z,5,text4,", + "1970-01-01T00:00:00.003Z,5,text3,", + "1970-01-01T00:00:00.002Z,2,text3,", + "1970-01-01T00:00:00.001Z,1,text1,", + }; + tableResultSetEqualTest( + "select time,s1,s6 from table1 FILL METHOD NEXT order by time desc", + expectedHeader, + retArray, + DATABASE_NAME); + + retArray = + new String[] { + "1970-01-01T00:00:00.001Z,1,text1,", + "1970-01-01T00:00:00.002Z,2,text1,", + "1970-01-01T00:00:00.003Z,2,text3,", + "1970-01-01T00:00:00.004Z,2,text4,", + "1970-01-01T00:00:00.005Z,5,text4,", + "1970-01-01T00:00:00.007Z,7,text4,", + "1970-01-01T00:00:00.008Z,7,text8,", + }; + tableResultSetEqualTest( + "select * from (select time,s1,s6 from table1 order by time desc) FILL METHOD NEXT order by time", + expectedHeader, + retArray, + DATABASE_NAME); + + expectedHeader = new String[] {"time", "s1"}; + retArray = + new String[] { + "1970-01-01T00:00:00.001Z,1,", + "1970-01-01T00:00:00.002Z,2,", + "1970-01-01T00:00:00.003Z,5,", + }; + tableResultSetEqualTest( + "select time,s1 from table1 FILL METHOD NEXT limit 3", + expectedHeader, + retArray, + DATABASE_NAME); + + expectedHeader = new String[] {"time", "city", "device_id", "s1", "s2"}; + retArray = + new String[] { + "1970-01-01T00:00:00.001Z,beijing,d1,102,1011,", + "1970-01-01T00:00:00.002Z,beijing,d1,102,null,", + "1970-01-01T00:00:00.003Z,beijing,d1,103,null,", + "1970-01-01T00:00:00.004Z,beijing,d1,104,null,", + "1970-01-01T00:00:00.001Z,beijing,d2,101,1022,", + "1970-01-01T00:00:00.002Z,beijing,d2,null,1022,", + "1970-01-01T00:00:00.003Z,beijing,d2,null,1033,", + "1970-01-01T00:00:00.004Z,beijing,d2,null,1044,", + "1970-01-01T00:00:00.001Z,shanghai,d1,212,2111,", + "1970-01-01T00:00:00.002Z,shanghai,d1,212,null,", + }; + tableResultSetEqualTest( + "select time,city,device_id,s1,s2 from table2 FILL METHOD NEXT FILL_GROUP 2,3", + expectedHeader, + retArray, + DATABASE_NAME); + tableResultSetEqualTest( + "select time,city,device_id,s1,s2 from table2 FILL METHOD NEXT TIME_COLUMN 1 FILL_GROUP 2,3", + expectedHeader, + retArray, + DATABASE_NAME); + tableResultSetEqualTest( + "select time,city,device_id,s1,s2 from table2 FILL METHOD NEXT TIME_BOUND 2ms TIME_COLUMN 1 FILL_GROUP 2,3", + expectedHeader, + retArray, + DATABASE_NAME); + } + @Test public void abNormalFillTest() { @@ -705,6 +841,25 @@ public void abNormalFillTest() { + ": PREVIOUS FILL FILL_GROUP position 3 is not in select list", DATABASE_NAME); + // --------------------------------- NEXT FILL --------------------------------- + tableAssertTestFail( + "select s1 from table1 FILL METHOD NEXT TIME_COLUMN 1", + TSStatusCode.SEMANTIC_ERROR.getStatusCode() + + ": Don't need to specify TIME_COLUMN while either TIME_BOUND or FILL_GROUP parameter is not specified", + DATABASE_NAME); + + tableAssertTestFail( + "select s1 from table1 FILL METHOD NEXT TIME_BOUND 2ms", + TSStatusCode.SEMANTIC_ERROR.getStatusCode() + + ": Cannot infer TIME_COLUMN for NEXT FILL, there exists no column whose type is TIMESTAMP", + DATABASE_NAME); + + tableAssertTestFail( + "select s1 from table1 FILL METHOD NEXT FILL_GROUP 1", + TSStatusCode.SEMANTIC_ERROR.getStatusCode() + + ": Cannot infer TIME_COLUMN for NEXT FILL, there exists no column whose type is TIMESTAMP", + DATABASE_NAME); + // --------------------------------- LINEAR FILL --------------------------------- tableAssertTestFail( "select s1 from table1 FILL METHOD LINEAR", diff --git a/iotdb-core/calc-commons/src/main/codegen/templates/nextFill.ftl b/iotdb-core/calc-commons/src/main/codegen/templates/nextFill.ftl new file mode 100644 index 0000000000000..e3353141eed0d --- /dev/null +++ b/iotdb-core/calc-commons/src/main/codegen/templates/nextFill.ftl @@ -0,0 +1,101 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +<@pp.dropOutputFile /> + +<#list allDataTypes.types as type> + + <#assign className = "${type.dataType?cap_first}NextFill"> + <@pp.changeOutputFile name="/org/apache/iotdb/calc/execution/operator/process/fill/next/${className}.java" /> +package org.apache.iotdb.calc.execution.operator.process.fill.next; + +import org.apache.iotdb.calc.execution.operator.process.fill.IFillFilter; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.block.column.${type.column}; +import org.apache.tsfile.read.common.block.column.${type.column}Builder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +<#if type.dataType == "Binary"> +import org.apache.tsfile.utils.Binary; + + +import java.util.Optional; + +/* +* This class is generated using freemarker and the ${.template_name} template. +*/ +@SuppressWarnings("unused") +public class ${className} extends NextFill { + + private ${type.dataType} nextValue; + private ${type.dataType} nextValueInCurrentColumn; + + public ${className}(IFillFilter filter) { + super(filter); + } + + @Override + void fillValue(Column column, int index, Object array) { + ((${type.dataType}[]) array)[index] = column.get${type.dataType?cap_first}(index); + } + + @Override + void fillNextValueInCurrentColumn(Object array, int index) { + ((${type.dataType}[]) array)[index] = nextValueInCurrentColumn; + } + + @Override + Object createValueArray(int size) { + return new ${type.dataType}[size]; + } + + @Override + Column createNullValueColumn() { + return ${type.column}Builder.NULL_VALUE_BLOCK; + } + + @Override + Column createRunLengthEncodedFilledValueColumn(int size) { + return new RunLengthEncodedColumn( + new ${type.column}(1, Optional.empty(), new ${type.dataType}[] {nextValueInCurrentColumn}), size); + } + + @Override + Column createFilledValueColumn(Object array, boolean[] isNull, boolean hasNullValue, int size) { + if (hasNullValue) { + return new ${type.column}(size, Optional.of(isNull), (${type.dataType}[]) array); + } else { + return new ${type.column}(size, Optional.empty(), (${type.dataType}[]) array); + } + } + + @Override + void updateNextValue(Column nextValueColumn, int index) { + this.nextValue = nextValueColumn.get${type.dataType?cap_first}(index); + } + + @Override + void updateNextValueInCurrentColumn(Column nextValueColumn, int index) { + this.nextValueInCurrentColumn = nextValueColumn.get${type.dataType?cap_first}(index); + } + + @Override + void updateNextValueInCurrentColumn() { + this.nextValueInCurrentColumn = this.nextValue; + } +} + diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java new file mode 100644 index 0000000000000..83c413229448d --- /dev/null +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.execution.operator.process; + +import org.apache.iotdb.calc.execution.operator.CommonOperatorContext; +import org.apache.iotdb.calc.execution.operator.Operator; +import org.apache.iotdb.calc.execution.operator.process.fill.ILinearFill; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.block.TsBlock; + +public class TableNextFillOperator extends AbstractLinearFillOperator { + + // start from 0; -1 means plain NEXT has no helper column. + private final int helperColumnIndex; + + private final boolean hasTimeBound; + + public TableNextFillOperator( + CommonOperatorContext operatorContext, + ILinearFill[] fillArray, + Operator child, + int helperColumnIndex, + boolean hasTimeBound) { + super(operatorContext, fillArray, child); + this.helperColumnIndex = helperColumnIndex; + this.hasTimeBound = hasTimeBound; + } + + @Override + protected Column getHelperColumn(TsBlock tsBlock) { + return helperColumnIndex == -1 ? tsBlock.getTimeColumn() : tsBlock.getColumn(helperColumnIndex); + } + + @Override + protected Integer getLastRowIndexForNonNullHelperColumn(TsBlock tsBlock) { + int size = tsBlock.getPositionCount(); + if (!hasTimeBound) { + return size - 1; + } + Column helperColumn = getHelperColumn(tsBlock); + if (!helperColumn.mayHaveNull()) { + return size - 1; + } else { + int i = size - 1; + for (; i >= 0; i--) { + if (!helperColumn.isNull(i)) { + break; + } + } + return i; + } + } + + @Override + public long ramBytesUsed() { + return super.ramBytesUsed() + Integer.BYTES; + } +} diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java new file mode 100644 index 0000000000000..7e3b5f52918bc --- /dev/null +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.execution.operator.process; + +import org.apache.iotdb.calc.execution.operator.CommonOperatorContext; +import org.apache.iotdb.calc.execution.operator.Operator; +import org.apache.iotdb.calc.execution.operator.process.fill.ILinearFill; +import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; +import org.apache.iotdb.calc.utils.datastructure.SortKey; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class TableNextFillWithGroupOperator extends TableNextFillOperator { + + private final List groupSplitter; + + private final List noMoreTsBlockForCurrentGroup; + + private final Comparator groupKeyComparator; + + private final TsBlockBuilder resultBuilder; + + private SortKey lastRow = null; + + public TableNextFillWithGroupOperator( + CommonOperatorContext operatorContext, + ILinearFill[] fillArray, + Operator child, + int helperColumnIndex, + boolean hasTimeBound, + Comparator groupKeyComparator, + List dataTypes) { + super(operatorContext, fillArray, child, helperColumnIndex, hasTimeBound); + this.groupSplitter = new ArrayList<>(); + this.noMoreTsBlockForCurrentGroup = new ArrayList<>(); + this.groupKeyComparator = groupKeyComparator; + this.resultBuilder = new TsBlockBuilder(dataTypes); + } + + @Override + // we won't build timeColumn in this method + TsBlock append(int length, Column timeColumn, Column[] valueColumns) { + for (int i = 0; i < outputColumnCount; i++) { + Column column = valueColumns[i]; + ColumnBuilder builder = resultBuilder.getColumnBuilder(i); + for (int rowIndex = 0; rowIndex < length; rowIndex++) { + if (column.isNull(rowIndex)) { + builder.appendNull(); + } else { + builder.write(column, rowIndex); + } + } + } + resultBuilder.declarePositions(length); + return null; + } + + @Override + TsBlock buildFinalResult(TsBlock tempResult) { + TsBlock result = null; + if (!resultBuilder.isEmpty()) { + Column timeColumn = + new RunLengthEncodedColumn( + CommonOperatorUtils.TIME_COLUMN_TEMPLATE, resultBuilder.getPositionCount()); + result = resultBuilder.build(timeColumn); + resultBuilder.reset(); + } + return result; + } + + @Override + boolean noMoreTsBlockForCurrentGroup() { + return noMoreTsBlock || noMoreTsBlockForCurrentGroup.get(0); + } + + @Override + void resetFill() { + boolean isNewGroup = Boolean.TRUE.equals(groupSplitter.remove(0)); + boolean isNoMoreTsBlockForCurrentGroup = + Boolean.TRUE.equals(noMoreTsBlockForCurrentGroup.remove(0)); + if (isNewGroup || isNoMoreTsBlockForCurrentGroup) { + for (ILinearFill fill : fillArray) { + fill.reset(); + } + } + } + + @Override + public void close() throws Exception { + super.close(); + lastRow = null; + } + + @Override + void updateCachedData(TsBlock tsBlock) { + + boolean isFirstGroupOfCurrentTsBlock = true; + SortKey currentGroupKey = new SortKey(tsBlock, 0); + int size = tsBlock.getPositionCount(); + for (int i = 1; i < size; i++) { + SortKey nextGroupKey = new SortKey(tsBlock, i); + + if (groupKeyComparator.compare(currentGroupKey, nextGroupKey) != 0) { + int length = i - currentGroupKey.rowIndex; + TsBlock currentGroup = tsBlock.getRegion(currentGroupKey.rowIndex, length); + super.updateCachedData(currentGroup); + if (isFirstGroupOfCurrentTsBlock) { + isFirstGroupOfCurrentTsBlock = false; + boolean isNewGroup = isNewGroup(currentGroupKey); + if (isNewGroup && !noMoreTsBlockForCurrentGroup.isEmpty()) { + noMoreTsBlockForCurrentGroup.set(noMoreTsBlockForCurrentGroup.size() - 1, true); + } + groupSplitter.add(isNewGroup); + } else { + groupSplitter.add(true); + } + noMoreTsBlockForCurrentGroup.add(true); + currentGroupKey = nextGroupKey; + } + } + + int length = size - currentGroupKey.rowIndex; + TsBlock currentGroup = tsBlock.getRegion(currentGroupKey.rowIndex, length); + super.updateCachedData(currentGroup); + if (isFirstGroupOfCurrentTsBlock) { + boolean isNewGroup = isNewGroup(currentGroupKey); + if (isNewGroup && !noMoreTsBlockForCurrentGroup.isEmpty()) { + noMoreTsBlockForCurrentGroup.set(noMoreTsBlockForCurrentGroup.size() - 1, true); + } + groupSplitter.add(isNewGroup); + } else { + groupSplitter.add(true); + } + noMoreTsBlockForCurrentGroup.add(false); + lastRow = currentGroupKey; + } + + private boolean isNewGroup(SortKey currentGroupKey) { + return lastRow == null || groupKeyComparator.compare(lastRow, currentGroupKey) != 0; + } +} diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/next/NextFill.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/next/NextFill.java new file mode 100644 index 0000000000000..84ecb7245dcb2 --- /dev/null +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/next/NextFill.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.execution.operator.process.fill.next; + +import org.apache.iotdb.calc.execution.operator.process.fill.IFillFilter; +import org.apache.iotdb.calc.execution.operator.process.fill.ILinearFill; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; + +/** + * NEXT fill copies the nearest non-null value that appears later in the current input order. The + * optional filter is only used for TIME_BOUND checks. + */ +public abstract class NextFill implements ILinearFill { + + private final IFillFilter filter; + + private long nextRowIndex = -1; + private long nextTime = -1; + private boolean nextIsNull = true; + + private long nextTimeInCurrentColumn = -1; + private boolean nextInCurrentColumnIsNull = true; + + protected NextFill(IFillFilter filter) { + this.filter = filter; + } + + @Override + public Column fill(Column timeColumn, Column valueColumn, long startRowIndex) { + int size = valueColumn.getPositionCount(); + if (size == 0 || !valueColumn.mayHaveNull()) { + return valueColumn; + } + + prepareNextValueInCurrentColumn(startRowIndex, size); + if (valueColumn instanceof RunLengthEncodedColumn && filter == null) { + return nextInCurrentColumnIsNull + ? new RunLengthEncodedColumn(createNullValueColumn(), size) + : createRunLengthEncodedFilledValueColumn(size); + } + + Object array = createValueArray(size); + boolean[] isNull = new boolean[size]; + boolean hasNullValue = false; + for (int i = size - 1; i >= 0; i--) { + if (valueColumn.isNull(i)) { + if (nextInCurrentColumnIsNull || cannotFillByTimeBound(timeColumn, i)) { + isNull[i] = true; + hasNullValue = true; + } else { + fillNextValueInCurrentColumn(array, i); + } + } else { + fillValue(valueColumn, i, array); + if (canBeNextCandidate(timeColumn, i)) { + nextInCurrentColumnIsNull = false; + nextTimeInCurrentColumn = filter == null ? -1 : timeColumn.getLong(i); + updateNextValueInCurrentColumn(valueColumn, i); + } + } + } + return createFilledValueColumn(array, isNull, hasNullValue, size); + } + + @Override + public boolean needPrepareForNext( + long rowIndex, Column valueColumn, int lastRowIndexForNonNullHelperColumn) { + if (valueColumn.getPositionCount() == 0 || !valueColumn.mayHaveNull()) { + return false; + } + if (filter != null && lastRowIndexForNonNullHelperColumn < 0) { + return false; + } + if (!nextIsNull && nextRowIndex > rowIndex) { + return false; + } + + int lastIndex = + filter == null ? valueColumn.getPositionCount() - 1 : lastRowIndexForNonNullHelperColumn; + if (lastIndex < 0) { + return false; + } + + boolean hasTrailingNull = false; + for (int i = lastIndex; i >= 0; i--) { + if (valueColumn.isNull(i)) { + hasTrailingNull = true; + } else { + return hasTrailingNull; + } + } + return hasTrailingNull; + } + + @Override + public boolean prepareForNext( + long startRowIndex, long endRowIndex, Column nextTimeColumn, Column nextValueColumn) { + if (!nextIsNull && endRowIndex < nextRowIndex) { + return true; + } + + for (int i = 0, size = nextValueColumn.getPositionCount(); i < size; i++) { + if (!nextValueColumn.isNull(i) && canBeNextCandidate(nextTimeColumn, i)) { + updateNextValue(nextValueColumn, i); + nextTime = filter == null ? -1 : nextTimeColumn.getLong(i); + nextRowIndex = startRowIndex + i; + nextIsNull = false; + return true; + } + } + return false; + } + + @Override + public void reset() { + nextRowIndex = -1; + nextTime = -1; + nextIsNull = true; + nextTimeInCurrentColumn = -1; + nextInCurrentColumnIsNull = true; + } + + private void prepareNextValueInCurrentColumn(long startRowIndex, int size) { + if (!nextIsNull && nextRowIndex >= startRowIndex + size) { + nextInCurrentColumnIsNull = false; + nextTimeInCurrentColumn = nextTime; + updateNextValueInCurrentColumn(); + } else { + nextInCurrentColumnIsNull = true; + nextTimeInCurrentColumn = -1; + } + } + + private boolean canBeNextCandidate(Column timeColumn, int index) { + return filter == null || !timeColumn.isNull(index); + } + + private boolean cannotFillByTimeBound(Column timeColumn, int index) { + return filter != null + && (timeColumn.isNull(index) + || !filter.needFill(timeColumn.getLong(index), nextTimeInCurrentColumn)); + } + + abstract void fillValue(Column column, int index, Object array); + + abstract void fillNextValueInCurrentColumn(Object array, int index); + + abstract Object createValueArray(int size); + + abstract Column createNullValueColumn(); + + abstract Column createRunLengthEncodedFilledValueColumn(int size); + + abstract Column createFilledValueColumn( + Object array, boolean[] isNull, boolean hasNullValue, int size); + + abstract void updateNextValue(Column nextValueColumn, int index); + + abstract void updateNextValueInCurrentColumn(Column nextValueColumn, int index); + + /** update nextValueInCurrentColumn using value of next Column. */ + abstract void updateNextValueInCurrentColumn(); +} diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/CommonOperatorUtils.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/CommonOperatorUtils.java index 8928e2a37da89..a332122b2574e 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/CommonOperatorUtils.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/CommonOperatorUtils.java @@ -32,6 +32,12 @@ import org.apache.iotdb.calc.execution.operator.process.fill.linear.FloatLinearFill; import org.apache.iotdb.calc.execution.operator.process.fill.linear.IntLinearFill; import org.apache.iotdb.calc.execution.operator.process.fill.linear.LongLinearFill; +import org.apache.iotdb.calc.execution.operator.process.fill.next.BinaryNextFill; +import org.apache.iotdb.calc.execution.operator.process.fill.next.BooleanNextFill; +import org.apache.iotdb.calc.execution.operator.process.fill.next.DoubleNextFill; +import org.apache.iotdb.calc.execution.operator.process.fill.next.FloatNextFill; +import org.apache.iotdb.calc.execution.operator.process.fill.next.IntNextFill; +import org.apache.iotdb.calc.execution.operator.process.fill.next.LongNextFill; import org.apache.iotdb.calc.execution.operator.process.fill.previous.BinaryPreviousFill; import org.apache.iotdb.calc.execution.operator.process.fill.previous.BinaryPreviousFillWithTimeDuration; import org.apache.iotdb.calc.execution.operator.process.fill.previous.BooleanPreviousFill; @@ -105,40 +111,7 @@ public static IFill[] getPreviousFill( List inputDataTypes, TimeDuration timeDurationThreshold, ZoneId zoneId) { - IFillFilter filter; - if (timeDurationThreshold == null) { - filter = null; - } else if (!timeDurationThreshold.containsMonth()) { - filter = new FixedIntervalFillFilter(timeDurationThreshold.nonMonthDuration); - } else { - switch (TIMESTAMP_PRECISION) { - case "ms": - filter = - new MonthIntervalMSFillFilter( - timeDurationThreshold.monthDuration, - timeDurationThreshold.nonMonthDuration, - zoneId); - break; - case "us": - filter = - new MonthIntervalUSFillFilter( - timeDurationThreshold.monthDuration, - timeDurationThreshold.nonMonthDuration, - zoneId); - break; - case "ns": - filter = - new MonthIntervalNSFillFilter( - timeDurationThreshold.monthDuration, - timeDurationThreshold.nonMonthDuration, - zoneId); - break; - default: - // this case will never reach - throw new UnsupportedOperationException( - String.format(QueryMessages.UNSUPPORTED_TIME_PRECISION, TIMESTAMP_PRECISION)); - } - } + IFillFilter filter = createFillFilter(timeDurationThreshold, zoneId); IFill[] previousFill = new IFill[inputColumns]; for (int i = 0; i < inputColumns; i++) { @@ -188,4 +161,68 @@ public static IFill[] getPreviousFill( } return previousFill; } + + public static ILinearFill[] getNextFill( + int inputColumns, + List inputDataTypes, + TimeDuration timeDurationThreshold, + ZoneId zoneId) { + IFillFilter filter = createFillFilter(timeDurationThreshold, zoneId); + + ILinearFill[] nextFill = new ILinearFill[inputColumns]; + for (int i = 0; i < inputColumns; i++) { + switch (inputDataTypes.get(i)) { + case BOOLEAN: + nextFill[i] = new BooleanNextFill(filter); + break; + case TEXT: + case STRING: + case BLOB: + case OBJECT: + nextFill[i] = new BinaryNextFill(filter); + break; + case INT32: + case DATE: + nextFill[i] = new IntNextFill(filter); + break; + case INT64: + case TIMESTAMP: + nextFill[i] = new LongNextFill(filter); + break; + case FLOAT: + nextFill[i] = new FloatNextFill(filter); + break; + case DOUBLE: + nextFill[i] = new DoubleNextFill(filter); + break; + default: + throw new IllegalArgumentException(UNKNOWN_DATATYPE + inputDataTypes.get(i)); + } + } + return nextFill; + } + + private static IFillFilter createFillFilter(TimeDuration timeDurationThreshold, ZoneId zoneId) { + if (timeDurationThreshold == null) { + return null; + } + if (!timeDurationThreshold.containsMonth()) { + return new FixedIntervalFillFilter(timeDurationThreshold.nonMonthDuration); + } + switch (TIMESTAMP_PRECISION) { + case "ms": + return new MonthIntervalMSFillFilter( + timeDurationThreshold.monthDuration, timeDurationThreshold.nonMonthDuration, zoneId); + case "us": + return new MonthIntervalUSFillFilter( + timeDurationThreshold.monthDuration, timeDurationThreshold.nonMonthDuration, zoneId); + case "ns": + return new MonthIntervalNSFillFilter( + timeDurationThreshold.monthDuration, timeDurationThreshold.nonMonthDuration, zoneId); + default: + // this case will never reach + throw new UnsupportedOperationException( + String.format(QueryMessages.UNSUPPORTED_TIME_PRECISION, TIMESTAMP_PRECISION)); + } + } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java index ed4fc101b1b05..ea593b8e6ca72 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java @@ -34,6 +34,8 @@ import org.apache.iotdb.calc.execution.operator.process.TableLinearFillOperator; import org.apache.iotdb.calc.execution.operator.process.TableLinearFillWithGroupOperator; import org.apache.iotdb.calc.execution.operator.process.TableMergeSortOperator; +import org.apache.iotdb.calc.execution.operator.process.TableNextFillOperator; +import org.apache.iotdb.calc.execution.operator.process.TableNextFillWithGroupOperator; import org.apache.iotdb.calc.execution.operator.process.TableSortOperator; import org.apache.iotdb.calc.execution.operator.process.TableStreamSortOperator; import org.apache.iotdb.calc.execution.operator.process.TableTopKOperator; @@ -134,6 +136,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.Measure; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -215,6 +218,7 @@ import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.IDENTITY_FILL; import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.UNKNOWN_DATATYPE; import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.getLinearFill; +import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.getNextFill; import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.getPreviousFill; import static org.apache.iotdb.calc.utils.constant.SqlConstant.FIRST_AGGREGATION; import static org.apache.iotdb.calc.utils.constant.SqlConstant.FIRST_BY_AGGREGATION; @@ -604,6 +608,46 @@ public Operator visitLinearFill(LinearFillNode node, C context) { } } + @Override + public Operator visitNextFill(NextFillNode node, C context) { + Operator child = node.getChild().accept(this, context); + + List inputDataTypes = + getOutputColumnTypes(node.getChild(), context.getTableTypeProvider()); + int inputColumnCount = inputDataTypes.size(); + int helperColumnIndex = -1; + if (node.getHelperColumn().isPresent()) { + helperColumnIndex = getColumnIndex(node.getHelperColumn().get(), node.getChild()); + } + ILinearFill[] fillArray = + getNextFill( + inputColumnCount, + inputDataTypes, + node.getTimeBound().orElse(null), + context.getZoneId()); + + if (node.getGroupingKeys().isPresent()) { + CommonOperatorContext operatorContext = + addOperatorContext( + context, node.getPlanNodeId(), TableNextFillWithGroupOperator.class.getSimpleName()); + return new TableNextFillWithGroupOperator( + operatorContext, + fillArray, + child, + helperColumnIndex, + node.getTimeBound().isPresent(), + genFillGroupKeyComparator( + node.getGroupingKeys().get(), node, inputDataTypes, new HashSet<>()), + inputDataTypes); + } else { + CommonOperatorContext operatorContext = + addOperatorContext( + context, node.getPlanNodeId(), TableNextFillOperator.class.getSimpleName()); + return new TableNextFillOperator( + operatorContext, fillArray, child, helperColumnIndex, node.getTimeBound().isPresent()); + } + } + @Override public Operator visitValueFill(ValueFillNode node, C context) { Operator child = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index e38ce40d48bb3..f747c18503573 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -37,6 +37,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MarkDistinctNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PreviousFillNode; @@ -914,6 +915,20 @@ public List visitLinearFill(LinearFillNode node, GraphContext context) { return render(node, boxValue, context); } + @Override + public List visitNextFill(NextFillNode node, GraphContext context) { + List boxValue = new ArrayList<>(); + boxValue.add(String.format("NextFill-%s", node.getPlanNodeId().getId())); + node.getTimeBound() + .ifPresent(timeBound -> boxValue.add(String.format("TIME_BOUND: %s", timeBound))); + node.getHelperColumn() + .ifPresent(timeColumn -> boxValue.add(String.format("TIME_COLUMN: %s", timeColumn))); + node.getGroupingKeys() + .ifPresent(groupingKeys -> boxValue.add(String.format("FILL_GROUP: %s", groupingKeys))); + + return render(node, boxValue, context); + } + @Override public List visitValueFill(ValueFillNode node, GraphContext context) { List boxValue = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index a10cf6254bb2b..34ed87bc08f8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -1529,6 +1529,32 @@ public Optional> getGroupingKeys() { } } + public static class NextFillAnalysis extends FillAnalysis { + @Nullable private final TimeDuration timeBound; + @Nullable private final FieldReference fieldReference; + @Nullable private final List groupingKeys; + + public NextFillAnalysis( + TimeDuration timeBound, FieldReference fieldReference, List groupingKeys) { + super(FillPolicy.NEXT); + this.timeBound = timeBound; + this.fieldReference = fieldReference; + this.groupingKeys = groupingKeys; + } + + public Optional getTimeBound() { + return Optional.ofNullable(timeBound); + } + + public Optional getFieldReference() { + return Optional.ofNullable(fieldReference); + } + + public Optional> getGroupingKeys() { + return Optional.ofNullable(groupingKeys); + } + } + public static class LinearFillAnalysis extends FillAnalysis { private final FieldReference fieldReference; @Nullable private final List groupingKeys; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index d0779ee24a5b8..8912ac37264a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -3972,6 +3972,28 @@ private void analyzeFill(Fill node, Scope scope) { fillAnalysis = new Analysis.PreviousFillAnalysis( node.getTimeBound().orElse(null), timeColumn, groupingKeys); + } else if (node.getFillMethod() == FillPolicy.NEXT) { + FieldReference timeColumn = null; + List groupingKeys = null; + if (node.getTimeBound().isPresent() || node.getFillGroupingElements().isPresent()) { + timeColumn = getHelperColumn(node, scope, FillPolicy.NEXT); + ExpressionAnalyzer.analyzeExpression( + metadata, + queryContext, + sessionContext, + statementAnalyzerFactory, + accessControl, + scope, + analysis, + timeColumn, + WarningCollector.NOOP, + correlationSupport); + + groupingKeys = analyzeFillGroup(node, scope, FillPolicy.NEXT); + } + fillAnalysis = + new Analysis.NextFillAnalysis( + node.getTimeBound().orElse(null), timeColumn, groupingKeys); } else if (node.getFillMethod() == FillPolicy.CONSTANT) { Literal literal = node.getFillValue().get(); ExpressionAnalyzer.analyzeExpression( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index 10ea1c1e25ad3..73f0cb347c2a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -35,6 +35,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.ProjectNode; @@ -1286,6 +1287,27 @@ private PlanBuilder fill(PlanBuilder subPlan, Optional fill) { previousFillAnalysis.getTimeBound().orElse(null), previousFillHelperColumn, groupingKeys)); + case NEXT: + Analysis.NextFillAnalysis nextFillAnalysis = + (Analysis.NextFillAnalysis) analysis.getFill(fill.get()); + Symbol nextFillHelperColumn = null; + if (nextFillAnalysis.getFieldReference().isPresent()) { + nextFillHelperColumn = subPlan.translate(nextFillAnalysis.getFieldReference().get()); + } + + if (nextFillAnalysis.getGroupingKeys().isPresent()) { + List fieldReferenceList = nextFillAnalysis.getGroupingKeys().get(); + groupingKeys = new ArrayList<>(fieldReferenceList.size()); + subPlan = fillGroup(subPlan, fieldReferenceList, groupingKeys, nextFillHelperColumn); + } + + return subPlan.withNewRoot( + new NextFillNode( + queryIdAllocator.genPlanNodeId(), + subPlan.getRoot(), + nextFillAnalysis.getTimeBound().orElse(null), + nextFillHelperColumn, + groupingKeys)); case LINEAR: Analysis.LinearFillAnalysis linearFillAnalysis = (Analysis.LinearFillAnalysis) analysis.getFill(fill.get()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneFillColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneFillColumns.java index d93953c695bd5..d1a061ea90e8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneFillColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneFillColumns.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.FillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PreviousFillNode; import com.google.common.collect.ImmutableSet; @@ -50,6 +51,10 @@ protected Optional pushDownProjectOff( PreviousFillNode previousFillNode = (PreviousFillNode) fillNode; previousFillNode.getHelperColumn().ifPresent(referencedInputs::add); previousFillNode.getGroupingKeys().ifPresent(keys -> referencedInputs.addAll(keys)); + } else if (fillNode instanceof NextFillNode) { + NextFillNode nextFillNode = (NextFillNode) fillNode; + nextFillNode.getHelperColumn().ifPresent(referencedInputs::add); + nextFillNode.getGroupingKeys().ifPresent(keys -> referencedInputs.addAll(keys)); } else if (fillNode instanceof LinearFillNode) { LinearFillNode linearFillNode = (LinearFillNode) fillNode; referencedInputs.add(linearFillNode.getHelperColumn()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java index d3155343a6a15..298e0616f3178 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.SortNode; @@ -141,6 +142,12 @@ public PlanNode visitLinearFill(LinearFillNode node, Context context) { return node; } + @Override + public PlanNode visitNextFill(NextFillNode node, Context context) { + context.enablePushDown = false; + return node; + } + @Override public PlanNode visitPreviousFill(PreviousFillNode node, Context context) { if (node.getGroupingKeys().isPresent()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 6dda2d17503d0..1960252a8ffdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -39,6 +39,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MarkDistinctNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -331,6 +332,37 @@ public PlanAndMappings visitPreviousFill(PreviousFillNode node, UnaliasContext c } } + @Override + public PlanAndMappings visitNextFill(NextFillNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + + if (node.getHelperColumn().isPresent() || node.getGroupingKeys().isPresent()) { + Map mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + Symbol helperColumn = null; + if (node.getHelperColumn().isPresent()) { + helperColumn = mapper.map(node.getHelperColumn().get()); + } + List groupingKeys = null; + if (node.getGroupingKeys().isPresent()) { + groupingKeys = mapper.mapAndDistinct(node.getGroupingKeys().get()); + } + return new PlanAndMappings( + new NextFillNode( + node.getPlanNodeId(), + rewrittenSource.getRoot(), + node.getTimeBound().orElse(null), + helperColumn, + groupingKeys), + mapping); + } else { + return new PlanAndMappings( + node.replaceChildren(ImmutableList.of(rewrittenSource.getRoot())), + rewrittenSource.getMappings()); + } + } + @Override public PlanAndMappings visitLinearFill(LinearFillNode node, UnaliasContext context) { PlanAndMappings rewrittenSource = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 25d68eba6beb5..a19168b50f42f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -142,6 +142,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ZeroOrMoreQuantifier; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ZeroOrOneQuantifier; import org.apache.iotdb.commons.queryengine.plan.relational.sql.parser.ParsingException; +import org.apache.iotdb.commons.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; import org.apache.iotdb.commons.schema.table.InformationSchema; @@ -2398,6 +2399,43 @@ public Node visitPreviousFill(RelationalSqlParser.PreviousFillContext ctx) { return new Fill(getLocation(ctx), timeBound, timeColumn, fillGroupingElements); } + @Override + public Node visitNextFill(RelationalSqlParser.NextFillContext ctx) { + TimeDuration timeBound = null; + LongLiteral timeColumn = null; + List fillGroupingElements = null; + if (ctx.timeBoundClause() != null) { + timeBound = + DataNodeDateTimeUtils.constructTimeDuration( + ctx.timeBoundClause().timeDuration().getText()); + + if (timeBound.monthDuration != 0 && timeBound.nonMonthDuration != 0) { + throw new SemanticException( + "Simultaneous setting of monthly and non-monthly intervals is not supported."); + } + } + + if (ctx.timeColumnClause() != null) { + timeColumn = + new LongLiteral( + getLocation(ctx.timeColumnClause().INTEGER_VALUE()), + ctx.timeColumnClause().INTEGER_VALUE().getText()); + } + + if (ctx.fillGroupClause() != null) { + fillGroupingElements = + ctx.fillGroupClause().INTEGER_VALUE().stream() + .map(index -> new LongLiteral(getLocation(index), index.getText())) + .collect(toList()); + } + + if (timeColumn != null && (timeBound == null && fillGroupingElements == null)) { + throw new SemanticException( + "Don't need to specify TIME_COLUMN while either TIME_BOUND or FILL_GROUP parameter is not specified"); + } + return new Fill(getLocation(ctx), FillPolicy.NEXT, timeBound, timeColumn, fillGroupingElements); + } + @Override public Node visitLinearFill(RelationalSqlParser.LinearFillContext ctx) { LongLiteral timeColumn = null; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java new file mode 100644 index 0000000000000..4f393a799b968 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.fill; + +import org.apache.iotdb.calc.execution.operator.CommonOperatorContext; +import org.apache.iotdb.calc.execution.operator.Operator; +import org.apache.iotdb.calc.execution.operator.process.TableNextFillWithGroupOperator; +import org.apache.iotdb.calc.execution.operator.process.fill.ILinearFill; +import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; +import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.BinaryColumn; +import org.apache.tsfile.read.common.block.column.IntColumn; +import org.apache.tsfile.read.common.block.column.LongColumn; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.TimeDuration; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NextFillTest { + + @Test + public void testNextFillAcrossTsBlocks() { + ILinearFill fill = + CommonOperatorUtils.getNextFill( + 1, ImmutableList.of(TSDataType.INT32), null, ZoneId.systemDefault())[0]; + IntColumn valueColumn = + new IntColumn(3, Optional.of(new boolean[] {true, true, true}), new int[3]); + LongColumn timeColumn = new LongColumn(3, Optional.empty(), new long[] {1, 2, 3}); + IntColumn nextValueColumn = + new IntColumn(2, Optional.of(new boolean[] {true, false}), new int[] {0, 9}); + LongColumn nextTimeColumn = new LongColumn(2, Optional.empty(), new long[] {4, 5}); + + assertTrue(fill.needPrepareForNext(2, valueColumn, 2)); + assertTrue(fill.prepareForNext(3, 2, nextTimeColumn, nextValueColumn)); + + IntColumn result = (IntColumn) fill.fill(timeColumn, valueColumn, 0); + for (int i = 0; i < result.getPositionCount(); i++) { + assertFalse(result.isNull(i)); + assertEquals(9, result.getInt(i)); + } + } + + @Test + public void testNextFillTimeBoundAndHelperNull() { + ILinearFill fill = + CommonOperatorUtils.getNextFill( + 1, ImmutableList.of(TSDataType.INT32), new TimeDuration(0, 2), ZoneId.systemDefault())[ + 0]; + IntColumn valueColumn = + new IntColumn(3, Optional.of(new boolean[] {true, true, true}), new int[3]); + LongColumn timeColumn = + new LongColumn(3, Optional.of(new boolean[] {false, true, false}), new long[] {1, 0, 3}); + IntColumn nextValueColumn = new IntColumn(1, Optional.empty(), new int[] {40}); + LongColumn nextTimeColumn = new LongColumn(1, Optional.empty(), new long[] {4}); + + assertTrue(fill.needPrepareForNext(2, valueColumn, 2)); + assertTrue(fill.prepareForNext(3, 2, nextTimeColumn, nextValueColumn)); + + IntColumn result = (IntColumn) fill.fill(timeColumn, valueColumn, 0); + assertTrue(result.isNull(0)); + assertTrue(result.isNull(1)); + assertFalse(result.isNull(2)); + assertEquals(40, result.getInt(2)); + } + + @Test + public void testObjectNextFillUsesBinaryFill() { + ILinearFill fill = + CommonOperatorUtils.getNextFill( + 1, ImmutableList.of(TSDataType.OBJECT), null, ZoneId.systemDefault())[0]; + Binary objectValue = new Binary("object-value", TSFileConfig.STRING_CHARSET); + BinaryColumn valueColumn = + new BinaryColumn( + 2, + Optional.of(new boolean[] {true, false}), + new Binary[] {Binary.EMPTY_VALUE, objectValue}); + LongColumn timeColumn = new LongColumn(2, Optional.empty(), new long[] {1, 2}); + + BinaryColumn result = (BinaryColumn) fill.fill(timeColumn, valueColumn, 0); + assertFalse(result.isNull(0)); + assertEquals(objectValue, result.getBinary(0)); + assertEquals(objectValue, result.getBinary(1)); + } + + @Test + public void testNextFillWithGroupDoesNotUseNextGroupAfterContinuedGroup() throws Exception { + List dataTypes = ImmutableList.of(TSDataType.TEXT, TSDataType.INT32); + CommonOperatorContext operatorContext = new TestOperatorContext(); + TableNextFillWithGroupOperator operator = + new TableNextFillWithGroupOperator( + operatorContext, + CommonOperatorUtils.getNextFill(2, dataTypes, null, ZoneId.systemDefault()), + new TsBlockSourceOperator( + operatorContext, + ImmutableList.of( + buildBlock(new String[] {"a"}, new Integer[] {1}), + buildBlock(new String[] {"a", "b"}, new Integer[] {null, 9}))), + -1, + false, + (left, right) -> + left.tsBlock + .getColumn(0) + .getBinary(left.rowIndex) + .toString() + .compareTo(right.tsBlock.getColumn(0).getBinary(right.rowIndex).toString()), + dataTypes); + + assertTrue(operator.hasNext()); + TsBlock firstBlock = operator.next(); + assertEquals(1, firstBlock.getPositionCount()); + assertEquals(1, firstBlock.getColumn(1).getInt(0)); + + assertTrue(operator.hasNext()); + TsBlock secondBlock = operator.next(); + assertEquals(2, secondBlock.getPositionCount()); + assertEquals("a", secondBlock.getColumn(0).getBinary(0).toString()); + assertTrue(secondBlock.getColumn(1).isNull(0)); + assertEquals("b", secondBlock.getColumn(0).getBinary(1).toString()); + assertEquals(9, secondBlock.getColumn(1).getInt(1)); + } + + private static TsBlock buildBlock(String[] groups, Integer[] values) { + TsBlockBuilder builder = + new TsBlockBuilder(groups.length, ImmutableList.of(TSDataType.TEXT, TSDataType.INT32)); + for (int i = 0; i < groups.length; i++) { + builder.getColumnBuilder(0).writeBinary(new Binary(groups[i], TSFileConfig.STRING_CHARSET)); + if (values[i] == null) { + builder.getColumnBuilder(1).appendNull(); + } else { + builder.getColumnBuilder(1).writeInt(values[i]); + } + } + builder.declarePositions(groups.length); + return builder.build( + new RunLengthEncodedColumn(CommonOperatorUtils.TIME_COLUMN_TEMPLATE, groups.length)); + } + + private static class TsBlockSourceOperator implements Operator { + + private final CommonOperatorContext operatorContext; + private final List tsBlocks; + private int index; + + private TsBlockSourceOperator(CommonOperatorContext operatorContext, List tsBlocks) { + this.operatorContext = operatorContext; + this.tsBlocks = tsBlocks; + } + + @Override + public CommonOperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + return index < tsBlocks.size() ? tsBlocks.get(index++) : null; + } + + @Override + public boolean hasNext() { + return index < tsBlocks.size(); + } + + @Override + public void close() { + // No resources. + } + + @Override + public boolean isFinished() { + return index >= tsBlocks.size(); + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } + + private static class TestOperatorContext extends CommonOperatorContext { + + private TestOperatorContext() { + super(0, new PlanNodeId("test"), "test"); + } + + @Override + public MemoryReservationManager getMemoryReservationContext() { + return null; + } + + @Override + public int getFragmentId() { + return 0; + } + + @Override + public int getPipelineId() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java new file mode 100644 index 0000000000000..438511c5bb522 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner; + +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; +import org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelper; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.TimeDuration; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +public class NextFillNodeSerdeTest { + + @Test + public void testNextFillNodeSerde() throws Exception { + NextFillNode node = + new NextFillNode( + new PlanNodeId("nextFill"), + null, + new TimeDuration(0, 2), + new Symbol("time"), + ImmutableList.of(new Symbol("city"))); + + ByteBuffer byteBuffer = node.serializeToByteBuffer(); + PlanNode deserialized = PlanNodeDeserializeHelper.deserialize(byteBuffer); + assertEquals(node, deserialized); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/FillStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/FillStatementTest.java new file mode 100644 index 0000000000000..59d26aedb12c2 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/FillStatementTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.parser; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Fill; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Query; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.QuerySpecification; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.commons.queryengine.plan.statement.component.FillPolicy; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; + +import org.junit.Before; +import org.junit.Test; + +import java.time.ZoneId; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class FillStatementTest { + + private SqlParser sqlParser; + private IClientSession clientSession; + + @Before + public void setUp() { + sqlParser = new SqlParser(); + clientSession = new InternalClientSession("testClient"); + } + + @Test + public void testNextFillStatement() { + Statement statement = + sqlParser.createStatement( + "select time, device_id, s1 from table1 FILL METHOD NEXT TIME_BOUND 2ms TIME_COLUMN 1 FILL_GROUP 2,3", + ZoneId.systemDefault(), + clientSession); + + assertTrue(statement instanceof Query); + QuerySpecification querySpecification = (QuerySpecification) ((Query) statement).getQueryBody(); + assertTrue(querySpecification.getFill().isPresent()); + Fill fill = querySpecification.getFill().get(); + assertEquals(FillPolicy.NEXT, fill.getFillMethod()); + assertTrue(fill.getTimeBound().isPresent()); + assertEquals(1, fill.getTimeColumnIndex().get().getParsedValue()); + assertEquals(2, fill.getFillGroupingElements().get().size()); + assertEquals(2, fill.getFillGroupingElements().get().get(0).getParsedValue()); + assertEquals(3, fill.getFillGroupingElements().get().get(1).getParsedValue()); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/CommonPlanNodeDeserializer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/CommonPlanNodeDeserializer.java index 8f14053cd4c9d..6219a053417e7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/CommonPlanNodeDeserializer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/CommonPlanNodeDeserializer.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PreviousFillNode; @@ -140,6 +141,8 @@ public PlanNode deserialize(ByteBuffer buffer, short nodeType) { return RowNumberNode.deserialize(buffer); case 1039: return ValuesNode.deserialize(buffer); + case 1043: + return NextFillNode.deserialize(buffer); default: throw new IllegalArgumentException(QueryMessages.INVALID_NODE_TYPE + nodeType); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/ICoreQueryPlanVisitor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/ICoreQueryPlanVisitor.java index c1b884088deb7..ed429b227a9dd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/ICoreQueryPlanVisitor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/ICoreQueryPlanVisitor.java @@ -41,6 +41,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -154,6 +155,10 @@ default R visitLinearFill(LinearFillNode node, C context) { return visitFill(node, context); } + default R visitNextFill(NextFillNode node, C context) { + return visitFill(node, context); + } + default R visitValueFill(ValueFillNode node, C context) { return visitFill(node, context); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java index 71eb2238f15f0..76a3b2f5e280f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -200,6 +200,7 @@ public enum PlanNodeType { TABLE_DISK_USAGE_INFORMATION_SCHEMA_TABLE_SCAN_NODE((short) 1040), ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1041), NON_ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1042), + TABLE_NEXT_FILL_NODE((short) 1043), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/NextFillNode.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/NextFillNode.java new file mode 100644 index 0000000000000..11676253b3334 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/NextFillNode.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.ICoreQueryPlanVisitor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; + +import com.google.common.collect.Iterables; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.utils.TimeDuration; + +import javax.annotation.Nullable; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class NextFillNode extends FillNode { + + @Nullable private final TimeDuration timeBound; + + @Nullable private final Symbol helperColumn; + + @Nullable private final List groupingKeys; + + public NextFillNode( + PlanNodeId id, + PlanNode child, + TimeDuration timeBound, + Symbol helperColumn, + List groupingKeys) { + super(id, child); + this.timeBound = timeBound; + this.helperColumn = helperColumn; + this.groupingKeys = groupingKeys; + } + + public Optional getTimeBound() { + return Optional.ofNullable(timeBound); + } + + public Optional getHelperColumn() { + return Optional.ofNullable(helperColumn); + } + + public Optional> getGroupingKeys() { + return Optional.ofNullable(groupingKeys); + } + + @Override + public PlanNode clone() { + return new NextFillNode(id, null, timeBound, helperColumn, groupingKeys); + } + + @Override + public R accept(IPlanVisitor visitor, C context) { + return ((ICoreQueryPlanVisitor) visitor).visitNextFill(this, context); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_NEXT_FILL_NODE.serialize(byteBuffer); + if (timeBound == null) { + ReadWriteIOUtils.write(false, byteBuffer); + } else { + ReadWriteIOUtils.write(true, byteBuffer); + timeBound.serialize(byteBuffer); + } + + if (helperColumn == null) { + ReadWriteIOUtils.write(false, byteBuffer); + } else { + ReadWriteIOUtils.write(true, byteBuffer); + Symbol.serialize(helperColumn, byteBuffer); + } + + if (groupingKeys == null) { + ReadWriteIOUtils.write(false, byteBuffer); + } else { + ReadWriteIOUtils.write(true, byteBuffer); + ReadWriteIOUtils.write(groupingKeys.size(), byteBuffer); + for (Symbol symbol : groupingKeys) { + Symbol.serialize(symbol, byteBuffer); + } + } + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_NEXT_FILL_NODE.serialize(stream); + if (timeBound == null) { + ReadWriteIOUtils.write(false, stream); + } else { + ReadWriteIOUtils.write(true, stream); + timeBound.serialize(stream); + } + if (helperColumn == null) { + ReadWriteIOUtils.write(false, stream); + } else { + ReadWriteIOUtils.write(true, stream); + Symbol.serialize(helperColumn, stream); + } + if (groupingKeys == null) { + ReadWriteIOUtils.write(false, stream); + } else { + ReadWriteIOUtils.write(true, stream); + ReadWriteIOUtils.write(groupingKeys.size(), stream); + for (Symbol symbol : groupingKeys) { + Symbol.serialize(symbol, stream); + } + } + } + + public static NextFillNode deserialize(ByteBuffer byteBuffer) { + boolean hasValue = ReadWriteIOUtils.readBool(byteBuffer); + TimeDuration timeDuration = null; + if (hasValue) { + timeDuration = TimeDuration.deserialize(byteBuffer); + } + hasValue = ReadWriteIOUtils.readBool(byteBuffer); + Symbol helperColumn = null; + if (hasValue) { + helperColumn = Symbol.deserialize(byteBuffer); + } + hasValue = ReadWriteIOUtils.readBool(byteBuffer); + List groupingKeys = null; + if (hasValue) { + int size = ReadWriteIOUtils.readInt(byteBuffer); + groupingKeys = new ArrayList<>(size); + while (size-- > 0) { + groupingKeys.add(Symbol.deserialize(byteBuffer)); + } + } + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new NextFillNode(planNodeId, null, timeDuration, helperColumn, groupingKeys); + } + + @Override + public PlanNode replaceChildren(List newChildren) { + return new NextFillNode( + id, Iterables.getOnlyElement(newChildren), timeBound, helperColumn, groupingKeys); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + NextFillNode that = (NextFillNode) o; + return Objects.equals(timeBound, that.timeBound) + && Objects.equals(helperColumn, that.helperColumn) + && Objects.equals(groupingKeys, that.groupingKeys); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), timeBound, helperColumn, groupingKeys); + } + + @Override + public String toString() { + return "NextFillNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java index c544b2eac7e8d..c1d3c2de0af44 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java @@ -41,13 +41,13 @@ public class Fill extends Node { // used for constant fill private final Literal fillValue; - // used for previous fill + // used for previous fill or next fill private final TimeDuration timeBound; - // used for linear fill or previous fill + // used for linear fill, previous fill, or next fill private final LongLiteral timeColumnIndex; - // used for linear fill or previous fill + // used for linear fill, previous fill, or next fill private final List fillGroupingElements; // used for constant fill @@ -66,11 +66,24 @@ public Fill( TimeDuration timeBound, LongLiteral timeColumnIndex, List fillGroupingElements) { + this(location, FillPolicy.PREVIOUS, timeBound, timeColumnIndex, fillGroupingElements); + } + + // used for previous fill or next fill + public Fill( + NodeLocation location, + FillPolicy fillMethod, + TimeDuration timeBound, + LongLiteral timeColumnIndex, + List fillGroupingElements) { super(requireNonNull(location, "location is null")); + if (fillMethod != FillPolicy.PREVIOUS && fillMethod != FillPolicy.NEXT) { + throw new IllegalArgumentException("Unsupported fill method: " + fillMethod); + } this.fillValue = null; this.timeBound = timeBound; this.timeColumnIndex = timeColumnIndex; - this.fillMethod = FillPolicy.PREVIOUS; + this.fillMethod = fillMethod; this.fillGroupingElements = fillGroupingElements; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/util/CommonQuerySqlFormatter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/util/CommonQuerySqlFormatter.java index c9ee3a682d852..7bb325fbc63f3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/util/CommonQuerySqlFormatter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/util/CommonQuerySqlFormatter.java @@ -158,7 +158,8 @@ public Void visitFill(Fill node, Integer indent) { elements.stream() .map(CommonQuerySqlFormatter::formatExpression) .collect(joining(", ")))); - } else if (node.getFillMethod() == FillPolicy.PREVIOUS) { + } else if (node.getFillMethod() == FillPolicy.PREVIOUS + || node.getFillMethod() == FillPolicy.NEXT) { node.getTimeBound() .ifPresent(timeBound -> builder.append(" TIME_BOUND ").append(timeBound.toString())); node.getTimeColumnIndex() diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/statement/component/FillPolicy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/statement/component/FillPolicy.java index 507eda6d1b8e5..46ba4f308d06b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/statement/component/FillPolicy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/statement/component/FillPolicy.java @@ -23,6 +23,7 @@ public enum FillPolicy { PREVIOUS((byte) 0), LINEAR((byte) 1), CONSTANT((byte) 2), + NEXT((byte) 3), ; FillPolicy(byte fillMethod) { diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index f984e825fc7ad..93dfa8bb1f4f1 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -981,6 +981,7 @@ fillClause fillMethod : LINEAR timeColumnClause? fillGroupClause? #linearFill | PREVIOUS timeBoundClause? timeColumnClause? fillGroupClause? #previousFill + | NEXT timeBoundClause? timeColumnClause? fillGroupClause? #nextFill | CONSTANT literalExpression #valueFill ; From 6c6d2258618ac5b2a2fef3636ab263e6210599f9 Mon Sep 17 00:00:00 2001 From: DaZuiZui Date: Tue, 2 Jun 2026 12:56:44 +0800 Subject: [PATCH 2/4] Fix grouped NEXT fill review comments --- .../process/TableNextFillOperator.java | 5 ++ .../TableNextFillWithGroupOperator.java | 18 ++++-- .../operator/process/fill/NextFillTest.java | 46 ++++++++++++++ .../planner/NextFillNodeSerdeTest.java | 60 +++++++++++++++++++ .../planner/node/PreviousFillNode.java | 5 +- 5 files changed, 127 insertions(+), 7 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java index 83c413229448d..1630336b26765 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillOperator.java @@ -26,6 +26,8 @@ import org.apache.tsfile.block.column.Column; import org.apache.tsfile.read.common.block.TsBlock; +import static com.google.common.base.Preconditions.checkArgument; + public class TableNextFillOperator extends AbstractLinearFillOperator { // start from 0; -1 means plain NEXT has no helper column. @@ -40,6 +42,9 @@ public TableNextFillOperator( int helperColumnIndex, boolean hasTimeBound) { super(operatorContext, fillArray, child); + checkArgument( + !hasTimeBound || helperColumnIndex != -1, + "helperColumnIndex should be resolved when timeBound exists"); this.helperColumnIndex = helperColumnIndex; this.hasTimeBound = hasTimeBound; } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java index 7e3b5f52918bc..da72a96f44299 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java @@ -101,13 +101,11 @@ boolean noMoreTsBlockForCurrentGroup() { @Override void resetFill() { - boolean isNewGroup = Boolean.TRUE.equals(groupSplitter.remove(0)); + groupSplitter.remove(0); boolean isNoMoreTsBlockForCurrentGroup = Boolean.TRUE.equals(noMoreTsBlockForCurrentGroup.remove(0)); - if (isNewGroup || isNoMoreTsBlockForCurrentGroup) { - for (ILinearFill fill : fillArray) { - fill.reset(); - } + if (isNoMoreTsBlockForCurrentGroup) { + resetFillState(); } } @@ -135,6 +133,8 @@ void updateCachedData(TsBlock tsBlock) { boolean isNewGroup = isNewGroup(currentGroupKey); if (isNewGroup && !noMoreTsBlockForCurrentGroup.isEmpty()) { noMoreTsBlockForCurrentGroup.set(noMoreTsBlockForCurrentGroup.size() - 1, true); + } else if (isNewGroup) { + resetFillState(); } groupSplitter.add(isNewGroup); } else { @@ -152,6 +152,8 @@ void updateCachedData(TsBlock tsBlock) { boolean isNewGroup = isNewGroup(currentGroupKey); if (isNewGroup && !noMoreTsBlockForCurrentGroup.isEmpty()) { noMoreTsBlockForCurrentGroup.set(noMoreTsBlockForCurrentGroup.size() - 1, true); + } else if (isNewGroup) { + resetFillState(); } groupSplitter.add(isNewGroup); } else { @@ -164,4 +166,10 @@ void updateCachedData(TsBlock tsBlock) { private boolean isNewGroup(SortKey currentGroupKey) { return lastRow == null || groupKeyComparator.compare(lastRow, currentGroupKey) != 0; } + + private void resetFillState() { + for (ILinearFill fill : fillArray) { + fill.reset(); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java index 4f393a799b968..497fe6efe4d2c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/NextFillTest.java @@ -46,6 +46,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class NextFillTest { @@ -151,6 +152,51 @@ public void testNextFillWithGroupDoesNotUseNextGroupAfterContinuedGroup() throws assertEquals(9, secondBlock.getColumn(1).getInt(1)); } + @Test + public void testNextFillWithGroupUsesNextSameGroupAcrossTsBlocks() throws Exception { + List dataTypes = ImmutableList.of(TSDataType.TEXT, TSDataType.INT32); + CommonOperatorContext operatorContext = new TestOperatorContext(); + TableNextFillWithGroupOperator operator = + new TableNextFillWithGroupOperator( + operatorContext, + CommonOperatorUtils.getNextFill(2, dataTypes, null, ZoneId.systemDefault()), + new TsBlockSourceOperator( + operatorContext, + ImmutableList.of( + buildBlock(new String[] {"a"}, new Integer[] {null}), + buildBlock(new String[] {"a"}, new Integer[] {7}))), + -1, + false, + (left, right) -> + left.tsBlock + .getColumn(0) + .getBinary(left.rowIndex) + .toString() + .compareTo(right.tsBlock.getColumn(0).getBinary(right.rowIndex).toString()), + dataTypes); + + TsBlock result = nextNonNull(operator); + assertNotNull(result); + assertEquals(2, result.getPositionCount()); + assertEquals("a", result.getColumn(0).getBinary(0).toString()); + assertFalse(result.getColumn(1).isNull(0)); + assertEquals(7, result.getColumn(1).getInt(0)); + assertEquals("a", result.getColumn(0).getBinary(1).toString()); + assertFalse(result.getColumn(1).isNull(1)); + assertEquals(7, result.getColumn(1).getInt(1)); + assertFalse(operator.hasNext()); + } + + private static TsBlock nextNonNull(TableNextFillWithGroupOperator operator) throws Exception { + while (operator.hasNext()) { + TsBlock result = operator.next(); + if (result != null) { + return result; + } + } + return null; + } + private static TsBlock buildBlock(String[] groups, Integer[] values) { TsBlockBuilder builder = new TsBlockBuilder(groups.length, ImmutableList.of(TSDataType.TEXT, TSDataType.INT32)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java index 438511c5bb522..96d57ca6b9bd5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/NextFillNodeSerdeTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.NextFillNode; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelper; import com.google.common.collect.ImmutableList; @@ -32,6 +33,9 @@ import java.nio.ByteBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; public class NextFillNodeSerdeTest { @@ -45,8 +49,64 @@ public void testNextFillNodeSerde() throws Exception { new Symbol("time"), ImmutableList.of(new Symbol("city"))); + NextFillNode deserialized = assertNextFillNodeSerde(node); + assertTrue(deserialized.getTimeBound().isPresent()); + assertEquals(new TimeDuration(0, 2), deserialized.getTimeBound().get()); + assertTrue(deserialized.getHelperColumn().isPresent()); + assertEquals(new Symbol("time"), deserialized.getHelperColumn().get()); + assertTrue(deserialized.getGroupingKeys().isPresent()); + assertEquals(ImmutableList.of(new Symbol("city")), deserialized.getGroupingKeys().get()); + } + + @Test + public void testPlainNextFillNodeSerde() throws Exception { + NextFillNode node = new NextFillNode(new PlanNodeId("plainNextFill"), null, null, null, null); + + NextFillNode deserialized = assertNextFillNodeSerde(node); + assertFalse(deserialized.getTimeBound().isPresent()); + assertFalse(deserialized.getHelperColumn().isPresent()); + assertFalse(deserialized.getGroupingKeys().isPresent()); + } + + @Test + public void testPartialNextFillNodeSerde() throws Exception { + NextFillNode node = + new NextFillNode( + new PlanNodeId("partialNextFill"), + null, + new TimeDuration(0, 5), + new Symbol("time"), + null); + + NextFillNode deserialized = assertNextFillNodeSerde(node); + assertTrue(deserialized.getTimeBound().isPresent()); + assertEquals(new TimeDuration(0, 5), deserialized.getTimeBound().get()); + assertTrue(deserialized.getHelperColumn().isPresent()); + assertEquals(new Symbol("time"), deserialized.getHelperColumn().get()); + assertFalse(deserialized.getGroupingKeys().isPresent()); + } + + @Test + public void testPreviousFillNodeEqualsIncludesGroupingKeys() { + PreviousFillNode cityGroupNode = + new PreviousFillNode( + new PlanNodeId("previousFill"), null, null, null, ImmutableList.of(new Symbol("city"))); + PreviousFillNode deviceGroupNode = + new PreviousFillNode( + new PlanNodeId("previousFill"), + null, + null, + null, + ImmutableList.of(new Symbol("device"))); + + assertNotEquals(cityGroupNode, deviceGroupNode); + assertNotEquals(cityGroupNode.hashCode(), deviceGroupNode.hashCode()); + } + + private static NextFillNode assertNextFillNodeSerde(NextFillNode node) throws Exception { ByteBuffer byteBuffer = node.serializeToByteBuffer(); PlanNode deserialized = PlanNodeDeserializeHelper.deserialize(byteBuffer); assertEquals(node, deserialized); + return (NextFillNode) deserialized; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/PreviousFillNode.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/PreviousFillNode.java index 36b66835d2d08..950425acfeb51 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/PreviousFillNode.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/planner/node/PreviousFillNode.java @@ -179,12 +179,13 @@ public boolean equals(Object o) { } PreviousFillNode that = (PreviousFillNode) o; return Objects.equals(timeBound, that.timeBound) - && Objects.equals(helperColumn, that.helperColumn); + && Objects.equals(helperColumn, that.helperColumn) + && Objects.equals(groupingKeys, that.groupingKeys); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), timeBound, helperColumn); + return Objects.hash(super.hashCode(), timeBound, helperColumn, groupingKeys); } @Override From 9e5d87b48837771102e1f4882a30c54244a4f9e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=93=87=E5=A1=9E=E5=A4=A7=E5=98=B4=E5=A5=BD=E5=B8=A5?= <66861267+DaZuiZui@users.noreply.github.com> Date: Fri, 5 Jun 2026 00:17:07 +0800 Subject: [PATCH 3/4] Remove redundant NEXT fill group splitter --- .../process/TableNextFillWithGroupOperator.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java index da72a96f44299..71ddc5fb30b82 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/TableNextFillWithGroupOperator.java @@ -38,8 +38,6 @@ public class TableNextFillWithGroupOperator extends TableNextFillOperator { - private final List groupSplitter; - private final List noMoreTsBlockForCurrentGroup; private final Comparator groupKeyComparator; @@ -57,7 +55,6 @@ public TableNextFillWithGroupOperator( Comparator groupKeyComparator, List dataTypes) { super(operatorContext, fillArray, child, helperColumnIndex, hasTimeBound); - this.groupSplitter = new ArrayList<>(); this.noMoreTsBlockForCurrentGroup = new ArrayList<>(); this.groupKeyComparator = groupKeyComparator; this.resultBuilder = new TsBlockBuilder(dataTypes); @@ -101,7 +98,6 @@ boolean noMoreTsBlockForCurrentGroup() { @Override void resetFill() { - groupSplitter.remove(0); boolean isNoMoreTsBlockForCurrentGroup = Boolean.TRUE.equals(noMoreTsBlockForCurrentGroup.remove(0)); if (isNoMoreTsBlockForCurrentGroup) { @@ -136,9 +132,6 @@ void updateCachedData(TsBlock tsBlock) { } else if (isNewGroup) { resetFillState(); } - groupSplitter.add(isNewGroup); - } else { - groupSplitter.add(true); } noMoreTsBlockForCurrentGroup.add(true); currentGroupKey = nextGroupKey; @@ -155,9 +148,6 @@ void updateCachedData(TsBlock tsBlock) { } else if (isNewGroup) { resetFillState(); } - groupSplitter.add(isNewGroup); - } else { - groupSplitter.add(true); } noMoreTsBlockForCurrentGroup.add(false); lastRow = currentGroupKey; From 06807373e44d10c6e27a7ea0439ead623764be24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=93=87=E5=A1=9E=E5=A4=A7=E5=98=B4=E5=A5=BD=E5=B8=A5?= <66861267+DaZuiZui@users.noreply.github.com> Date: Fri, 5 Jun 2026 00:28:05 +0800 Subject: [PATCH 4/4] Check fill method in Fill constructor --- .../iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java | 1 + 1 file changed, 1 insertion(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java index c1d3c2de0af44..303100aa74021 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/sql/ast/Fill.java @@ -77,6 +77,7 @@ public Fill( LongLiteral timeColumnIndex, List fillGroupingElements) { super(requireNonNull(location, "location is null")); + fillMethod = requireNonNull(fillMethod, "fillMethod is null"); if (fillMethod != FillPolicy.PREVIOUS && fillMethod != FillPolicy.NEXT) { throw new IllegalArgumentException("Unsupported fill method: " + fillMethod); }