Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@
namespace doris {
namespace {

IColumn::WrappedPtr clone_column_deep(const IColumn::WrappedPtr& column) {
auto full_column = column->convert_to_full_column_if_const();
auto cloned = full_column->clone_resized(full_column->size());
cloned->for_each_subcolumn(
[](IColumn::WrappedPtr& subcolumn) { subcolumn = clone_column_deep(subcolumn); });
return cloned;
}

DataTypePtr create_array_of_type(PrimitiveType type, size_t num_dimensions, bool is_nullable,
int precision = -1, int scale = -1) {
DataTypePtr result = type == PrimitiveType::INVALID_TYPE
Expand Down Expand Up @@ -2796,4 +2804,33 @@ MutableColumnPtr ColumnVariant::clone() const {
return res;
}

MutableColumnPtr ColumnVariant::clone_finalized() const {
auto res = ColumnVariant::create(_max_subcolumns_count, _enable_doc_mode);
Subcolumns new_subcolumns;
for (const auto& subcolumn : subcolumns) {
auto new_subcolumn = subcolumn->data;
for (auto& part : new_subcolumn.data) {
part = clone_column_deep(part);
}
if (subcolumn->data.is_root) {
new_subcolumns.create_root(std::move(new_subcolumn));
} else if (!new_subcolumns.add(subcolumn->path, std::move(new_subcolumn))) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"add path {} is error in clone_finalized()",
subcolumn->path.get_path());
}
}
if (!new_subcolumns.get_root()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "root is nullptr in clone_finalized()");
}
res->subcolumns = std::move(new_subcolumns);
res->serialized_sparse_column = clone_column_deep(serialized_sparse_column);
res->serialized_doc_value_column = clone_column_deep(serialized_doc_value_column);
res->set_num_rows(num_rows);

ENABLE_CHECK_CONSISTENCY(res.get());
res->finalize(FinalizeMode::READ_MODE);
return res;
}

} // namespace doris
6 changes: 1 addition & 5 deletions be/src/core/column/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,7 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {

bool is_finalized() const;

MutableColumnPtr clone_finalized() const {
auto finalized = IColumn::mutate(get_ptr());
static_cast<ColumnVariant*>(finalized.get())->finalize(FinalizeMode::READ_MODE);
return finalized;
}
MutableColumnPtr clone_finalized() const;

MutableColumnPtr clone() const override;

Expand Down
42 changes: 23 additions & 19 deletions be/src/core/data_type/data_type_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {

int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const {
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
if (!column_variant.is_finalized()) {
// Icolumn originates from MutablePtr or block, and therefore can be modified.
// todo: We should reconsider the logic here, why are we using finalize() in this context?
const_cast<ColumnVariant&>(column_variant).finalize();
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
MutableColumnPtr finalized_column;
if (!column_variant->is_finalized()) {
// Local exchange can share the same block across downstream tasks. Serialize a private
// finalized copy so serialization never mutates shared variant columns.
finalized_column = column_variant->clone_finalized();
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
}

const auto& subcolumns = column_variant.get_subcolumns();
const auto& subcolumns = column_variant->get_subcolumns();
size_t size = 0;

size += sizeof(uint32_t);
Expand All @@ -95,26 +97,28 @@ int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
// sparse column
// TODO make compability with sparse column
size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
*column_variant.get_sparse_column(), be_exec_version);
*column_variant->get_sparse_column(), be_exec_version);

size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
*column_variant.get_doc_value_column(), be_exec_version);
*column_variant->get_doc_value_column(), be_exec_version);
return size;
}

char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_version) const {
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
if (!column_variant.is_finalized()) {
// Icolumn originates from block, and therefore can be modified.
// todo: We should reconsider the logic here, why are we using finalize() in this context?
const_cast<ColumnVariant&>(column_variant).finalize();
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
MutableColumnPtr finalized_column;
if (!column_variant->is_finalized()) {
// Local exchange can share the same block across downstream tasks. Serialize a private
// finalized copy so serialization never mutates shared variant columns.
finalized_column = column_variant->clone_finalized();
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
}
#ifndef NDEBUG
// DCHECK size
column_variant.check_consistency();
column_variant->check_consistency();
#endif

const auto& subcolumns = column_variant.get_subcolumns();
const auto& subcolumns = column_variant->get_subcolumns();

char* size_pos = buf;
buf += sizeof(uint32_t);
Expand Down Expand Up @@ -147,15 +151,15 @@ char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_v
// Safe case
unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
// serialize num of rows, only take effect when subcolumns empty
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant->rows()));
buf += sizeof(uint32_t);

// serialize sparse column
// TODO make compability with sparse column
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
buf, be_exec_version);
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
buf, be_exec_version);
buf = ColumnVariant::get_binary_column_type()->serialize(
*column_variant->get_doc_value_column(), buf, be_exec_version);
return buf;
}

Expand Down
69 changes: 46 additions & 23 deletions be/src/exec/exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,30 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
return deps;
}

Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
const int num_buckets, const bool use_global_hash_shuffle,
Status LocalExchangeSinkOperatorX::init(RuntimeState* state, TLocalPartitionType::type type,
const int num_buckets,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
DCHECK(!_planned_by_fe);
_name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_shuffle_idx_to_instance_idx.clear();
_use_global_shuffle = use_global_hash_shuffle;
if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (use_global_hash_shuffle) {
_shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
_shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
if (state->query_options().__isset.enable_new_shuffle_hash_method &&
state->query_options().enable_new_shuffle_hash_method) {
_partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions);
} else {
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = i;
}
_partitioner =
std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
}
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) {
_shuffle_idx_to_instance_idx.clear();
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = i;
}
if (state->query_options().__isset.enable_new_shuffle_hash_method &&
state->query_options().enable_new_shuffle_hash_method) {
Expand All @@ -64,17 +70,41 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
}
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
} else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
DCHECK_GT(num_buckets, 0);
_partitioner = std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(num_buckets);
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
}

Status LocalExchangeSinkOperatorX::init_partitioner(RuntimeState* state) {
DCHECK(_planned_by_fe);
// Set operator name to include exchange type (base class init(tnode) only sets generic name).
_name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(_type) + ")";
if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) {
if (state->query_options().__isset.enable_new_shuffle_hash_method &&
state->query_options().enable_new_shuffle_hash_method) {
_partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions);
} else {
_partitioner =
std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
}
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
DCHECK_GT(_num_partitions, 0);
_partitioner = std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
}

Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state));
if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) {
if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
}
Expand All @@ -88,11 +118,6 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(custom_profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(custom_profile(), "DistributeDataTime");
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
custom_profile()->add_info_string(
"UseGlobalShuffle",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
}
custom_profile()->add_info_string(
"PartitionExprsSize",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num));
Expand All @@ -108,8 +133,7 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);

if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
_exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
if (is_shuffled_exchange(_exchanger->get_type())) {
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
}
Expand All @@ -132,12 +156,11 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
"{}, _channel_id: {}, _num_partitions: {}, "
"_num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}",
Base::debug_string(indentation_level),
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
_exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources,
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators);
return fmt::to_string(debug_string_buffer);
}
Expand Down
28 changes: 20 additions & 8 deletions be/src/exec/exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}

LocalExchangeSinkOperatorX(int operator_id, int dest_id, const TPlanNode& tnode,
int num_partitions,
const std::map<int, int>& shuffle_id_to_instance_idx)
: Base(operator_id, tnode, dest_id),
_type(tnode.local_exchange_node.partition_type),
_num_partitions(num_partitions),
_texprs(tnode.local_exchange_node.distribute_expr_lists),
_partitioned_exprs_num(tnode.local_exchange_node.distribute_expr_lists.size()),
_shuffle_idx_to_instance_idx(shuffle_id_to_instance_idx),
_planned_by_fe(true) {}
#ifdef BE_TEST
LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs,
const std::map<int, int>& bucket_seq_to_instance_idx)
Expand All @@ -89,18 +100,19 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
#endif

Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}

Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}

Status init(RuntimeState* state, ExchangeType type, const int num_buckets,
const bool use_global_hash_shuffle,
Status init(RuntimeState* state, TLocalPartitionType::type type, const int num_buckets,
const std::map<int, int>& shuffle_idx_to_instance_idx) override;

// Initialize partitioner for FE-planned local exchange nodes. The FE-planned constructor
// already sets _type, _num_partitions, _texprs, and _shuffle_idx_to_instance_idx from the
// TPlanNode, but does not create the partitioner. This method creates the partitioner so
// that prepare() can call _partitioner->prepare() without null dereference.
Status init_partitioner(RuntimeState* state);

Status prepare(RuntimeState* state) override;

Status sink(RuntimeState* state, Block* in_block, bool eos) override;
Expand All @@ -115,13 +127,13 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
private:
friend class LocalExchangeSinkLocalState;
friend class ShuffleExchanger;
ExchangeType _type;
TLocalPartitionType::type _type;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
const size_t _partitioned_exprs_num;
std::unique_ptr<PartitionerBase> _partitioner;
std::map<int, int> _shuffle_idx_to_instance_idx;
bool _use_global_shuffle = false;
const bool _planned_by_fe = false;
};

} // namespace doris
5 changes: 2 additions & 3 deletions be/src/exec/exchange/local_exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "GetBlockFailedTime", TUnit::UNIT, 1);
if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
_exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
if (is_shuffled_exchange(_exchanger->get_type())) {
_copy_data_timer = ADD_TIMER(custom_profile(), "CopyDataTime");
}

Expand Down Expand Up @@ -60,7 +59,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
}

std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
if ((_exchanger->get_type() == ExchangeType::PASS_TO_ONE) && _channel_id != 0) {
if ((_exchanger->get_type() == TLocalPartitionType::PASS_TO_ONE) && _channel_id != 0) {
// If this is a PASS_TO_ONE exchange and is not the first task, source operators always
// return empty result so no dependencies here.
return {};
Expand Down
Loading
Loading