From 5078f25a971fce7ce62b76ebd8dcc312ef81378e Mon Sep 17 00:00:00 2001 From: lihangyu-x Date: Thu, 4 Jun 2026 01:34:27 +0800 Subject: [PATCH] [fix](be) Avoid mutating shared variant columns ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Queries that evaluate VARIANT expressions after local exchange can share input blocks across downstream pipeline tasks. Variant casts and Variant serialization finalized source columns in-place, so one consumer could mutate a shared input column while another consumer still expected the original column shape and row count. This made local-shuffle query results unstable and could trigger later operators to observe changed Variant column contents or sizes. This change confines the fix to Variant handling by using private finalized Variant copies for cast and serialization paths instead of mutating the source column. ### Release note None ### Check List (For Author) - Test: - PATH=/tmp/doris-clang-format-bin:$PATH build-support/clang-format.sh - git diff --check HEAD^ - ./build.sh --be - ./run-be-ut.sh --run --filter='ColumnVariantTest.serialize_does_not_finalize_source_column:ColumnVariantTest.block_serialize_does_not_finalize_source_column:FunctionVariantCast.CastFromVariantDoesNotFinalizeSourceColumn:FunctionVariantCast.CastFromVariant' - Manual test: with column_nullable.cpp and column_nullable_test.cpp reverted from this PR, local 1FE+4BE first local-shuffle repro passed 16x100 concurrent executions - Manual test: with column_nullable.cpp and column_nullable_test.cpp reverted from this PR, local 1FE+4BE second local-shuffle repro matched local-off baseline for 100 iterations - Behavior changed: No - Does this need documentation: No --- be/src/core/data_type/data_type_variant.cpp | 42 ++++++------ be/src/exprs/function/cast/cast_to_variant.h | 68 +++++++++++-------- be/test/core/column/column_variant_test.cpp | 57 ++++++++++++++++ .../cast/function_variant_cast_test.cpp | 57 ++++++++++++++++ 4 files changed, 177 insertions(+), 47 deletions(-) diff --git a/be/src/core/data_type/data_type_variant.cpp b/be/src/core/data_type/data_type_variant.cpp index 133226def49d7c..fff6ff53408cc2 100644 --- a/be/src/core/data_type/data_type_variant.cpp +++ b/be/src/core/data_type/data_type_variant.cpp @@ -61,14 +61,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const { int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - const auto& column_variant = assert_cast(column); - if (!column_variant.is_finalized()) { - // Icolumn originates from MutablePtr or block, and therefore can be modified. - // todo: We should reconsider the logic here, why are we using finalize() in this context? - const_cast(column_variant).finalize(); + const auto* column_variant = assert_cast(&column); + MutableColumnPtr finalized_column; + if (!column_variant->is_finalized()) { + // Local exchange can share the same block across downstream tasks. Serialize a private + // finalized copy so serialization never mutates shared variant columns. + finalized_column = column_variant->clone_finalized(); + column_variant = assert_cast(finalized_column.get()); } - const auto& subcolumns = column_variant.get_subcolumns(); + const auto& subcolumns = column_variant->get_subcolumns(); size_t size = 0; size += sizeof(uint32_t); @@ -95,26 +97,28 @@ int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column // sparse column // TODO make compability with sparse column size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes( - *column_variant.get_sparse_column(), be_exec_version); + *column_variant->get_sparse_column(), be_exec_version); size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes( - *column_variant.get_doc_value_column(), be_exec_version); + *column_variant->get_doc_value_column(), be_exec_version); return size; } char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_version) const { - const auto& column_variant = assert_cast(column); - if (!column_variant.is_finalized()) { - // Icolumn originates from block, and therefore can be modified. - // todo: We should reconsider the logic here, why are we using finalize() in this context? - const_cast(column_variant).finalize(); + const auto* column_variant = assert_cast(&column); + MutableColumnPtr finalized_column; + if (!column_variant->is_finalized()) { + // Local exchange can share the same block across downstream tasks. Serialize a private + // finalized copy so serialization never mutates shared variant columns. + finalized_column = column_variant->clone_finalized(); + column_variant = assert_cast(finalized_column.get()); } #ifndef NDEBUG // DCHECK size - column_variant.check_consistency(); + column_variant->check_consistency(); #endif - const auto& subcolumns = column_variant.get_subcolumns(); + const auto& subcolumns = column_variant->get_subcolumns(); char* size_pos = buf; buf += sizeof(uint32_t); @@ -147,15 +151,15 @@ char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_v // Safe case unaligned_store(size_pos, static_cast(num_of_columns)); // serialize num of rows, only take effect when subcolumns empty - unaligned_store(buf, static_cast(column_variant.rows())); + unaligned_store(buf, static_cast(column_variant->rows())); buf += sizeof(uint32_t); // serialize sparse column // TODO make compability with sparse column - buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(), - buf, be_exec_version); - buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(), + buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(), buf, be_exec_version); + buf = ColumnVariant::get_binary_column_type()->serialize( + *column_variant->get_doc_value_column(), buf, be_exec_version); return buf; } diff --git a/be/src/exprs/function/cast/cast_to_variant.h b/be/src/exprs/function/cast/cast_to_variant.h index 6c6ed1743fcdf0..0efc29047b15f9 100644 --- a/be/src/exprs/function/cast/cast_to_variant.h +++ b/be/src/exprs/function/cast/cast_to_variant.h @@ -32,45 +32,52 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block, auto& col_with_type_and_name = block.get_by_position(arguments[0]); auto& col_from = col_with_type_and_name.column; const IColumn* variant_column = col_from.get(); - if (const auto* nullable = check_and_get_column(*variant_column)) { + const auto* nullable = check_and_get_column(*variant_column); + if (nullable != nullptr) { variant_column = &nullable->get_nested_column(); } + const auto* variant = assert_cast(variant_column); + ColumnPtr col_to = data_type_to->create_column(); - if (!assert_cast(*variant_column).is_finalized()) { - // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure - auto mutable_column = IColumn::mutate(std::move(col_with_type_and_name.column)); - if (auto* nullable = check_and_get_column(*mutable_column)) { - const auto& const_nullable = *nullable; - auto nested_column = IColumn::mutate(const_nullable.get_nested_column_ptr()); - assert_cast(*nested_column).finalize(); - ColumnPtr nested_column_ptr = std::move(nested_column); - nullable->change_nested_column(nested_column_ptr); + ColumnPtr finalized_input_column; + if (!variant->is_finalized()) { + // Local exchange can share the same input block across multiple downstream tasks. + // Finalize a private copy so variant casts never mutate shared input columns. + auto finalized_variant = variant->clone_finalized(); + variant = assert_cast(finalized_variant.get()); + if (nullable != nullptr) { + auto cloned_null_map = + nullable->get_null_map_column_ptr()->clone_resized(input_rows_count); + finalized_input_column = ColumnNullable::create(std::move(finalized_variant), + std::move(cloned_null_map)); } else { - assert_cast(*mutable_column).finalize(); + finalized_input_column = std::move(finalized_variant); } - col_with_type_and_name.column = std::move(mutable_column); - } - - variant_column = col_with_type_and_name.column.get(); - if (const auto* nullable = check_and_get_column(*variant_column)) { - variant_column = &nullable->get_nested_column(); } - const auto& variant = assert_cast(*variant_column); - ColumnPtr col_to = data_type_to->create_column(); + auto execute_on_finalized_input = [&](auto&& executor) -> Status { + if (!finalized_input_column) { + return executor(block); + } + Block finalized_block = block; + finalized_block.replace_by_position(arguments[0], finalized_input_column); + RETURN_IF_ERROR(executor(finalized_block)); + block.replace_by_position(result, finalized_block.get_by_position(result).column); + return Status::OK(); + }; // It's important to convert as many elements as possible in this context. For instance, // if the root of this variant column is a number column, converting it to a number column // is acceptable. However, if the destination type is a string and root is none scalar root, then // we should convert the entire tree to a string. - bool is_root_valuable = variant.is_scalar_variant() || - (!variant.is_null_root() && - variant.get_root_type()->get_primitive_type() != INVALID_TYPE && + bool is_root_valuable = variant->is_scalar_variant() || + (!variant->is_null_root() && + variant->get_root_type()->get_primitive_type() != INVALID_TYPE && !is_string_type(data_type_to->get_primitive_type()) && data_type_to->get_primitive_type() != TYPE_JSONB); if (is_root_valuable) { - ColumnPtr nested = variant.get_root(); - auto nested_from_type = variant.get_root_type(); + ColumnPtr nested = variant->get_root(); + auto nested_from_type = variant->get_root_type(); // DCHECK(nested_from_type->is_nullable()); DCHECK(!data_type_to->is_nullable()); auto new_context = context == nullptr ? nullptr : context->clone(); @@ -105,16 +112,21 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block, {0, 1}, input_rows_count); } } else { - if (variant.only_have_default_values()) { + if (variant->only_have_default_values()) { col_to->assert_mutable()->insert_many_defaults(input_rows_count); col_to = make_nullable(col_to, true); } else if (is_string_type(data_type_to->get_primitive_type())) { // serialize to string - return CastToStringFunction::execute_impl(context, block, arguments, result, - input_rows_count); + return execute_on_finalized_input([&](Block& finalized_block) { + return CastToStringFunction::execute_impl(context, finalized_block, arguments, + result, input_rows_count); + }); } else if (data_type_to->get_primitive_type() == TYPE_JSONB) { // serialize to json by parsing - return cast_from_generic_to_jsonb(context, block, arguments, result, input_rows_count); + return execute_on_finalized_input([&](Block& finalized_block) { + return cast_from_generic_to_jsonb(context, finalized_block, arguments, result, + input_rows_count); + }); } else if (!data_type_to->is_nullable() && !is_string_type(data_type_to->get_primitive_type())) { // other types diff --git a/be/test/core/column/column_variant_test.cpp b/be/test/core/column/column_variant_test.cpp index dff9e2c0ae5a77..4db42b8a3efce1 100644 --- a/be/test/core/column/column_variant_test.cpp +++ b/be/test/core/column/column_variant_test.cpp @@ -27,8 +27,11 @@ #include #include #include +#include +#include "agent/be_exec_version_manager.h" #include "common/cast_set.h" +#include "core/block/block.h" #include "core/column/column_variant.cpp" #include "core/column/common_column_test.h" #include "core/column/subcolumn_tree.h" @@ -40,9 +43,11 @@ #include "core/types.h" #include "core/value/jsonb_value.h" #include "exec/common/variant_util.h" +#include "gen_cpp/data.pb.h" #include "storage/olap_common.h" #include "testutil/test_util.h" #include "testutil/variant_util.h" +#include "util/block_compression.h" using namespace doris; namespace doris { @@ -2025,6 +2030,58 @@ TEST_F(ColumnVariantTest, clone_finalized) { test_func(std::move(cloned_object)); } +TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) { + auto source_column = VariantUtil::construct_advanced_varint_column(); + source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE); + + auto cloned = source_column->clone_finalized(); + auto* cloned_variant = assert_cast(cloned.get()); + EXPECT_TRUE(cloned_variant->is_finalized()); + + for (const auto& source_subcolumn : source_column->get_subcolumns()) { + const auto* cloned_subcolumn = + cloned_variant->get_subcolumns().find_exact(source_subcolumn->path); + ASSERT_NE(cloned_subcolumn, nullptr); + EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(), + cloned_subcolumn->data.get_finalized_column_ptr().get()) + << source_subcolumn->path.get_path(); + } + EXPECT_NE(source_column->get_sparse_column().get(), cloned_variant->get_sparse_column().get()); + EXPECT_NE(source_column->get_doc_value_column().get(), + cloned_variant->get_doc_value_column().get()); +} + +TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) { + auto source_column = VariantUtil::construct_advanced_varint_column(); + ASSERT_FALSE(source_column->is_finalized()); + + const int be_exec_version = BeExecVersionManager::get_newest_version(); + const auto size = + dt_variant->get_uncompressed_serialized_bytes(*source_column, be_exec_version); + EXPECT_FALSE(source_column->is_finalized()); + + auto buffer = std::make_unique(size); + dt_variant->serialize(*source_column, buffer.get(), be_exec_version); + EXPECT_FALSE(source_column->is_finalized()); +} + +TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) { + auto source_column = VariantUtil::construct_advanced_varint_column(); + ASSERT_FALSE(source_column->is_finalized()); + + Block block({{source_column->get_ptr(), dt_variant, "variant_col"}}); + PBlock pblock; + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + int64_t compress_time = 0; + auto status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, &compress_time, + segment_v2::NO_COMPRESSION); + ASSERT_TRUE(status.ok()) << status; + EXPECT_FALSE(source_column->is_finalized()); + EXPECT_GT(pblock.column_values().size(), 0); +} + TEST_F(ColumnVariantTest, sanitize) { auto test_func = [](const auto& source_column) { auto src_size = source_column->size(); diff --git a/be/test/exprs/function/cast/function_variant_cast_test.cpp b/be/test/exprs/function/cast/function_variant_cast_test.cpp index 960637bf1507d0..51034ad6e0319e 100644 --- a/be/test/exprs/function/cast/function_variant_cast_test.cpp +++ b/be/test/exprs/function/cast/function_variant_cast_test.cpp @@ -284,6 +284,63 @@ TEST(FunctionVariantCast, CastFromVariant) { } } +TEST(FunctionVariantCast, CastFromVariantDoesNotFinalizeSourceColumn) { + auto variant_type = std::make_shared(); + auto int32_type = std::make_shared(); + auto string_type = std::make_shared(); + auto variant_col = construct_basic_varint_column(); + + ASSERT_FALSE(variant_col->is_finalized()); + + { + ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), variant_type, "variant_col"}, + {nullptr, int32_type, "int32_type"}}; + + auto function = + SimpleFunctionFactory::instance().get_function("CAST", arguments, int32_type); + ASSERT_NE(function, nullptr); + + Block block {arguments}; + size_t result_column = block.columns(); + block.insert({nullptr, int32_type, "result"}); + + RuntimeState state; + auto ctx = FunctionContext::create_context(&state, {}, {}); + ASSERT_TRUE( + function->execute(ctx.get(), block, {0}, result_column, variant_col->size()).ok()); + + EXPECT_FALSE(variant_col->is_finalized()); + + auto result_col = block.get_by_position(result_column).column; + ASSERT_NE(result_col.get(), nullptr); + ASSERT_EQ(result_col->size(), variant_col->size()); + } + + { + ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), variant_type, "variant_col"}, + {nullptr, string_type, "string_type"}}; + + auto function = + SimpleFunctionFactory::instance().get_function("CAST", arguments, string_type); + ASSERT_NE(function, nullptr); + + Block block {arguments}; + size_t result_column = block.columns(); + block.insert({nullptr, string_type, "result"}); + + RuntimeState state; + auto ctx = FunctionContext::create_context(&state, {}, {}); + ASSERT_TRUE( + function->execute(ctx.get(), block, {0}, result_column, variant_col->size()).ok()); + + EXPECT_FALSE(variant_col->is_finalized()); + + auto result_col = block.get_by_position(result_column).column; + ASSERT_NE(result_col.get(), nullptr); + ASSERT_EQ(result_col->size(), variant_col->size()); + } +} + TEST(FunctionVariantCast, CastVariantWithNull) { auto variant_type = std::make_shared(); auto int32_type = std::make_shared();