diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java new file mode 100644 index 0000000000000..3ece5cf865e93 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual; + +import org.apache.iotdb.rpc.RpcUtils; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BytesUtils; + +import java.nio.charset.StandardCharsets; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; + +public class TypeConversionSemanticCase { + + public static final int ROW_COUNT = 3; + + public static final List CASES = + Arrays.asList( + c( + "bool_to_int32", + TSDataType.BOOLEAN, + TSDataType.INT32, + values("true", "false", "true"), + values("1", "0", "1")), + c( + "bool_to_int64", + TSDataType.BOOLEAN, + TSDataType.INT64, + values("true", "false", "true"), + values("1", "0", "1")), + c( + "bool_to_float", + TSDataType.BOOLEAN, + TSDataType.FLOAT, + values("true", "false", "true"), + values("1.0", "0.0", "1.0")), + c( + "bool_to_double", + TSDataType.BOOLEAN, + TSDataType.DOUBLE, + values("true", "false", "true"), + values("1.0", "0.0", "1.0")), + c( + "bool_to_text", + TSDataType.BOOLEAN, + TSDataType.TEXT, + values("true", "false", "true"), + values("true", "false", "true")), + c( + "bool_to_blob", + TSDataType.BOOLEAN, + TSDataType.BLOB, + values("true", "false", "true"), + values(blobValue("true"), blobValue("false"), blobValue("true"))), + c( + "bool_to_string", + TSDataType.BOOLEAN, + TSDataType.STRING, + values("true", "false", "true"), + values("true", "false", "true")), + c( + "bool_to_date", + TSDataType.BOOLEAN, + TSDataType.DATE, + values("true", "false", "true"), + values("1970-01-02", "1970-01-01", "1970-01-02")), + c( + "bool_to_timestamp", + TSDataType.BOOLEAN, + TSDataType.TIMESTAMP, + values("true", "false", "true"), + values(timestampValue(1), timestampValue(0), timestampValue(1))), + c( + "int32_to_boolean", + TSDataType.INT32, + TSDataType.BOOLEAN, + values("0", "2", "-1"), + values("false", "true", "true")), + c( + "int32_to_timestamp", + TSDataType.INT32, + TSDataType.TIMESTAMP, + values("0", "1", "86400000"), + values(timestampValue(0), timestampValue(1), timestampValue(86400000))), + c( + "int32_to_date", + TSDataType.INT32, + TSDataType.DATE, + values("19700102", "20240229", "42"), + values("1970-01-02", "2024-02-29", "1970-01-01")), + c( + "int64_to_int32", + TSDataType.INT64, + TSDataType.INT32, + values("2147483648", "-2147483649", "42"), + values("-2147483648", "2147483647", "42")), + c( + "int64_to_boolean", + TSDataType.INT64, + TSDataType.BOOLEAN, + values("0", "2", "-1"), + values("false", "true", "true")), + c( + "int64_to_date", + TSDataType.INT64, + TSDataType.DATE, + values("19700102", "2147483648", "19700103"), + values("1970-01-02", "1970-01-01", "1970-01-03")), + c( + "float_to_int32", + TSDataType.FLOAT, + TSDataType.INT32, + values("2.9", "-2.9", "0.0"), + values("2", "-2", "0")), + c( + "float_to_boolean", + TSDataType.FLOAT, + TSDataType.BOOLEAN, + values("0.0", "0.1", "-0.1"), + values("false", "true", "true")), + c( + "float_to_date", + TSDataType.FLOAT, + TSDataType.DATE, + values("19700102.0", "19700104.0", "42.9"), + values("1970-01-02", "1970-01-04", "1970-01-01")), + c( + "double_to_int64", + TSDataType.DOUBLE, + TSDataType.INT64, + values("3.9", "-3.9", "0.0"), + values("3", "-3", "0")), + c( + "double_to_boolean", + TSDataType.DOUBLE, + TSDataType.BOOLEAN, + values("0.0", "0.1", "-0.1"), + values("false", "true", "true")), + c( + "double_to_timestamp", + TSDataType.DOUBLE, + TSDataType.TIMESTAMP, + values("1.9", "86400000.9", "0.0"), + values(timestampValue(1), timestampValue(86400000), timestampValue(0))), + c( + "text_to_int32", + TSDataType.TEXT, + TSDataType.INT32, + values("'123.9'", "'bad'", "'-123.9'"), + values("123", "0", "-123")), + c( + "string_to_int64", + TSDataType.STRING, + TSDataType.INT64, + values("'456.9'", "'bad'", "'-456.9'"), + values("456", "0", "-456")), + c( + "blob_to_float", + TSDataType.BLOB, + TSDataType.FLOAT, + values(blobSql("7.5"), blobSql("bad"), blobSql("-7.5")), + values("7.5", "0.0", "-7.5")), + c( + "text_to_double", + TSDataType.TEXT, + TSDataType.DOUBLE, + values("'8.25'", "'bad'", "'-8.25'"), + values("8.25", "0.0", "-8.25")), + c( + "text_to_boolean", + TSDataType.TEXT, + TSDataType.BOOLEAN, + values("'true'", "'1'", "'TrUe'"), + values("true", "false", "true")), + c( + "string_to_boolean", + TSDataType.STRING, + TSDataType.BOOLEAN, + values("'TRUE'", "'false'", "'yes'"), + values("true", "false", "false")), + c( + "blob_to_boolean", + TSDataType.BLOB, + TSDataType.BOOLEAN, + values(blobSql("true"), blobSql("FALSE"), blobSql("0")), + values("true", "false", "false")), + c( + "text_to_timestamp", + TSDataType.TEXT, + TSDataType.TIMESTAMP, + values("'86400000'", "'1970-01-02T00:00:00.000'", "'bad'"), + values(timestampValue(86400000), timestampValue(86400000), timestampValue(0))), + c( + "string_to_timestamp", + TSDataType.STRING, + TSDataType.TIMESTAMP, + values("'1970-01-03T00:00:00.000'", "'bad'", "'86400000'"), + values(timestampValue(172800000), timestampValue(0), timestampValue(86400000))), + c( + "blob_to_timestamp", + TSDataType.BLOB, + TSDataType.TIMESTAMP, + values(blobSql("bad"), blobSql("1"), blobSql("1970-01-02T00:00:00.000")), + values(timestampValue(0), timestampValue(1), timestampValue(86400000))), + c( + "text_to_date", + TSDataType.TEXT, + TSDataType.DATE, + values("'19700102'", "'1970-01-04'", "'bad'"), + values("1970-01-02", "1970-01-04", "1970-01-01")), + c( + "string_to_date", + TSDataType.STRING, + TSDataType.DATE, + values("'1970-01-03'", "'19700105'", "'1970-01-07'"), + values("1970-01-03", "1970-01-05", "1970-01-07")), + c( + "blob_to_date", + TSDataType.BLOB, + TSDataType.DATE, + values(blobSql("bad"), blobSql("1970-01-06"), blobSql("19700108")), + values("1970-01-01", "1970-01-06", "1970-01-08")), + c( + "timestamp_to_date", + TSDataType.TIMESTAMP, + TSDataType.DATE, + values("0", "86399999", "86400000"), + values("1970-01-01", "1970-01-01", "1970-01-02")), + c( + "date_to_timestamp", + TSDataType.DATE, + TSDataType.TIMESTAMP, + values("'1970-01-01'", "'1970-01-02'", "'1970-01-03'"), + values(timestampValue(0), timestampValue(86400000), timestampValue(172800000))), + c( + "timestamp_to_boolean", + TSDataType.TIMESTAMP, + TSDataType.BOOLEAN, + values("0", "-1", "1"), + values("false", "true", "true")), + c( + "date_to_boolean", + TSDataType.DATE, + TSDataType.BOOLEAN, + values("'1970-01-01'", "'1970-01-02'", "'1969-12-31'"), + values("false", "true", "true"))); + + public final String measurement; + public final TSDataType sourceType; + public final TSDataType targetType; + public final String[] sourceSqlValues; + public final String[] expectedValues; + + private TypeConversionSemanticCase( + final String measurement, + final TSDataType sourceType, + final TSDataType targetType, + final String[] sourceSqlValues, + final String[] expectedValues) { + this.measurement = measurement; + this.sourceType = sourceType; + this.targetType = targetType; + this.sourceSqlValues = sourceSqlValues; + this.expectedValues = expectedValues; + } + + private static TypeConversionSemanticCase c( + final String measurement, + final TSDataType sourceType, + final TSDataType targetType, + final String[] sourceSqlValues, + final String[] expectedValues) { + return new TypeConversionSemanticCase( + measurement, sourceType, targetType, sourceSqlValues, expectedValues); + } + + private static String[] values(final String... values) { + return values; + } + + public static String timestampValue(final long timestamp) { + return RpcUtils.formatDatetime("default", "ms", timestamp, ZoneOffset.UTC); + } + + private static String blobSql(final String value) { + final StringBuilder builder = new StringBuilder("X'"); + for (final byte b : value.getBytes(StandardCharsets.UTF_8)) { + builder.append(String.format("%02x", b & 0xFF)); + } + return builder.append("'").toString(); + } + + private static String blobValue(final String value) { + return BytesUtils.parseBlobByteArrayToString(value.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java new file mode 100644 index 0000000000000..6704d37b87e2e --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase; +import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTableManualEnhanced.class}) +public class IoTDBPipeTypeConversionSemanticIT extends AbstractPipeTableModelDualManualIT { + + private static final String DATABASE = "pipe_type_conversion"; + private static final String TABLE = "semantic_conversion"; + private static final String STREAM_TABLE = "semantic_stream_conversion"; + private static final List STREAM_CASES = + getCases( + "bool_to_int32", + "bool_to_int64", + "bool_to_float", + "bool_to_double", + "bool_to_blob", + "bool_to_date", + "bool_to_timestamp", + "int32_to_boolean", + "int32_to_timestamp", + "int32_to_date"); + + @Override + @Before + public void setUp() { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + setupConfig(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); + } + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + receiverEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + } + + @Test + public void testPipeReceiverTypeConversionSemantics() { + createDatabaseAndTable(senderEnv, TABLE, TypeConversionSemanticCase.CASES, true); + createDatabaseAndTable(receiverEnv, TABLE, TypeConversionSemanticCase.CASES, false); + createPipe(); + + TestUtils.executeNonQueries( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + createInsertStatements(TABLE, TypeConversionSemanticCase.CASES), + null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(TABLE, TypeConversionSemanticCase.CASES), + createExpectedHeader(TypeConversionSemanticCase.CASES), + new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)), + 60, + DATABASE, + null); + } + + @Test + public void testStreamPipeReceiverTypeConversionSemantics() { + createDatabaseAndTable(senderEnv, STREAM_TABLE, STREAM_CASES, true); + createDatabaseAndTable(receiverEnv, STREAM_TABLE, STREAM_CASES, false); + createStreamPipe(); + + TestUtils.executeNonQueries( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + createInsertStatements(STREAM_TABLE, STREAM_CASES), + null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(STREAM_TABLE, STREAM_CASES), + createExpectedHeader(STREAM_CASES), + new HashSet<>(createExpectedRows(STREAM_CASES)), + 60, + DATABASE, + null); + } + + private static void createDatabaseAndTable( + final BaseEnv env, + final String table, + final List conversionCases, + final boolean useSourceType) { + final List sqls = new ArrayList<>(); + sqls.add("create database if not exists " + DATABASE); + sqls.add("use " + DATABASE); + final List columns = new ArrayList<>(); + columns.add("tag_id string tag"); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + columns.add( + String.format( + "%s %s field", + conversionCase.measurement, + useSourceType ? conversionCase.sourceType : conversionCase.targetType)); + } + sqls.add(String.format("create table %s (%s)", table, String.join(",", columns))); + TestUtils.executeNonQueries(null, BaseEnv.TABLE_SQL_DIALECT, env, sqls, null); + } + + private void createPipe() { + TestUtils.executeNonQuery( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + String.format( + "create pipe type_conversion_semantic" + + " with source ('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='forced-log')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('node-urls'='%s','batch.enable'='false','sink.format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private void createStreamPipe() { + TestUtils.executeNonQuery( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + String.format( + "create pipe stream_type_conversion_semantic" + + " with source ('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='stream')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('sink'='iotdb-thrift-sink','sink.node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private static List createInsertStatements( + final String table, final List conversionCases) { + final List sqls = new ArrayList<>(); + final String measurements = + String.join( + ",", + conversionCases.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + values.add(conversionCase.sourceSqlValues[row]); + } + sqls.add( + String.format( + "insert into %s(time,tag_id,%s) values (%d,'d',%s)", + table, measurements, row + 1, String.join(",", values))); + } + sqls.add("flush"); + return sqls; + } + + private static String createQuerySql( + final String table, final List conversionCases) { + return String.format( + "select %s,time from %s where tag_id='d'", + String.join( + ",", + conversionCases.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)), + table); + } + + private static String createExpectedHeader( + final List conversionCases) { + final List columns = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + columns.add(conversionCase.measurement); + } + columns.add("time"); + return String.join(",", columns) + ","; + } + + private static List createExpectedRows( + final List conversionCases) { + final List rows = new ArrayList<>(); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + values.add(conversionCase.expectedValues[row]); + } + values.add(TypeConversionSemanticCase.timestampValue(row + 1)); + rows.add(String.join(",", values) + ","); + } + return rows; + } + + private static List getCases(final String... measurements) { + final List cases = new ArrayList<>(); + for (final String measurement : measurements) { + cases.add(getCase(measurement)); + } + return cases; + } + + private static TypeConversionSemanticCase getCase(final String measurement) { + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + if (conversionCase.measurement.equals(measurement)) { + return conversionCase; + } + } + throw new IllegalArgumentException("Unknown type conversion semantic case: " + measurement); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java new file mode 100644 index 0000000000000..b221800337988 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase; +import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeAutoEnhanced.class}) +public class IoTDBPipeTypeConversionSemanticIT extends AbstractPipeDualTreeModelAutoIT { + + private static final String DEVICE = "root.pipe_type_conversion.d"; + private static final String ALIGNED_DEVICE = "root.pipe_type_conversion.aligned_d"; + private static final List ALIGNED_STREAM_CASES = + getCases( + "bool_to_int32", + "bool_to_int64", + "bool_to_float", + "bool_to_double", + "bool_to_blob", + "bool_to_date", + "bool_to_timestamp", + "int32_to_boolean", + "int32_to_timestamp", + "int32_to_date"); + + @Override + @Before + public void setUp() { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + setupConfig(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); + } + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + receiverEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + } + + @Test + public void testPipeReceiverTypeConversionSemantics() { + createTimeseries(senderEnv, DEVICE, TypeConversionSemanticCase.CASES, true); + createTimeseries(receiverEnv, DEVICE, TypeConversionSemanticCase.CASES, false); + createPipe(); + + TestUtils.executeNonQueries( + senderEnv, createInsertStatements(DEVICE, TypeConversionSemanticCase.CASES, false), null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(DEVICE, TypeConversionSemanticCase.CASES), + createExpectedHeader(DEVICE, TypeConversionSemanticCase.CASES), + new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)), + 60); + } + + @Test + public void testAlignedStreamPipeReceiverTypeConversionSemantics() { + createAlignedTimeseries(senderEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES, true); + createAlignedTimeseries(receiverEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES, false); + createStreamPipe(ALIGNED_DEVICE); + + TestUtils.executeNonQueries( + senderEnv, createInsertStatements(ALIGNED_DEVICE, ALIGNED_STREAM_CASES, true), null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(ALIGNED_DEVICE, ALIGNED_STREAM_CASES), + createExpectedHeader(ALIGNED_DEVICE, ALIGNED_STREAM_CASES), + new HashSet<>(createExpectedRows(ALIGNED_STREAM_CASES)), + 60); + } + + private static void createTimeseries( + final BaseEnv env, + final String device, + final List conversionCases, + final boolean useSourceType) { + final List sqls = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + sqls.add( + String.format( + "create timeseries %s.%s with datatype=%s,encoding=PLAIN", + device, + conversionCase.measurement, + useSourceType ? conversionCase.sourceType : conversionCase.targetType)); + } + TestUtils.executeNonQueries(env, sqls, null); + } + + private static void createAlignedTimeseries( + final BaseEnv env, + final String device, + final List conversionCases, + final boolean useSourceType) { + final List measurements = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + measurements.add( + String.format( + "%s %s encoding=PLAIN", + conversionCase.measurement, + useSourceType ? conversionCase.sourceType : conversionCase.targetType)); + } + TestUtils.executeNonQuery( + env, + String.format("create aligned timeseries %s(%s)", device, String.join(",", measurements)), + null); + } + + private void createPipe() { + TestUtils.executeNonQuery( + senderEnv, + String.format( + "create pipe type_conversion_semantic" + + " with source ('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='forced-log')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('node-urls'='%s','batch.enable'='false','sink.format'='tablet')", + DEVICE, receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private void createStreamPipe(final String device) { + TestUtils.executeNonQuery( + senderEnv, + String.format( + "create pipe aligned_type_conversion_semantic" + + " with source ('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='stream')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('sink'='iotdb-thrift-sink','sink.node-urls'='%s')", + device, receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private static List createInsertStatements( + final String device, + final List conversionCases, + final boolean isAligned) { + final List sqls = new ArrayList<>(); + final String measurements = + String.join( + ",", + conversionCases.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + values.add(conversionCase.sourceSqlValues[row]); + } + sqls.add( + String.format( + "insert into %s(time,%s)%s values (%d,%s)", + device, + measurements, + isAligned ? " aligned" : "", + row + 1, + String.join(",", values))); + } + sqls.add("flush"); + return sqls; + } + + private static String createQuerySql( + final String device, final List conversionCases) { + return String.format( + "select %s from %s", + String.join( + ",", + conversionCases.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)), + device); + } + + private static String createExpectedHeader( + final String device, final List conversionCases) { + final List columns = new ArrayList<>(); + columns.add("Time"); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + columns.add(device + "." + conversionCase.measurement); + } + return String.join(",", columns) + ","; + } + + private static List createExpectedRows( + final List conversionCases) { + final List rows = new ArrayList<>(); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + values.add(Integer.toString(row + 1)); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + values.add(conversionCase.expectedValues[row]); + } + rows.add(String.join(",", values) + ","); + } + return rows; + } + + private static List getCases(final String... measurements) { + final List cases = new ArrayList<>(); + for (final String measurement : measurements) { + cases.add(getCase(measurement)); + } + return cases; + } + + private static TypeConversionSemanticCase getCase(final String measurement) { + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + if (conversionCase.measurement.equals(measurement)) { + return conversionCase; + } + } + throw new IllegalArgumentException("Unknown type conversion semantic case: " + measurement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 731c2eb50f0f0..e6c8ddefc997e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -590,7 +590,7 @@ static Map buildLoadTsFileAttributesForAsync( dataBaseName, LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName), shouldConvertDataTypeOnTypeMismatch, - validateTsFile, + validateTsFile || shouldConvertDataTypeOnTypeMismatch, null, shouldMarkAsPipeRequest); } @@ -598,16 +598,23 @@ static Map buildLoadTsFileAttributesForAsync( private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbsolutePath) throws FileNotFoundException { return executeStatementAndClassifyExceptions( - buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, validateTsFile.get())); + buildLoadTsFileStatementForSync( + dataBaseName, + fileAbsolutePath, + validateTsFile.get(), + shouldConvertDataTypeOnTypeMismatch)); } static LoadTsFileStatement buildLoadTsFileStatementForSync( - final String dataBaseName, final String fileAbsolutePath, final boolean validateTsFile) + final String dataBaseName, + final String fileAbsolutePath, + final boolean validateTsFile, + final boolean shouldConvertDataTypeOnTypeMismatch) throws FileNotFoundException { final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(fileAbsolutePath); statement.setDeleteAfterLoad(true); - statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(validateTsFile); + statement.setConvertOnTypeMismatch(shouldConvertDataTypeOnTypeMismatch); + statement.setVerifySchema(validateTsFile || shouldConvertDataTypeOnTypeMismatch); statement.setAutoCreateDatabase( IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()); statement.setDatabase(dataBaseName); @@ -956,14 +963,26 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch( } } + // Execute insert statements through the conversion wrapper first to avoid writing a partial + // row/tablet before the type mismatch is converted. + if (shouldConvertDataTypeOnTypeMismatch && statement instanceof InsertBaseStatement) { + final Optional convertedStatus = + executeInsertStatementWithDataTypeConversion( + statement, isTableModelStatement, databaseName); + if (convertedStatus.isPresent()) { + return convertedStatus.get(); + } + } + // Real execution of the statement final TSStatus status = isTableModelStatement ? executeStatementForTableModel(statement, databaseName) : executeStatementForTreeModel(statement); - // Try to convert data type if the statement is a tree model statement - // and the status code is not success + // Try to convert data type if the status code is not success. Insert statements normally return + // above after the first converted execution. The retry path is kept for load and fallback + // cases. if (!shouldConvertDataTypeOnTypeMismatch || !((statement instanceof InsertBaseStatement && ((InsertBaseStatement) statement).hasFailedMeasurements()) @@ -986,6 +1005,17 @@ && shouldUseTableModelVisitorForLoadStatement((LoadTsFileStatement) statement)) : statement.accept(treeStatementDataTypeConvertExecutionVisitor, status).orElse(status); } + private Optional executeInsertStatementWithDataTypeConversion( + final Statement statement, final boolean isTableModelStatement, final String databaseName) { + return isTableModelStatement + ? statement.accept( + tableStatementDataTypeConvertExecutionVisitor, + new Pair<>(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), databaseName)) + : statement.accept( + treeStatementDataTypeConvertExecutionVisitor, + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + } + private boolean shouldUseTableModelVisitorForLoadStatement( final LoadTsFileStatement loadTsFileStatement) { final List isTableModel = loadTsFileStatement.getIsTableModel(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 5171ccd93c3e8..e357b86e0c736 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -619,7 +619,8 @@ private TreeSchemaAutoCreatorAndVerifier getOrCreateTreeSchemaVerifier() { private LoadTsFileTableSchemaCache getOrCreateTableSchemaCache() { if (tableSchemaCache == null) { - tableSchemaCache = new LoadTsFileTableSchemaCache(metadata, context, isAutoCreateDatabase); + tableSchemaCache = + new LoadTsFileTableSchemaCache(metadata, context, isAutoCreateDatabase, isVerifySchema); } return tableSchemaCache; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index c3c0ea73310f2..92a56d29a5522 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -98,6 +99,7 @@ public class LoadTsFileTableSchemaCache { private Map tableSchemaMap; private final Metadata metadata; private final MPPQueryContext context; + private final boolean shouldVerifyDataType; private Map> currentBatchTable2Devices; @@ -116,13 +118,17 @@ public class LoadTsFileTableSchemaCache { private final AtomicBoolean needDecode4DifferentTimeColumn = new AtomicBoolean(false); public LoadTsFileTableSchemaCache( - final Metadata metadata, final MPPQueryContext context, final boolean needToCreateDatabase) + final Metadata metadata, + final MPPQueryContext context, + final boolean needToCreateDatabase, + final boolean shouldVerifyDataType) throws LoadRuntimeOutOfMemoryException { this.block = LoadTsFileMemoryManager.getInstance() .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); this.metadata = metadata; this.context = context; + this.shouldVerifyDataType = shouldVerifyDataType; this.currentBatchTable2Devices = new HashMap<>(); this.currentModifications = PatternTreeMapFactory.getModsPatternTreeMap(); this.needToCreateDatabase = needToCreateDatabase; @@ -389,14 +395,25 @@ private void verifyTableDataTypeAndGenerateTagColumnMapper( } } else if (fileColumn.getColumnCategory() == TsTableColumnCategory.FIELD) { ColumnSchema realColumn = fieldColumnNameToSchema.get(fileColumn.getName()); - if (LOGGER.isDebugEnabled() - && (realColumn == null || !fileColumn.getType().equals(realColumn.getType()))) { + if (realColumn != null && !fileColumn.getType().equals(realColumn.getType())) { + final String message = + String.format( + "Data type mismatch for column %s in table %s, type in TsFile: %s, type in IoTDB: %s", + fileColumn.getName(), + realSchema.getTableName(), + fileColumn.getType(), + realColumn.getType()); + if (shouldVerifyDataType) { + throw new LoadAnalyzeTypeMismatchException(message); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(message); + } + } else if (LOGGER.isDebugEnabled() && realColumn == null) { LOGGER.debug( - "Data type mismatch for column {} in table {}, type in TsFile: {}, type in IoTDB: {}", + "Column {} in table {} is not found in IoTDB while loading TsFile.", fileColumn.getName(), - realSchema.getTableName(), - fileColumn.getType(), - Objects.nonNull(realColumn) ? realColumn.getType() : null); + realSchema.getTableName()); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index f41c44763f997..1e279a18febfb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -39,7 +39,7 @@ public void testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() t try { final LoadTsFileStatement statement = IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( - "root.test.sg_0", tsFile.toString(), true); + "root.test.sg_0", tsFile.toString(), true, true); Assert.assertEquals("root.test.sg_0", statement.getDatabase()); Assert.assertEquals(2, statement.getDatabaseLevel()); @@ -54,16 +54,17 @@ public void testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() try { final Map attributes = IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync( - "root.test.sg_0", true, true, true); + "root.test.sg_0", true, false, true); Assert.assertEquals( "root.test.sg_0", attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY)); Assert.assertEquals("2", attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY)); final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(tsFile.toString()); - ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, true); + ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, false); Assert.assertEquals("root.test.sg_0", statement.getDatabase()); Assert.assertEquals(2, statement.getDatabaseLevel()); + Assert.assertTrue(statement.isVerifySchema()); } finally { Files.deleteIfExists(tsFile); } @@ -75,7 +76,8 @@ public void testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseName final Path tsFile = Files.createTempFile("pipe-load-default-database-level", ".tsfile"); try { final LoadTsFileStatement statement = - IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, tsFile.toString(), true); + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + null, tsFile.toString(), true, true); Assert.assertNull(statement.getDatabase()); Assert.assertEquals( @@ -92,7 +94,7 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception { try { final LoadTsFileStatement statement = IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( - "root.test.sg_0", tsFile.toString(), true); + "root.test.sg_0", tsFile.toString(), true, true); final long receiverId = System.nanoTime(); final Exception exception = new RuntimeException("repeated receiver exception " + receiverId); @@ -107,4 +109,35 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception { Files.deleteIfExists(tsFile); } } + + @Test + public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), false, true); + + Assert.assertTrue(statement.isConvertOnTypeMismatch()); + Assert.assertTrue(statement.isVerifySchema()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType() + throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-no-convert-no-verify-schema", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), false, false); + + Assert.assertFalse(statement.isConvertOnTypeMismatch()); + Assert.assertFalse(statement.isVerifySchema()); + } finally { + Files.deleteIfExists(tsFile); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index d6bdb1e37feb0..01406b0fd063d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -19,7 +19,10 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -33,6 +36,7 @@ import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator; +import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -45,6 +49,7 @@ import java.io.File; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collections; @@ -109,6 +114,41 @@ public void testAnalyzeSingleTableFileShouldNotCountTimestampInPointCount() thro Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount()); } + @Test + public void testTableSchemaCacheShouldThrowMismatchWhenVerifyingDataType() throws Exception { + final LoadTsFileTableSchemaCache schemaCache = createTableSchemaCache(true); + try { + final InvocationTargetException exception = + Assert.assertThrows( + InvocationTargetException.class, + () -> + getVerifyTableDataTypeMethod() + .invoke( + schemaCache, + createTableSchema(TSDataType.INT64), + createTableSchema(TSDataType.DOUBLE))); + + Assert.assertTrue(exception.getCause() instanceof LoadAnalyzeTypeMismatchException); + } finally { + schemaCache.close(); + } + } + + @Test + public void testTableSchemaCacheShouldNotThrowMismatchWhenSkippingDataTypeVerification() + throws Exception { + final LoadTsFileTableSchemaCache schemaCache = createTableSchemaCache(false); + try { + getVerifyTableDataTypeMethod() + .invoke( + schemaCache, + createTableSchema(TSDataType.INT64), + createTableSchema(TSDataType.DOUBLE)); + } finally { + schemaCache.close(); + } + } + private void writeTableTsFileWithMixedDevices(final File tsFile) throws Exception { if (tsFile.exists()) { Assert.assertTrue(tsFile.delete()); @@ -163,6 +203,33 @@ private void injectTableSchemaCache( tableSchemaCacheField.set(analyzer, schemaCache); } + private LoadTsFileTableSchemaCache createTableSchemaCache(final boolean shouldVerifyDataType) + throws LoadRuntimeOutOfMemoryException { + return new LoadTsFileTableSchemaCache( + null, new MPPQueryContext(new QueryId("load_test")), false, shouldVerifyDataType); + } + + private Method getVerifyTableDataTypeMethod() throws NoSuchMethodException { + final Method method = + LoadTsFileTableSchemaCache.class.getDeclaredMethod( + "verifyTableDataTypeAndGenerateTagColumnMapper", + org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class, + org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class); + method.setAccessible(true); + return method; + } + + private org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema + createTableSchema(final TSDataType fieldType) { + return new org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema( + "table1", + Arrays.asList( + new ColumnSchema( + "tag1", TypeFactory.getType(TSDataType.STRING), false, TsTableColumnCategory.TAG), + new ColumnSchema( + "s1", TypeFactory.getType(fieldType), false, TsTableColumnCategory.FIELD))); + } + private boolean containsDevice(final Set devices, final String... expectedSegments) { return devices.stream() .anyMatch(device -> Arrays.equals(device.getSegments(), expectedSegments)); @@ -173,7 +240,7 @@ private static class TrackingLoadTsFileTableSchemaCache extends LoadTsFileTableS private final Set> verifiedDevices = new HashSet<>(); private TrackingLoadTsFileTableSchemaCache() throws LoadRuntimeOutOfMemoryException { - super(null, new MPPQueryContext(new QueryId("load_test")), false); + super(null, new MPPQueryContext(new QueryId("load_test")), false, true); } @Override