Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions be/src/core/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,19 @@ Block Block::clone_empty() const {
return res;
}

std::vector<Block> Block::split_to_const_blocks() const {
std::vector<Block> blocks(rows());
for (size_t row = 0; row < blocks.size(); ++row) {
auto& block = blocks[row];
block.reserve(data.size());
for (const auto& elem : data) {
DCHECK(elem.column);
block.insert({ColumnConst::create(elem.column->cut(row, 1), 1), elem.type, elem.name});
}
}
return blocks;
}

MutableColumns Block::clone_empty_columns() const {
size_t num_columns = data.size();
MutableColumns columns(num_columns);
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class Block {

Block clone_without_columns(const std::vector<int>* column_offset = nullptr) const;

std::vector<Block> split_to_const_blocks() const;

/** Get empty columns with the same types as in block. */
MutableColumns clone_empty_columns() const;

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/operator/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ static bool check_all_match_one(const auto& vecs) {
}

static void insert_with_indexs(auto& dst, const auto& src, const auto& indexs, bool all_match_one) {
if (const auto* const_src = check_and_get_column<ColumnConst>(src.get())) {
dst->insert_many_from(*const_src->get_data_column_ptr(), 0, indexs.size());
return;
}
if (all_match_one) {
dst->insert_range_from(*src, indexs[0], indexs.size());
} else {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/operator/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "exec/operator/operator.h"

#include "common/status.h"
#include "core/block/materialize_block.h"
#include "exec/common/util.hpp"
#include "exec/exchange/local_exchange_sink_operator.h"
#include "exec/exchange/local_exchange_source_operator.h"
Expand Down Expand Up @@ -795,6 +796,7 @@ Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
template <typename Writer, typename Parent>
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, Block* block, bool eos) {
materialize_block_inplace(*block);
return _writer->sink(block, eos);
}

Expand Down
17 changes: 17 additions & 0 deletions be/src/exec/operator/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,16 @@ class PipelineXSinkLocalStateBase {

bool low_memory_mode() { return _state->low_memory_mode(); }

#ifndef NDEBUG
bool should_mock_sink_const_block_once() {
if (_has_mocked_sink_const_block) {
return false;
}
_has_mocked_sink_const_block = true;
return true;
}
#endif

protected:
DataSinkOperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
Expand All @@ -543,6 +553,9 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
#ifndef NDEBUG
bool _has_mocked_sink_const_block = false;
#endif
};

template <typename SharedStateArg = FakeSharedState>
Expand Down Expand Up @@ -624,6 +637,10 @@ class DataSinkOperatorXBase : public OperatorBase {
[[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) = 0;

#ifndef NDEBUG
[[nodiscard]] virtual bool mock_const_block_execution() const { return true; }
#endif

// Returns the memory this sink operator expects to allocate in the next
// execution round (sink only — pipeline task sums all operators + sink).
[[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) {
Expand Down
26 changes: 26 additions & 0 deletions be/src/exec/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ class RuntimeState;

namespace doris {

#ifndef NDEBUG
namespace {

Status sink_with_const_block_mock(DataSinkOperatorXBase* sink, RuntimeState* state, Block* block,
bool eos) {
auto* local_state = state->get_sink_local_state();
DCHECK(local_state);
if (block->empty() || !sink->mock_const_block_execution() ||
!local_state->should_mock_sink_const_block_once()) {
return sink->sink(state, block, eos);
}

auto const_blocks = block->split_to_const_blocks();
for (size_t i = 0; i < const_blocks.size(); ++i) {
RETURN_IF_ERROR(sink->sink(state, &const_blocks[i], eos && i + 1 == const_blocks.size()));
}
return Status::OK();
}

} // namespace
#endif

PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
std::shared_ptr<PipelineFragmentContext> fragment_context,
RuntimeProfile* parent_profile,
Expand Down Expand Up @@ -719,7 +741,11 @@ Status PipelineTask::execute(bool* done) {
}
});
RETURN_IF_ERROR(block->check_type_and_column());
#ifndef NDEBUG
status = sink_with_const_block_mock(_sink.get(), _state, block, _eos);
#else
status = _sink->sink(_state, block, _eos);
#endif

if (_eos) {
if (_sink->reset_to_rerun(_state, _root)) {
Expand Down
28 changes: 28 additions & 0 deletions be/test/core/block/block_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,34 @@ TEST(BlockTest, others) {
ASSERT_TRUE(dumped_names.empty()) << "Dumped names: " << dumped_names;
}

TEST(BlockTest, split_to_const_blocks) {
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3});
block.insert(ColumnHelper::create_column_with_name<DataTypeString>({"abc", "efg", "hij"}));
block.insert(
ColumnHelper::create_nullable_column_with_name<DataTypeUInt8>({10, 20, 30}, {0, 1, 0}));
block.insert({ColumnConst::create(ColumnInt64::create(1, 100), 3),
std::make_shared<DataTypeInt64>(), "const_column"});

auto const_blocks = block.split_to_const_blocks();
ASSERT_EQ(const_blocks.size(), 3);

for (size_t row = 0; row < const_blocks.size(); ++row) {
const auto& const_block = const_blocks[row];
ASSERT_EQ(const_block.rows(), 1);
ASSERT_EQ(const_block.columns(), block.columns());
for (size_t column = 0; column < block.columns(); ++column) {
const auto& source_column = block.get_by_position(column);
const auto& const_column = const_block.get_by_position(column);
ASSERT_EQ(const_column.name, source_column.name);
ASSERT_TRUE(const_column.type->equals(*source_column.type));
ASSERT_TRUE(is_column_const(*const_column.column));
auto full_const_column = const_column.column->convert_to_full_column_if_const();
auto full_source_column = source_column.column->convert_to_full_column_if_const();
ASSERT_EQ(full_const_column->compare_at(0, row, *full_source_column, 0), 0);
}
}
}

TEST(BlockTest, ClearSelectedColumnDataClonesSharedColumn) {
auto type = std::make_shared<DataTypeInt32>();
auto mutable_col0 = ColumnInt32::create();
Expand Down
34 changes: 33 additions & 1 deletion be/test/exec/operator/hashjoin_probe_operator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "core/assert_cast.h"
#include "core/block/block.h"
#include "core/column/column_const.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/field.h"
Expand Down Expand Up @@ -347,6 +348,37 @@ TEST_F(HashJoinProbeOperatorTest, InnerJoin) {
{Field::create_field<TYPE_STRING>("c"), Field::create_field<TYPE_STRING>("d")});
}

TEST_F(HashJoinProbeOperatorTest, InnerJoinConstProbeBlock) {
auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3});
sink_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c"}));

auto probe_key_column = ColumnInt32::create();
probe_key_column->insert_value(2);
auto probe_payload_column = ColumnString::create();
probe_payload_column->insert_data("b", 1);
Block probe_block(
{ColumnWithTypeAndName(ColumnConst::create(std::move(probe_key_column), 1),
std::make_shared<DataTypeInt32>(), "column1"),
ColumnWithTypeAndName(ColumnConst::create(std::move(probe_payload_column), 1),
std::make_shared<DataTypeString>(), "column2")});

Block output_block;
std::vector<Block> build_blocks = {sink_block};
std::vector<Block> probe_blocks = {probe_block};
run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, {false, false},
{false, false}, build_blocks, probe_blocks, output_block);

ASSERT_EQ(output_block.rows(), 1);
check_column_values(*output_block.get_by_position(0).column,
{Field::create_field<TYPE_INT>(2)});
check_column_values(*output_block.get_by_position(1).column,
{Field::create_field<TYPE_STRING>("b")});
check_column_values(*output_block.get_by_position(2).column,
{Field::create_field<TYPE_INT>(2)});
check_column_values(*output_block.get_by_position(3).column,
{Field::create_field<TYPE_STRING>("b")});
}

TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyBuildSide) {
auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({}, {}));
Expand Down Expand Up @@ -1251,4 +1283,4 @@ TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMarkOtherConjuncts) {
Field::create_field<TYPE_BOOLEAN>(1)});
}

} // namespace doris
} // namespace doris
87 changes: 87 additions & 0 deletions be/test/exec/pipeline/pipeline_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
#include <functional>
#include <future>
#include <thread>
#include <vector>

#include "common/config.h"
#include "common/status.h"
#include "core/column/column_const.h"
#include "core/data_type/data_type_number.h"
#include "exec/operator/mock_operator.h"
#include "exec/operator/operator.h"
#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
Expand All @@ -35,6 +39,7 @@
#include "exec/spill/spill_file.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
Expand Down Expand Up @@ -113,6 +118,21 @@ class BlockableSubmitTaskScheduler : public MockTaskScheduler {
std::function<void(PipelineTaskSPtr)> on_submit;
};

class RecordingSinkOperator final : public DataSinkOperatorX<DummySinkLocalState> {
public:
RecordingSinkOperator(int op_id, int node_id, int dest_id)
: DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id) {}

Status sink(RuntimeState* state, Block* in_block, bool eos) override {
received_blocks.push_back(*in_block);
received_eos.push_back(eos);
return Status::OK();
}

std::vector<Block> received_blocks;
std::vector<bool> received_eos;
};

TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
Expand Down Expand Up @@ -397,6 +417,73 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) {
}
}

TEST_F(PipelineTaskTest, TEST_SINK_CONST_BLOCK_MOCK_ONCE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
auto* mock_source = new MockOperatorX();
mock_source->_outout_blocks.push_back(ColumnHelper::create_block<DataTypeInt32>({1, 2, 3}));
mock_source->_outout_blocks.push_back(ColumnHelper::create_block<DataTypeInt32>({4, 5}));
source_op.reset(mock_source);
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new RecordingSinkOperator(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-2);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}

_query_ctx->get_execution_dependency()->set_ready();
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(done);

const auto& sink = task->_sink->cast<RecordingSinkOperator>();
ASSERT_EQ(sink.received_blocks.size(), 4);
ASSERT_EQ(sink.received_eos.size(), 4);

for (size_t i = 0; i < 3; ++i) {
ASSERT_EQ(sink.received_blocks[i].rows(), 1);
ASSERT_EQ(sink.received_blocks[i].columns(), 1);
const auto& column = sink.received_blocks[i].get_by_position(0).column;
ASSERT_TRUE(is_column_const(*column));
EXPECT_EQ(sink.received_blocks[i].get_by_position(0).type->to_string(*column, 0),
std::to_string(i + 1));
EXPECT_FALSE(sink.received_eos[i]);
}

ASSERT_EQ(sink.received_blocks[3].rows(), 2);
ASSERT_EQ(sink.received_blocks[3].columns(), 1);
EXPECT_FALSE(is_column_const(*sink.received_blocks[3].get_by_position(0).column));
EXPECT_EQ(sink.received_blocks[3].get_by_position(0).type->to_string(
*sink.received_blocks[3].get_by_position(0).column, 0),
"4");
EXPECT_EQ(sink.received_blocks[3].get_by_position(0).type->to_string(
*sink.received_blocks[3].get_by_position(0).column, 1),
"5");
EXPECT_TRUE(sink.received_eos[3]);
}

TEST_F(PipelineTaskTest, TEST_TERMINATE) {
auto num_instances = 1;
auto pip_id = 0;
Expand Down
Loading