From 24533ce27b7fa163be637ed4737c15d776033378 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:44:14 +0800 Subject: [PATCH 1/4] Add pipe type conversion semantic ITs --- .../it/dual/TypeConversionSemanticCase.java | 315 ++++++++++++++++++ .../IoTDBPipeTypeConversionSemanticIT.java | 176 ++++++++++ .../IoTDBPipeTypeConversionSemanticIT.java | 166 +++++++++ 3 files changed, 657 insertions(+) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java new file mode 100644 index 000000000000..eebef16f1d7e --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual; + +import org.apache.iotdb.rpc.RpcUtils; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BytesUtils; + +import java.nio.charset.StandardCharsets; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; + +public class TypeConversionSemanticCase { + + public static final int ROW_COUNT = 3; + + public static final List CASES = + Arrays.asList( + c( + "bool_to_int32", + TSDataType.BOOLEAN, + TSDataType.INT32, + values("true", "false", "true"), + values("1", "0", "1")), + c( + "bool_to_int64", + TSDataType.BOOLEAN, + TSDataType.INT64, + values("true", "false", "true"), + values("1", "0", "1")), + c( + "bool_to_float", + TSDataType.BOOLEAN, + TSDataType.FLOAT, + values("true", "false", "true"), + values("1.0", "0.0", "1.0")), + c( + "bool_to_double", + TSDataType.BOOLEAN, + TSDataType.DOUBLE, + values("true", "false", "true"), + values("1.0", "0.0", "1.0")), + c( + "bool_to_text", + TSDataType.BOOLEAN, + TSDataType.TEXT, + values("true", "false", "true"), + values("true", "false", "true")), + c( + "bool_to_blob", + TSDataType.BOOLEAN, + TSDataType.BLOB, + values("true", "false", "true"), + values(blobValue("true"), blobValue("false"), blobValue("true"))), + c( + "bool_to_string", + TSDataType.BOOLEAN, + TSDataType.STRING, + values("true", "false", "true"), + values("true", "false", "true")), + c( + "bool_to_date", + TSDataType.BOOLEAN, + TSDataType.DATE, + values("true", "false", "true"), + values("1970-01-02", "1970-01-01", "1970-01-02")), + c( + "bool_to_timestamp", + TSDataType.BOOLEAN, + TSDataType.TIMESTAMP, + values("true", "false", "true"), + values(timestampValue(1), timestampValue(0), timestampValue(1))), + c( + "int32_to_boolean", + TSDataType.INT32, + TSDataType.BOOLEAN, + values("0", "2", "-1"), + values("false", "true", "true")), + c( + "int32_to_timestamp", + TSDataType.INT32, + TSDataType.TIMESTAMP, + values("0", "1", "86400000"), + values(timestampValue(0), timestampValue(1), timestampValue(86400000))), + c( + "int32_to_date", + TSDataType.INT32, + TSDataType.DATE, + values("19700102", "20240229", "42"), + values("1970-01-02", "2024-02-29", "1970-01-01")), + c( + "int64_to_int32", + TSDataType.INT64, + TSDataType.INT32, + values("2147483648", "-2147483649", "42"), + values("-2147483648", "2147483647", "42")), + c( + "int64_to_boolean", + TSDataType.INT64, + TSDataType.BOOLEAN, + values("0", "2", "-1"), + values("false", "true", "true")), + c( + "int64_to_date", + TSDataType.INT64, + TSDataType.DATE, + values("19700102", "2147483648", "19700103"), + values("1970-01-02", "1970-01-01", "1970-01-03")), + c( + "float_to_int32", + TSDataType.FLOAT, + TSDataType.INT32, + values("2.9", "-2.9", "0.0"), + values("2", "-2", "0")), + c( + "float_to_boolean", + TSDataType.FLOAT, + TSDataType.BOOLEAN, + values("0.0", "0.1", "-0.1"), + values("false", "true", "true")), + c( + "float_to_date", + TSDataType.FLOAT, + TSDataType.DATE, + values("19700102.9", "19700103.1", "42.9"), + values("1970-01-02", "1970-01-03", "1970-01-01")), + c( + "double_to_int64", + TSDataType.DOUBLE, + TSDataType.INT64, + values("3.9", "-3.9", "0.0"), + values("3", "-3", "0")), + c( + "double_to_boolean", + TSDataType.DOUBLE, + TSDataType.BOOLEAN, + values("0.0", "0.1", "-0.1"), + values("false", "true", "true")), + c( + "double_to_timestamp", + TSDataType.DOUBLE, + TSDataType.TIMESTAMP, + values("1.9", "86400000.9", "0.0"), + values(timestampValue(1), timestampValue(86400000), timestampValue(0))), + c( + "text_to_int32", + TSDataType.TEXT, + TSDataType.INT32, + values("'123.9'", "'bad'", "'-123.9'"), + values("123", "0", "-123")), + c( + "string_to_int64", + TSDataType.STRING, + TSDataType.INT64, + values("'456.9'", "'bad'", "'-456.9'"), + values("456", "0", "-456")), + c( + "blob_to_float", + TSDataType.BLOB, + TSDataType.FLOAT, + values(blobSql("7.5"), blobSql("bad"), blobSql("-7.5")), + values("7.5", "0.0", "-7.5")), + c( + "text_to_double", + TSDataType.TEXT, + TSDataType.DOUBLE, + values("'8.25'", "'bad'", "'-8.25'"), + values("8.25", "0.0", "-8.25")), + c( + "text_to_boolean", + TSDataType.TEXT, + TSDataType.BOOLEAN, + values("'true'", "'1'", "'TrUe'"), + values("true", "false", "true")), + c( + "string_to_boolean", + TSDataType.STRING, + TSDataType.BOOLEAN, + values("'TRUE'", "'false'", "'yes'"), + values("true", "false", "false")), + c( + "blob_to_boolean", + TSDataType.BLOB, + TSDataType.BOOLEAN, + values(blobSql("true"), blobSql("FALSE"), blobSql("0")), + values("true", "false", "false")), + c( + "text_to_timestamp", + TSDataType.TEXT, + TSDataType.TIMESTAMP, + values("'86400000'", "'1970-01-02T00:00:00.000'", "'bad'"), + values(timestampValue(86400000), timestampValue(86400000), timestampValue(0))), + c( + "string_to_timestamp", + TSDataType.STRING, + TSDataType.TIMESTAMP, + values("'1970-01-03T00:00:00.000'", "'bad'", "'86400000'"), + values(timestampValue(172800000), timestampValue(0), timestampValue(86400000))), + c( + "blob_to_timestamp", + TSDataType.BLOB, + TSDataType.TIMESTAMP, + values(blobSql("bad"), blobSql("1"), blobSql("1970-01-02T00:00:00.000")), + values(timestampValue(0), timestampValue(1), timestampValue(86400000))), + c( + "text_to_date", + TSDataType.TEXT, + TSDataType.DATE, + values("'19700102'", "'1970-01-04'", "'bad'"), + values("1970-01-02", "1970-01-04", "1970-01-01")), + c( + "string_to_date", + TSDataType.STRING, + TSDataType.DATE, + values("'1970-01-03'", "'19700105'", "'1970-01-07'"), + values("1970-01-03", "1970-01-05", "1970-01-07")), + c( + "blob_to_date", + TSDataType.BLOB, + TSDataType.DATE, + values(blobSql("bad"), blobSql("1970-01-06"), blobSql("19700108")), + values("1970-01-01", "1970-01-06", "1970-01-08")), + c( + "timestamp_to_date", + TSDataType.TIMESTAMP, + TSDataType.DATE, + values("0", "86399999", "86400000"), + values("1970-01-01", "1970-01-01", "1970-01-02")), + c( + "date_to_timestamp", + TSDataType.DATE, + TSDataType.TIMESTAMP, + values("'1970-01-01'", "'1970-01-02'", "'1970-01-03'"), + values(timestampValue(0), timestampValue(86400000), timestampValue(172800000))), + c( + "timestamp_to_boolean", + TSDataType.TIMESTAMP, + TSDataType.BOOLEAN, + values("0", "-1", "1"), + values("false", "true", "true")), + c( + "date_to_boolean", + TSDataType.DATE, + TSDataType.BOOLEAN, + values("'1970-01-01'", "'1970-01-02'", "'1969-12-31'"), + values("false", "true", "true"))); + + public final String measurement; + public final TSDataType sourceType; + public final TSDataType targetType; + public final String[] sourceSqlValues; + public final String[] expectedValues; + + private TypeConversionSemanticCase( + final String measurement, + final TSDataType sourceType, + final TSDataType targetType, + final String[] sourceSqlValues, + final String[] expectedValues) { + this.measurement = measurement; + this.sourceType = sourceType; + this.targetType = targetType; + this.sourceSqlValues = sourceSqlValues; + this.expectedValues = expectedValues; + } + + private static TypeConversionSemanticCase c( + final String measurement, + final TSDataType sourceType, + final TSDataType targetType, + final String[] sourceSqlValues, + final String[] expectedValues) { + return new TypeConversionSemanticCase( + measurement, sourceType, targetType, sourceSqlValues, expectedValues); + } + + private static String[] values(final String... values) { + return values; + } + + public static String timestampValue(final long timestamp) { + return RpcUtils.formatDatetime("default", "ms", timestamp, ZoneOffset.UTC); + } + + private static String blobSql(final String value) { + final StringBuilder builder = new StringBuilder("X'"); + for (final byte b : value.getBytes(StandardCharsets.UTF_8)) { + builder.append(String.format("%02x", b & 0xFF)); + } + return builder.append("'").toString(); + } + + private static String blobValue(final String value) { + return BytesUtils.parseBlobByteArrayToString(value.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java new file mode 100644 index 000000000000..9eb10de913ac --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase; +import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTableManualEnhanced.class}) +public class IoTDBPipeTypeConversionSemanticIT extends AbstractPipeTableModelDualManualIT { + + private static final String DATABASE = "pipe_type_conversion"; + private static final String TABLE = "semantic_conversion"; + + @Override + @Before + public void setUp() { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + setupConfig(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); + } + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + receiverEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + } + + @Test + public void testPipeReceiverTypeConversionSemantics() { + createDatabaseAndTable(senderEnv, true); + createDatabaseAndTable(receiverEnv, false); + createPipe(); + + TestUtils.executeNonQueries( + DATABASE, BaseEnv.TABLE_SQL_DIALECT, senderEnv, createInsertStatements(), null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(), + createExpectedHeader(), + new HashSet<>(createExpectedRows()), + 60, + DATABASE, + null); + } + + private static void createDatabaseAndTable(final BaseEnv env, final boolean useSourceType) { + final List sqls = new ArrayList<>(); + sqls.add("create database if not exists " + DATABASE); + sqls.add("use " + DATABASE); + final List columns = new ArrayList<>(); + columns.add("tag_id string tag"); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + columns.add( + String.format( + "%s %s field", + conversionCase.measurement, + useSourceType ? conversionCase.sourceType : conversionCase.targetType)); + } + sqls.add(String.format("create table %s (%s)", TABLE, String.join(",", columns))); + TestUtils.executeNonQueries(null, BaseEnv.TABLE_SQL_DIALECT, env, sqls, null); + } + + private void createPipe() { + TestUtils.executeNonQuery( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + String.format( + "create pipe type_conversion_semantic" + + " with source ('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='forced-log')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('node-urls'='%s','batch.enable'='false','sink.format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private static List createInsertStatements() { + final List sqls = new ArrayList<>(); + final String measurements = + String.join( + ",", + TypeConversionSemanticCase.CASES.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + values.add(conversionCase.sourceSqlValues[row]); + } + sqls.add( + String.format( + "insert into %s(time,tag_id,%s) values (%d,'d',%s)", + TABLE, measurements, row + 1, String.join(",", values))); + } + sqls.add("flush"); + return sqls; + } + + private static String createQuerySql() { + return String.format( + "select %s,time from %s where tag_id='d'", + String.join( + ",", + TypeConversionSemanticCase.CASES.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)), + TABLE); + } + + private static String createExpectedHeader() { + final List columns = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + columns.add(conversionCase.measurement); + } + columns.add("time"); + return String.join(",", columns) + ","; + } + + private static List createExpectedRows() { + final List rows = new ArrayList<>(); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + values.add(conversionCase.expectedValues[row]); + } + values.add(TypeConversionSemanticCase.timestampValue(row + 1)); + rows.add(String.join(",", values) + ","); + } + return rows; + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java new file mode 100644 index 000000000000..ce493023f40c --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase; +import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeAutoEnhanced.class}) +public class IoTDBPipeTypeConversionSemanticIT extends AbstractPipeDualTreeModelAutoIT { + + private static final String DEVICE = "root.pipe_type_conversion.d"; + + @Override + @Before + public void setUp() { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + setupConfig(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); + } + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + receiverEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + } + + @Test + public void testPipeReceiverTypeConversionSemantics() { + createTimeseries(senderEnv, true); + createTimeseries(receiverEnv, false); + createPipe(); + + TestUtils.executeNonQueries(senderEnv, createInsertStatements(), null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(), + createExpectedHeader(), + new HashSet<>(createExpectedRows()), + 60); + } + + private static void createTimeseries(final BaseEnv env, final boolean useSourceType) { + final List sqls = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + sqls.add( + String.format( + "create timeseries %s.%s with datatype=%s,encoding=PLAIN", + DEVICE, + conversionCase.measurement, + useSourceType ? conversionCase.sourceType : conversionCase.targetType)); + } + TestUtils.executeNonQueries(env, sqls, null); + } + + private void createPipe() { + TestUtils.executeNonQuery( + senderEnv, + String.format( + "create pipe type_conversion_semantic" + + " with source ('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='forced-log')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('node-urls'='%s','batch.enable'='false','sink.format'='tablet')", + DEVICE, receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private static List createInsertStatements() { + final List sqls = new ArrayList<>(); + final String measurements = + String.join( + ",", + TypeConversionSemanticCase.CASES.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + values.add(conversionCase.sourceSqlValues[row]); + } + sqls.add( + String.format( + "insert into %s(time,%s) values (%d,%s)", + DEVICE, measurements, row + 1, String.join(",", values))); + } + sqls.add("flush"); + return sqls; + } + + private static String createQuerySql() { + return String.format( + "select %s from %s", + String.join( + ",", + TypeConversionSemanticCase.CASES.stream() + .map(conversionCase -> conversionCase.measurement) + .toArray(String[]::new)), + DEVICE); + } + + private static String createExpectedHeader() { + final List columns = new ArrayList<>(); + columns.add("Time"); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + columns.add(DEVICE + "." + conversionCase.measurement); + } + return String.join(",", columns) + ","; + } + + private static List createExpectedRows() { + final List rows = new ArrayList<>(); + for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { + final List values = new ArrayList<>(); + values.add(Integer.toString(row + 1)); + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + values.add(conversionCase.expectedValues[row]); + } + rows.add(String.join(",", values) + ","); + } + return rows; + } +} From 17190aa6aeae4d55838af633e67bc06a596043e0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 5 Jun 2026 14:52:23 +0800 Subject: [PATCH 2/4] Fix float-to-date semantic IT expectation --- .../apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java index eebef16f1d7e..3ece5cf865e9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java @@ -141,8 +141,8 @@ public class TypeConversionSemanticCase { "float_to_date", TSDataType.FLOAT, TSDataType.DATE, - values("19700102.9", "19700103.1", "42.9"), - values("1970-01-02", "1970-01-03", "1970-01-01")), + values("19700102.0", "19700104.0", "42.9"), + values("1970-01-02", "1970-01-04", "1970-01-01")), c( "double_to_int64", TSDataType.DOUBLE, From bdeb94a298a88d5a687b52d6174551e815d4d7ec Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 5 Jun 2026 16:25:24 +0800 Subject: [PATCH 3/4] Add aligned stream pipe type conversion IT --- .../IoTDBPipeTypeConversionSemanticIT.java | 137 +++++++++++++++--- 1 file changed, 115 insertions(+), 22 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java index ce493023f40c..b22180033798 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java @@ -41,6 +41,19 @@ public class IoTDBPipeTypeConversionSemanticIT extends AbstractPipeDualTreeModelAutoIT { private static final String DEVICE = "root.pipe_type_conversion.d"; + private static final String ALIGNED_DEVICE = "root.pipe_type_conversion.aligned_d"; + private static final List ALIGNED_STREAM_CASES = + getCases( + "bool_to_int32", + "bool_to_int64", + "bool_to_float", + "bool_to_double", + "bool_to_blob", + "bool_to_date", + "bool_to_timestamp", + "int32_to_boolean", + "int32_to_timestamp", + "int32_to_date"); @Override @Before @@ -70,33 +83,74 @@ protected void setupConfig() { @Test public void testPipeReceiverTypeConversionSemantics() { - createTimeseries(senderEnv, true); - createTimeseries(receiverEnv, false); + createTimeseries(senderEnv, DEVICE, TypeConversionSemanticCase.CASES, true); + createTimeseries(receiverEnv, DEVICE, TypeConversionSemanticCase.CASES, false); createPipe(); - TestUtils.executeNonQueries(senderEnv, createInsertStatements(), null); + TestUtils.executeNonQueries( + senderEnv, createInsertStatements(DEVICE, TypeConversionSemanticCase.CASES, false), null); TestUtils.assertDataEventuallyOnEnv( receiverEnv, - createQuerySql(), - createExpectedHeader(), - new HashSet<>(createExpectedRows()), + createQuerySql(DEVICE, TypeConversionSemanticCase.CASES), + createExpectedHeader(DEVICE, TypeConversionSemanticCase.CASES), + new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)), 60); } - private static void createTimeseries(final BaseEnv env, final boolean useSourceType) { + @Test + public void testAlignedStreamPipeReceiverTypeConversionSemantics() { + createAlignedTimeseries(senderEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES, true); + createAlignedTimeseries(receiverEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES, false); + createStreamPipe(ALIGNED_DEVICE); + + TestUtils.executeNonQueries( + senderEnv, createInsertStatements(ALIGNED_DEVICE, ALIGNED_STREAM_CASES, true), null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(ALIGNED_DEVICE, ALIGNED_STREAM_CASES), + createExpectedHeader(ALIGNED_DEVICE, ALIGNED_STREAM_CASES), + new HashSet<>(createExpectedRows(ALIGNED_STREAM_CASES)), + 60); + } + + private static void createTimeseries( + final BaseEnv env, + final String device, + final List conversionCases, + final boolean useSourceType) { final List sqls = new ArrayList<>(); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { sqls.add( String.format( "create timeseries %s.%s with datatype=%s,encoding=PLAIN", - DEVICE, + device, conversionCase.measurement, useSourceType ? conversionCase.sourceType : conversionCase.targetType)); } TestUtils.executeNonQueries(env, sqls, null); } + private static void createAlignedTimeseries( + final BaseEnv env, + final String device, + final List conversionCases, + final boolean useSourceType) { + final List measurements = new ArrayList<>(); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + measurements.add( + String.format( + "%s %s encoding=PLAIN", + conversionCase.measurement, + useSourceType ? conversionCase.sourceType : conversionCase.targetType)); + } + TestUtils.executeNonQuery( + env, + String.format("create aligned timeseries %s(%s)", device, String.join(",", measurements)), + null); + } + private void createPipe() { TestUtils.executeNonQuery( senderEnv, @@ -109,58 +163,97 @@ private void createPipe() { null); } - private static List createInsertStatements() { + private void createStreamPipe(final String device) { + TestUtils.executeNonQuery( + senderEnv, + String.format( + "create pipe aligned_type_conversion_semantic" + + " with source ('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='stream')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('sink'='iotdb-thrift-sink','sink.node-urls'='%s')", + device, receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private static List createInsertStatements( + final String device, + final List conversionCases, + final boolean isAligned) { final List sqls = new ArrayList<>(); final String measurements = String.join( ",", - TypeConversionSemanticCase.CASES.stream() + conversionCases.stream() .map(conversionCase -> conversionCase.measurement) .toArray(String[]::new)); for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { final List values = new ArrayList<>(); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { values.add(conversionCase.sourceSqlValues[row]); } sqls.add( String.format( - "insert into %s(time,%s) values (%d,%s)", - DEVICE, measurements, row + 1, String.join(",", values))); + "insert into %s(time,%s)%s values (%d,%s)", + device, + measurements, + isAligned ? " aligned" : "", + row + 1, + String.join(",", values))); } sqls.add("flush"); return sqls; } - private static String createQuerySql() { + private static String createQuerySql( + final String device, final List conversionCases) { return String.format( "select %s from %s", String.join( ",", - TypeConversionSemanticCase.CASES.stream() + conversionCases.stream() .map(conversionCase -> conversionCase.measurement) .toArray(String[]::new)), - DEVICE); + device); } - private static String createExpectedHeader() { + private static String createExpectedHeader( + final String device, final List conversionCases) { final List columns = new ArrayList<>(); columns.add("Time"); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { - columns.add(DEVICE + "." + conversionCase.measurement); + for (final TypeConversionSemanticCase conversionCase : conversionCases) { + columns.add(device + "." + conversionCase.measurement); } return String.join(",", columns) + ","; } - private static List createExpectedRows() { + private static List createExpectedRows( + final List conversionCases) { final List rows = new ArrayList<>(); for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { final List values = new ArrayList<>(); values.add(Integer.toString(row + 1)); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { values.add(conversionCase.expectedValues[row]); } rows.add(String.join(",", values) + ","); } return rows; } + + private static List getCases(final String... measurements) { + final List cases = new ArrayList<>(); + for (final String measurement : measurements) { + cases.add(getCase(measurement)); + } + return cases; + } + + private static TypeConversionSemanticCase getCase(final String measurement) { + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + if (conversionCase.measurement.equals(measurement)) { + return conversionCase; + } + } + throw new IllegalArgumentException("Unknown type conversion semantic case: " + measurement); + } } From 59530d131ce8325a29af75c490601bb0e3db109a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 5 Jun 2026 17:36:41 +0800 Subject: [PATCH 4/4] Fix pipe table type conversion load path --- .../IoTDBPipeTypeConversionSemanticIT.java | 119 +++++++++++++++--- .../thrift/IoTDBDataNodeReceiver.java | 44 +++++-- .../plan/analyze/load/LoadTsFileAnalyzer.java | 3 +- .../load/LoadTsFileTableSchemaCache.java | 31 +++-- .../thrift/IoTDBDataNodeReceiverTest.java | 43 ++++++- .../analyze/load/LoadTsFileAnalyzerTest.java | 69 +++++++++- 6 files changed, 268 insertions(+), 41 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java index 9eb10de913ac..6704d37b87e2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java @@ -42,6 +42,19 @@ public class IoTDBPipeTypeConversionSemanticIT extends AbstractPipeTableModelDua private static final String DATABASE = "pipe_type_conversion"; private static final String TABLE = "semantic_conversion"; + private static final String STREAM_TABLE = "semantic_stream_conversion"; + private static final List STREAM_CASES = + getCases( + "bool_to_int32", + "bool_to_int64", + "bool_to_float", + "bool_to_double", + "bool_to_blob", + "bool_to_date", + "bool_to_timestamp", + "int32_to_boolean", + "int32_to_timestamp", + "int32_to_date"); @Override @Before @@ -71,37 +84,68 @@ protected void setupConfig() { @Test public void testPipeReceiverTypeConversionSemantics() { - createDatabaseAndTable(senderEnv, true); - createDatabaseAndTable(receiverEnv, false); + createDatabaseAndTable(senderEnv, TABLE, TypeConversionSemanticCase.CASES, true); + createDatabaseAndTable(receiverEnv, TABLE, TypeConversionSemanticCase.CASES, false); createPipe(); TestUtils.executeNonQueries( - DATABASE, BaseEnv.TABLE_SQL_DIALECT, senderEnv, createInsertStatements(), null); + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + createInsertStatements(TABLE, TypeConversionSemanticCase.CASES), + null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + createQuerySql(TABLE, TypeConversionSemanticCase.CASES), + createExpectedHeader(TypeConversionSemanticCase.CASES), + new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)), + 60, + DATABASE, + null); + } + + @Test + public void testStreamPipeReceiverTypeConversionSemantics() { + createDatabaseAndTable(senderEnv, STREAM_TABLE, STREAM_CASES, true); + createDatabaseAndTable(receiverEnv, STREAM_TABLE, STREAM_CASES, false); + createStreamPipe(); + + TestUtils.executeNonQueries( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + createInsertStatements(STREAM_TABLE, STREAM_CASES), + null); TestUtils.assertDataEventuallyOnEnv( receiverEnv, - createQuerySql(), - createExpectedHeader(), - new HashSet<>(createExpectedRows()), + createQuerySql(STREAM_TABLE, STREAM_CASES), + createExpectedHeader(STREAM_CASES), + new HashSet<>(createExpectedRows(STREAM_CASES)), 60, DATABASE, null); } - private static void createDatabaseAndTable(final BaseEnv env, final boolean useSourceType) { + private static void createDatabaseAndTable( + final BaseEnv env, + final String table, + final List conversionCases, + final boolean useSourceType) { final List sqls = new ArrayList<>(); sqls.add("create database if not exists " + DATABASE); sqls.add("use " + DATABASE); final List columns = new ArrayList<>(); columns.add("tag_id string tag"); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { columns.add( String.format( "%s %s field", conversionCase.measurement, useSourceType ? conversionCase.sourceType : conversionCase.targetType)); } - sqls.add(String.format("create table %s (%s)", TABLE, String.join(",", columns))); + sqls.add(String.format("create table %s (%s)", table, String.join(",", columns))); TestUtils.executeNonQueries(null, BaseEnv.TABLE_SQL_DIALECT, env, sqls, null); } @@ -119,53 +163,71 @@ private void createPipe() { null); } - private static List createInsertStatements() { + private void createStreamPipe() { + TestUtils.executeNonQuery( + DATABASE, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + String.format( + "create pipe stream_type_conversion_semantic" + + " with source ('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='stream')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('sink'='iotdb-thrift-sink','sink.node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()), + null); + } + + private static List createInsertStatements( + final String table, final List conversionCases) { final List sqls = new ArrayList<>(); final String measurements = String.join( ",", - TypeConversionSemanticCase.CASES.stream() + conversionCases.stream() .map(conversionCase -> conversionCase.measurement) .toArray(String[]::new)); for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { final List values = new ArrayList<>(); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { values.add(conversionCase.sourceSqlValues[row]); } sqls.add( String.format( "insert into %s(time,tag_id,%s) values (%d,'d',%s)", - TABLE, measurements, row + 1, String.join(",", values))); + table, measurements, row + 1, String.join(",", values))); } sqls.add("flush"); return sqls; } - private static String createQuerySql() { + private static String createQuerySql( + final String table, final List conversionCases) { return String.format( "select %s,time from %s where tag_id='d'", String.join( ",", - TypeConversionSemanticCase.CASES.stream() + conversionCases.stream() .map(conversionCase -> conversionCase.measurement) .toArray(String[]::new)), - TABLE); + table); } - private static String createExpectedHeader() { + private static String createExpectedHeader( + final List conversionCases) { final List columns = new ArrayList<>(); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { columns.add(conversionCase.measurement); } columns.add("time"); return String.join(",", columns) + ","; } - private static List createExpectedRows() { + private static List createExpectedRows( + final List conversionCases) { final List rows = new ArrayList<>(); for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) { final List values = new ArrayList<>(); - for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + for (final TypeConversionSemanticCase conversionCase : conversionCases) { values.add(conversionCase.expectedValues[row]); } values.add(TypeConversionSemanticCase.timestampValue(row + 1)); @@ -173,4 +235,21 @@ private static List createExpectedRows() { } return rows; } + + private static List getCases(final String... measurements) { + final List cases = new ArrayList<>(); + for (final String measurement : measurements) { + cases.add(getCase(measurement)); + } + return cases; + } + + private static TypeConversionSemanticCase getCase(final String measurement) { + for (final TypeConversionSemanticCase conversionCase : TypeConversionSemanticCase.CASES) { + if (conversionCase.measurement.equals(measurement)) { + return conversionCase; + } + } + throw new IllegalArgumentException("Unknown type conversion semantic case: " + measurement); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 731c2eb50f0f..e6c8ddefc997 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -590,7 +590,7 @@ static Map buildLoadTsFileAttributesForAsync( dataBaseName, LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName), shouldConvertDataTypeOnTypeMismatch, - validateTsFile, + validateTsFile || shouldConvertDataTypeOnTypeMismatch, null, shouldMarkAsPipeRequest); } @@ -598,16 +598,23 @@ static Map buildLoadTsFileAttributesForAsync( private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbsolutePath) throws FileNotFoundException { return executeStatementAndClassifyExceptions( - buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, validateTsFile.get())); + buildLoadTsFileStatementForSync( + dataBaseName, + fileAbsolutePath, + validateTsFile.get(), + shouldConvertDataTypeOnTypeMismatch)); } static LoadTsFileStatement buildLoadTsFileStatementForSync( - final String dataBaseName, final String fileAbsolutePath, final boolean validateTsFile) + final String dataBaseName, + final String fileAbsolutePath, + final boolean validateTsFile, + final boolean shouldConvertDataTypeOnTypeMismatch) throws FileNotFoundException { final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(fileAbsolutePath); statement.setDeleteAfterLoad(true); - statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(validateTsFile); + statement.setConvertOnTypeMismatch(shouldConvertDataTypeOnTypeMismatch); + statement.setVerifySchema(validateTsFile || shouldConvertDataTypeOnTypeMismatch); statement.setAutoCreateDatabase( IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()); statement.setDatabase(dataBaseName); @@ -956,14 +963,26 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch( } } + // Execute insert statements through the conversion wrapper first to avoid writing a partial + // row/tablet before the type mismatch is converted. + if (shouldConvertDataTypeOnTypeMismatch && statement instanceof InsertBaseStatement) { + final Optional convertedStatus = + executeInsertStatementWithDataTypeConversion( + statement, isTableModelStatement, databaseName); + if (convertedStatus.isPresent()) { + return convertedStatus.get(); + } + } + // Real execution of the statement final TSStatus status = isTableModelStatement ? executeStatementForTableModel(statement, databaseName) : executeStatementForTreeModel(statement); - // Try to convert data type if the statement is a tree model statement - // and the status code is not success + // Try to convert data type if the status code is not success. Insert statements normally return + // above after the first converted execution. The retry path is kept for load and fallback + // cases. if (!shouldConvertDataTypeOnTypeMismatch || !((statement instanceof InsertBaseStatement && ((InsertBaseStatement) statement).hasFailedMeasurements()) @@ -986,6 +1005,17 @@ && shouldUseTableModelVisitorForLoadStatement((LoadTsFileStatement) statement)) : statement.accept(treeStatementDataTypeConvertExecutionVisitor, status).orElse(status); } + private Optional executeInsertStatementWithDataTypeConversion( + final Statement statement, final boolean isTableModelStatement, final String databaseName) { + return isTableModelStatement + ? statement.accept( + tableStatementDataTypeConvertExecutionVisitor, + new Pair<>(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), databaseName)) + : statement.accept( + treeStatementDataTypeConvertExecutionVisitor, + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + } + private boolean shouldUseTableModelVisitorForLoadStatement( final LoadTsFileStatement loadTsFileStatement) { final List isTableModel = loadTsFileStatement.getIsTableModel(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 5171ccd93c3e..e357b86e0c73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -619,7 +619,8 @@ private TreeSchemaAutoCreatorAndVerifier getOrCreateTreeSchemaVerifier() { private LoadTsFileTableSchemaCache getOrCreateTableSchemaCache() { if (tableSchemaCache == null) { - tableSchemaCache = new LoadTsFileTableSchemaCache(metadata, context, isAutoCreateDatabase); + tableSchemaCache = + new LoadTsFileTableSchemaCache(metadata, context, isAutoCreateDatabase, isVerifySchema); } return tableSchemaCache; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index c3c0ea73310f..92a56d29a552 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -98,6 +99,7 @@ public class LoadTsFileTableSchemaCache { private Map tableSchemaMap; private final Metadata metadata; private final MPPQueryContext context; + private final boolean shouldVerifyDataType; private Map> currentBatchTable2Devices; @@ -116,13 +118,17 @@ public class LoadTsFileTableSchemaCache { private final AtomicBoolean needDecode4DifferentTimeColumn = new AtomicBoolean(false); public LoadTsFileTableSchemaCache( - final Metadata metadata, final MPPQueryContext context, final boolean needToCreateDatabase) + final Metadata metadata, + final MPPQueryContext context, + final boolean needToCreateDatabase, + final boolean shouldVerifyDataType) throws LoadRuntimeOutOfMemoryException { this.block = LoadTsFileMemoryManager.getInstance() .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); this.metadata = metadata; this.context = context; + this.shouldVerifyDataType = shouldVerifyDataType; this.currentBatchTable2Devices = new HashMap<>(); this.currentModifications = PatternTreeMapFactory.getModsPatternTreeMap(); this.needToCreateDatabase = needToCreateDatabase; @@ -389,14 +395,25 @@ private void verifyTableDataTypeAndGenerateTagColumnMapper( } } else if (fileColumn.getColumnCategory() == TsTableColumnCategory.FIELD) { ColumnSchema realColumn = fieldColumnNameToSchema.get(fileColumn.getName()); - if (LOGGER.isDebugEnabled() - && (realColumn == null || !fileColumn.getType().equals(realColumn.getType()))) { + if (realColumn != null && !fileColumn.getType().equals(realColumn.getType())) { + final String message = + String.format( + "Data type mismatch for column %s in table %s, type in TsFile: %s, type in IoTDB: %s", + fileColumn.getName(), + realSchema.getTableName(), + fileColumn.getType(), + realColumn.getType()); + if (shouldVerifyDataType) { + throw new LoadAnalyzeTypeMismatchException(message); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(message); + } + } else if (LOGGER.isDebugEnabled() && realColumn == null) { LOGGER.debug( - "Data type mismatch for column {} in table {}, type in TsFile: {}, type in IoTDB: {}", + "Column {} in table {} is not found in IoTDB while loading TsFile.", fileColumn.getName(), - realSchema.getTableName(), - fileColumn.getType(), - Objects.nonNull(realColumn) ? realColumn.getType() : null); + realSchema.getTableName()); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index f41c44763f99..1e279a18febf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -39,7 +39,7 @@ public void testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() t try { final LoadTsFileStatement statement = IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( - "root.test.sg_0", tsFile.toString(), true); + "root.test.sg_0", tsFile.toString(), true, true); Assert.assertEquals("root.test.sg_0", statement.getDatabase()); Assert.assertEquals(2, statement.getDatabaseLevel()); @@ -54,16 +54,17 @@ public void testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() try { final Map attributes = IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync( - "root.test.sg_0", true, true, true); + "root.test.sg_0", true, false, true); Assert.assertEquals( "root.test.sg_0", attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY)); Assert.assertEquals("2", attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY)); final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(tsFile.toString()); - ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, true); + ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, false); Assert.assertEquals("root.test.sg_0", statement.getDatabase()); Assert.assertEquals(2, statement.getDatabaseLevel()); + Assert.assertTrue(statement.isVerifySchema()); } finally { Files.deleteIfExists(tsFile); } @@ -75,7 +76,8 @@ public void testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseName final Path tsFile = Files.createTempFile("pipe-load-default-database-level", ".tsfile"); try { final LoadTsFileStatement statement = - IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, tsFile.toString(), true); + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + null, tsFile.toString(), true, true); Assert.assertNull(statement.getDatabase()); Assert.assertEquals( @@ -92,7 +94,7 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception { try { final LoadTsFileStatement statement = IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( - "root.test.sg_0", tsFile.toString(), true); + "root.test.sg_0", tsFile.toString(), true, true); final long receiverId = System.nanoTime(); final Exception exception = new RuntimeException("repeated receiver exception " + receiverId); @@ -107,4 +109,35 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception { Files.deleteIfExists(tsFile); } } + + @Test + public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), false, true); + + Assert.assertTrue(statement.isConvertOnTypeMismatch()); + Assert.assertTrue(statement.isVerifySchema()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType() + throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-no-convert-no-verify-schema", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), false, false); + + Assert.assertFalse(statement.isConvertOnTypeMismatch()); + Assert.assertFalse(statement.isVerifySchema()); + } finally { + Files.deleteIfExists(tsFile); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index d6bdb1e37feb..01406b0fd063 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -19,7 +19,10 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -33,6 +36,7 @@ import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator; +import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -45,6 +49,7 @@ import java.io.File; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collections; @@ -109,6 +114,41 @@ public void testAnalyzeSingleTableFileShouldNotCountTimestampInPointCount() thro Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount()); } + @Test + public void testTableSchemaCacheShouldThrowMismatchWhenVerifyingDataType() throws Exception { + final LoadTsFileTableSchemaCache schemaCache = createTableSchemaCache(true); + try { + final InvocationTargetException exception = + Assert.assertThrows( + InvocationTargetException.class, + () -> + getVerifyTableDataTypeMethod() + .invoke( + schemaCache, + createTableSchema(TSDataType.INT64), + createTableSchema(TSDataType.DOUBLE))); + + Assert.assertTrue(exception.getCause() instanceof LoadAnalyzeTypeMismatchException); + } finally { + schemaCache.close(); + } + } + + @Test + public void testTableSchemaCacheShouldNotThrowMismatchWhenSkippingDataTypeVerification() + throws Exception { + final LoadTsFileTableSchemaCache schemaCache = createTableSchemaCache(false); + try { + getVerifyTableDataTypeMethod() + .invoke( + schemaCache, + createTableSchema(TSDataType.INT64), + createTableSchema(TSDataType.DOUBLE)); + } finally { + schemaCache.close(); + } + } + private void writeTableTsFileWithMixedDevices(final File tsFile) throws Exception { if (tsFile.exists()) { Assert.assertTrue(tsFile.delete()); @@ -163,6 +203,33 @@ private void injectTableSchemaCache( tableSchemaCacheField.set(analyzer, schemaCache); } + private LoadTsFileTableSchemaCache createTableSchemaCache(final boolean shouldVerifyDataType) + throws LoadRuntimeOutOfMemoryException { + return new LoadTsFileTableSchemaCache( + null, new MPPQueryContext(new QueryId("load_test")), false, shouldVerifyDataType); + } + + private Method getVerifyTableDataTypeMethod() throws NoSuchMethodException { + final Method method = + LoadTsFileTableSchemaCache.class.getDeclaredMethod( + "verifyTableDataTypeAndGenerateTagColumnMapper", + org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class, + org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class); + method.setAccessible(true); + return method; + } + + private org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema + createTableSchema(final TSDataType fieldType) { + return new org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema( + "table1", + Arrays.asList( + new ColumnSchema( + "tag1", TypeFactory.getType(TSDataType.STRING), false, TsTableColumnCategory.TAG), + new ColumnSchema( + "s1", TypeFactory.getType(fieldType), false, TsTableColumnCategory.FIELD))); + } + private boolean containsDevice(final Set devices, final String... expectedSegments) { return devices.stream() .anyMatch(device -> Arrays.equals(device.getSegments(), expectedSegments)); @@ -173,7 +240,7 @@ private static class TrackingLoadTsFileTableSchemaCache extends LoadTsFileTableS private final Set> verifiedDevices = new HashSet<>(); private TrackingLoadTsFileTableSchemaCache() throws LoadRuntimeOutOfMemoryException { - super(null, new MPPQueryContext(new QueryId("load_test")), false); + super(null, new MPPQueryContext(new QueryId("load_test")), false, true); } @Override