From f52a708dfd92d0353ce9b8a9a93b668f5716acaa Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 5 Apr 2026 20:06:47 +0200 Subject: [PATCH 1/5] Added support for s3 tables --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + src/Core/SettingsEnums.cpp | 3 +- src/Core/SettingsEnums.h | 1 + src/Databases/DataLake/AWSV4Signer.cpp | 110 ++++++++++ src/Databases/DataLake/AWSV4Signer.h | 34 +++ src/Databases/DataLake/DatabaseDataLake.cpp | 45 +++- src/Databases/DataLake/RestCatalog.h | 4 +- src/Databases/DataLake/S3TablesCatalog.cpp | 196 ++++++++++++++++++ src/Databases/DataLake/S3TablesCatalog.h | 60 ++++++ .../enableAllExperimentalSettings.cpp | 1 + 11 files changed, 451 insertions(+), 7 deletions(-) create mode 100644 src/Databases/DataLake/AWSV4Signer.cpp create mode 100644 src/Databases/DataLake/AWSV4Signer.h create mode 100644 src/Databases/DataLake/S3TablesCatalog.cpp create mode 100644 src/Databases/DataLake/S3TablesCatalog.h diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index ab20ef00e452..6ba195bfa8c7 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7710,6 +7710,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'. )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"( Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest' +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_database_s3_tables, false, R"( +Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4) )", EXPERIMENTAL) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 643b5ccbb7cf..9cb8af42703d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -63,6 +63,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"optimize_read_in_window_order", true, false, "Disable this logic by default."}, {"correlated_subqueries_use_in_memory_buffer", false, true, "Use in-memory buffer for input of correlated subqueries by default."}, {"allow_experimental_database_paimon_rest_catalog", false, false, "New setting"}, + {"allow_experimental_database_s3_tables", false, false, "New setting"}, {"allow_experimental_object_storage_queue_hive_partitioning", false, false, "New setting."}, {"type_json_use_partial_match_to_skip_paths_by_regexp", false, true, "Add new setting that allows to use partial match in regexp paths skip in JSON type parsing"}, {"max_insert_block_size_bytes", 0, 0, "New setting that allows to control the size of blocks in bytes during parsing of data in Row Input Format."}, diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 19d47df4a88d..2c9cc986e4f2 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -346,7 +346,8 @@ IMPLEMENT_SETTING_ENUM( {"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE}, {"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE}, {"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE}, - {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}}) + {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}, + {"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}}) IMPLEMENT_SETTING_ENUM( FileCachePolicy, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 05c3b26340f4..8160bf4ee9df 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -439,6 +439,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t ICEBERG_ONELAKE, ICEBERG_BIGLAKE, PAIMON_REST, + S3_TABLES, }; DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType) diff --git a/src/Databases/DataLake/AWSV4Signer.cpp b/src/Databases/DataLake/AWSV4Signer.cpp new file mode 100644 index 000000000000..b063031188a5 --- /dev/null +++ b/src/Databases/DataLake/AWSV4Signer.cpp @@ -0,0 +1,110 @@ +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; +} +} + +namespace DataLake +{ +namespace +{ + +Aws::Http::HttpMethod mapPocoMethodToAws(const String & method) +{ + using Aws::Http::HttpMethod; + using Poco::Net::HTTPRequest; + + static const std::pair supported_methods[] = { + {HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET}, + {HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST}, + {HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT}, + {HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE}, + {HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD}, + {HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH}, + }; + + for (const auto & [poco_method, aws_method] : supported_methods) + if (method == poco_method) + return aws_method; + + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method); +} + +} + +void signRequestWithAWSV4( + const String & method, + const Poco::URI & uri, + const DB::HTTPHeaderEntries & extra_headers, + const String & payload, + Aws::Client::AWSAuthV4Signer & signer, + const String & region, + const String & service, + DB::HTTPHeaderEntries & out_headers) +{ + const Aws::Http::URI aws_uri(uri.toString().c_str()); + Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method)); + + for (const auto & h : extra_headers) + { + if (Poco::icompare(h.name, "authorization") == 0) + continue; + request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size())); + } + + if (!payload.empty()) + { + auto body_stream = Aws::MakeShared("AWSV4Signer"); + body_stream->write(payload.data(), static_cast(payload.size())); + body_stream->seekg(0); + request.AddContentBody(body_stream); + } + + static constexpr bool sign_body = true; + if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body)) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "AWS SigV4 signing failed"); + + bool has_authorization = false; + for (const auto & [key, value] : request.GetHeaders()) + { + if (Poco::icompare(key, "authorization") == 0 && !value.empty()) + has_authorization = true; + } + if (!has_authorization) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "AWS credentials are missing or incomplete; cannot sign S3 Tables REST request"); + + out_headers.clear(); + for (const auto & [key, value] : request.GetHeaders()) + { + if (Poco::icompare(key, "host") == 0) + continue; + out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size())); + } +} + +} + +#endif diff --git a/src/Databases/DataLake/AWSV4Signer.h b/src/Databases/DataLake/AWSV4Signer.h new file mode 100644 index 000000000000..cdc42adaca5f --- /dev/null +++ b/src/Databases/DataLake/AWSV4Signer.h @@ -0,0 +1,34 @@ +#pragma once + +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include +#include + +namespace Aws::Client +{ +class AWSAuthV4Signer; +} + +namespace DataLake +{ + +/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer. +/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts +/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI). +void signRequestWithAWSV4( + const String & method, + const Poco::URI & uri, + const DB::HTTPHeaderEntries & extra_headers, + const String & payload, + Aws::Client::AWSAuthV4Signer & signer, + const String & region, + const String & service, + DB::HTTPHeaderEntries & out_headers); + +} + +#endif diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 5bee6258e1c6..53b3d096cc4b 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -27,6 +27,9 @@ #include #include #include +#if USE_AWS_S3 && USE_SSL +#include +#endif #include #include @@ -90,6 +93,7 @@ namespace Setting extern const SettingsBool allow_experimental_database_glue_catalog; extern const SettingsBool allow_experimental_database_hms_catalog; extern const SettingsBool allow_experimental_database_paimon_rest_catalog; + extern const SettingsBool allow_experimental_database_s3_tables; extern const SettingsBool use_hive_partitioning; extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString cluster_for_parallel_replicas; @@ -137,11 +141,12 @@ DatabaseDataLake::DatabaseDataLake( void DatabaseDataLake::validateSettings() { - if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE) + if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE + || settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES) { if (settings[DatabaseDataLakeSetting::region].value.empty()) throw Exception( - ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. " + ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue or S3 Tables catalog. " "Please specify 'SETTINGS region=' in the CREATE DATABASE query"); } else if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) @@ -336,6 +341,23 @@ std::shared_ptr DatabaseDataLake::getCatalog() const } break; } + case DB::DatabaseDataLakeCatalogType::S3_TABLES: + { +#if USE_AWS_S3 && USE_SSL + catalog_impl = std::make_shared( + settings[DatabaseDataLakeSetting::warehouse].value, + url, + settings[DatabaseDataLakeSetting::region].value, + catalog_parameters, + settings[DatabaseDataLakeSetting::namespaces].value, + Context::getGlobalContextInstance()); +#else + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL"); +#endif + break; + } } return catalog_impl; } @@ -368,6 +390,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( case DatabaseDataLakeCatalogType::ICEBERG_HIVE: case DatabaseDataLakeCatalogType::ICEBERG_REST: case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: + case DatabaseDataLakeCatalogType::S3_TABLES: { switch (type) { @@ -962,9 +985,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); } - if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) + if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST + && catalog_type != DatabaseDataLakeCatalogType::S3_TABLES) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only"); } for (auto & engine_arg : engine_args) @@ -1050,6 +1074,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory) engine_func->name = "Paimon"; break; } + case DatabaseDataLakeCatalogType::S3_TABLES: + { + if (!args.create_query.attach + && !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "DatabaseDataLake with S3 Tables catalog is experimental. " + "To allow its usage, enable setting allow_experimental_database_s3_tables"); + } + + engine_func->name = "Iceberg"; + break; + } case DatabaseDataLakeCatalogType::NONE: break; } diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 17170436898d..88bb84b0776c 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -148,7 +148,7 @@ class RestCatalog : public ICatalog, public DB::WithContext Poco::Net::HTTPBasicCredentials credentials{}; - DB::ReadWriteBufferFromHTTPPtr createReadBuffer( + virtual DB::ReadWriteBufferFromHTTPPtr createReadBuffer( const std::string & endpoint, const Poco::URI::QueryParameters & params = {}, const DB::HTTPHeaderEntries & headers = {}) const; @@ -183,7 +183,7 @@ class RestCatalog : public ICatalog, public DB::WithContext AccessToken retrieveAccessTokenOAuth() const; static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); - void sendRequest( + virtual void sendRequest( const String & endpoint, Poco::JSON::Object::Ptr request_body, const String & method = Poco::Net::HTTPRequest::HTTP_POST, diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp new file mode 100644 index 000000000000..07843af6cc7a --- /dev/null +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -0,0 +1,196 @@ +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB::ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +namespace DB::Setting +{ + extern const SettingsUInt64 s3_max_connections; + extern const SettingsUInt64 s3_max_redirects; + extern const SettingsUInt64 s3_retry_attempts; + extern const SettingsBool s3_slow_all_threads_after_network_error; + extern const SettingsBool enable_s3_requests_logging; + extern const SettingsUInt64 s3_connect_timeout_ms; + extern const SettingsUInt64 s3_request_timeout_ms; +} + +namespace DB::ServerSetting +{ + extern const ServerSettingsUInt64 s3_max_redirects; + extern const ServerSettingsUInt64 s3_retry_attempts; +} + +namespace DataLake +{ + +S3TablesCatalog::S3TablesCatalog( + const String & warehouse_, + const String & base_url_, + const String & region_, + const CatalogSettings & catalog_settings_, + const String & namespaces_, + DB::ContextPtr context_) + : RestCatalog(warehouse_, base_url_, "", "", false, namespaces_, context_) + , region(region_) + , signing_service("s3tables") +{ + if (region.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Tables catalog requires non-empty `region` setting"); + + DB::S3::CredentialsConfiguration creds_config; + creds_config.use_environment_credentials = true; + creds_config.role_arn = catalog_settings_.aws_role_arn; + creds_config.role_session_name = catalog_settings_.aws_role_session_name; + + const auto & server_settings = getContext()->getGlobalContext()->getServerSettings(); + const DB::Settings & global_settings = getContext()->getGlobalContext()->getSettingsRef(); + + int s3_max_redirects = static_cast(server_settings[DB::ServerSetting::s3_max_redirects]); + if (global_settings.isChanged("s3_max_redirects")) + s3_max_redirects = static_cast(global_settings[DB::Setting::s3_max_redirects]); + + int s3_retry_attempts = static_cast(server_settings[DB::ServerSetting::s3_retry_attempts]); + if (global_settings.isChanged("s3_retry_attempts")) + s3_retry_attempts = static_cast(global_settings[DB::Setting::s3_retry_attempts]); + + bool s3_slow_all_threads_after_network_error = global_settings[DB::Setting::s3_slow_all_threads_after_network_error]; + bool s3_slow_all_threads_after_retryable_error = false; + bool enable_s3_requests_logging = global_settings[DB::Setting::enable_s3_requests_logging]; + + DB::S3::PocoHTTPClientConfiguration poco_config = DB::S3::ClientFactory::instance().createClientConfiguration( + region, + getContext()->getRemoteHostFilter(), + s3_max_redirects, + DB::S3::PocoHTTPClientConfiguration::RetryStrategy{.max_retries = static_cast(s3_retry_attempts)}, + s3_slow_all_threads_after_network_error, + s3_slow_all_threads_after_retryable_error, + enable_s3_requests_logging, + /* for_disk_s3 = */ false, + /* opt_disk_name = */ {}, + /* request_throttler = */ {}); + + Aws::Auth::AWSCredentials credentials(catalog_settings_.aws_access_key_id, catalog_settings_.aws_secret_access_key); + credentials_provider = DB::S3::getCredentialsProvider(poco_config, credentials, creds_config); + + signer = std::make_unique( + credentials_provider, + "s3tables", + Aws::String(region.data(), region.size()), + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always); + + config = loadConfig(); + + if (config.prefix.empty()) + { + String encoded_warehouse; + Poco::URI::encode(warehouse_, "", encoded_warehouse); + config.prefix = encoded_warehouse; + } +} + +DB::HTTPHeaderEntries S3TablesCatalog::getAuthHeaders(bool /* update_token */) const +{ + return {}; +} + +DB::ReadWriteBufferFromHTTPPtr S3TablesCatalog::createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params, + const DB::HTTPHeaderEntries & headers) const +{ + const auto & context = getContext(); + + Poco::URI url(base_url / endpoint, /* enable_url_encoding */ false); + if (!params.empty()) + url.setQueryParameters(params); + + DB::HTTPHeaderEntries signed_headers; + signRequestWithAWSV4(Poco::Net::HTTPRequest::HTTP_GET, url, headers, "", *signer, region, signing_service, signed_headers); + + LOG_DEBUG(log, "Requesting: {}", url.toString()); + + return DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withSettings(getContext()->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&getContext()->getRemoteHostFilter()) + .withHeaders(signed_headers) + .withDelayInit(false) + .withSkipNotFound(false) + .create(credentials); +} + +void S3TablesCatalog::sendRequest( + const String & endpoint, + Poco::JSON::Object::Ptr request_body, + const String & method, + bool ignore_result) const +{ + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + if (request_body) + request_body->stringify(oss); + const std::string body_str = DB::removeEscapedSlashes(oss.str()); + + DB::HTTPHeaderEntries extra_headers; + if (!body_str.empty()) + extra_headers.emplace_back("Content-Type", "application/json"); + + const auto & context = getContext(); + + Poco::URI url(endpoint, /* enable_url_encoding */ false); + + DB::HTTPHeaderEntries signed_headers; + signRequestWithAWSV4(method, url, extra_headers, body_str, *signer, region, signing_service, signed_headers); + + DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; + if (!body_str.empty()) + { + out_stream_callback = [body_str](std::ostream & os) { os << body_str; }; + } + + auto wb = DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withMethod(method) + .withSettings(context->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&context->getRemoteHostFilter()) + .withHeaders(signed_headers) + .withOutCallback(out_stream_callback) + .withSkipNotFound(false) + .create(credentials); + + String response_str; + if (!ignore_result) + readJSONObjectPossiblyInvalid(response_str, *wb); + else + wb->ignoreAll(); +} + +} + +#endif diff --git a/src/Databases/DataLake/S3TablesCatalog.h b/src/Databases/DataLake/S3TablesCatalog.h new file mode 100644 index 000000000000..08a1df157929 --- /dev/null +++ b/src/Databases/DataLake/S3TablesCatalog.h @@ -0,0 +1,60 @@ +#pragma once + +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include + +#include + +#include + +namespace Aws::Auth +{ +class AWSCredentialsProvider; +} + +namespace DataLake +{ + +/// Iceberg REST catalog for Amazon S3 Tables (SigV4, signing name `s3tables`). +/// https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-open-source.html +class S3TablesCatalog final : public RestCatalog +{ +public: + S3TablesCatalog( + const String & warehouse_, + const String & base_url_, + const String & region_, + const DataLake::CatalogSettings & catalog_settings_, + const String & namespaces_, + DB::ContextPtr context_); + + DB::DatabaseDataLakeCatalogType getCatalogType() const override { return DB::DatabaseDataLakeCatalogType::S3_TABLES; } + + DB::ReadWriteBufferFromHTTPPtr createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params = {}, + const DB::HTTPHeaderEntries & headers = {}) const override; + + void sendRequest( + const String & endpoint, + Poco::JSON::Object::Ptr request_body, + const String & method = Poco::Net::HTTPRequest::HTTP_POST, + bool ignore_result = false) const override; + +protected: + DB::HTTPHeaderEntries getAuthHeaders(bool /* update_token */) const override; + +private: + const String region; + const String signing_service; + std::shared_ptr credentials_provider; + std::unique_ptr signer; +}; + +} + +#endif diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index 2ef37686b3d2..64bac904b3da 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -64,6 +64,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_dynamic_type_in_join_keys", 1); context->setSetting("allow_experimental_alias_table_engine", 1); context->setSetting("allow_experimental_database_paimon_rest_catalog", 1); + context->setSetting("allow_experimental_database_s3_tables", 1); context->setSetting("allow_experimental_object_storage_queue_hive_partitioning", 1); /// clickhouse-private settings From dbcf541b8de4db6fb26db216d008af77d93f7d45 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 6 Apr 2026 23:04:59 +0200 Subject: [PATCH 2/5] Override getTables and tryGetTableMetadata, getTables() since s3tables only support single namespace, for every namespace , make call to getTables. Use ThreadPoolCallbackRunnerLocal to perform in thredpool. tryGetTableMetadata , Inject https://s3.{region}.amazonaws.com so that requests dont goto bucket.s3.amazonaws.com. pre-strips the s3://bucket-name/ prefix, leaving just the relative path metadata/00001-....json, so downstream code constructs the correct S3 key. Detects missing/empty credentials and injects the catalog's own IAM credentials from credentials_provider->GetAWSCredentials(). --- src/Databases/DataLake/ICatalog.cpp | 5 ++ src/Databases/DataLake/S3TablesCatalog.cpp | 89 ++++++++++++++++++++- src/Databases/DataLake/S3TablesCatalog.h | 8 ++ src/Databases/DataLake/StorageCredentials.h | 2 + 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index e2170c038e52..a135580f29b6 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -257,6 +257,11 @@ bool TableMetadata::hasStorageCredentials() const return storage_credentials != nullptr; } +bool TableMetadata::hasDataLakeSpecificProperties() const +{ + return data_lake_specific_metadata.has_value(); +} + std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const { std::string metadata_location = iceberg_metadata_file_location; diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp index 07843af6cc7a..7ae022b9a2d1 100644 --- a/src/Databases/DataLake/S3TablesCatalog.cpp +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include #include #include #include @@ -23,6 +25,8 @@ #include #include +#include + namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; @@ -101,7 +105,8 @@ S3TablesCatalog::S3TablesCatalog( credentials_provider, "s3tables", Aws::String(region.data(), region.size()), - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always); + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always, + /* urlEscapePath = */ false); config = loadConfig(); @@ -113,6 +118,88 @@ S3TablesCatalog::S3TablesCatalog( } } +DB::Names S3TablesCatalog::getTables() const +{ + auto namespaces = getNamespaces(""); + + auto & pool = getContext()->getIcebergCatalogThreadpool(); + DB::ThreadPoolCallbackRunnerLocal runner(pool, DB::ThreadName::DATALAKE_REST_CATALOG); + + DB::Names tables; + std::mutex mutex; + for (const auto & ns : namespaces) + { + if (!allowed_namespaces.isNamespaceAllowed(ns, /*nested*/ false)) + continue; + runner.enqueueAndKeepTrack( + [&, ns] + { + auto tables_in_ns = RestCatalog::getTables(ns); + std::lock_guard lock(mutex); + std::move(tables_in_ns.begin(), tables_in_ns.end(), std::back_inserter(tables)); + }); + } + runner.waitForAllToFinishAndRethrowFirstError(); + return tables; +} + +bool S3TablesCatalog::tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const +{ + if (!RestCatalog::tryGetTableMetadata(namespace_name, table_name, context_, result)) + return false; + + if (!result.requiresCredentials()) + return true; + + bool need_credentials = !result.hasStorageCredentials() || !result.getStorageCredentials(); + if (!need_credentials) + { + auto creds = std::dynamic_pointer_cast(result.getStorageCredentials()); + if (creds && creds->isEmpty()) + need_credentials = true; + } + + if (need_credentials) + { + LOG_DEBUG(log, "S3 Tables: no vended credentials for {}.{}, injecting catalog IAM credentials", namespace_name, table_name); + auto aws_creds = credentials_provider->GetAWSCredentials(); + result.setStorageCredentials(std::make_shared( + String(aws_creds.GetAWSAccessKeyId().c_str(), aws_creds.GetAWSAccessKeyId().size()), + String(aws_creds.GetAWSSecretKey().c_str(), aws_creds.GetAWSSecretKey().size()), + String(aws_creds.GetSessionToken().c_str(), aws_creds.GetSessionToken().size()))); + } + + if (result.getEndpoint().empty()) + { + String regional_endpoint = "https://s3." + region + ".amazonaws.com"; + LOG_DEBUG(log, "S3 Tables: no s3.endpoint for {}.{}, injecting regional endpoint: {}", namespace_name, table_name, regional_endpoint); + result.setEndpoint(regional_endpoint); + } + + if (result.hasDataLakeSpecificProperties()) + { + auto props = result.getDataLakeSpecificProperties(); + if (props.has_value() && !props->iceberg_metadata_file_location.empty()) + { + const String & loc = props->iceberg_metadata_file_location; + auto scheme_end = loc.find("://"); + if (scheme_end != String::npos) + { + auto path_start = loc.find('/', scheme_end + 3); + if (path_start != String::npos) + props->iceberg_metadata_file_location = loc.substr(path_start + 1); + } + result.setDataLakeSpecificProperties(std::move(props)); + } + } + + return true; +} + DB::HTTPHeaderEntries S3TablesCatalog::getAuthHeaders(bool /* update_token */) const { return {}; diff --git a/src/Databases/DataLake/S3TablesCatalog.h b/src/Databases/DataLake/S3TablesCatalog.h index 08a1df157929..2d6e54712e59 100644 --- a/src/Databases/DataLake/S3TablesCatalog.h +++ b/src/Databases/DataLake/S3TablesCatalog.h @@ -34,6 +34,14 @@ class S3TablesCatalog final : public RestCatalog DB::DatabaseDataLakeCatalogType getCatalogType() const override { return DB::DatabaseDataLakeCatalogType::S3_TABLES; } + DB::Names getTables() const override; + + bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const override; + DB::ReadWriteBufferFromHTTPPtr createReadBuffer( const std::string & endpoint, const Poco::URI::QueryParameters & params = {}, diff --git a/src/Databases/DataLake/StorageCredentials.h b/src/Databases/DataLake/StorageCredentials.h index ab09e3420889..5e261b9bb3a4 100644 --- a/src/Databases/DataLake/StorageCredentials.h +++ b/src/Databases/DataLake/StorageCredentials.h @@ -31,6 +31,8 @@ class S3Credentials final : public IStorageCredentials , session_token(session_token_) {} + bool isEmpty() const { return access_key_id.empty() || secret_access_key.empty(); } + void addCredentialsToEngineArgs(DB::ASTs & engine_args) const override { if (engine_args.size() != 1) From f035cca932dd818551d0739a55692764f538dad6 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sat, 11 Apr 2026 03:48:16 +0200 Subject: [PATCH 3/5] Add retry mechanism --- src/Databases/DataLake/S3TablesCatalog.cpp | 40 ++++++++++++++++------ 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp index 7ae022b9a2d1..de16dd53bf0e 100644 --- a/src/Databases/DataLake/S3TablesCatalog.cpp +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -216,20 +216,38 @@ DB::ReadWriteBufferFromHTTPPtr S3TablesCatalog::createReadBuffer( if (!params.empty()) url.setQueryParameters(params); - DB::HTTPHeaderEntries signed_headers; - signRequestWithAWSV4(Poco::Net::HTTPRequest::HTTP_GET, url, headers, "", *signer, region, signing_service, signed_headers); + auto create_buffer = [&] + { + DB::HTTPHeaderEntries signed_headers; + signRequestWithAWSV4(Poco::Net::HTTPRequest::HTTP_GET, url, headers, "", *signer, region, signing_service, signed_headers); + + return DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withSettings(getContext()->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&getContext()->getRemoteHostFilter()) + .withHeaders(signed_headers) + .withDelayInit(false) + .withSkipNotFound(false) + .create(credentials); + }; LOG_DEBUG(log, "Requesting: {}", url.toString()); - return DB::BuilderRWBufferFromHTTP(url) - .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) - .withSettings(getContext()->getReadSettings()) - .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) - .withHostFilter(&getContext()->getRemoteHostFilter()) - .withHeaders(signed_headers) - .withDelayInit(false) - .withSkipNotFound(false) - .create(credentials); + try + { + return create_buffer(); + } + catch (const DB::HTTPException & e) + { + const auto status = e.getHTTPStatus(); + if (status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED + || status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN) + { + return create_buffer(); + } + throw; + } } void S3TablesCatalog::sendRequest( From 0cb0725746f9c47f8e2f3f2decad8b75cc247c01 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sat, 11 Apr 2026 21:26:19 +0200 Subject: [PATCH 4/5] Changed from LOGICAL_ERROR to S3_ERROR --- src/Databases/DataLake/AWSV4Signer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Databases/DataLake/AWSV4Signer.cpp b/src/Databases/DataLake/AWSV4Signer.cpp index b063031188a5..f5bd8c4bc6ed 100644 --- a/src/Databases/DataLake/AWSV4Signer.cpp +++ b/src/Databases/DataLake/AWSV4Signer.cpp @@ -21,7 +21,7 @@ namespace DB namespace ErrorCodes { extern const int BAD_ARGUMENTS; - extern const int LOGICAL_ERROR; + extern const int S3_ERROR; } } @@ -83,7 +83,7 @@ void signRequestWithAWSV4( static constexpr bool sign_body = true; if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body)) - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "AWS SigV4 signing failed"); + throw DB::Exception(DB::ErrorCodes::S3_ERROR, "AWS SigV4 signing failed"); bool has_authorization = false; for (const auto & [key, value] : request.GetHeaders()) From 8acaa19e9041b5fa6dc708d877ac63524401a154 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 13 Apr 2026 20:36:08 +0200 Subject: [PATCH 5/5] Added support for dropTable, moved signing logic from createReadBuffer to getAuthHeaders --- src/Databases/DataLake/DatabaseDataLake.cpp | 17 ++- src/Databases/DataLake/RestCatalog.cpp | 42 ++++++-- src/Databases/DataLake/RestCatalog.h | 21 +++- src/Databases/DataLake/S3TablesCatalog.cpp | 114 ++++++-------------- src/Databases/DataLake/S3TablesCatalog.h | 18 ++-- 5 files changed, 104 insertions(+), 108 deletions(-) diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 53b3d096cc4b..a48c8b529292 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -141,14 +141,25 @@ DatabaseDataLake::DatabaseDataLake( void DatabaseDataLake::validateSettings() { - if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE - || settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES) + if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE) { if (settings[DatabaseDataLakeSetting::region].value.empty()) throw Exception( - ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue or S3 Tables catalog. " + ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue catalog. " "Please specify 'SETTINGS region=' in the CREATE DATABASE query"); } + else if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES) + { + if (settings[DatabaseDataLakeSetting::region].value.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for S3 Tables catalog. " + "Please specify 'SETTINGS region=' in the CREATE DATABASE query"); + + if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty for S3 Tables catalog. " + "Please specify 'SETTINGS warehouse=' in the CREATE DATABASE query"); + } else if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) { throw Exception( diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index fb7031e3d052..b51a13bfeef2 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -218,7 +218,12 @@ void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Pt result.default_base_location = object->get("default-base-location").extract(); } -DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const +DB::HTTPHeaderEntries RestCatalog::getAuthHeaders( + bool update_token, + const String & /*method*/, + const Poco::URI & /*url*/, + const DB::HTTPHeaderEntries & /*extra_headers*/, + const String & /*body*/) const { /// Option 1: user specified auth header manually. /// Header has format: 'Authorization: '. @@ -348,7 +353,12 @@ OneLakeCatalog::OneLakeCatalog( config = loadConfig(); } -DB::HTTPHeaderEntries OneLakeCatalog::getAuthHeaders(bool update_token) const +DB::HTTPHeaderEntries OneLakeCatalog::getAuthHeaders( + bool update_token, + const String & /*method*/, + const Poco::URI & /*url*/, + const DB::HTTPHeaderEntries & /*extra_headers*/, + const String & /*body*/) const { /// User provided grant_type, client_id and client_secret. /// We would make OAuthClientCredentialsRequest @@ -477,7 +487,12 @@ BigLakeCatalog::BigLakeCatalog( config = loadConfig(); } -DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(bool update_token) const +DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders( + bool update_token, + const String & /*method*/, + const Poco::URI & /*url*/, + const DB::HTTPHeaderEntries & /*extra_headers*/, + const String & /*body*/) const { if (!google_project_id.empty() || !google_adc_client_id.empty()) { @@ -711,7 +726,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( auto create_buffer = [&](bool update_token) { - auto result_headers = getAuthHeaders(update_token); + auto result_headers = getAuthHeaders(update_token, Poco::Net::HTTPRequest::HTTP_GET, url, headers, {}); std::move(headers.begin(), headers.end(), std::back_inserter(result_headers)); return DB::BuilderRWBufferFromHTTP(url) @@ -1187,9 +1202,6 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r request_body->stringify(oss); const std::string body_str = DB::removeEscapedSlashes(oss.str()); - DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true); - headers.emplace_back("Content-Type", "application/json"); - const auto & context = getContext(); DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; @@ -1203,6 +1215,12 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r /// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar' Poco::URI url(endpoint, /* enable_url_encoding */ false); + + DB::HTTPHeaderEntries extra_headers; + extra_headers.emplace_back("Content-Type", "application/json"); + + DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true, method, url, extra_headers, body_str); + headers.emplace_back("Content-Type", "application/json"); auto wb = DB::BuilderRWBufferFromHTTP(url) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withMethod(method) @@ -1223,7 +1241,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, const String & location) const { - const std::string endpoint = fmt::format("{}/namespaces", base_url); + const std::string endpoint = base_url / config.prefix / "namespaces"; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; { @@ -1255,7 +1273,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl createNamespaceIfNotExists(namespace_name, metadata_content->getValue("location")); - const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name); + const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables"; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; request_body->set("name", table_name); @@ -1292,7 +1310,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const { - const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name); + const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; { @@ -1362,7 +1380,9 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_ "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", table_name, namespace_name); - const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name); + const std::string endpoint + = (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string() + + "?purgeRequested=False"; Poco::JSON::Object::Ptr request_body = nullptr; try diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 88bb84b0776c..3b0712b6bbe9 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -179,7 +179,12 @@ class RestCatalog : public ICatalog, public DB::WithContext TableMetadata & result) const; Config loadConfig(); - virtual DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const; + virtual DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const; AccessToken retrieveAccessTokenOAuth() const; static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); @@ -210,7 +215,12 @@ class OneLakeCatalog : public RestCatalog return DB::DatabaseDataLakeCatalogType::ICEBERG_ONELAKE; } - DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const override; + DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const override; String getTenantId() const { return tenant_id; } @@ -241,7 +251,12 @@ class BigLakeCatalog : public RestCatalog return DB::DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE; } - DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const override; + DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const override; private: const std::string google_project_id; diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp index de16dd53bf0e..daf0e036d8b6 100644 --- a/src/Databases/DataLake/S3TablesCatalog.cpp +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -30,6 +30,8 @@ namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; + extern const int DATALAKE_DATABASE_ERROR; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace DB::Setting @@ -200,100 +202,52 @@ bool S3TablesCatalog::tryGetTableMetadata( return true; } -DB::HTTPHeaderEntries S3TablesCatalog::getAuthHeaders(bool /* update_token */) const +void S3TablesCatalog::dropTable(const String & namespace_name, const String & table_name) const { - return {}; -} - -DB::ReadWriteBufferFromHTTPPtr S3TablesCatalog::createReadBuffer( - const std::string & endpoint, - const Poco::URI::QueryParameters & params, - const DB::HTTPHeaderEntries & headers) const -{ - const auto & context = getContext(); + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); - Poco::URI url(base_url / endpoint, /* enable_url_encoding */ false); - if (!params.empty()) - url.setQueryParameters(params); - - auto create_buffer = [&] - { - DB::HTTPHeaderEntries signed_headers; - signRequestWithAWSV4(Poco::Net::HTTPRequest::HTTP_GET, url, headers, "", *signer, region, signing_service, signed_headers); - - return DB::BuilderRWBufferFromHTTP(url) - .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) - .withSettings(getContext()->getReadSettings()) - .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) - .withHostFilter(&getContext()->getRemoteHostFilter()) - .withHeaders(signed_headers) - .withDelayInit(false) - .withSkipNotFound(false) - .create(credentials); - }; - - LOG_DEBUG(log, "Requesting: {}", url.toString()); + const std::string endpoint + = (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string() + + "?purgeRequested=True"; + Poco::JSON::Object::Ptr request_body = nullptr; try { - return create_buffer(); + sendRequest(endpoint, request_body, Poco::Net::HTTPRequest::HTTP_DELETE, true); } - catch (const DB::HTTPException & e) + catch (const DB::HTTPException & ex) { - const auto status = e.getHTTPStatus(); - if (status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED - || status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN) - { - return create_buffer(); - } - throw; + if (ex.getHTTPStatus() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) + // 404 is returned by the API when the table does + LOG_DEBUG(log, "S3 Tables: table {}.{} already does not exist (404 on purge-delete)", namespace_name, table_name); + else + throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Failed to drop table {}", ex.displayText()); } } -void S3TablesCatalog::sendRequest( - const String & endpoint, - Poco::JSON::Object::Ptr request_body, +DB::HTTPHeaderEntries S3TablesCatalog::getAuthHeaders( + bool /*update_token*/, const String & method, - bool ignore_result) const + const Poco::URI & url, + const DB::HTTPHeaderEntries & extra_headers, + const String & body) const { - std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM - if (request_body) - request_body->stringify(oss); - const std::string body_str = DB::removeEscapedSlashes(oss.str()); - - DB::HTTPHeaderEntries extra_headers; - if (!body_str.empty()) - extra_headers.emplace_back("Content-Type", "application/json"); - - const auto & context = getContext(); - - Poco::URI url(endpoint, /* enable_url_encoding */ false); - - DB::HTTPHeaderEntries signed_headers; - signRequestWithAWSV4(method, url, extra_headers, body_str, *signer, region, signing_service, signed_headers); - - DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; - if (!body_str.empty()) + DB::HTTPHeaderEntries all_signed; + signRequestWithAWSV4(method, url, extra_headers, body, *signer, region, signing_service, all_signed); + + // signRequestWithAWSV4 returns both input extra_headers and signer-added auth + // headers. Only return the auth portion (authorization, x-amz-*); the caller + // appends the original request headers separately. + DB::HTTPHeaderEntries auth_headers; + for (auto & h : all_signed) { - out_stream_callback = [body_str](std::ostream & os) { os << body_str; }; + if (h.name == "authorization" || h.name.starts_with("x-amz-")) + auth_headers.push_back(std::move(h)); } - - auto wb = DB::BuilderRWBufferFromHTTP(url) - .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) - .withMethod(method) - .withSettings(context->getReadSettings()) - .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) - .withHostFilter(&context->getRemoteHostFilter()) - .withHeaders(signed_headers) - .withOutCallback(out_stream_callback) - .withSkipNotFound(false) - .create(credentials); - - String response_str; - if (!ignore_result) - readJSONObjectPossiblyInvalid(response_str, *wb); - else - wb->ignoreAll(); + return auth_headers; } } diff --git a/src/Databases/DataLake/S3TablesCatalog.h b/src/Databases/DataLake/S3TablesCatalog.h index 2d6e54712e59..ead03b513784 100644 --- a/src/Databases/DataLake/S3TablesCatalog.h +++ b/src/Databases/DataLake/S3TablesCatalog.h @@ -42,19 +42,15 @@ class S3TablesCatalog final : public RestCatalog DB::ContextPtr context_, TableMetadata & result) const override; - DB::ReadWriteBufferFromHTTPPtr createReadBuffer( - const std::string & endpoint, - const Poco::URI::QueryParameters & params = {}, - const DB::HTTPHeaderEntries & headers = {}) const override; - - void sendRequest( - const String & endpoint, - Poco::JSON::Object::Ptr request_body, - const String & method = Poco::Net::HTTPRequest::HTTP_POST, - bool ignore_result = false) const override; + void dropTable(const String & namespace_name, const String & table_name) const override; protected: - DB::HTTPHeaderEntries getAuthHeaders(bool /* update_token */) const override; + DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const override; private: const String region;