diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java index 2a5c5c13d0b3..bc81047b17a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java @@ -69,6 +69,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -122,7 +123,7 @@ public void testAggregationWithoutTimeFilter() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(500, resultTsBlock.getColumn(i).getLong(0)); } @@ -153,7 +154,7 @@ public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, false, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(500, resultTsBlock.getColumn(i).getLong(0)); } @@ -189,7 +190,7 @@ public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001); count++; @@ -227,7 +228,7 @@ public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertTrue(resultTsBlock.getColumn(0).getBoolean(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(20199, resultTsBlock.getColumn(2).getLong(0)); @@ -269,7 +270,7 @@ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Ex initAlignedSeriesAggregationScanOperator(aggregators, null, false, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertTrue(resultTsBlock.getColumn(0).getBoolean(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(20199, resultTsBlock.getColumn(2).getLong(0)); @@ -304,7 +305,7 @@ public void testAggregationWithTimeFilter1() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(resultTsBlock.getColumn(i).getLong(0), 380); } @@ -337,7 +338,7 @@ public void testAggregationWithTimeFilter2() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(resultTsBlock.getColumn(i).getLong(0), 380); } @@ -370,7 +371,7 @@ public void testAggregationWithTimeFilter3() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(resultTsBlock.getColumn(i).getLong(0), 300); } @@ -410,7 +411,7 @@ public void testMultiAggregationWithTimeFilter() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertTrue(resultTsBlock.getColumn(0).getBoolean(0)); assertEquals(399, resultTsBlock.getColumn(1).getInt(0)); assertEquals(20199, resultTsBlock.getColumn(2).getLong(0)); @@ -448,7 +449,7 @@ public void testGroupByWithoutGlobalTimeFilter() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -489,7 +490,7 @@ public void testGroupByWithGlobalTimeFilter() throws Exception { aggregators, timeFilter, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -533,7 +534,7 @@ public void testGroupByWithMultiFunction() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -578,7 +579,7 @@ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, false, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(pos)); @@ -613,7 +614,7 @@ public void testGroupBySlidingTimeWindow() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -645,7 +646,7 @@ public void testGroupBySlidingTimeWindow2() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); @@ -688,7 +689,7 @@ public void testGroupBySlidingWindowWithMultiFunction() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java index e2bb5a655ec1..ac3c83a8364d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -209,7 +210,7 @@ public long ramBytesUsed() { } }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + 1 + count * 10000L; assertEquals(expectedTime, block.getTimeByIndex(i)); @@ -386,7 +387,7 @@ public long ramBytesUsed() { } }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + 1 + count * 10000L; assertEquals(expectedTime, block.getTimeByIndex(i)); @@ -563,7 +564,7 @@ public long ramBytesUsed() { } }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + 1 + count * 10000L; assertEquals(expectedTime, block.getTimeByIndex(i)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java index bad50a7e1c9c..ec258a36c0e9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java @@ -66,6 +66,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -197,7 +198,7 @@ public void batchTest1() throws Exception { int count = 0; while (horizontallyConcatOperator.isBlocked().isDone() && horizontallyConcatOperator.hasNext()) { - TsBlock tsBlock = horizontallyConcatOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(horizontallyConcatOperator); assertEquals(6, tsBlock.getValueColumnCount()); for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { assertEquals(count, tsBlock.getTimeByIndex(i)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java index 08859dc127be..8ea63f132e9a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java @@ -41,8 +41,8 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class LinearFillOperatorTest { @@ -1301,8 +1301,7 @@ public long ramBytesUsed() { }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); - assertNotNull(block); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + count; assertEquals(expectedTime, block.getTimeByIndex(i)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index 76ec370ad98b..9afc9c7c5409 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -88,6 +88,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1765,7 +1766,7 @@ public void mergeSortTest() throws Exception { int index = 0; while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) { - TsBlock result = mergeSortOperator.next(); + TsBlock result = nextNonNullOrEmpty(mergeSortOperator); for (int i = 0; i < result.getPositionCount(); i++) { long time = result.getTimeByIndex(i); assertEquals(time, ans[index++]); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java index a1188becf109..6602c65ff981 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java @@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -161,7 +162,7 @@ public void batchTest1() throws Exception { new LimitOperator(driverContext.getOperatorContexts().get(4), 250, offsetOperator); int count = 100; while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) { - TsBlock tsBlock = limitOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(limitOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -265,7 +266,7 @@ public void batchTest2() throws Exception { int count = 0; while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) { - TsBlock tsBlock = offsetOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(offsetOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -367,7 +368,7 @@ public void batchTest3() throws Exception { new OffsetOperator(driverContext.getOperatorContexts().get(3), 500, timeJoinOperator); while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) { - TsBlock tsBlock = offsetOperator.next(); + TsBlock tsBlock = nextNonNull(offsetOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -455,7 +456,7 @@ public void batchTest4() throws Exception { driverContext.getOperatorContexts().get(3), 98_784_247_808L, timeJoinOperator); while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) { - TsBlock tsBlock = offsetOperator.next(); + TsBlock tsBlock = nextNonNull(offsetOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -468,4 +469,14 @@ public void batchTest4() throws Exception { instanceNotificationExecutor.shutdown(); } } + + private static TsBlock nextNonNull(Operator operator) throws Exception { + while (operator.hasNext()) { + TsBlock result = operator.next(); + if (result != null) { + return result; + } + } + throw new AssertionError("Expected a non-null TsBlock from operator"); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java new file mode 100644 index 000000000000..37b979e87bec --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public final class OperatorTestUtils { + + private OperatorTestUtils() { + // Utility class. + } + + public static TsBlock nextNonNullOrEmpty(Operator operator) throws Exception { + while (operator.hasNext()) { + TsBlock result = operator.next(); + if (!isNullOrEmpty(result)) { + return result; + } + } + throw new AssertionError("Expected a non-null and non-empty TsBlock from operator"); + } + + public static TsBlock lastNonNullOrEmpty(Operator operator) throws Exception { + TsBlock result = null; + while (operator.isBlocked().isDone() && operator.hasNext()) { + TsBlock nextResult = operator.next(); + if (!isNullOrEmpty(nextResult)) { + result = nextResult; + } + } + return result; + } + + private static boolean isNullOrEmpty(TsBlock tsBlock) { + return tsBlock == null || tsBlock.getPositionCount() == 0; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java index bbf96e89fb4f..afa08223d646 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java @@ -66,6 +66,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; public class SeriesAggregationScanOperatorTest { @@ -111,7 +112,7 @@ public void testAggregationWithoutTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); count++; } @@ -135,7 +136,7 @@ public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); count++; } @@ -161,7 +162,7 @@ public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001); count++; @@ -192,7 +193,7 @@ public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(20000, resultTsBlock.getColumn(0).getInt(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(0, resultTsBlock.getColumn(2).getLong(0)); @@ -228,7 +229,7 @@ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Ex int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(20000, resultTsBlock.getColumn(0).getInt(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(0, resultTsBlock.getColumn(2).getLong(0)); @@ -259,7 +260,7 @@ public void testAggregationWithTimeFilter1() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(resultTsBlock.getColumn(0).getLong(0), 380); count++; } @@ -285,7 +286,7 @@ public void testAggregationWithTimeFilter2() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(resultTsBlock.getColumn(0).getLong(0), 380); count++; } @@ -311,7 +312,7 @@ public void testAggregationWithTimeFilter3() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(resultTsBlock.getColumn(0).getLong(0), 300); count++; } @@ -343,7 +344,7 @@ public void testMultiAggregationWithTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(20100, resultTsBlock.getColumn(0).getInt(0)); assertEquals(399, resultTsBlock.getColumn(1).getInt(0)); assertEquals(100, resultTsBlock.getColumn(2).getLong(0)); @@ -375,7 +376,7 @@ public void testGroupByWithoutGlobalTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -408,7 +409,7 @@ public void testGroupByWithGlobalTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -450,7 +451,7 @@ public void testGroupByWithMultiFunction() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -495,7 +496,7 @@ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(pos)); @@ -530,7 +531,7 @@ public void testGroupBySlidingTimeWindow() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -563,7 +564,7 @@ public void testGroupBySlidingTimeWindow2() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); @@ -606,7 +607,7 @@ public void testGroupBySlidingWindowWithMultiFunction() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java index 11daae6f1c54..39455ab42089 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -114,7 +115,7 @@ public void batchTest() throws Exception { int count = 0; while (seriesScanOperator.hasNext()) { - TsBlock tsBlock = seriesScanOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(seriesScanOperator); assertEquals(1, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java index 0c9c47815aeb..30604831e268 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java @@ -62,6 +62,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -103,7 +104,7 @@ public void testUpdateLastCacheOperatorTestWithoutTimeFilter() { assertTrue(updateLastCacheOperator.isBlocked().isDone()); assertTrue(updateLastCacheOperator.hasNext()); - TsBlock result = updateLastCacheOperator.next(); + TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator); assertEquals(1, result.getPositionCount()); assertEquals(3, result.getValueColumnCount()); @@ -133,7 +134,7 @@ public void testUpdateLastCacheOperatorTestWithTimeFilter1() { assertTrue(updateLastCacheOperator.isBlocked().isDone()); assertTrue(updateLastCacheOperator.hasNext()); - TsBlock result = updateLastCacheOperator.next(); + TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator); assertEquals(1, result.getPositionCount()); assertEquals(3, result.getValueColumnCount()); @@ -163,7 +164,7 @@ public void testUpdateLastCacheOperatorTestWithTimeFilter2() { assertTrue(updateLastCacheOperator.isBlocked().isDone()); assertTrue(updateLastCacheOperator.hasNext()); - TsBlock result = updateLastCacheOperator.next(); + TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator); assertEquals(1, result.getPositionCount()); assertEquals(3, result.getValueColumnCount());