diff --git a/be/src/core/data_type/data_type_variant.cpp b/be/src/core/data_type/data_type_variant.cpp index 133226def49d7c..fff6ff53408cc2 100644 --- a/be/src/core/data_type/data_type_variant.cpp +++ b/be/src/core/data_type/data_type_variant.cpp @@ -61,14 +61,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const { int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - const auto& column_variant = assert_cast(column); - if (!column_variant.is_finalized()) { - // Icolumn originates from MutablePtr or block, and therefore can be modified. - // todo: We should reconsider the logic here, why are we using finalize() in this context? - const_cast(column_variant).finalize(); + const auto* column_variant = assert_cast(&column); + MutableColumnPtr finalized_column; + if (!column_variant->is_finalized()) { + // Local exchange can share the same block across downstream tasks. Serialize a private + // finalized copy so serialization never mutates shared variant columns. + finalized_column = column_variant->clone_finalized(); + column_variant = assert_cast(finalized_column.get()); } - const auto& subcolumns = column_variant.get_subcolumns(); + const auto& subcolumns = column_variant->get_subcolumns(); size_t size = 0; size += sizeof(uint32_t); @@ -95,26 +97,28 @@ int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column // sparse column // TODO make compability with sparse column size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes( - *column_variant.get_sparse_column(), be_exec_version); + *column_variant->get_sparse_column(), be_exec_version); size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes( - *column_variant.get_doc_value_column(), be_exec_version); + *column_variant->get_doc_value_column(), be_exec_version); return size; } char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_version) const { - const auto& column_variant = assert_cast(column); - if (!column_variant.is_finalized()) { - // Icolumn originates from block, and therefore can be modified. - // todo: We should reconsider the logic here, why are we using finalize() in this context? - const_cast(column_variant).finalize(); + const auto* column_variant = assert_cast(&column); + MutableColumnPtr finalized_column; + if (!column_variant->is_finalized()) { + // Local exchange can share the same block across downstream tasks. Serialize a private + // finalized copy so serialization never mutates shared variant columns. + finalized_column = column_variant->clone_finalized(); + column_variant = assert_cast(finalized_column.get()); } #ifndef NDEBUG // DCHECK size - column_variant.check_consistency(); + column_variant->check_consistency(); #endif - const auto& subcolumns = column_variant.get_subcolumns(); + const auto& subcolumns = column_variant->get_subcolumns(); char* size_pos = buf; buf += sizeof(uint32_t); @@ -147,15 +151,15 @@ char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_v // Safe case unaligned_store(size_pos, static_cast(num_of_columns)); // serialize num of rows, only take effect when subcolumns empty - unaligned_store(buf, static_cast(column_variant.rows())); + unaligned_store(buf, static_cast(column_variant->rows())); buf += sizeof(uint32_t); // serialize sparse column // TODO make compability with sparse column - buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(), - buf, be_exec_version); - buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(), + buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(), buf, be_exec_version); + buf = ColumnVariant::get_binary_column_type()->serialize( + *column_variant->get_doc_value_column(), buf, be_exec_version); return buf; } diff --git a/be/src/exprs/function/cast/cast_to_variant.h b/be/src/exprs/function/cast/cast_to_variant.h index 6c6ed1743fcdf0..0efc29047b15f9 100644 --- a/be/src/exprs/function/cast/cast_to_variant.h +++ b/be/src/exprs/function/cast/cast_to_variant.h @@ -32,45 +32,52 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block, auto& col_with_type_and_name = block.get_by_position(arguments[0]); auto& col_from = col_with_type_and_name.column; const IColumn* variant_column = col_from.get(); - if (const auto* nullable = check_and_get_column(*variant_column)) { + const auto* nullable = check_and_get_column(*variant_column); + if (nullable != nullptr) { variant_column = &nullable->get_nested_column(); } + const auto* variant = assert_cast(variant_column); + ColumnPtr col_to = data_type_to->create_column(); - if (!assert_cast(*variant_column).is_finalized()) { - // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure - auto mutable_column = IColumn::mutate(std::move(col_with_type_and_name.column)); - if (auto* nullable = check_and_get_column(*mutable_column)) { - const auto& const_nullable = *nullable; - auto nested_column = IColumn::mutate(const_nullable.get_nested_column_ptr()); - assert_cast(*nested_column).finalize(); - ColumnPtr nested_column_ptr = std::move(nested_column); - nullable->change_nested_column(nested_column_ptr); + ColumnPtr finalized_input_column; + if (!variant->is_finalized()) { + // Local exchange can share the same input block across multiple downstream tasks. + // Finalize a private copy so variant casts never mutate shared input columns. + auto finalized_variant = variant->clone_finalized(); + variant = assert_cast(finalized_variant.get()); + if (nullable != nullptr) { + auto cloned_null_map = + nullable->get_null_map_column_ptr()->clone_resized(input_rows_count); + finalized_input_column = ColumnNullable::create(std::move(finalized_variant), + std::move(cloned_null_map)); } else { - assert_cast(*mutable_column).finalize(); + finalized_input_column = std::move(finalized_variant); } - col_with_type_and_name.column = std::move(mutable_column); - } - - variant_column = col_with_type_and_name.column.get(); - if (const auto* nullable = check_and_get_column(*variant_column)) { - variant_column = &nullable->get_nested_column(); } - const auto& variant = assert_cast(*variant_column); - ColumnPtr col_to = data_type_to->create_column(); + auto execute_on_finalized_input = [&](auto&& executor) -> Status { + if (!finalized_input_column) { + return executor(block); + } + Block finalized_block = block; + finalized_block.replace_by_position(arguments[0], finalized_input_column); + RETURN_IF_ERROR(executor(finalized_block)); + block.replace_by_position(result, finalized_block.get_by_position(result).column); + return Status::OK(); + }; // It's important to convert as many elements as possible in this context. For instance, // if the root of this variant column is a number column, converting it to a number column // is acceptable. However, if the destination type is a string and root is none scalar root, then // we should convert the entire tree to a string. - bool is_root_valuable = variant.is_scalar_variant() || - (!variant.is_null_root() && - variant.get_root_type()->get_primitive_type() != INVALID_TYPE && + bool is_root_valuable = variant->is_scalar_variant() || + (!variant->is_null_root() && + variant->get_root_type()->get_primitive_type() != INVALID_TYPE && !is_string_type(data_type_to->get_primitive_type()) && data_type_to->get_primitive_type() != TYPE_JSONB); if (is_root_valuable) { - ColumnPtr nested = variant.get_root(); - auto nested_from_type = variant.get_root_type(); + ColumnPtr nested = variant->get_root(); + auto nested_from_type = variant->get_root_type(); // DCHECK(nested_from_type->is_nullable()); DCHECK(!data_type_to->is_nullable()); auto new_context = context == nullptr ? nullptr : context->clone(); @@ -105,16 +112,21 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block, {0, 1}, input_rows_count); } } else { - if (variant.only_have_default_values()) { + if (variant->only_have_default_values()) { col_to->assert_mutable()->insert_many_defaults(input_rows_count); col_to = make_nullable(col_to, true); } else if (is_string_type(data_type_to->get_primitive_type())) { // serialize to string - return CastToStringFunction::execute_impl(context, block, arguments, result, - input_rows_count); + return execute_on_finalized_input([&](Block& finalized_block) { + return CastToStringFunction::execute_impl(context, finalized_block, arguments, + result, input_rows_count); + }); } else if (data_type_to->get_primitive_type() == TYPE_JSONB) { // serialize to json by parsing - return cast_from_generic_to_jsonb(context, block, arguments, result, input_rows_count); + return execute_on_finalized_input([&](Block& finalized_block) { + return cast_from_generic_to_jsonb(context, finalized_block, arguments, result, + input_rows_count); + }); } else if (!data_type_to->is_nullable() && !is_string_type(data_type_to->get_primitive_type())) { // other types diff --git a/be/test/core/column/column_variant_test.cpp b/be/test/core/column/column_variant_test.cpp index dff9e2c0ae5a77..4db42b8a3efce1 100644 --- a/be/test/core/column/column_variant_test.cpp +++ b/be/test/core/column/column_variant_test.cpp @@ -27,8 +27,11 @@ #include #include #include +#include +#include "agent/be_exec_version_manager.h" #include "common/cast_set.h" +#include "core/block/block.h" #include "core/column/column_variant.cpp" #include "core/column/common_column_test.h" #include "core/column/subcolumn_tree.h" @@ -40,9 +43,11 @@ #include "core/types.h" #include "core/value/jsonb_value.h" #include "exec/common/variant_util.h" +#include "gen_cpp/data.pb.h" #include "storage/olap_common.h" #include "testutil/test_util.h" #include "testutil/variant_util.h" +#include "util/block_compression.h" using namespace doris; namespace doris { @@ -2025,6 +2030,58 @@ TEST_F(ColumnVariantTest, clone_finalized) { test_func(std::move(cloned_object)); } +TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) { + auto source_column = VariantUtil::construct_advanced_varint_column(); + source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE); + + auto cloned = source_column->clone_finalized(); + auto* cloned_variant = assert_cast(cloned.get()); + EXPECT_TRUE(cloned_variant->is_finalized()); + + for (const auto& source_subcolumn : source_column->get_subcolumns()) { + const auto* cloned_subcolumn = + cloned_variant->get_subcolumns().find_exact(source_subcolumn->path); + ASSERT_NE(cloned_subcolumn, nullptr); + EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(), + cloned_subcolumn->data.get_finalized_column_ptr().get()) + << source_subcolumn->path.get_path(); + } + EXPECT_NE(source_column->get_sparse_column().get(), cloned_variant->get_sparse_column().get()); + EXPECT_NE(source_column->get_doc_value_column().get(), + cloned_variant->get_doc_value_column().get()); +} + +TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) { + auto source_column = VariantUtil::construct_advanced_varint_column(); + ASSERT_FALSE(source_column->is_finalized()); + + const int be_exec_version = BeExecVersionManager::get_newest_version(); + const auto size = + dt_variant->get_uncompressed_serialized_bytes(*source_column, be_exec_version); + EXPECT_FALSE(source_column->is_finalized()); + + auto buffer = std::make_unique(size); + dt_variant->serialize(*source_column, buffer.get(), be_exec_version); + EXPECT_FALSE(source_column->is_finalized()); +} + +TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) { + auto source_column = VariantUtil::construct_advanced_varint_column(); + ASSERT_FALSE(source_column->is_finalized()); + + Block block({{source_column->get_ptr(), dt_variant, "variant_col"}}); + PBlock pblock; + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + int64_t compress_time = 0; + auto status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, &compress_time, + segment_v2::NO_COMPRESSION); + ASSERT_TRUE(status.ok()) << status; + EXPECT_FALSE(source_column->is_finalized()); + EXPECT_GT(pblock.column_values().size(), 0); +} + TEST_F(ColumnVariantTest, sanitize) { auto test_func = [](const auto& source_column) { auto src_size = source_column->size(); diff --git a/be/test/exprs/function/cast/function_variant_cast_test.cpp b/be/test/exprs/function/cast/function_variant_cast_test.cpp index 960637bf1507d0..51034ad6e0319e 100644 --- a/be/test/exprs/function/cast/function_variant_cast_test.cpp +++ b/be/test/exprs/function/cast/function_variant_cast_test.cpp @@ -284,6 +284,63 @@ TEST(FunctionVariantCast, CastFromVariant) { } } +TEST(FunctionVariantCast, CastFromVariantDoesNotFinalizeSourceColumn) { + auto variant_type = std::make_shared(); + auto int32_type = std::make_shared(); + auto string_type = std::make_shared(); + auto variant_col = construct_basic_varint_column(); + + ASSERT_FALSE(variant_col->is_finalized()); + + { + ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), variant_type, "variant_col"}, + {nullptr, int32_type, "int32_type"}}; + + auto function = + SimpleFunctionFactory::instance().get_function("CAST", arguments, int32_type); + ASSERT_NE(function, nullptr); + + Block block {arguments}; + size_t result_column = block.columns(); + block.insert({nullptr, int32_type, "result"}); + + RuntimeState state; + auto ctx = FunctionContext::create_context(&state, {}, {}); + ASSERT_TRUE( + function->execute(ctx.get(), block, {0}, result_column, variant_col->size()).ok()); + + EXPECT_FALSE(variant_col->is_finalized()); + + auto result_col = block.get_by_position(result_column).column; + ASSERT_NE(result_col.get(), nullptr); + ASSERT_EQ(result_col->size(), variant_col->size()); + } + + { + ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), variant_type, "variant_col"}, + {nullptr, string_type, "string_type"}}; + + auto function = + SimpleFunctionFactory::instance().get_function("CAST", arguments, string_type); + ASSERT_NE(function, nullptr); + + Block block {arguments}; + size_t result_column = block.columns(); + block.insert({nullptr, string_type, "result"}); + + RuntimeState state; + auto ctx = FunctionContext::create_context(&state, {}, {}); + ASSERT_TRUE( + function->execute(ctx.get(), block, {0}, result_column, variant_col->size()).ok()); + + EXPECT_FALSE(variant_col->is_finalized()); + + auto result_col = block.get_by_position(result_column).column; + ASSERT_NE(result_col.get(), nullptr); + ASSERT_EQ(result_col->size(), variant_col->size()); + } +} + TEST(FunctionVariantCast, CastVariantWithNull) { auto variant_type = std::make_shared(); auto int32_type = std::make_shared();