Skip to content
Open
23 changes: 22 additions & 1 deletion docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ y: 993

### Schema evolution {#iceberg-writes-schema-evolution}

ClickHouse allows you to add, drop, or modify columns with simple types (non-tuple, non-array, non-map).
ClickHouse allows you to add, drop, modify, or rename columns with simple types (non-tuple, non-array, non-map).

### Example {#example-iceberg-writes-evolution}

Expand Down Expand Up @@ -479,6 +479,27 @@ Row 1:
──────
x: Ivanov
y: 993

ALTER TABLE iceberg_writes_example RENAME COLUMN y TO value;
SHOW CREATE TABLE iceberg_writes_example;

┌─statement─────────────────────────────────────────────────┐
1. │ CREATE TABLE default.iceberg_writes_example ↴│
│↳( ↴│
│↳ `x` Nullable(String), ↴│
│↳ `value` Nullable(Int64) ↴│
│↳) ↴│
│↳ENGINE = IcebergLocal('/home/scanhex12/iceberg_example/') │
└───────────────────────────────────────────────────────────┘

SELECT *
FROM iceberg_writes_example
FORMAT VERTICAL;

Row 1:
──────
x: Ivanov
value: 993
```

### Compaction {#iceberg-writes-compaction}
Expand Down
238 changes: 188 additions & 50 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <IO/Operators.h>
#include <Interpreters/Context.h>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
Expand All @@ -42,6 +43,8 @@
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>

#include <sstream>


namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -116,6 +119,185 @@ String encodeNamespaceForURI(const String & namespace_name)

}

namespace
{
Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
obj->stringify(oss);

Poco::JSON::Parser parser;
return parser.parse(oss.str()).extract<Poco::JSON::Object::Ptr>();
}
}

UpdateMetadataRequestBodyResult buildUpdateMetadataRequestBody(
const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot)
{
UpdateMetadataRequestBodyResult result;

if (!new_snapshot) // If new_snapshot is nullptr, return Skip
{
result.status = UpdateMetadataRequestBodyResult::Status::Skip;
return result;
}

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

// If the metadata has a schemas field, we need to update the schema
if (new_snapshot->has(DB::Iceberg::f_schemas))
{
if (!new_snapshot->has(DB::Iceberg::f_current_schema_id))
{
result.status = UpdateMetadataRequestBodyResult::Status::Error;
return result;
}

// Extract the new schema id and the old schema id
const Int32 new_schema_id = new_snapshot->getValue<Int32>(DB::Iceberg::f_current_schema_id);
const Int32 old_schema_id = new_schema_id - 1;

// schemas is a JSON array of schema objects, we need to find the schema object with the new schema id
// "schemas" : [
// {
// "fields" : [
// {
// "name" : "id",
// "type" : "int",
// "id" : 1
// }
// ],
// "schema-id" : 0,
// "type" : "struct"
// },
// ...
// "fields" : [
// {
// "name" : "id2", // id renamed from id to id2
// "type" : "int",
// "id" : 1
// }
// ],
// "schema-id" : 1 // new_schema_id,
// "type" : "struct"
// },

// Find the schema object with the new schema id
Poco::JSON::Object::Ptr new_schema_obj;
auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
auto s = schemas->getObject(i);
if (s->getValue<Int32>(DB::Iceberg::f_schema_id) == new_schema_id)
{
new_schema_obj = s;
break;
}
}
// if we don't find the schema object with the new schema id, return an error
if (!new_schema_obj)
{
result.status = UpdateMetadataRequestBodyResult::Status::Error;
return result;
}

Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj);
if (!schema_for_rest->has("identifier-field-ids"))
schema_for_rest->set("identifier-field-ids", new Poco::JSON::Array);

if (old_schema_id >= 0)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-current-schema-id");
requirement->set("current-schema-id", old_schema_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object;
add_schema->set("action", "add-schema");
add_schema->set("schema", schema_for_rest);
if (new_snapshot->has(DB::Iceberg::f_last_column_id))
add_schema->set("last-column-id", new_snapshot->getValue<Int32>(DB::Iceberg::f_last_column_id));
updates->add(add_schema);
}
{
Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object;
set_current_schema->set("action", "set-current-schema");
set_current_schema->set("schema-id", new_schema_id);
updates->add(set_current_schema);
}
request_body->set("updates", updates);
}
else
{
// If the metadata has a parent-snapshot-id field, we need to update the parent-snapshot-id
// "parent-snapshot-id" : 1,
// "snapshot-id" : 2,
// "timestamp-ms" : 1717334400000,
// "schema-id" : 1,
// "operation" : "replace",
// "summary" : "replace snapshot 1 with snapshot 2"
// }
if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

// If the metadata has a snapshot-id field, we need to update the snapshot-id
{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
}

result.status = UpdateMetadataRequestBodyResult::Status::Ok;
result.request_body = request_body;
return result;
}

std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
Expand Down Expand Up @@ -1294,59 +1476,15 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
const auto built = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot);
if (built.status == UpdateMetadataRequestBodyResult::Status::Skip)
return true;
if (built.status == UpdateMetadataRequestBodyResult::Status::Error)
return false;

try
{
sendRequest(endpoint, request_body);
sendRequest(endpoint, built.request_body);
}
catch (const DB::HTTPException &)
{
Expand Down
22 changes: 22 additions & 0 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ struct AccessToken
}
};

/// Result of building the Iceberg REST catalog request body for `RestCatalog::updateMetadata`.
struct UpdateMetadataRequestBodyResult
{
enum class Status
{
/// `new_snapshot` is null; caller should not send HTTP and return true.
Skip,
/// `request_body` is valid; caller should `sendRequest`.
Ok,
/// Validation failed; caller should return false.
Error,
};
Status status = Status::Error;
Poco::JSON::Object::Ptr request_body;
};

/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update).
UpdateMetadataRequestBodyResult buildUpdateMetadataRequestBody(
const String & namespace_name,
const String & table_name,
Poco::JSON::Object::Ptr new_snapshot);

class RestCatalog : public ICatalog, public DB::WithContext
{
public:
Expand Down
Loading
Loading