diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java index 9a68324a9119..ff4f2a5f638b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java @@ -69,6 +69,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -122,7 +123,7 @@ public void testAggregationWithoutTimeFilter() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(500, resultTsBlock.getColumn(i).getLong(0)); } @@ -153,7 +154,7 @@ public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, false, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(500, resultTsBlock.getColumn(i).getLong(0)); } @@ -189,7 +190,7 @@ public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001); count++; @@ -227,7 +228,7 @@ public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertTrue(resultTsBlock.getColumn(0).getBoolean(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(20199, resultTsBlock.getColumn(2).getLong(0)); @@ -269,7 +270,7 @@ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Ex initAlignedSeriesAggregationScanOperator(aggregators, null, false, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertTrue(resultTsBlock.getColumn(0).getBoolean(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(20199, resultTsBlock.getColumn(2).getLong(0)); @@ -304,7 +305,7 @@ public void testAggregationWithTimeFilter1() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(resultTsBlock.getColumn(i).getLong(0), 380); } @@ -337,7 +338,7 @@ public void testAggregationWithTimeFilter2() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(resultTsBlock.getColumn(i).getLong(0), 380); } @@ -370,7 +371,7 @@ public void testAggregationWithTimeFilter3() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); for (int i = 0; i < measurementSchemas.size(); i++) { assertEquals(resultTsBlock.getColumn(i).getLong(0), 300); } @@ -410,7 +411,7 @@ public void testMultiAggregationWithTimeFilter() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertTrue(resultTsBlock.getColumn(0).getBoolean(0)); assertEquals(399, resultTsBlock.getColumn(1).getInt(0)); assertEquals(20199, resultTsBlock.getColumn(2).getLong(0)); @@ -448,7 +449,7 @@ public void testGroupByWithoutGlobalTimeFilter() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -489,7 +490,7 @@ public void testGroupByWithGlobalTimeFilter() throws Exception { aggregators, timeFilter, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -534,7 +535,7 @@ public void testGroupByWithMultiFunction() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -580,7 +581,7 @@ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, false, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(pos)); @@ -616,7 +617,7 @@ public void testGroupBySlidingTimeWindow() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -649,7 +650,7 @@ public void testGroupBySlidingTimeWindow2() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); @@ -693,7 +694,7 @@ public void testGroupBySlidingWindowWithMultiFunction() throws Exception { initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java index 7252d2341bd3..3262bf97f135 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java @@ -72,6 +72,7 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.lastNonNullOrEmpty; import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -258,10 +259,7 @@ public void testSingleDeviceSmallData() throws Exception { prepareDeviceData("device0", 2); operator = createAndInitOperatorForSingleDevices(2); - TsBlock result = null; - while (operator.isBlocked().isDone() && operator.hasNext()) { - result = operator.next(); - } + TsBlock result = lastNonNullOrEmpty(operator); assertNotNull(result); assertEquals(2, result.getPositionCount()); @@ -276,10 +274,7 @@ public void testSingleDeviceExceedsMaxTsBlockSize() throws Exception { prepareDeviceData("device0", 10); operator = createAndInitOperatorForSingleDevices(10); - TsBlock result = null; - while (operator.isBlocked().isDone() && operator.hasNext()) { - result = operator.next(); - } + TsBlock result = lastNonNullOrEmpty(operator); assertNotNull(result); assertEquals(10, result.getPositionCount()); @@ -295,10 +290,7 @@ public void testMultipleDeviceSmallData() throws Exception { prepareDeviceData("device1", 1); operator = createAndInitOperatorForMultipleDevices(2, 1); - TsBlock result = null; - while (operator.isBlocked().isDone() && operator.hasNext()) { - result = operator.next(); - } + TsBlock result = lastNonNullOrEmpty(operator); assertNotNull(result); assertEquals(2, result.getPositionCount()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java index 24a13ea0d7f3..a0d3d57a5823 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -210,7 +211,7 @@ public long ramBytesUsed() { } }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + 1 + count * 10000L; assertEquals(expectedTime, block.getTimeByIndex(i)); @@ -383,7 +384,7 @@ public long ramBytesUsed() { } }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + 1 + count * 10000L; assertEquals(expectedTime, block.getTimeByIndex(i)); @@ -560,7 +561,7 @@ public long ramBytesUsed() { } }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + 1 + count * 10000L; assertEquals(expectedTime, block.getTimeByIndex(i)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java index 135bb7ff83f9..d479f145317a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java @@ -68,6 +68,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -203,7 +204,7 @@ public void batchTest1() throws Exception { int count = 0; while (horizontallyConcatOperator.isBlocked().isDone() && horizontallyConcatOperator.hasNext()) { - TsBlock tsBlock = horizontallyConcatOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(horizontallyConcatOperator); assertEquals(6, tsBlock.getValueColumnCount()); for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { assertEquals(count, tsBlock.getTimeByIndex(i)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java index b6cdefd41944..cfbcf0e26306 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java @@ -93,6 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1794,7 +1795,7 @@ public void mergeSortTest() throws Exception { int index = 0; while (treeMergeSortOperator.isBlocked().isDone() && treeMergeSortOperator.hasNext()) { - TsBlock result = treeMergeSortOperator.next(); + TsBlock result = nextNonNullOrEmpty(treeMergeSortOperator); for (int i = 0; i < result.getPositionCount(); i++) { long time = result.getTimeByIndex(i); assertEquals(time, ans[index++]); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java index 526577d61bb2..12f8555d0ec9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.queryengine.execution.operator; +import org.apache.iotdb.calc.execution.operator.Operator; import org.apache.iotdb.calc.execution.operator.process.LimitOperator; import org.apache.iotdb.calc.execution.operator.process.OffsetOperator; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; @@ -65,6 +66,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -168,7 +170,7 @@ public void batchTest1() throws Exception { new LimitOperator(driverContext.getOperatorContexts().get(4), 250, offsetOperator); int count = 100; while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) { - TsBlock tsBlock = limitOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(limitOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -276,7 +278,7 @@ public void batchTest2() throws Exception { int count = 0; while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) { - TsBlock tsBlock = offsetOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(offsetOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -382,7 +384,7 @@ public void batchTest3() throws Exception { new OffsetOperator(driverContext.getOperatorContexts().get(3), 500, timeJoinOperator); while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) { - TsBlock tsBlock = offsetOperator.next(); + TsBlock tsBlock = nextNonNull(offsetOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -474,7 +476,7 @@ public void batchTest4() throws Exception { driverContext.getOperatorContexts().get(3), 98_784_247_808L, timeJoinOperator); while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) { - TsBlock tsBlock = offsetOperator.next(); + TsBlock tsBlock = nextNonNull(offsetOperator); assertEquals(2, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); assertTrue(tsBlock.getColumn(1) instanceof IntColumn); @@ -487,4 +489,14 @@ public void batchTest4() throws Exception { instanceNotificationExecutor.shutdown(); } } + + private static TsBlock nextNonNull(Operator operator) throws Exception { + while (operator.hasNext()) { + TsBlock result = operator.next(); + if (result != null) { + return result; + } + } + throw new AssertionError("Expected a non-null TsBlock from operator"); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java new file mode 100644 index 000000000000..77d1eddb4c0c --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.calc.execution.operator.Operator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public final class OperatorTestUtils { + + private OperatorTestUtils() { + // Utility class. + } + + public static TsBlock nextNonNullOrEmpty(Operator operator) throws Exception { + while (operator.hasNext()) { + TsBlock result = operator.next(); + if (!isNullOrEmpty(result)) { + return result; + } + } + throw new AssertionError("Expected a non-null and non-empty TsBlock from operator"); + } + + public static TsBlock lastNonNullOrEmpty(Operator operator) throws Exception { + TsBlock result = null; + while (operator.isBlocked().isDone() && operator.hasNext()) { + TsBlock nextResult = operator.next(); + if (!isNullOrEmpty(nextResult)) { + result = nextResult; + } + } + return result; + } + + private static boolean isNullOrEmpty(TsBlock tsBlock) { + return tsBlock == null || tsBlock.getPositionCount() == 0; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java index edf22b93a4a1..43efba320e9a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java @@ -68,6 +68,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; public class SeriesAggregationScanOperatorTest { @@ -113,7 +114,7 @@ public void testAggregationWithoutTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); count++; } @@ -137,7 +138,7 @@ public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); count++; } @@ -163,7 +164,7 @@ public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(500, resultTsBlock.getColumn(0).getLong(0)); assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001); count++; @@ -194,7 +195,7 @@ public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(20000, resultTsBlock.getColumn(0).getInt(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(0, resultTsBlock.getColumn(2).getLong(0)); @@ -230,7 +231,7 @@ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Ex int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(20000, resultTsBlock.getColumn(0).getInt(0)); assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); assertEquals(0, resultTsBlock.getColumn(2).getLong(0)); @@ -261,7 +262,7 @@ public void testAggregationWithTimeFilter1() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(resultTsBlock.getColumn(0).getLong(0), 380); count++; } @@ -287,7 +288,7 @@ public void testAggregationWithTimeFilter2() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(resultTsBlock.getColumn(0).getLong(0), 380); count++; } @@ -313,7 +314,7 @@ public void testAggregationWithTimeFilter3() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(resultTsBlock.getColumn(0).getLong(0), 300); count++; } @@ -345,7 +346,7 @@ public void testMultiAggregationWithTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); assertEquals(20100, resultTsBlock.getColumn(0).getInt(0)); assertEquals(399, resultTsBlock.getColumn(1).getInt(0)); assertEquals(100, resultTsBlock.getColumn(2).getLong(0)); @@ -377,7 +378,7 @@ public void testGroupByWithoutGlobalTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -410,7 +411,7 @@ public void testGroupByWithGlobalTimeFilter() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -452,7 +453,7 @@ public void testGroupByWithMultiFunction() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -497,7 +498,7 @@ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(pos)); @@ -532,7 +533,7 @@ public void testGroupBySlidingTimeWindow() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos)); @@ -565,7 +566,7 @@ public void testGroupBySlidingTimeWindow2() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); @@ -608,7 +609,7 @@ public void testGroupBySlidingWindowWithMultiFunction() throws Exception { int count = 0; while (seriesAggregationScanOperator.hasNext()) { - TsBlock resultTsBlock = seriesAggregationScanOperator.next(); + TsBlock resultTsBlock = nextNonNullOrEmpty(seriesAggregationScanOperator); int positionCount = resultTsBlock.getPositionCount(); for (int pos = 0; pos < positionCount; pos++) { assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(pos)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java index 92cb35f515af..23dda52f2bdd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -119,7 +120,7 @@ public void batchTest() throws Exception { int count = 0; while (seriesScanOperator.hasNext()) { - TsBlock tsBlock = seriesScanOperator.next(); + TsBlock tsBlock = nextNonNullOrEmpty(seriesScanOperator); assertEquals(1, tsBlock.getValueColumnCount()); assertTrue(tsBlock.getColumn(0) instanceof IntColumn); for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java index 690da2ffd193..81cc4c6654ac 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java @@ -69,6 +69,7 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.lastNonNullOrEmpty; import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -198,10 +199,7 @@ public void testAllResultsInSingleTsBlock() throws Exception { prepareSourceTargetPairs(2); operator = createAndInitOperator(2); - TsBlock result = null; - while (operator.isBlocked().isDone() && operator.hasNext()) { - result = operator.next(); - } + TsBlock result = lastNonNullOrEmpty(operator); assertNotNull(result); assertEquals(2, result.getPositionCount()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java index 393a23dc2a70..7d1962fb3f46 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java @@ -42,8 +42,8 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TreeLinearFillOperatorTest { @@ -1309,8 +1309,7 @@ public long ramBytesUsed() { }; while (fillOperator.hasNext()) { - TsBlock block = fillOperator.next(); - assertNotNull(block); + TsBlock block = nextNonNullOrEmpty(fillOperator); for (int i = 0; i < block.getPositionCount(); i++) { long expectedTime = i + count; assertEquals(expectedTime, block.getTimeByIndex(i)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java index c9688bbc2af3..2454ffb4fa87 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java @@ -63,6 +63,7 @@ import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -104,7 +105,7 @@ public void testUpdateLastCacheOperatorTestWithoutTimeFilter() { assertTrue(updateLastCacheOperator.isBlocked().isDone()); assertTrue(updateLastCacheOperator.hasNext()); - TsBlock result = updateLastCacheOperator.next(); + TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator); assertEquals(1, result.getPositionCount()); assertEquals(3, result.getValueColumnCount()); @@ -134,7 +135,7 @@ public void testUpdateLastCacheOperatorTestWithTimeFilter1() { assertTrue(updateLastCacheOperator.isBlocked().isDone()); assertTrue(updateLastCacheOperator.hasNext()); - TsBlock result = updateLastCacheOperator.next(); + TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator); assertEquals(1, result.getPositionCount()); assertEquals(3, result.getValueColumnCount()); @@ -164,7 +165,7 @@ public void testUpdateLastCacheOperatorTestWithTimeFilter2() { assertTrue(updateLastCacheOperator.isBlocked().isDone()); assertTrue(updateLastCacheOperator.hasNext()); - TsBlock result = updateLastCacheOperator.next(); + TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator); assertEquals(1, result.getPositionCount()); assertEquals(3, result.getValueColumnCount());