Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,51 @@ The server successfully detected this situation and will download merged part fr
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \
\
M(DataLakeRestCatalogLoadConfig, "Number of 'load config' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogLoadConfigMicroseconds, "Total time of 'load config' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogGetNamespaces, "Number of 'get namespaces' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogGetNamespacesMicroseconds, "Total time of 'get namespaces' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogGetTables, "Number of 'get tables' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogGetTablesMicroseconds, "Total time of 'get tables' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogGetTableMetadata, "Number of 'get table metadata' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogGetTableMetadataMicroseconds, "Total time of 'get table metadata' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogGetCredentials, "Number of 'get credentials' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogGetCredentialsMicroseconds, "Total time of 'get credentials' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogCreateNamespace, "Number of 'create namespace' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogCreateNamespaceMicroseconds, "Total time of 'create namespace' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogCreateTable, "Number of 'create table' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogCreateTableMicroseconds, "Total time of 'create table' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogUpdateTable, "Number of 'update table' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogUpdateTableMicroseconds, "Total time of 'update table' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogDropTable, "Number of 'drop table' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogDropTableMicroseconds, "Total time of 'drop table' requests to Iceberg REST catalog.", ValueType::Microseconds) \
\
M(DataLakeGlueCatalogGetDatabases, "Number of 'get databases' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogGetDatabasesMicroseconds, "Total time of 'get databases' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
M(DataLakeGlueCatalogGetTables, "Number of 'get tables' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogGetTablesMicroseconds, "Total time of 'get tables' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
M(DataLakeGlueCatalogGetTable, "Number of 'get table' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogGetTableMicroseconds, "Total time of 'get table' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
M(DataLakeGlueCatalogCreateDatabase, "Number of 'create database' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogCreateDatabaseMicroseconds, "Total time of 'create database' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
M(DataLakeGlueCatalogCreateTable, "Number of 'create table' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogCreateTableMicroseconds, "Total time of 'create table' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
M(DataLakeGlueCatalogUpdateTable, "Number of 'update table' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogUpdateTableMicroseconds, "Total time of 'update table' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
M(DataLakeGlueCatalogDropTable, "Number of 'drop table' requests to Iceberg Glue catalog.", ValueType::Number) \
M(DataLakeGlueCatalogDropTableMicroseconds, "Total time of 'drop table' requests to Iceberg Glue catalog.", ValueType::Microseconds) \
\
M(DataLakeUnityCatalogGetTables, "Number of 'get tables' requests to Iceberg Unity catalog.", ValueType::Number) \
M(DataLakeUnityCatalogGetTablesMicroseconds, "Total time of 'get tables' requests to Iceberg Unity catalog.", ValueType::Microseconds) \
M(DataLakeUnityCatalogGetTable, "Number of 'get table' requests to Iceberg Unity catalog.", ValueType::Number) \
M(DataLakeUnityCatalogGetTableMicroseconds, "Total time of 'get table' requests to Iceberg Unity catalog.", ValueType::Microseconds) \
M(DataLakeUnityCatalogGetTableMetadata, "Number of 'get table metadata' requests to Iceberg Unity catalog.", ValueType::Number) \
M(DataLakeUnityCatalogGetTableMetadataMicroseconds, "Total time of 'get table metadata' requests to Iceberg Unity catalog.", ValueType::Microseconds) \
M(DataLakeUnityCatalogGetSchemas, "Number of 'get schemas' requests to Iceberg Unity catalog.", ValueType::Number) \
M(DataLakeUnityCatalogGetSchemasMicroseconds, "Total time of 'get schemas' requests to Iceberg Unity catalog.", ValueType::Microseconds) \
M(DataLakeUnityCatalogGetCredentials, "Number of 'get credentials' requests to Iceberg Unity catalog.", ValueType::Number) \
M(DataLakeUnityCatalogGetCredentialsMicroseconds, "Total time of 'get credentials' requests to Iceberg Unity catalog.", ValueType::Microseconds) \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
70 changes: 64 additions & 6 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>

Expand Down Expand Up @@ -77,6 +78,24 @@ namespace DB::ServerSetting
extern const ServerSettingsUInt64 s3_retry_attempts;
}

namespace ProfileEvents
{
extern const Event DataLakeGlueCatalogGetDatabases;
extern const Event DataLakeGlueCatalogGetDatabasesMicroseconds;
extern const Event DataLakeGlueCatalogGetTables;
extern const Event DataLakeGlueCatalogGetTablesMicroseconds;
extern const Event DataLakeGlueCatalogGetTable;
extern const Event DataLakeGlueCatalogGetTableMicroseconds;
extern const Event DataLakeGlueCatalogCreateDatabase;
extern const Event DataLakeGlueCatalogCreateDatabaseMicroseconds;
extern const Event DataLakeGlueCatalogCreateTable;
extern const Event DataLakeGlueCatalogCreateTableMicroseconds;
extern const Event DataLakeGlueCatalogUpdateTable;
extern const Event DataLakeGlueCatalogUpdateTableMicroseconds;
extern const Event DataLakeGlueCatalogDropTable;
extern const Event DataLakeGlueCatalogDropTableMicroseconds;
}

namespace CurrentMetrics
{
extern const Metric MarkCacheBytes;
Expand Down Expand Up @@ -184,7 +203,14 @@ DataLake::ICatalog::Namespaces GlueCatalog::getDatabases(const std::string & pre
do
{
request.SetNextToken(next_token);
auto outcome = glue_client->GetDatabases(request);

Aws::Glue::Model::GetDatabasesOutcome outcome;
{
ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogGetDatabases);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogGetDatabasesMicroseconds);
outcome = glue_client->GetDatabases(request);
}

if (outcome.IsSuccess())
{
const auto & databases_result = outcome.GetResult();
Expand Down Expand Up @@ -229,7 +255,12 @@ DB::Names GlueCatalog::getTablesForDatabase(const std::string & db_name, size_t
do
{
request.SetNextToken(next_token);
auto outcome = glue_client->GetTables(request);
Aws::Glue::Model::GetTablesOutcome outcome;
{
ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogGetTables);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogGetTablesMicroseconds);
outcome = glue_client->GetTables(request);
}
if (outcome.IsSuccess())
{
const auto & tables_result = outcome.GetResult();
Expand Down Expand Up @@ -282,6 +313,8 @@ bool GlueCatalog::existsTable(const std::string & database_name, const std::stri
request.SetDatabaseName(database_name);
request.SetName(table_name);

ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogGetTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogGetTableMicroseconds);
auto outcome = glue_client->GetTable(request);
return outcome.IsSuccess();
}
Expand All @@ -299,7 +332,12 @@ bool GlueCatalog::tryGetTableMetadata(
request.SetDatabaseName(database_name);
request.SetName(table_name);

auto outcome = glue_client->GetTable(request);
Aws::Glue::Model::GetTableOutcome outcome;
{
ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogGetTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogGetTableMicroseconds);
outcome = glue_client->GetTable(request);
}
if (outcome.IsSuccess())
{
const auto & table_outcome = outcome.GetResult().GetTable();
Expand Down Expand Up @@ -575,6 +613,8 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons
db_input.SetName(namespace_name);
create_request.SetDatabaseInput(db_input);

ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogCreateDatabase);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogCreateDatabaseMicroseconds);
glue_client->CreateDatabase(create_request);
}

Expand Down Expand Up @@ -612,7 +652,13 @@ void GlueCatalog::createTable(const String & namespace_name, const String & tabl

request.SetTableInput(table_input);

auto response = glue_client->CreateTable(request);
Aws::Glue::Model::CreateTableOutcome response;

{
ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogCreateTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogCreateTableMicroseconds);
response = glue_client->CreateTable(request);
}

if (!response.IsSuccess())
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Can not create metadata in glue catalog: {}", response.GetError().GetMessage());
Expand Down Expand Up @@ -647,7 +693,13 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t

request.SetTableInput(table_input);

auto response = glue_client->UpdateTable(request);
Aws::Glue::Model::UpdateTableOutcome response;

{
ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogUpdateTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogUpdateTableMicroseconds);
response = glue_client->UpdateTable(request);
}

if (!response.IsSuccess())
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Can not update metadata in glue catalog {}", response.GetError().GetMessage());
Expand All @@ -666,7 +718,13 @@ void GlueCatalog::dropTable(const String & namespace_name, const String & table_
request.SetDatabaseName(namespace_name);
request.SetName(table_name);

auto response = glue_client->DeleteTable(request);
Aws::Glue::Model::DeleteTableOutcome response;

{
ProfileEvents::increment(ProfileEvents::DataLakeGlueCatalogDropTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeGlueCatalogDropTableMicroseconds);
response = glue_client->DeleteTable(request);
}

if (!response.IsSuccess())
throw DB::Exception(
Expand Down
82 changes: 66 additions & 16 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/CurrentThread.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
#include <mutex>
#include <chrono>
Expand Down Expand Up @@ -53,6 +54,28 @@ namespace DB::ErrorCodes
extern const int CATALOG_NAMESPACE_DISABLED;
}

namespace ProfileEvents
{
extern const Event DataLakeRestCatalogLoadConfig;
extern const Event DataLakeRestCatalogLoadConfigMicroseconds;
extern const Event DataLakeRestCatalogGetNamespaces;
extern const Event DataLakeRestCatalogGetNamespacesMicroseconds;
extern const Event DataLakeRestCatalogGetTables;
extern const Event DataLakeRestCatalogGetTablesMicroseconds;
extern const Event DataLakeRestCatalogGetTableMetadata;
extern const Event DataLakeRestCatalogGetTableMetadataMicroseconds;
extern const Event DataLakeRestCatalogGetCredentials;
extern const Event DataLakeRestCatalogGetCredentialsMicroseconds;
extern const Event DataLakeRestCatalogCreateNamespace;
extern const Event DataLakeRestCatalogCreateNamespaceMicroseconds;
extern const Event DataLakeRestCatalogCreateTable;
extern const Event DataLakeRestCatalogCreateTableMicroseconds;
extern const Event DataLakeRestCatalogUpdateTable;
extern const Event DataLakeRestCatalogUpdateTableMicroseconds;
extern const Event DataLakeRestCatalogDropTable;
extern const Event DataLakeRestCatalogDropTableMicroseconds;
}

namespace DataLake
{

Expand Down Expand Up @@ -185,10 +208,15 @@ RestCatalog::RestCatalog(
RestCatalog::Config RestCatalog::loadConfig()
{
Poco::URI::QueryParameters params = {{"warehouse", warehouse}};
auto buf = createReadBuffer(CONFIG_ENDPOINT, params);

std::string json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);

{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogLoadConfig);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogLoadConfigMicroseconds);
auto buf = createReadBuffer(CONFIG_ENDPOINT, params);
readJSONObjectPossiblyInvalid(json_str, *buf);
}

LOG_DEBUG(log, "Received catalog configuration settings: {}", json_str);

Expand Down Expand Up @@ -709,6 +737,8 @@ RestCatalog::Namespaces RestCatalog::getNamespaces(const std::string & base_name

try
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogGetNamespaces);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogGetNamespacesMicroseconds);
auto buf = createReadBuffer(config.prefix / NAMESPACES_ENDPOINT, params);
auto namespaces = parseNamespaces(*buf, base_namespace);
LOG_DEBUG(log, "Loaded {} namespaces in base namespace {}", namespaces.size(), base_namespace);
Expand Down Expand Up @@ -799,6 +829,8 @@ DB::Names RestCatalog::getTables(const std::string & base_namespace, size_t limi
auto encoded_namespace = encodeNamespaceForURI(base_namespace);
const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encoded_namespace / "tables";

ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogGetTables);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogGetTablesMicroseconds);
auto buf = createReadBuffer(config.prefix / endpoint);
return parseTables(*buf, base_namespace, limit);
}
Expand Down Expand Up @@ -908,16 +940,21 @@ bool RestCatalog::getTableMetadataImpl(
}

const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encodeNamespaceForURI(namespace_name) / "tables" / table_name;
auto buf = createReadBuffer(config.prefix / endpoint, /* params */{}, headers);
String json_str;

if (buf->eof())
{
LOG_DEBUG(log, "Table doesn't exist (endpoint: {})", endpoint);
return false;
}
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogGetTableMetadata);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogGetTableMetadataMicroseconds);
auto buf = createReadBuffer(config.prefix / endpoint, /* params */{}, headers);

String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (buf->eof())
{
LOG_DEBUG(log, "Table doesn't exist (endpoint: {})", endpoint);
return false;
}

readJSONObjectPossiblyInvalid(json_str, *buf);
}

#ifdef DEBUG_OR_SANITIZER_BUILD
/// This log message might contain credentials,
Expand Down Expand Up @@ -1046,6 +1083,8 @@ void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, cons

try
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogCreateNamespace);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogCreateNamespaceMicroseconds);
sendRequest(endpoint, request_body);
}
catch (...)
Expand Down Expand Up @@ -1088,6 +1127,8 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

try
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogCreateTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogCreateTableMicroseconds);
sendRequest(endpoint, request_body);
}
catch (const DB::HTTPException & ex)
Expand Down Expand Up @@ -1153,6 +1194,8 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t

try
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogUpdateTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogUpdateTableMicroseconds);
sendRequest(endpoint, request_body);
}
catch (const DB::HTTPException &)
Expand All @@ -1176,6 +1219,8 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_
Poco::JSON::Object::Ptr request_body = nullptr;
try
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogDropTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogDropTableMicroseconds);
sendRequest(endpoint, request_body, Poco::Net::HTTPRequest::HTTP_DELETE, true);
}
catch (const DB::HTTPException & ex)
Expand Down Expand Up @@ -1255,16 +1300,21 @@ ICatalog::CredentialsRefreshCallback RestCatalog::getCredentialsConfigurationCal
const auto & table = storage_id.getTableName();
auto [namespace_name, table_name] = DataLake::parseTableName(table);
const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encodeNamespaceForURI(namespace_name) / "tables" / table_name;
auto buf = createReadBuffer(config.prefix / endpoint, /* params */{}, headers);
String json_str;

if (buf->eof())
{
LOG_DEBUG(log, "Table doesn't exist (endpoint: {})", endpoint);
return nullptr;
}
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogGetCredentials);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogGetCredentialsMicroseconds);
auto buf = createReadBuffer(config.prefix / endpoint, /* params */{}, headers);

String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (buf->eof())
{
LOG_DEBUG(log, "Table doesn't exist (endpoint: {})", endpoint);
return nullptr;
}

readJSONObjectPossiblyInvalid(json_str, *buf);
}

Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
Expand Down
Loading
Loading