diff --git a/be/src/core/block/block.cpp b/be/src/core/block/block.cpp index a0598dcc8121d5..2c9e232bb070a8 100644 --- a/be/src/core/block/block.cpp +++ b/be/src/core/block/block.cpp @@ -593,6 +593,19 @@ Block Block::clone_empty() const { return res; } +std::vector Block::split_to_const_blocks() const { + std::vector blocks(rows()); + for (size_t row = 0; row < blocks.size(); ++row) { + auto& block = blocks[row]; + block.reserve(data.size()); + for (const auto& elem : data) { + DCHECK(elem.column); + block.insert({ColumnConst::create(elem.column->cut(row, 1), 1), elem.type, elem.name}); + } + } + return blocks; +} + MutableColumns Block::clone_empty_columns() const { size_t num_columns = data.size(); MutableColumns columns(num_columns); diff --git a/be/src/core/block/block.h b/be/src/core/block/block.h index 6207aa8f49da43..04e824bfdff363 100644 --- a/be/src/core/block/block.h +++ b/be/src/core/block/block.h @@ -209,6 +209,8 @@ class Block { Block clone_without_columns(const std::vector* column_offset = nullptr) const; + std::vector split_to_const_blocks() const; + /** Get empty columns with the same types as in block. */ MutableColumns clone_empty_columns() const; diff --git a/be/src/exec/operator/join/process_hash_table_probe_impl.h b/be/src/exec/operator/join/process_hash_table_probe_impl.h index 8e4e0131ffe12f..064faf85cdc319 100644 --- a/be/src/exec/operator/join/process_hash_table_probe_impl.h +++ b/be/src/exec/operator/join/process_hash_table_probe_impl.h @@ -47,6 +47,10 @@ static bool check_all_match_one(const auto& vecs) { } static void insert_with_indexs(auto& dst, const auto& src, const auto& indexs, bool all_match_one) { + if (const auto* const_src = check_and_get_column(src.get())) { + dst->insert_many_from(*const_src->get_data_column_ptr(), 0, indexs.size()); + return; + } if (all_match_one) { dst->insert_range_from(*src, indexs[0], indexs.size()); } else { diff --git a/be/src/exec/operator/operator.cpp b/be/src/exec/operator/operator.cpp index 1ce7dc8727d688..dc0dee21fed7a3 100644 --- a/be/src/exec/operator/operator.cpp +++ b/be/src/exec/operator/operator.cpp @@ -18,6 +18,7 @@ #include "exec/operator/operator.h" #include "common/status.h" +#include "core/block/materialize_block.h" #include "exec/common/util.hpp" #include "exec/exchange/local_exchange_sink_operator.h" #include "exec/exchange/local_exchange_source_operator.h" @@ -795,6 +796,7 @@ Status AsyncWriterSink::open(RuntimeState* state) { template requires(std::is_base_of_v) Status AsyncWriterSink::sink(RuntimeState* state, Block* block, bool eos) { + materialize_block_inplace(*block); return _writer->sink(block, eos); } diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h index ce5df951e22497..6f558ad7831b7d 100644 --- a/be/src/exec/operator/operator.h +++ b/be/src/exec/operator/operator.h @@ -518,6 +518,16 @@ class PipelineXSinkLocalStateBase { bool low_memory_mode() { return _state->low_memory_mode(); } +#ifndef NDEBUG + bool should_mock_sink_const_block_once() { + if (_has_mocked_sink_const_block) { + return false; + } + _has_mocked_sink_const_block = true; + return true; + } +#endif + protected: DataSinkOperatorXBase* _parent = nullptr; RuntimeState* _state = nullptr; @@ -543,6 +553,9 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; +#ifndef NDEBUG + bool _has_mocked_sink_const_block = false; +#endif }; template @@ -624,6 +637,10 @@ class DataSinkOperatorXBase : public OperatorBase { [[nodiscard]] virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; +#ifndef NDEBUG + [[nodiscard]] virtual bool mock_const_block_execution() const { return true; } +#endif + // Returns the memory this sink operator expects to allocate in the next // execution round (sink only — pipeline task sums all operators + sink). [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) { diff --git a/be/src/exec/pipeline/pipeline_task.cpp b/be/src/exec/pipeline/pipeline_task.cpp index fe75d7b499dbf1..4c85f30fae24ea 100644 --- a/be/src/exec/pipeline/pipeline_task.cpp +++ b/be/src/exec/pipeline/pipeline_task.cpp @@ -59,6 +59,28 @@ class RuntimeState; namespace doris { +#ifndef NDEBUG +namespace { + +Status sink_with_const_block_mock(DataSinkOperatorXBase* sink, RuntimeState* state, Block* block, + bool eos) { + auto* local_state = state->get_sink_local_state(); + DCHECK(local_state); + if (block->empty() || !sink->mock_const_block_execution() || + !local_state->should_mock_sink_const_block_once()) { + return sink->sink(state, block, eos); + } + + auto const_blocks = block->split_to_const_blocks(); + for (size_t i = 0; i < const_blocks.size(); ++i) { + RETURN_IF_ERROR(sink->sink(state, &const_blocks[i], eos && i + 1 == const_blocks.size())); + } + return Status::OK(); +} + +} // namespace +#endif + PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, std::shared_ptr fragment_context, RuntimeProfile* parent_profile, @@ -719,7 +741,11 @@ Status PipelineTask::execute(bool* done) { } }); RETURN_IF_ERROR(block->check_type_and_column()); +#ifndef NDEBUG + status = sink_with_const_block_mock(_sink.get(), _state, block, _eos); +#else status = _sink->sink(_state, block, _eos); +#endif if (_eos) { if (_sink->reset_to_rerun(_state, _root)) { diff --git a/be/test/core/block/block_test.cpp b/be/test/core/block/block_test.cpp index 3b29f819e093d4..1b903d2cc4812d 100644 --- a/be/test/core/block/block_test.cpp +++ b/be/test/core/block/block_test.cpp @@ -1384,6 +1384,34 @@ TEST(BlockTest, others) { ASSERT_TRUE(dumped_names.empty()) << "Dumped names: " << dumped_names; } +TEST(BlockTest, split_to_const_blocks) { + auto block = ColumnHelper::create_block({1, 2, 3}); + block.insert(ColumnHelper::create_column_with_name({"abc", "efg", "hij"})); + block.insert( + ColumnHelper::create_nullable_column_with_name({10, 20, 30}, {0, 1, 0})); + block.insert({ColumnConst::create(ColumnInt64::create(1, 100), 3), + std::make_shared(), "const_column"}); + + auto const_blocks = block.split_to_const_blocks(); + ASSERT_EQ(const_blocks.size(), 3); + + for (size_t row = 0; row < const_blocks.size(); ++row) { + const auto& const_block = const_blocks[row]; + ASSERT_EQ(const_block.rows(), 1); + ASSERT_EQ(const_block.columns(), block.columns()); + for (size_t column = 0; column < block.columns(); ++column) { + const auto& source_column = block.get_by_position(column); + const auto& const_column = const_block.get_by_position(column); + ASSERT_EQ(const_column.name, source_column.name); + ASSERT_TRUE(const_column.type->equals(*source_column.type)); + ASSERT_TRUE(is_column_const(*const_column.column)); + auto full_const_column = const_column.column->convert_to_full_column_if_const(); + auto full_source_column = source_column.column->convert_to_full_column_if_const(); + ASSERT_EQ(full_const_column->compare_at(0, row, *full_source_column, 0), 0); + } + } +} + TEST(BlockTest, ClearSelectedColumnDataClonesSharedColumn) { auto type = std::make_shared(); auto mutable_col0 = ColumnInt32::create(); diff --git a/be/test/exec/operator/hashjoin_probe_operator_test.cpp b/be/test/exec/operator/hashjoin_probe_operator_test.cpp index 9828744a58f0dd..be6c9bef465bdf 100644 --- a/be/test/exec/operator/hashjoin_probe_operator_test.cpp +++ b/be/test/exec/operator/hashjoin_probe_operator_test.cpp @@ -34,6 +34,7 @@ #include "core/assert_cast.h" #include "core/block/block.h" +#include "core/column/column_const.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" #include "core/field.h" @@ -347,6 +348,37 @@ TEST_F(HashJoinProbeOperatorTest, InnerJoin) { {Field::create_field("c"), Field::create_field("d")}); } +TEST_F(HashJoinProbeOperatorTest, InnerJoinConstProbeBlock) { + auto sink_block = ColumnHelper::create_block({1, 2, 3}); + sink_block.insert(ColumnHelper::create_column_with_name({"a", "b", "c"})); + + auto probe_key_column = ColumnInt32::create(); + probe_key_column->insert_value(2); + auto probe_payload_column = ColumnString::create(); + probe_payload_column->insert_data("b", 1); + Block probe_block( + {ColumnWithTypeAndName(ColumnConst::create(std::move(probe_key_column), 1), + std::make_shared(), "column1"), + ColumnWithTypeAndName(ColumnConst::create(std::move(probe_payload_column), 1), + std::make_shared(), "column2")}); + + Block output_block; + std::vector build_blocks = {sink_block}; + std::vector probe_blocks = {probe_block}; + run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, {false, false}, + {false, false}, build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 1); + check_column_values(*output_block.get_by_position(0).column, + {Field::create_field(2)}); + check_column_values(*output_block.get_by_position(1).column, + {Field::create_field("b")}); + check_column_values(*output_block.get_by_position(2).column, + {Field::create_field(2)}); + check_column_values(*output_block.get_by_position(3).column, + {Field::create_field("b")}); +} + TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyBuildSide) { auto sink_block = ColumnHelper::create_block({}); sink_block.insert(ColumnHelper::create_nullable_column_with_name({}, {})); @@ -1251,4 +1283,4 @@ TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMarkOtherConjuncts) { Field::create_field(1)}); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index d6294825288949..141fc61a3e4b77 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -22,9 +22,13 @@ #include #include #include +#include #include "common/config.h" #include "common/status.h" +#include "core/column/column_const.h" +#include "core/data_type/data_type_number.h" +#include "exec/operator/mock_operator.h" #include "exec/operator/operator.h" #include "exec/operator/spill_utils.h" #include "exec/pipeline/dependency.h" @@ -35,6 +39,7 @@ #include "exec/spill/spill_file.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" +#include "testutil/column_helper.h" #include "testutil/mock/mock_runtime_state.h" #include "testutil/mock/mock_thread_mem_tracker_mgr.h" #include "testutil/mock/mock_workload_group_mgr.h" @@ -113,6 +118,21 @@ class BlockableSubmitTaskScheduler : public MockTaskScheduler { std::function on_submit; }; +class RecordingSinkOperator final : public DataSinkOperatorX { +public: + RecordingSinkOperator(int op_id, int node_id, int dest_id) + : DataSinkOperatorX(op_id, node_id, dest_id) {} + + Status sink(RuntimeState* state, Block* in_block, bool eos) override { + received_blocks.push_back(*in_block); + received_eos.push_back(eos); + return Status::OK(); + } + + std::vector received_blocks; + std::vector received_eos; +}; + TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) { auto num_instances = 1; auto pip_id = 0; @@ -397,6 +417,73 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { } } +TEST_F(PipelineTaskTest, TEST_SINK_CONST_BLOCK_MOCK_ONCE) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + { + OperatorPtr source_op; + auto* mock_source = new MockOperatorX(); + mock_source->_outout_blocks.push_back(ColumnHelper::create_block({1, 2, 3})); + mock_source->_outout_blocks.push_back(ColumnHelper::create_block({4, 5})); + source_op.reset(mock_source); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new RecordingSinkOperator(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-2); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + { + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + } + + _query_ctx->get_execution_dependency()->set_ready(); + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(done); + + const auto& sink = task->_sink->cast(); + ASSERT_EQ(sink.received_blocks.size(), 4); + ASSERT_EQ(sink.received_eos.size(), 4); + + for (size_t i = 0; i < 3; ++i) { + ASSERT_EQ(sink.received_blocks[i].rows(), 1); + ASSERT_EQ(sink.received_blocks[i].columns(), 1); + const auto& column = sink.received_blocks[i].get_by_position(0).column; + ASSERT_TRUE(is_column_const(*column)); + EXPECT_EQ(sink.received_blocks[i].get_by_position(0).type->to_string(*column, 0), + std::to_string(i + 1)); + EXPECT_FALSE(sink.received_eos[i]); + } + + ASSERT_EQ(sink.received_blocks[3].rows(), 2); + ASSERT_EQ(sink.received_blocks[3].columns(), 1); + EXPECT_FALSE(is_column_const(*sink.received_blocks[3].get_by_position(0).column)); + EXPECT_EQ(sink.received_blocks[3].get_by_position(0).type->to_string( + *sink.received_blocks[3].get_by_position(0).column, 0), + "4"); + EXPECT_EQ(sink.received_blocks[3].get_by_position(0).type->to_string( + *sink.received_blocks[3].get_by_position(0).column, 1), + "5"); + EXPECT_TRUE(sink.received_eos[3]); +} + TEST_F(PipelineTaskTest, TEST_TERMINATE) { auto num_instances = 1; auto pip_id = 0;