diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java index 9e83c8ac41bf8..91d7a0f5bc415 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java @@ -434,6 +434,107 @@ public void testCapacityFunction() { expectedHeader, retArray, DATABASE_NAME); + + // CAPACITY with SLIDE=2 (same as SIZE=2, should behave identically to no SLIDE) + expectedHeader = new String[] {"window_index", "time", "stock_id", "price", "s1"}; + retArray = + new String[] { + "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,", + "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,", + "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + }; + tableResultSetEqualTest( + "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 2) ORDER BY stock_id, time", + expectedHeader, + retArray, + DATABASE_NAME); + + // CAPACITY with SIZE=2, SLIDE=1 (overlapping windows) + expectedHeader = new String[] {"window_index", "time", "stock_id", "price", "s1"}; + retArray = + new String[] { + "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,", + "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "1,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "2,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,", + "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + "1,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + "2,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + }; + tableResultSetEqualTest( + "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 1) ORDER BY stock_id, window_index, time", + expectedHeader, + retArray, + DATABASE_NAME); + + // CAPACITY with SIZE=3, SLIDE=2 (overlapping windows, different params) + expectedHeader = new String[] {"window_index", "time", "stock_id", "price", "s1"}; + retArray = + new String[] { + "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,", + "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "0,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,", + "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,", + "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + "0,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,", + }; + tableResultSetEqualTest( + "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 3, SLIDE => 2) ORDER BY stock_id, window_index, time", + expectedHeader, + retArray, + DATABASE_NAME); + + // CAPACITY with SIZE=2, SLIDE=3 (gap windows, some rows discarded) + expectedHeader = new String[] {"window_index", "time", "stock_id", "price", "s1"}; + retArray = + new String[] { + "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,", + "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,", + "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,", + "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,", + }; + tableResultSetEqualTest( + "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 3) ORDER BY stock_id, window_index, time", + expectedHeader, + retArray, + DATABASE_NAME); + + // CAPACITY with SIZE=2, SLIDE=1 + GROUP BY (verify aggregation with overlapping windows) + expectedHeader = new String[] {"stock_id", "window_index", "avg"}; + retArray = + new String[] { + "AAPL,0,101.5,", + "AAPL,1,102.5,", + "AAPL,2,102.0,", + "TESL,0,201.0,", + "TESL,1,198.5,", + "TESL,2,195.0,", + }; + tableResultSetEqualTest( + "SELECT stock_id, window_index, avg(price) as avg FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 1) GROUP BY window_index, stock_id ORDER BY stock_id, window_index", + expectedHeader, + retArray, + DATABASE_NAME); + + // CAPACITY with negative SLIDE (error case) + tableAssertTestFail( + "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => -1) ORDER BY stock_id, time", + "Invalid scalar argument SLIDE, should be a positive value", + DATABASE_NAME); + + // CAPACITY with SLIDE=0 (error case) + tableAssertTestFail( + "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 0) ORDER BY stock_id, time", + "Invalid scalar argument SLIDE, should be a positive value", + DATABASE_NAME); } @Test diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java index d26ba6d837699..5d59d8cfd704b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java @@ -42,9 +42,12 @@ import java.util.List; import java.util.Map; +import static org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER; + public class CapacityTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; private static final String SIZE_PARAMETER_NAME = "SIZE"; + private static final String SLIDE_PARAMETER_NAME = "SLIDE"; @Override public List getArgumentsSpecifications() { @@ -53,7 +56,17 @@ public List getArgumentsSpecifications() { .name(DATA_PARAMETER_NAME) .passThroughColumns() .build(), - ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build()); + ScalarParameterSpecification.builder() + .name(SIZE_PARAMETER_NAME) + .addChecker(POSITIVE_LONG_CHECKER) + .type(Type.INT64) + .build(), + ScalarParameterSpecification.builder() + .name(SLIDE_PARAMETER_NAME) + .addChecker(POSITIVE_LONG_CHECKER) + .type(Type.INT64) + .defaultValue(-1L) + .build()); } @Override @@ -62,8 +75,16 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF if (size <= 0) { throw new UDFException("Size must be greater than 0"); } + long slide = (long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(); + // default SLIDE to SIZE when not specified (sentinel value -1) + if (slide == -1L) { + slide = size; + } MapTableFunctionHandle handle = - new MapTableFunctionHandle.Builder().addProperty(SIZE_PARAMETER_NAME, size).build(); + new MapTableFunctionHandle.Builder() + .addProperty(SIZE_PARAMETER_NAME, size) + .addProperty(SLIDE_PARAMETER_NAME, slide) + .build(); return TableFunctionAnalysis.builder() .properColumnSchema( new DescribedSchema.Builder().addField("window_index", Type.INT64).build()) @@ -81,12 +102,13 @@ public TableFunctionHandle createTableFunctionHandle() { @Override public TableFunctionProcessorProvider getProcessorProvider( TableFunctionHandle tableFunctionHandle) { - long sz = - (long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME); + MapTableFunctionHandle handle = (MapTableFunctionHandle) tableFunctionHandle; + long size = (long) handle.getProperty(SIZE_PARAMETER_NAME); + long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new CapacityDataProcessor(sz); + return new CapacityDataProcessor(size, slide); } }; } @@ -94,12 +116,12 @@ public TableFunctionDataProcessor getDataProcessor() { private static class CapacityDataProcessor implements TableFunctionDataProcessor { private final long size; - private long currentStartIndex = 0; + private final long slide; private long curIndex = 0; - private long windowIndex = 0; - public CapacityDataProcessor(long size) { + public CapacityDataProcessor(long size, long slide) { this.size = size; + this.slide = slide; } @Override @@ -107,26 +129,21 @@ public void process( Record input, List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - if (curIndex - currentStartIndex == size) { - outputWindow(properColumnBuilders, passThroughIndexBuilder); - currentStartIndex = curIndex; + // For each row at curIndex, find all windows k such that: + // k * slide <= curIndex < k * slide + size, and k >= 0 + // The first valid k: max(0, ceil((curIndex - size + 1) / slide)) + // The last valid k: floor(curIndex / slide) + long firstWindow = Math.max(0, (curIndex - size + slide) / slide); + long lastWindow = curIndex / slide; + for (long k = firstWindow; k <= lastWindow; k++) { + // Verify: k * slide <= curIndex < k * slide + size + long windowStart = k * slide; + if (windowStart <= curIndex && curIndex < windowStart + size) { + properColumnBuilders.get(0).writeLong(k); + passThroughIndexBuilder.writeLong(curIndex); + } } curIndex++; } - - @Override - public void finish( - List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - outputWindow(properColumnBuilders, passThroughIndexBuilder); - } - - private void outputWindow( - List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - for (long i = currentStartIndex; i < curIndex; i++) { - properColumnBuilders.get(0).writeLong(windowIndex); - passThroughIndexBuilder.writeLong(i); - } - windowIndex++; - } } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java new file mode 100644 index 0000000000000..4f40e73758621 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin.relational.tvf; + +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class CapacityTableFunctionTest { + + private final CapacityTableFunction function = new CapacityTableFunction(); + + // ======================== analyze() tests ======================== + + @Test + public void testAnalyzeWithSlideDefault() throws UDFException { + Map args = new HashMap<>(); + args.put("SIZE", new ScalarArgument(Type.INT64, 5L)); + args.put("SLIDE", new ScalarArgument(Type.INT64, -1L)); + + TableFunctionAnalysis analysis = function.analyze(args); + assertNotNull(analysis); + } + + @Test + public void testAnalyzeWithExplicitSlide() throws UDFException { + Map args = new HashMap<>(); + args.put("SIZE", new ScalarArgument(Type.INT64, 4L)); + args.put("SLIDE", new ScalarArgument(Type.INT64, 2L)); + + TableFunctionAnalysis analysis = function.analyze(args); + assertNotNull(analysis); + } + + @Test(expected = UDFException.class) + public void testAnalyzeSizeZero() throws UDFException { + Map args = new HashMap<>(); + args.put("SIZE", new ScalarArgument(Type.INT64, 0L)); + args.put("SLIDE", new ScalarArgument(Type.INT64, -1L)); + + function.analyze(args); + } + + @Test(expected = UDFException.class) + public void testAnalyzeSizeNegative() throws UDFException { + Map args = new HashMap<>(); + args.put("SIZE", new ScalarArgument(Type.INT64, -3L)); + args.put("SLIDE", new ScalarArgument(Type.INT64, -1L)); + + function.analyze(args); + } + + // ======================== processor tests ======================== + + /** + * Helper: builds the processor from analyze() -> getProcessorProvider() chain, then feeds N rows + * through process(). Returns captured (windowIndex, passThroughIndex) pairs. + */ + private List runProcessor(long size, long slide, int rowCount) throws UDFException { + Map args = new HashMap<>(); + args.put("SIZE", new ScalarArgument(Type.INT64, size)); + args.put("SLIDE", new ScalarArgument(Type.INT64, slide == -1 ? -1L : slide)); + + TableFunctionAnalysis analysis = function.analyze(args); + TableFunctionHandle handle = analysis.getTableFunctionHandle(); + + TableFunctionProcessorProvider provider = function.getProcessorProvider(handle); + TableFunctionDataProcessor processor = provider.getDataProcessor(); + + Record mockRecord = Mockito.mock(Record.class); + List results = new ArrayList<>(); + + for (int i = 0; i < rowCount; i++) { + ArgumentCaptor windowCaptor = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor indexCaptor = ArgumentCaptor.forClass(Long.class); + + ColumnBuilder properBuilder = Mockito.mock(ColumnBuilder.class); + ColumnBuilder passThroughBuilder = Mockito.mock(ColumnBuilder.class); + + processor.process(mockRecord, Collections.singletonList(properBuilder), passThroughBuilder); + + Mockito.verify(properBuilder, Mockito.atLeast(0)).writeLong(windowCaptor.capture()); + Mockito.verify(passThroughBuilder, Mockito.atLeast(0)).writeLong(indexCaptor.capture()); + + List windows = windowCaptor.getAllValues(); + List indices = indexCaptor.getAllValues(); + for (int j = 0; j < windows.size(); j++) { + results.add(new long[] {windows.get(j), indices.get(j)}); + } + } + return results; + } + + @Test + public void testSlideEqualsSize() throws UDFException { + // SIZE=2, SLIDE=2 (non-overlapping), 5 rows + // window0: rows 0,1; window1: rows 2,3; window2: row 4 + List results = runProcessor(2, 2, 5); + long[][] expected = {{0, 0}, {0, 1}, {1, 2}, {1, 3}, {2, 4}}; + assertResultsEqual(expected, results); + } + + @Test + public void testSlideDefault() throws UDFException { + // SIZE=2, SLIDE=-1 (defaults to SIZE=2), 5 rows — same as above + List results = runProcessor(2, -1, 5); + long[][] expected = {{0, 0}, {0, 1}, {1, 2}, {1, 3}, {2, 4}}; + assertResultsEqual(expected, results); + } + + @Test + public void testSlideLessThanSize() throws UDFException { + // SIZE=2, SLIDE=1 (overlapping), 3 rows + // row0: window 0 + // row1: window 0, 1 + // row2: window 1, 2 + List results = runProcessor(2, 1, 3); + long[][] expected = {{0, 0}, {0, 1}, {1, 1}, {1, 2}, {2, 2}}; + assertResultsEqual(expected, results); + } + + @Test + public void testSlideGreaterThanSize() throws UDFException { + // SIZE=2, SLIDE=3 (gap), 6 rows + // window0: rows 0,1; row2: gap; window1: rows 3,4; row5: gap + List results = runProcessor(2, 3, 6); + long[][] expected = {{0, 0}, {0, 1}, {1, 3}, {1, 4}}; + assertResultsEqual(expected, results); + } + + @Test + public void testOverlappingLargeSize() throws UDFException { + // SIZE=3, SLIDE=2 (overlapping), 3 rows + // row0: window 0 + // row1: window 0 + // row2: window 0, 1 + List results = runProcessor(3, 2, 3); + long[][] expected = {{0, 0}, {0, 1}, {0, 2}, {1, 2}}; + assertResultsEqual(expected, results); + } + + @Test + public void testSingleRow() throws UDFException { + // SIZE=3, SLIDE=1, 1 row — row0 belongs to window 0 only + List results = runProcessor(3, 1, 1); + long[][] expected = {{0, 0}}; + assertResultsEqual(expected, results); + } + + @Test + public void testGetArgumentsSpecifications() { + assertEquals(3, function.getArgumentsSpecifications().size()); + } + + @Test + public void testCreateTableFunctionHandle() { + assertNotNull(function.createTableFunctionHandle()); + } + + private void assertResultsEqual(long[][] expected, List actual) { + assertEquals("Result count mismatch", expected.length, actual.size()); + for (int i = 0; i < expected.length; i++) { + assertEquals("Window index mismatch at position " + i, expected[i][0], actual.get(i)[0]); + assertEquals("PassThrough index mismatch at position " + i, expected[i][1], actual.get(i)[1]); + } + } +}