Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 21 additions & 156 deletions extensions/lmdb/LmdbContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include "minifi-cpp/utils/gsl.h"
#include "utils/Locations.h"

namespace org::apache::nifi::minifi::core::repository {
namespace org::apache::nifi::minifi::extensions::lmdb {

LmdbContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {}

Expand All @@ -46,7 +46,7 @@ void LmdbContentRepository::Session::commit() {
if (outStream->write(stream->getBuffer()) != size) {
throw Exception(REPOSITORY_EXCEPTION, "Failed to write " + std::string(is_append ? "appended" : "new") + " resource: " + resource_claim->getContentFullPath());
}
auto lmdb_out_stream = std::dynamic_pointer_cast<io::LmdbStream>(outStream);
auto lmdb_out_stream = std::dynamic_pointer_cast<LmdbStream>(outStream);
if (lmdb_out_stream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't cast output stream to LmdbStream for commit: " + resource_claim->getContentFullPath()); }
if (!lmdb_out_stream->commit()) { throw Exception(REPOSITORY_EXCEPTION, "Failed to commit " + std::string(is_append ? "appended" : "new") + " resource: " + resource_claim->getContentFullPath()); }
};
Expand All @@ -64,11 +64,6 @@ void LmdbContentRepository::Session::commit() {
}

bool LmdbContentRepository::initialize(const std::shared_ptr<minifi::Configure>& configuration) {
if (const int rc = mdb_env_create(&lmdb_env_)) {
logger_->log_error("Failed to create LMDB environment: {}", mdb_strerror(rc));
return false;
}

// Reserve virtual address space for the DB file (max size it can grow to)
const auto max_db_size = configuration->get(Configure::nifi_content_repository_lmdb_max_db_size) | utils::andThen([](auto max_db_size_str) -> std::optional<uint64_t> {
if (max_db_size_str.empty()) { return std::nullopt; }
Expand All @@ -80,16 +75,6 @@ bool LmdbContentRepository::initialize(const std::shared_ptr<minifi::Configure>&

if (!max_db_size) {
logger_->log_error("Invalid max DB size configuration for LMDB Content Repository");
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}

logger_->log_info("Setting LMDB max DB size to {} bytes", *max_db_size);
if (const auto rc = mdb_env_set_mapsize(lmdb_env_, gsl::narrow<size_t>(*max_db_size)); rc != MDB_SUCCESS) {
logger_->log_error("Failed to set LMDB map size: {}", mdb_strerror(rc));
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}

Expand All @@ -101,151 +86,45 @@ bool LmdbContentRepository::initialize(const std::shared_ptr<minifi::Configure>&
} else {
directory_ = (working_dir / "lmdbcontentrepository").string();
}
logger_->log_info("Using LMDB Content Repository directory '{}'", directory_);

if (std::filesystem::exists(directory_)) {
logger_->log_info("Using existing LMDB Content Repository directory at {}", directory_);
} else {
logger_->log_info("Creating LMDB Content Repository directory at {}", directory_);
if (!std::filesystem::create_directories(directory_)) {
logger_->log_error("Failed to create LMDB Content Repository directory at {}", directory_);
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}
}

if (const int rc = mdb_env_open(lmdb_env_, directory_.c_str(), MDB_NOTLS, 0664)) {
logger_->log_error("Failed to open LMDB environment: {}", mdb_strerror(rc));
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}

MDB_txn* init_txn = nullptr;
if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to begin LMDB transaction during initialize: {}", mdb_strerror(rc));
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}
if (const int rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) {
logger_->log_error("Failed to open LMDB database: {}", mdb_strerror(rc));
mdb_txn_abort(init_txn);
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}

if (const int rc = mdb_txn_commit(init_txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to commit LMDB transaction during initialize: {}", mdb_strerror(rc));
mdb_env_close(lmdb_env_);
lmdb_env_ = nullptr;
return false;
}

return true;
return lmdb_wrapper_.initialize(directory_, *max_db_size);
}

void LmdbContentRepository::start() {}
void LmdbContentRepository::stop() {}

std::shared_ptr<ContentSession> LmdbContentRepository::createSession() {
return std::make_shared<Session>(sharedFromThis<ContentRepository>());
std::shared_ptr<core::ContentSession> LmdbContentRepository::createSession() {
return std::make_shared<Session>(sharedFromThis<core::ContentRepository>());
}

std::shared_ptr<io::BaseStream> LmdbContentRepository::write(const minifi::ResourceClaim& claim, bool) {
return std::make_shared<io::LmdbStream>(claim.getContentFullPath(), lmdb_env_, &lmdb_handle_, true);
return std::make_shared<LmdbStream>(claim.getContentFullPath(), lmdb_wrapper_, true);
}

std::shared_ptr<io::BaseStream> LmdbContentRepository::read(const minifi::ResourceClaim& claim) {
return std::make_shared<io::LmdbStream>(claim.getContentFullPath(), lmdb_env_, &lmdb_handle_, false);
return std::make_shared<LmdbStream>(claim.getContentFullPath(), lmdb_wrapper_, false);
}

bool LmdbContentRepository::exists(const minifi::ResourceClaim& streamId) {
const auto path = streamId.getContentFullPath();
MDB_val key{path.size(), const_cast<char*>(path.data())};
MDB_val value{};

MDB_txn* txn = nullptr;
if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to begin LMDB read transaction in exists: {}", mdb_strerror(rc));
return false;
}
auto guard = gsl::finally([txn] { mdb_txn_abort(txn); });

const auto rc = mdb_get(txn, lmdb_handle_, &key, &value);
if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) {
logger_->log_error("Failed to get value from LMDB database: {}", mdb_strerror(rc));
}
return rc == MDB_SUCCESS;
return lmdb_wrapper_.exists(path);
}

bool LmdbContentRepository::removeKey(const std::string& content_path) {
MDB_val key{content_path.size(), const_cast<char*>(content_path.data())};

MDB_txn* txn = nullptr;
if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to begin LMDB write transaction in removeKey: {}", mdb_strerror(rc));
return false;
}
int rc = mdb_del(txn, lmdb_handle_, &key, nullptr);

if (rc == MDB_SUCCESS) {
if (const int rc = mdb_txn_commit(txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to commit LMDB transaction during delete: {}", mdb_strerror(rc));
return false;
}
return true;
} else if (rc == MDB_NOTFOUND) {
logger_->log_debug("Key {} not found in LMDB database during delete", content_path);
mdb_txn_abort(txn);
return true;
} else {
logger_->log_error("Failed to delete key '{}' from LMDB database: {}", content_path, mdb_strerror(rc));
mdb_txn_abort(txn);
return false;
}
return lmdb_wrapper_.removeKey(content_path);
}

void LmdbContentRepository::clearOrphans() {
std::vector<std::string> keys_to_be_deleted;

{
MDB_txn* txn = nullptr;
if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to begin LMDB read transaction in clearOrphans: {}", mdb_strerror(rc));
return;
}
auto txn_guard = gsl::finally([txn] { mdb_txn_abort(txn); });

MDB_cursor* cursor = nullptr;
if (const int rc = mdb_cursor_open(txn, lmdb_handle_, &cursor); rc != MDB_SUCCESS) {
logger_->log_error("Failed to open LMDB cursor in clearOrphans: {}", mdb_strerror(rc));
return;
}
auto cursor_guard = gsl::finally([cursor] { mdb_cursor_close(cursor); });

MDB_val key{};
MDB_val val{};
int rc = mdb_cursor_get(cursor, &key, &val, MDB_FIRST);

while (rc == MDB_SUCCESS) {
std::string key_string = std::string(static_cast<char*>(key.mv_data), key.mv_size);

std::lock_guard<std::mutex> lock(count_map_mutex_);
auto claim_it = count_map_.find(key_string);
if (claim_it == count_map_.end() || claim_it->second == 0) {
logger_->log_debug("Deleting orphan resource {}", key_string);
keys_to_be_deleted.push_back(key_string);
}
rc = mdb_cursor_get(cursor, &key, &val, MDB_NEXT);
}
lmdb_wrapper_.forEach([this, &keys_to_be_deleted](const MDB_val& key, const MDB_val&) {
std::string key_string = std::string(static_cast<char*>(key.mv_data), key.mv_size);

if (rc != MDB_NOTFOUND) {
logger_->log_error("Failed to iterate over LMDB database: {}", mdb_strerror(rc));
return;
std::lock_guard<std::mutex> lock(count_map_mutex_);
auto claim_it = count_map_.find(key_string);
if (claim_it == count_map_.end() || claim_it->second == 0) {
logger_->log_debug("Deleting orphan resource {}", key_string);
keys_to_be_deleted.push_back(key_string);
}
}
});

std::vector<std::string> failed_deletions;
for (const auto& key : keys_to_be_deleted) {
Expand All @@ -260,29 +139,15 @@ void LmdbContentRepository::clearOrphans() {
purge_list_.insert(purge_list_.end(), std::make_move_iterator(failed_deletions.begin()), std::make_move_iterator(failed_deletions.end()));
}

MDB_stat LmdbContentRepository::getDbStat() const {
MDB_stat stat{};
MDB_txn* txn = nullptr;
if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) {
logger_->log_error("Failed to begin LMDB read transaction in getDbStat: {}", mdb_strerror(rc));
return stat;
}
if (const int rc = mdb_stat(txn, lmdb_handle_, &stat); rc != MDB_SUCCESS) {
logger_->log_error("Failed to read LMDB database stats: {}", mdb_strerror(rc));
}
mdb_txn_abort(txn);
return stat;
}

uint64_t LmdbContentRepository::getRepositorySize() const {
const auto stat = getDbStat();
const auto stat = lmdb_wrapper_.getDbStat();
return stat.ms_psize * (stat.ms_branch_pages + stat.ms_leaf_pages + stat.ms_overflow_pages);
}

uint64_t LmdbContentRepository::getRepositoryEntryCount() const {
return getDbStat().ms_entries;
return lmdb_wrapper_.getDbStat().ms_entries;
}

REGISTER_RESOURCE_AS(LmdbContentRepository, InternalResource, ("LmdbContentRepository", "lmdbcontentrepository"));

} // namespace org::apache::nifi::minifi::core::repository
} // namespace org::apache::nifi::minifi::extensions::lmdb
29 changes: 11 additions & 18 deletions extensions/lmdb/LmdbContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,31 @@
#include "minifi-cpp/core/PropertyDefinition.h"
#include "minifi-cpp/utils/Export.h"
#include "minifi-cpp/utils/Id.h"
#include "lmdb.h"
#include "LmdbWrapper.h"

namespace org::apache::nifi::minifi::core::repository {
namespace org::apache::nifi::minifi::extensions::lmdb {

class LmdbContentRepository : public core::ContentRepositoryImpl {
public:
explicit LmdbContentRepository(std::string_view name = className<LmdbContentRepository>(), const utils::Identifier& uuid = {})
explicit LmdbContentRepository(std::string_view name = core::className<LmdbContentRepository>(), const utils::Identifier& uuid = {})
: core::ContentRepositoryImpl(name, uuid) {}

~LmdbContentRepository() override {
stop();
if (lmdb_env_) {
mdb_dbi_close(lmdb_env_, lmdb_handle_);
mdb_env_close(lmdb_env_);
}
}

EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;

class Session : public BufferedContentSession {
class Session : public core::BufferedContentSession {
public:
explicit Session(std::shared_ptr<ContentRepository> repository);
explicit Session(std::shared_ptr<core::ContentRepository> repository);

void commit() override;
};

std::shared_ptr<ContentSession> createSession() override;
std::shared_ptr<core::ContentSession> createSession() override;
bool initialize(const std::shared_ptr<minifi::Configure>& configuration) override;
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false) override;
std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim) override;
Expand All @@ -67,8 +63,8 @@ class LmdbContentRepository : public core::ContentRepositoryImpl {

void clearOrphans() override;

void start() override;
void stop() override;
void start() override {}
void stop() override {}

uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
Expand All @@ -77,11 +73,8 @@ class LmdbContentRepository : public core::ContentRepositoryImpl {
bool removeKey(const std::string& content_path) override;

private:
MDB_stat getDbStat() const;

MDB_env* lmdb_env_{nullptr};
MDB_dbi lmdb_handle_{};
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<LmdbContentRepository>::getLogger()};
LmdbWrapper lmdb_wrapper_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<LmdbContentRepository>::getLogger()};
};

} // namespace org::apache::nifi::minifi::core::repository
} // namespace org::apache::nifi::minifi::extensions::lmdb
Loading
Loading