Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
be1fd1d
prior to llm build fix
arthurpassos Mar 19, 2026
b82ce39
making progress
arthurpassos Mar 25, 2026
2687a20
fix version
arthurpassos Mar 25, 2026
4175e33
export part working now
arthurpassos Mar 26, 2026
d9a27c7
progress before refactor
arthurpassos Mar 31, 2026
fdf6bf7
checkpoint, commented out partitioning..
arthurpassos Mar 31, 2026
bca4aeb
compatibility check seems ok
arthurpassos Mar 31, 2026
843961c
simplify code
arthurpassos Mar 31, 2026
80753da
fix tst that was.. not working for some super odd reason
arthurpassos Mar 31, 2026
fc93f76
fix compatibility check for year vs years
arthurpassos Apr 1, 2026
abbc77d
fix
arthurpassos Apr 1, 2026
57f3a6a
progress
arthurpassos Apr 1, 2026
1ee1b34
do not even recompute partition value, just use it from the source
arthurpassos Apr 1, 2026
52a812a
somehow fix the concurrency problem
arthurpassos Apr 1, 2026
da5b6be
make it actually transactional
arthurpassos Apr 2, 2026
3664749
Merge branch 'antalya-26.1' into export_partition_iceberg
arthurpassos Apr 2, 2026
a878a1e
add test for crash during 2phase commit
arthurpassos Apr 2, 2026
b5bd0eb
not quite good
arthurpassos Apr 3, 2026
5b0e833
put writefullpath in zk and add some comments
arthurpassos Apr 6, 2026
998a992
try to fix fast_test
arthurpassos Apr 6, 2026
1aa1b31
again
arthurpassos Apr 6, 2026
f21d66a
again
arthurpassos Apr 6, 2026
26827e2
again
arthurpassos Apr 6, 2026
6c61948
partially fix path bug
arthurpassos Apr 6, 2026
99ce30f
some more improvements
arthurpassos Apr 6, 2026
4259ec3
vibe coded ffix for catalog concurrent writes
arthurpassos Apr 7, 2026
c9dd096
simplify code, calculate partition values on the fly to avoid complex…
arthurpassos Apr 7, 2026
b180a3b
vibe coded tests
arthurpassos Apr 8, 2026
1f73187
some vibe coded tests
arthurpassos Apr 8, 2026
2f5643d
interesting stuff
arthurpassos Apr 8, 2026
6a19f92
is this the culprit
arthurpassos Apr 8, 2026
569803b
remove unused method
arthurpassos Apr 9, 2026
20aa695
one more temp fix
arthurpassos Apr 10, 2026
c80182c
fix possible deadlock
arthurpassos Apr 10, 2026
763dec1
Merge branch 'antalya-26.1' into export_partition_iceberg
arthurpassos Apr 10, 2026
18eb4d0
interesting fix
arthurpassos Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ static struct InitFiu
ONCE(disk_object_storage_fail_precommit_metadata_transaction) \
REGULAR(slowdown_parallel_replicas_local_plan_read) \
ONCE(iceberg_writes_cleanup) \
ONCE(iceberg_export_after_commit_before_zk_completed) \
ONCE(export_partition_status_change_throw) \
ONCE(backup_add_empty_memory_table) \
PAUSEABLE_ONCE(backup_pause_on_start) \
PAUSEABLE_ONCE(restore_pause_on_start) \
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7396,7 +7396,7 @@ Throw an error if there are pending patch parts when exporting a merge tree part
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, true, R"(
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled.
)", 0) \
Expand Down
19 changes: 18 additions & 1 deletion src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <base/types.h>
#include <Common/Exception.h>
#include <Interpreters/StorageID.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Array.h>
Expand Down Expand Up @@ -118,6 +119,8 @@ struct ExportReplicatedMergeTreePartitionManifest
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool lock_inside_the_task; /// todo temporary
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;

std::string toJsonString() const
{
Expand All @@ -129,7 +132,12 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("destination_table", destination_table);
json.set("source_replica", source_replica);
json.set("number_of_parts", number_of_parts);


if (!iceberg_metadata_json.empty())
{
json.set("iceberg_metadata_json", iceberg_metadata_json);
}

Poco::JSON::Array::Ptr parts_array = new Poco::JSON::Array();
for (const auto & part : parts)
parts_array->add(part);
Expand All @@ -145,6 +153,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("lock_inside_the_task", lock_inside_the_task);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand All @@ -166,6 +175,12 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.source_replica = json->getValue<String>("source_replica");
manifest.number_of_parts = json->getValue<size_t>("number_of_parts");
manifest.max_retries = json->getValue<size_t>("max_retries");

if (json->has("iceberg_metadata_json"))
{
manifest.iceberg_metadata_json = json->getValue<String>("iceberg_metadata_json");
}

auto parts_array = json->getArray("parts");
for (size_t i = 0; i < parts_array->size(); ++i)
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));
Expand All @@ -192,6 +207,8 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

return manifest;
}
};
Expand Down
16 changes: 15 additions & 1 deletion src/Storages/IStorage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <Core/Field.h>
#include <Core/Names.h>
#include <Core/QueryProcessingStage.h>
#include <Databases/IDatabase.h>
Expand All @@ -20,6 +21,7 @@
#include <Common/RWLock.h>
#include <Common/TypePromotion.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <Poco/JSON/Object.h>

#include <expected>
#include <optional>
Expand Down Expand Up @@ -476,7 +478,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
ContextPtr /*context*/,
bool /*async_insert*/);

virtual bool supportsImport() const
virtual bool supportsImport(ContextPtr) const
{
return false;
}
Expand All @@ -493,16 +495,28 @@ It is currently only implemented in StorageObjectStorage.
bool /* overwrite_if_exists */,
std::size_t /* max_bytes_per_file */,
std::size_t /* max_rows_per_file */,
const std::optional<std::string> & /* iceberg_metadata_json_string */,
const std::optional<FormatSettings> & /* format_settings */,
ContextPtr /* context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
}

struct IcebergCommitExportPartitionArguments
{
std::string metadata_json_string;
/// Partition column values (after transforms). Callers are responsible for
/// populating this: the partition-export path parses them from the persisted
/// JSON string, while the direct EXPORT PART path reads them from the part's
/// partition key.
std::vector<Field> partition_values;
};

virtual void commitExportPartitionTransaction(
const String & /* transaction_id */,
const String & /* partition_id */,
const Strings & /* exported_paths */,
const IcebergCommitExportPartitionArguments & /* iceberg_commit_export_partition_arguments */,
ContextPtr /* local_context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "commitExportPartitionTransaction is not implemented for storage type {}", getName());
Expand Down
19 changes: 19 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Common/ProfileEventsScope.h>
#include <Databases/DatabaseReplicated.h>
#include <Storages/MergeTree/ExportList.h>
#include <Storages/IStorage.h>
#include <Formats/FormatFactory.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <Processors/Sinks/SinkToStorage.h>
Expand Down Expand Up @@ -218,6 +219,7 @@ bool ExportPartTask::executeStep()
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
manifest.settings[Setting::export_merge_tree_part_max_bytes_per_file],
manifest.settings[Setting::export_merge_tree_part_max_rows_per_file],
manifest.iceberg_metadata_json,
getFormatSettings(local_context),
local_context);

Expand Down Expand Up @@ -295,6 +297,23 @@ bool ExportPartTask::executeStep()
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Export part was cancelled");
}

/// For the direct EXPORT PART → Iceberg path there is no deferred-commit callback
/// (the partition-export path provides one that writes to ZooKeeper).
/// Commit the Iceberg metadata inline here so the rows become visible immediately.
if (destination_storage->isDataLake() && !manifest.completion_callback)
{
IStorage::IcebergCommitExportPartitionArguments iceberg_args;
iceberg_args.metadata_json_string = manifest.iceberg_metadata_json;
iceberg_args.partition_values = manifest.data_part->partition.value;

destination_storage->commitExportPartitionTransaction(
manifest.transaction_id,
manifest.data_part->info.getPartitionId(),
(*exports_list_entry)->destination_file_paths,
iceberg_args,
local_context);
}

std::lock_guard inner_lock(storage.export_manifests_mutex);
storage.writePartLog(
PartLogElement::Type::EXPORT_PART,
Expand Down
158 changes: 116 additions & 42 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ProfileEvents.h>
#include <Common/FailPoint.h>
#include <Interpreters/DatabaseCatalog.h>

namespace ProfileEvents
Expand All @@ -21,6 +22,17 @@ namespace ProfileEvents

namespace DB
{

namespace ErrorCodes
{
extern const int FAULT_INJECTED;
}

namespace FailPoints
{
extern const char export_partition_status_change_throw[];
}

namespace
{
/*
Expand All @@ -34,7 +46,8 @@ namespace
const zkutil::ZooKeeperPtr & zk,
const std::string & entry_path,
const LoggerPtr & log,
const ContextPtr & context,
const ContextPtr & storage_context,
StorageReplicatedMergeTree & storage,
const std::string & key,
const ExportReplicatedMergeTreePartitionManifest & metadata,
const time_t now,
Expand All @@ -58,6 +71,8 @@ namespace
}
else if (is_pending)
{
auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage_context, metadata);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren);
std::vector<std::string> parts_in_processing_or_pending;
Expand All @@ -81,7 +96,18 @@ namespace
}

/// it sounds like a replica exported the last part, but was not able to commit the export. Try to fix it
ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context);
try
{
ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context, storage);
}
catch (const Exception & e)
{
LOG_WARNING(log,
"ExportPartition Manifest Updating Task: "
"Caught exception while committing export for {}: {}",
entry_path, e.message());
return false;
}

return true;
}
Expand Down Expand Up @@ -558,6 +584,7 @@ void ExportPartitionManifestUpdatingTask::poll()
entry_path,
storage.log.load(),
storage.getContext(),
storage,
key,
metadata,
now,
Expand Down Expand Up @@ -656,62 +683,109 @@ void ExportPartitionManifestUpdatingTask::addStatusChange(const std::string & ke

void ExportPartitionManifestUpdatingTask::handleStatusChanges()
{
std::lock_guard lock(status_changes_mutex);
std::lock_guard task_entries_lock(storage.export_merge_tree_partition_mutex);
auto zk = storage.getZooKeeper();

LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status changes. Number of status changes: {}", status_changes.size());
/// copy the events to a local queue to avoid holding the status_changes_mutex while also holding export_merge_tree_partition_mutex
std::queue<std::string> local_status_changes;
{
std::lock_guard lock(status_changes_mutex);
std::swap(status_changes, local_status_changes);
}
Comment on lines +689 to +691
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve unprocessed status events on handler failure

Swapping all pending status changes into a local queue means any exception during processing drops the remaining events. exportMergeTreePartitionStatusHandlingTask() catches and returns (especially on ZooKeeper/session errors), but does not requeue these keys, so some exports can miss terminal status transitions until another watch event happens (or forever if none arrives).

Useful? React with 👍 / 👎.


while (!status_changes.empty())
try
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status change for task {}", status_changes.front());
const auto key = status_changes.front();
status_changes.pop();
std::lock_guard task_entries_lock(storage.export_merge_tree_partition_mutex);
auto zk = storage.getZooKeeper();

auto it = storage.export_merge_tree_partition_task_entries_by_key.find(key);
if (it == storage.export_merge_tree_partition_task_entries_by_key.end())
continue;
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status changes. Number of status changes: {}", local_status_changes.size());

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet);
/// get new status from zk
std::string new_status_string;
if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string))
while (!local_status_changes.empty())
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to get new status for task {}, skipping", key);
continue;
}
const auto & key = local_status_changes.front();
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status change for task {}", key);

const auto new_status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(new_status_string);
if (!new_status)
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", new_status_string, key);
continue;
}
fiu_do_on(FailPoints::export_partition_status_change_throw,
{
throw Exception(ErrorCodes::FAULT_INJECTED,
"Failpoint: simulating exception during status change handling for key {}", key);
});

auto it = storage.export_merge_tree_partition_task_entries_by_key.find(key);
if (it == storage.export_merge_tree_partition_task_entries_by_key.end())
{
local_status_changes.pop();
continue;
}

LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data());
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet);
/// get new status from zk
std::string new_status_string;
if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string))
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to get new status for task {}, skipping", key);
local_status_changes.pop();
continue;
}

/// If status changed to KILLED, cancel local export operations
if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED)
{
try
const auto new_status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(new_status_string);
if (!new_status)
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key);
storage.killExportPart(it->manifest.transaction_id);
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", new_status_string, key);
local_status_changes.pop();
continue;
}
catch (...)

LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data());

/// If status changed to KILLED, cancel local export operations
if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED)
{
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);
try
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key);
storage.killExportPart(it->manifest.transaction_id);
}
catch (...)
{
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);
}
}
}

it->status = *new_status;
it->status = *new_status;

if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
/// we no longer need to keep the data parts alive
it->part_references.clear();
}

local_status_changes.pop();
}
}
catch (...)
{
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);

if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: exception thrown while handling status changes, enqueuing remaining status changes back to the status_changes queue. Number of remaining status changes: {}", local_status_changes.size());
/// It is possible that an exception is thrown while handling the status. In this scenario
/// we need to enqueue the remaining status changes back to the status_changes queue not to lose them.
/// The other solution to this problem would be to ignore it and schedule a poll - maybe it is simpler?
if (!local_status_changes.empty())
{
/// we no longer need to keep the data parts alive
it->part_references.clear();
std::lock_guard lock(status_changes_mutex);

// Prepend remaining items before any newly-arrived items
while (!status_changes.empty())
{
local_status_changes.push(std::move(status_changes.front()));
status_changes.pop();
}

std::swap(status_changes, local_status_changes);
}

LOG_INFO(storage.log, "ExportPartition Manifest Updating task: The new number of pending status after enqueueing unprocessed ones is {}", status_changes.size());

throw;
}
}

Expand Down
Loading
Loading