Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5076,6 +5076,26 @@ private static void createVectorPTFDesc(Operator<? extends OperatorDesc> ptfOp,
vectorizedPTFMaxMemoryBufferingBatchCount);
}

/**
* Whether a partition expression refers to the same input column as an output column slot.
*/
private static boolean partitionExprMatchesInputColumn(ExprNodeDesc partitionExpr,
List<ColumnInfo> outputSignature, int[] outputColumnProjectionMap, int evaluatorCount,
int inputColumnNum) {
ExprNodeDescEqualityWrapper partitionWrapper =
new ExprNodeDescEqualityWrapper(partitionExpr);
for (int i = evaluatorCount; i < outputColumnProjectionMap.length; i++) {
if (outputColumnProjectionMap[i] != inputColumnNum) {
continue;
}
ExprNodeColumnDesc outputColExpr = new ExprNodeColumnDesc(outputSignature.get(i));
if (partitionWrapper.equals(new ExprNodeDescEqualityWrapper(outputColExpr))) {
return true;
}
}
return false;
}

private static void determineKeyAndNonKeyInputColumnMap(int[] outputColumnProjectionMap,
boolean isPartitionOrderBy, int[] orderColumnMap, int[] partitionColumnMap,
int evaluatorCount, ArrayList<Integer> keyInputColumns,
Expand Down Expand Up @@ -5161,13 +5181,17 @@ private static VectorPTFInfo createVectorPTFInfo(Operator<? extends OperatorDesc
partitionColumnVectorTypes = new Type[partitionKeyCount];
partitionExpressions = new VectorExpression[partitionKeyCount];

int[] planPartitionColumnMap = new int[partitionKeyCount];
Type[] planPartitionColumnVectorTypes = new Type[partitionKeyCount];
VectorExpression[] planPartitionExpressions = new VectorExpression[partitionKeyCount];
for (int i = 0; i < partitionKeyCount; i++) {
VectorExpression partitionExpression = vContext.getVectorExpression(partitionExprNodeDescs[i]);
VectorExpression partitionExpression =
vContext.getVectorExpression(partitionExprNodeDescs[i]);
TypeInfo typeInfo = partitionExpression.getOutputTypeInfo();
Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
partitionColumnVectorTypes[i] = columnVectorType;
partitionColumnMap[i] = partitionExpression.getOutputColumnNum();
partitionExpressions[i] = partitionExpression;
planPartitionColumnVectorTypes[i] =
VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
planPartitionColumnMap[i] = partitionExpression.getOutputColumnNum();
planPartitionExpressions[i] = partitionExpression;
}

final int orderKeyCount = orderExprNodeDescs.length;
Expand All @@ -5186,10 +5210,49 @@ private static VectorPTFInfo createVectorPTFInfo(Operator<? extends OperatorDesc
ArrayList<Integer> keyInputColumns = new ArrayList<Integer>();
ArrayList<Integer> nonKeyInputColumns = new ArrayList<Integer>();
determineKeyAndNonKeyInputColumnMap(outputColumnProjectionMap, isPartitionOrderBy, orderColumnMap,
partitionColumnMap, evaluatorCount, keyInputColumns, nonKeyInputColumns);
planPartitionColumnMap, evaluatorCount, keyInputColumns, nonKeyInputColumns);
int[] keyInputColumnMap = ArrayUtils.toPrimitive(keyInputColumns.toArray(new Integer[0]));
int[] nonKeyInputColumnMap = ArrayUtils.toPrimitive(nonKeyInputColumns.toArray(new Integer[0]));

boolean[] partitionUsed = new boolean[partitionKeyCount];
int partitionIndex = 0;
for (int keyCol : keyInputColumnMap) {
for (int i = 0; i < partitionKeyCount; i++) {
if (!partitionUsed[i] && planPartitionColumnMap[i] == keyCol) {
partitionColumnVectorTypes[partitionIndex] = planPartitionColumnVectorTypes[i];
partitionColumnMap[partitionIndex] = keyCol;
partitionExpressions[partitionIndex] = planPartitionExpressions[i];
partitionUsed[i] = true;
partitionIndex++;
break;
}
}
}
for (int keyCol : keyInputColumnMap) {
for (int i = 0; i < partitionKeyCount; i++) {
if (partitionUsed[i]) {
continue;
}
if (partitionExprMatchesInputColumn(partitionExprNodeDescs[i], outputSignature,
outputColumnProjectionMap, evaluatorCount, keyCol)) {
partitionColumnVectorTypes[partitionIndex] = planPartitionColumnVectorTypes[i];
partitionColumnMap[partitionIndex] = keyCol;
partitionExpressions[partitionIndex] = planPartitionExpressions[i];
partitionUsed[i] = true;
partitionIndex++;
break;
}
}
}
for (int i = 0; i < partitionKeyCount; i++) {
if (!partitionUsed[i]) {
partitionColumnVectorTypes[partitionIndex] = planPartitionColumnVectorTypes[i];
partitionColumnMap[partitionIndex] = planPartitionColumnMap[i];
partitionExpressions[partitionIndex] = planPartitionExpressions[i];
partitionIndex++;
}
}

VectorExpression[][] evaluatorInputExpressions = new VectorExpression[evaluatorCount][];
Type[][] evaluatorInputColumnVectorTypes = new Type[evaluatorCount][];
for (int i = 0; i < evaluatorCount; i++) {
Expand Down
149 changes: 149 additions & 0 deletions ql/src/test/queries/clientpositive/lead_vec.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
SET hive.vectorized.execution.enabled=true;
create table web_sales_txt
(
ws_sold_date_sk int,
ws_sold_time_sk int,
ws_ship_date_sk int,
ws_item_sk int,
ws_bill_customer_sk int,
ws_bill_cdemo_sk int,
ws_bill_hdemo_sk int,
ws_bill_addr_sk int,
ws_ship_customer_sk int,
ws_ship_cdemo_sk int,
ws_ship_hdemo_sk int,
ws_ship_addr_sk int,
ws_web_page_sk int,
ws_web_site_sk int,
ws_ship_mode_sk int,
ws_warehouse_sk int,
ws_promo_sk int,
ws_order_number int,
ws_quantity int,
ws_wholesale_cost decimal(7,2),
ws_list_price decimal(7,2),
ws_sales_price decimal(7,2),
ws_ext_discount_amt decimal(7,2),
ws_ext_sales_price decimal(7,2),
ws_ext_wholesale_cost decimal(7,2),
ws_ext_list_price decimal(7,2),
ws_ext_tax decimal(7,2),
ws_coupon_amt decimal(7,2),
ws_ext_ship_cost decimal(7,2),
ws_net_paid decimal(7,2),
ws_net_paid_inc_tax decimal(7,2),
ws_net_paid_inc_ship decimal(7,2),
ws_net_paid_inc_ship_tax decimal(7,2),
ws_net_profit decimal(7,2)
)
row format delimited fields terminated by '|'
stored as textfile;

LOAD DATA LOCAL INPATH '../../data/files/web_sales_2k' OVERWRITE INTO TABLE web_sales_txt;
select ws_bill_customer_sk,ws_item_sk from web_sales_txt;

SET hive.vectorized.execution.enabled;
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LAG(ws_sales_price) OVER (
PARTITION BY ws_item_sk,ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS prev_sales_price,
ws_sales_price - LAG(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) AS sales_price_diff
FROM
web_sales_txt;

SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS next_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) - ws_sales_price AS sales_price_diff
FROM
web_sales_txt;



SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
FIRST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_item_sk,ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS first_price,
LAST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS last_price
FROM
web_sales_txt;

SET hive.vectorized.execution.enabled=false;

SET hive.vectorized.execution.enabled;
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LAG(ws_sales_price) OVER (
PARTITION BY ws_item_sk,ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS prev_sales_price,
ws_sales_price - LAG(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) AS sales_price_diff
FROM
web_sales_txt;

SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS next_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) - ws_sales_price AS sales_price_diff
FROM
web_sales_txt;



SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
FIRST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_item_sk,ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS first_price,
LAST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS last_price
FROM
web_sales_txt;
Loading
Loading