Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ public void insertWrongTimeSqlTest()
session.executeNonQueryStatement("insert into wrong_time values(1+1,'bb','cc','dd')");
fail("No exception thrown");
} catch (StatementExecutionException e) {
assertEquals("701: Unsupported expression: (1 + 1)", e.getMessage());
assertEquals(
"701: Insert expression must be constant after constant folding: (1 + 1) (folded to (1 + 1))",
e.getMessage());
}
try {
session.executeNonQueryStatement("insert into wrong_time values(1.0,'bb','cc','dd')");
Expand Down Expand Up @@ -333,6 +335,14 @@ public void insertRelationalSqlTest()
row, "tag:" + row, "attr:" + row, row));
}

session.executeNonQueryStatement(
"INSERT INTO table1 (time, tag1, attr1, m1) "
+ "VALUES (if(true, 50, 0), if(true, 'tag:50', 'x'), "
+ "if(false, 'x', 'attr:50'), if(true, 50.0, 0.0))");
session.executeNonQueryStatement(
"INSERT INTO table1 VALUES (if(true, 51, 0), if(true, 'tag:51', 'x'), "
+ "if(true, 'attr:51', 'x'), if(false, 0, 51))");

SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time");
int cnt = 0;
while (dataSet.hasNext()) {
Expand All @@ -343,7 +353,7 @@ public void insertRelationalSqlTest()
assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001);
cnt++;
}
assertEquals(50, cnt);
assertEquals(52, cnt);

// sql cannot create column
assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,17 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
List<WritePlanNode> writePlanNodeList = new ArrayList<>();
Map<TRegionReplicaSet, RelationalInsertRowsNode> splitMap = new HashMap<>();
List<TEndPoint> redirectInfo = new ArrayList<>();
String databaseName = getDatabaseName(analysis);
for (int i = 0; i < getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode = getInsertRowNodeList().get(i);
// Data region for insert row node
// each row may belong to different database, pass null for auto-detection
TRegionReplicaSet dataRegionReplicaSet =
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
insertRowNode.getDeviceID(),
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()),
analysis.getDatabaseName());
databaseName);

// Collect redirectInfo
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
Expand All @@ -195,6 +195,14 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
return writePlanNodeList;
}

private String getDatabaseName(IAnalysis analysis) {
if (analysis.getDataPartitionInfo() != null
&& analysis.getDataPartitionInfo().getDataPartitionMap().size() == 1) {
return analysis.getDataPartitionInfo().getDataPartitionMap().keySet().iterator().next();
}
return analysis.getDatabaseName();
}

public RelationalInsertRowsNode emptyClone() {
return new RelationalInsertRowsNode(this.getPlanNodeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRows;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertValues;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property;
Expand Down Expand Up @@ -791,6 +792,11 @@
}

private Scope visitInsert(WrappedInsertStatement insert, Optional<Scope> scope) {
if (insert instanceof InsertValues) {
((InsertValues) insert)
.materialize(new PlannerContext(metadata, typeManager), sessionContext);
}

final Scope ret = Scope.create();

final MPPQueryContext context = insert.getContext();
Expand Down Expand Up @@ -2680,7 +2686,7 @@
}
}

private Analysis.GroupingSetAnalysis analyzeGroupBy(

Check warning on line 2689 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 111 to 64, Complexity from 22 to 14, Nesting Level from 6 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCa796qf42bbbA62&open=AZ6WiCa796qf42bbbA62&pullRequest=17850
QuerySpecification node, Scope scope, List<Expression> outputExpressions) {
if (node.getGroupBy().isPresent()) {
ImmutableList.Builder<List<Set<FieldId>>> cubes = ImmutableList.builder();
Expand Down Expand Up @@ -3081,7 +3087,7 @@
}

@Override
public Scope visitSetOperation(SetOperation node, Optional<Scope> scope) {

Check warning on line 3090 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 82 to 64, Complexity from 16 to 14, Nesting Level from 3 to 2, Number of Variables from 25 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCa796qf42bbbA63&open=AZ6WiCa796qf42bbbA63&pullRequest=17850
checkState(node.getRelations().size() >= 2);

List<RelationType> childrenTypes =
Expand Down Expand Up @@ -3372,7 +3378,7 @@
// accessControlScope, filter));
// }

public Scope visitPatternRecognitionRelation(

Check warning on line 3381 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 121 to 64, Complexity from 18 to 14, Nesting Level from 3 to 2, Number of Variables from 35 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCa796qf42bbbA64&open=AZ6WiCa796qf42bbbA64&pullRequest=17850
PatternRecognitionRelation relation, Optional<Scope> scope) {
Scope inputScope = process(relation.getInput(), scope);

Expand Down Expand Up @@ -3731,7 +3737,7 @@
}

@Override
public Scope visitJoin(Join node, Optional<Scope> scope) {

Check warning on line 3740 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 81 to 64, Complexity from 15 to 14, Nesting Level from 3 to 2, Number of Variables from 18 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCa796qf42bbbA65&open=AZ6WiCa796qf42bbbA65&pullRequest=17850
JoinCriteria criteria = node.getCriteria().orElse(null);

joinConditionCheck(criteria);
Expand Down Expand Up @@ -5019,7 +5025,7 @@
return null;
}

private ArgumentsAnalysis analyzeArguments(

Check warning on line 5028 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 80 to 64, Complexity from 15 to 14, Nesting Level from 3 to 2, Number of Variables from 25 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCa796qf42bbbA66&open=AZ6WiCa796qf42bbbA66&pullRequest=17850
List<ParameterSpecification> parameterSpecifications,
List<TableFunctionArgument> arguments,
Optional<Scope> scope,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;

import org.apache.iotdb.calc.exception.QueryProcessException;
import org.apache.iotdb.commons.exception.SemanticException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.common.SessionInfo;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Row;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Values;
import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
import org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.schema.MeasurementSchema;

import javax.annotation.Nullable;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;

public class InsertValues extends InsertRows {

Check warning on line 56 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertValues.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Override the "equals" method in this class.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCR396qf42bbbA6z&open=AZ6WiCR396qf42bbbA6z&pullRequest=17850

Check warning on line 56 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertValues.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This class has 6 parents which is greater than 5 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6WiCR396qf42bbbA6y&open=AZ6WiCR396qf42bbbA6y&pullRequest=17850

private final String databaseName;
private final String tableName;
@Nullable private final List<String> columnNames;
private final List<List<Expression>> rows;
private final ZoneId zoneId;
private boolean materialized;

public InsertValues(
final String databaseName,
final String tableName,
@Nullable final List<Identifier> columnIdentifiers,
final Values values,
final ZoneId zoneId) {
super(createPlaceholderStatement(databaseName), null);
this.databaseName = requireNonNull(databaseName, "databaseName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.columnNames =
columnIdentifiers == null
? null
: columnIdentifiers.stream().map(Identifier::getValue).collect(Collectors.toList());
this.rows =
requireNonNull(values, "values is null").getRows().stream()
.map(InsertValues::normalizeRow)
.collect(Collectors.toList());
this.zoneId = requireNonNull(zoneId, "zoneId is null");
}

public void materialize(PlannerContext plannerContext, SessionInfo session) {
if (materialized) {
return;
}

InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
insertRowsStatement.setInsertRowStatementList(
columnNames == null
? toInsertRowStatementsWithTableSchema(plannerContext, session)
: toInsertRowStatementsWithColumns(plannerContext, session));
insertRowsStatement.setWriteToTable(true);
insertRowsStatement.setDatabaseName(databaseName);
setInnerTreeStatement(insertRowsStatement);
materialized = true;
}

private static InsertRowsStatement createPlaceholderStatement(String databaseName) {
InsertRowsStatement statement = new InsertRowsStatement();
statement.setInsertRowStatementList(Collections.emptyList());
statement.setWriteToTable(true);
statement.setDatabaseName(databaseName);
return statement;
}

@Override
public String getDatabase() {
return databaseName;
}

private static List<Expression> normalizeRow(Expression row) {
if (row instanceof Row) {
return ((Row) row).getItems();
}
return Collections.singletonList(row);
}

private List<InsertRowStatement> toInsertRowStatementsWithTableSchema(
PlannerContext plannerContext, SessionInfo session) {
final TsTable table = getTable();
return rows.stream()
.map(row -> toInsertRowStatement(row, table, plannerContext, session))
.collect(Collectors.toList());
}

private List<InsertRowStatement> toInsertRowStatementsWithColumns(
PlannerContext plannerContext, SessionInfo session) {
final List<String> nonTimeColumnNames = new ArrayList<>(columnNames);
final int timeColumnIndex = findTimeColumnIndex(nonTimeColumnNames);
if (timeColumnIndex != -1) {
nonTimeColumnNames.remove(timeColumnIndex);
}
if (timeColumnIndex == -1 && rows.size() > 1) {
throw new SemanticException(DataNodeQueryMessages.NEED_TIMESTAMPS_WHEN_INSERT_MULTI_ROWS);
}

final String[] nonTimeColumnArray = nonTimeColumnNames.toArray(new String[0]);
return rows.stream()
.map(
row ->
toInsertRowStatement(
row, timeColumnIndex, nonTimeColumnArray.clone(), plannerContext, session))
.collect(Collectors.toList());
}

private TsTable getTable() {
TsTable table = DataNodeTableCache.getInstance().getTable(databaseName, tableName);
if (table == null) {
throw new SemanticException(
"Insert target table does not exist: " + databaseName + "." + tableName);
}
return table;
}

private int findTimeColumnIndex(final List<String> columnNames) {
List<TsTableColumnSchema> timeColumnCandidates =
getTable().getColumnList().stream()
.filter(column -> column.getColumnCategory() == TIME)
.collect(Collectors.toList());
if (timeColumnCandidates.size() != 1) {
throw new SemanticException(
DataNodeQueryMessages.THE_TABLE_SHOULD_ONLY_HAVE_ONE_COLUMN_FOUND);
}

int timeColumnIndex = -1;
String timeColumnName = timeColumnCandidates.get(0).getColumnName();
for (int i = 0; i < columnNames.size(); i++) {
if (timeColumnName.equalsIgnoreCase(columnNames.get(i))) {
if (timeColumnIndex == -1) {
timeColumnIndex = i;
} else {
throw new SemanticException(
DataNodeQueryMessages.ONE_ROW_SHOULD_ONLY_HAVE_ONE_TIME_VALUE);
}
}
}
return timeColumnIndex;
}

private InsertRowStatement toInsertRowStatement(
List<Expression> expressions,
TsTable table,
PlannerContext plannerContext,
SessionInfo session) {
InsertRowStatement insertRowStatement = new InsertRowStatement();
insertRowStatement.setWriteToTable(true);
insertRowStatement.setDevicePath(new PartialPath(new String[] {table.getTableName()}));

List<TsTableColumnSchema> columnList = table.getColumnList();
if (expressions.size() != columnList.size()) {
throw new SemanticException(
"expressions and columns do not match, expressions size: "
+ expressions.size()
+ ", columns size: "
+ columnList.size());
}

String[] nonTimeColumnNames = new String[columnList.size() - 1];
Object[] nonTimeValues = new Object[columnList.size() - 1];
TsTableColumnCategory[] nonTimeColumnCategories =
new TsTableColumnCategory[columnList.size() - 1];
MeasurementSchema[] columnSchemas = new MeasurementSchema[columnList.size() - 1];
TSDataType[] dataTypes = new TSDataType[columnList.size() - 1];
int nonTimeColumnIndex = 0;
long timestamp = -1;
for (int i = 0; i < columnList.size(); i++) {
TsTableColumnSchema columnSchema = columnList.get(i);
Expression expression = expressions.get(i);

if (columnSchema.getColumnCategory().equals(TIME)) {
timestamp = AstUtil.expressionToTimestamp(expression, zoneId, plannerContext, session);
} else {
Object value = AstUtil.expressionToTsValue(expression, plannerContext, session);
nonTimeValues[nonTimeColumnIndex] = value;
nonTimeColumnNames[nonTimeColumnIndex] = columnSchema.getColumnName();
dataTypes[nonTimeColumnIndex] = columnSchema.getDataType();
nonTimeColumnCategories[nonTimeColumnIndex] = columnSchema.getColumnCategory();
columnSchemas[nonTimeColumnIndex] =
new MeasurementSchema(columnSchema.getColumnName(), columnSchema.getDataType());
nonTimeColumnIndex++;
}
}

TimestampPrecisionUtils.checkTimestampPrecision(timestamp);
insertRowStatement.setTime(timestamp);
insertRowStatement.setMeasurements(nonTimeColumnNames);
insertRowStatement.setDataTypes(dataTypes);
insertRowStatement.setMeasurementSchemas(columnSchemas);
insertRowStatement.setValues(nonTimeValues);
insertRowStatement.setColumnCategories(nonTimeColumnCategories);
insertRowStatement.setNeedInferType(false);
insertRowStatement.setDatabaseName(databaseName);

try {
insertRowStatement.transferType(zoneId);
} catch (QueryProcessException e) {
throw new SemanticException(e);
}
return insertRowStatement;
}

private InsertRowStatement toInsertRowStatement(
List<Expression> expressions,
int timeColumnIndex,
String[] nonTimeColumnNames,
PlannerContext plannerContext,
SessionInfo session) {
InsertRowStatement insertRowStatement = new InsertRowStatement();
insertRowStatement.setWriteToTable(true);
insertRowStatement.setDevicePath(new PartialPath(new String[] {tableName}));
long timestamp;
int nonTimeColumnCount;
if (timeColumnIndex == -1) {
timestamp = CommonDateTimeUtils.currentTime();
nonTimeColumnCount = expressions.size();
} else {
if (timeColumnIndex >= expressions.size()) {
throw new SemanticException(
String.format(
"TimeColumnIndex out of bound: %d-%d", timeColumnIndex, expressions.size()));
}

timestamp =
AstUtil.expressionToTimestamp(
expressions.get(timeColumnIndex), zoneId, plannerContext, session);
nonTimeColumnCount = expressions.size() - 1;
}

if (nonTimeColumnCount != nonTimeColumnNames.length) {
throw new SemanticException(
String.format(
"Inconsistent numbers of non-time column names and values: %d-%d",
nonTimeColumnNames.length, nonTimeColumnCount));
}

TimestampPrecisionUtils.checkTimestampPrecision(timestamp);
insertRowStatement.setTime(timestamp);
insertRowStatement.setMeasurements(nonTimeColumnNames);

Object[] values = new Object[nonTimeColumnNames.length];
int valuePosition = 0;
for (int i = 0; i < expressions.size(); i++) {
if (i != timeColumnIndex) {
values[valuePosition++] =
AstUtil.expressionToTsValue(expressions.get(i), plannerContext, session);
}
}

insertRowStatement.setValues(values);
insertRowStatement.setNeedInferType(true);
insertRowStatement.setDatabaseName(databaseName);
return insertRowStatement;
}
}
Loading
Loading