diff --git a/extensions/lmdb/LmdbContentRepository.cpp b/extensions/lmdb/LmdbContentRepository.cpp index 33680f5d15..cb6acc2e22 100644 --- a/extensions/lmdb/LmdbContentRepository.cpp +++ b/extensions/lmdb/LmdbContentRepository.cpp @@ -31,7 +31,7 @@ #include "minifi-cpp/utils/gsl.h" #include "utils/Locations.h" -namespace org::apache::nifi::minifi::core::repository { +namespace org::apache::nifi::minifi::extensions::lmdb { LmdbContentRepository::Session::Session(std::shared_ptr repository) : BufferedContentSession(std::move(repository)) {} @@ -46,7 +46,7 @@ void LmdbContentRepository::Session::commit() { if (outStream->write(stream->getBuffer()) != size) { throw Exception(REPOSITORY_EXCEPTION, "Failed to write " + std::string(is_append ? "appended" : "new") + " resource: " + resource_claim->getContentFullPath()); } - auto lmdb_out_stream = std::dynamic_pointer_cast(outStream); + auto lmdb_out_stream = std::dynamic_pointer_cast(outStream); if (lmdb_out_stream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't cast output stream to LmdbStream for commit: " + resource_claim->getContentFullPath()); } if (!lmdb_out_stream->commit()) { throw Exception(REPOSITORY_EXCEPTION, "Failed to commit " + std::string(is_append ? "appended" : "new") + " resource: " + resource_claim->getContentFullPath()); } }; @@ -64,11 +64,6 @@ void LmdbContentRepository::Session::commit() { } bool LmdbContentRepository::initialize(const std::shared_ptr& configuration) { - if (const int rc = mdb_env_create(&lmdb_env_)) { - logger_->log_error("Failed to create LMDB environment: {}", mdb_strerror(rc)); - return false; - } - // Reserve virtual address space for the DB file (max size it can grow to) const auto max_db_size = configuration->get(Configure::nifi_content_repository_lmdb_max_db_size) | utils::andThen([](auto max_db_size_str) -> std::optional { if (max_db_size_str.empty()) { return std::nullopt; } @@ -80,16 +75,6 @@ bool LmdbContentRepository::initialize(const std::shared_ptr& if (!max_db_size) { logger_->log_error("Invalid max DB size configuration for LMDB Content Repository"); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; - return false; - } - - logger_->log_info("Setting LMDB max DB size to {} bytes", *max_db_size); - if (const auto rc = mdb_env_set_mapsize(lmdb_env_, gsl::narrow(*max_db_size)); rc != MDB_SUCCESS) { - logger_->log_error("Failed to set LMDB map size: {}", mdb_strerror(rc)); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; return false; } @@ -101,151 +86,45 @@ bool LmdbContentRepository::initialize(const std::shared_ptr& } else { directory_ = (working_dir / "lmdbcontentrepository").string(); } + logger_->log_info("Using LMDB Content Repository directory '{}'", directory_); - if (std::filesystem::exists(directory_)) { - logger_->log_info("Using existing LMDB Content Repository directory at {}", directory_); - } else { - logger_->log_info("Creating LMDB Content Repository directory at {}", directory_); - if (!std::filesystem::create_directories(directory_)) { - logger_->log_error("Failed to create LMDB Content Repository directory at {}", directory_); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; - return false; - } - } - - if (const int rc = mdb_env_open(lmdb_env_, directory_.c_str(), MDB_NOTLS, 0664)) { - logger_->log_error("Failed to open LMDB environment: {}", mdb_strerror(rc)); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; - return false; - } - - MDB_txn* init_txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB transaction during initialize: {}", mdb_strerror(rc)); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; - return false; - } - if (const int rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) { - logger_->log_error("Failed to open LMDB database: {}", mdb_strerror(rc)); - mdb_txn_abort(init_txn); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; - return false; - } - - if (const int rc = mdb_txn_commit(init_txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to commit LMDB transaction during initialize: {}", mdb_strerror(rc)); - mdb_env_close(lmdb_env_); - lmdb_env_ = nullptr; - return false; - } - - return true; + return lmdb_wrapper_.initialize(directory_, *max_db_size); } -void LmdbContentRepository::start() {} -void LmdbContentRepository::stop() {} - -std::shared_ptr LmdbContentRepository::createSession() { - return std::make_shared(sharedFromThis()); +std::shared_ptr LmdbContentRepository::createSession() { + return std::make_shared(sharedFromThis()); } std::shared_ptr LmdbContentRepository::write(const minifi::ResourceClaim& claim, bool) { - return std::make_shared(claim.getContentFullPath(), lmdb_env_, &lmdb_handle_, true); + return std::make_shared(claim.getContentFullPath(), lmdb_wrapper_, true); } std::shared_ptr LmdbContentRepository::read(const minifi::ResourceClaim& claim) { - return std::make_shared(claim.getContentFullPath(), lmdb_env_, &lmdb_handle_, false); + return std::make_shared(claim.getContentFullPath(), lmdb_wrapper_, false); } bool LmdbContentRepository::exists(const minifi::ResourceClaim& streamId) { const auto path = streamId.getContentFullPath(); - MDB_val key{path.size(), const_cast(path.data())}; - MDB_val value{}; - - MDB_txn* txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB read transaction in exists: {}", mdb_strerror(rc)); - return false; - } - auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); - - const auto rc = mdb_get(txn, lmdb_handle_, &key, &value); - if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) { - logger_->log_error("Failed to get value from LMDB database: {}", mdb_strerror(rc)); - } - return rc == MDB_SUCCESS; + return lmdb_wrapper_.exists(path); } bool LmdbContentRepository::removeKey(const std::string& content_path) { - MDB_val key{content_path.size(), const_cast(content_path.data())}; - - MDB_txn* txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB write transaction in removeKey: {}", mdb_strerror(rc)); - return false; - } - int rc = mdb_del(txn, lmdb_handle_, &key, nullptr); - - if (rc == MDB_SUCCESS) { - if (const int rc = mdb_txn_commit(txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to commit LMDB transaction during delete: {}", mdb_strerror(rc)); - return false; - } - return true; - } else if (rc == MDB_NOTFOUND) { - logger_->log_debug("Key {} not found in LMDB database during delete", content_path); - mdb_txn_abort(txn); - return true; - } else { - logger_->log_error("Failed to delete key '{}' from LMDB database: {}", content_path, mdb_strerror(rc)); - mdb_txn_abort(txn); - return false; - } + return lmdb_wrapper_.removeKey(content_path); } void LmdbContentRepository::clearOrphans() { std::vector keys_to_be_deleted; - { - MDB_txn* txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB read transaction in clearOrphans: {}", mdb_strerror(rc)); - return; - } - auto txn_guard = gsl::finally([txn] { mdb_txn_abort(txn); }); - - MDB_cursor* cursor = nullptr; - if (const int rc = mdb_cursor_open(txn, lmdb_handle_, &cursor); rc != MDB_SUCCESS) { - logger_->log_error("Failed to open LMDB cursor in clearOrphans: {}", mdb_strerror(rc)); - return; - } - auto cursor_guard = gsl::finally([cursor] { mdb_cursor_close(cursor); }); - - MDB_val key{}; - MDB_val val{}; - int rc = mdb_cursor_get(cursor, &key, &val, MDB_FIRST); - - while (rc == MDB_SUCCESS) { - std::string key_string = std::string(static_cast(key.mv_data), key.mv_size); - - std::lock_guard lock(count_map_mutex_); - auto claim_it = count_map_.find(key_string); - if (claim_it == count_map_.end() || claim_it->second == 0) { - logger_->log_debug("Deleting orphan resource {}", key_string); - keys_to_be_deleted.push_back(key_string); - } - rc = mdb_cursor_get(cursor, &key, &val, MDB_NEXT); - } + lmdb_wrapper_.forEach([this, &keys_to_be_deleted](const MDB_val& key, const MDB_val&) { + std::string key_string = std::string(static_cast(key.mv_data), key.mv_size); - if (rc != MDB_NOTFOUND) { - logger_->log_error("Failed to iterate over LMDB database: {}", mdb_strerror(rc)); - return; + std::lock_guard lock(count_map_mutex_); + auto claim_it = count_map_.find(key_string); + if (claim_it == count_map_.end() || claim_it->second == 0) { + logger_->log_debug("Deleting orphan resource {}", key_string); + keys_to_be_deleted.push_back(key_string); } - } + }); std::vector failed_deletions; for (const auto& key : keys_to_be_deleted) { @@ -260,29 +139,15 @@ void LmdbContentRepository::clearOrphans() { purge_list_.insert(purge_list_.end(), std::make_move_iterator(failed_deletions.begin()), std::make_move_iterator(failed_deletions.end())); } -MDB_stat LmdbContentRepository::getDbStat() const { - MDB_stat stat{}; - MDB_txn* txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB read transaction in getDbStat: {}", mdb_strerror(rc)); - return stat; - } - if (const int rc = mdb_stat(txn, lmdb_handle_, &stat); rc != MDB_SUCCESS) { - logger_->log_error("Failed to read LMDB database stats: {}", mdb_strerror(rc)); - } - mdb_txn_abort(txn); - return stat; -} - uint64_t LmdbContentRepository::getRepositorySize() const { - const auto stat = getDbStat(); + const auto stat = lmdb_wrapper_.getDbStat(); return stat.ms_psize * (stat.ms_branch_pages + stat.ms_leaf_pages + stat.ms_overflow_pages); } uint64_t LmdbContentRepository::getRepositoryEntryCount() const { - return getDbStat().ms_entries; + return lmdb_wrapper_.getDbStat().ms_entries; } REGISTER_RESOURCE_AS(LmdbContentRepository, InternalResource, ("LmdbContentRepository", "lmdbcontentrepository")); -} // namespace org::apache::nifi::minifi::core::repository +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbContentRepository.h b/extensions/lmdb/LmdbContentRepository.h index 65d47a4dbb..76e85fd083 100644 --- a/extensions/lmdb/LmdbContentRepository.h +++ b/extensions/lmdb/LmdbContentRepository.h @@ -28,35 +28,31 @@ #include "minifi-cpp/core/PropertyDefinition.h" #include "minifi-cpp/utils/Export.h" #include "minifi-cpp/utils/Id.h" -#include "lmdb.h" +#include "LmdbWrapper.h" -namespace org::apache::nifi::minifi::core::repository { +namespace org::apache::nifi::minifi::extensions::lmdb { class LmdbContentRepository : public core::ContentRepositoryImpl { public: - explicit LmdbContentRepository(std::string_view name = className(), const utils::Identifier& uuid = {}) + explicit LmdbContentRepository(std::string_view name = core::className(), const utils::Identifier& uuid = {}) : core::ContentRepositoryImpl(name, uuid) {} ~LmdbContentRepository() override { stop(); - if (lmdb_env_) { - mdb_dbi_close(lmdb_env_, lmdb_handle_); - mdb_env_close(lmdb_env_); - } } EXTENSIONAPI static constexpr auto Properties = std::array{}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - class Session : public BufferedContentSession { + class Session : public core::BufferedContentSession { public: - explicit Session(std::shared_ptr repository); + explicit Session(std::shared_ptr repository); void commit() override; }; - std::shared_ptr createSession() override; + std::shared_ptr createSession() override; bool initialize(const std::shared_ptr& configuration) override; std::shared_ptr write(const minifi::ResourceClaim& claim, bool append = false) override; std::shared_ptr read(const minifi::ResourceClaim& claim) override; @@ -67,8 +63,8 @@ class LmdbContentRepository : public core::ContentRepositoryImpl { void clearOrphans() override; - void start() override; - void stop() override; + void start() override {} + void stop() override {} uint64_t getRepositorySize() const override; uint64_t getRepositoryEntryCount() const override; @@ -77,11 +73,8 @@ class LmdbContentRepository : public core::ContentRepositoryImpl { bool removeKey(const std::string& content_path) override; private: - MDB_stat getDbStat() const; - - MDB_env* lmdb_env_{nullptr}; - MDB_dbi lmdb_handle_{}; - std::shared_ptr logger_{logging::LoggerFactory::getLogger()}; + LmdbWrapper lmdb_wrapper_; + std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; }; -} // namespace org::apache::nifi::minifi::core::repository +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbFlowFileRepository.cpp b/extensions/lmdb/LmdbFlowFileRepository.cpp new file mode 100644 index 0000000000..d01414bee9 --- /dev/null +++ b/extensions/lmdb/LmdbFlowFileRepository.cpp @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "LmdbFlowFileRepository.h" +#include "core/Resource.h" +#include "minifi-cpp/FlowFileRecord.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::extensions::lmdb { + +namespace { +bool getRepositoryCheckHealth(const Configure& configure) { + std::string check_health_str; + configure.get(Configure::nifi_flow_file_repository_check_health, check_health_str); + return utils::string::toBool(check_health_str).value_or(true); +} +} // namespace + +bool LmdbFlowFileRepository::initialize(const std::shared_ptr &configure) { + std::string value; + + if (configure->get(Configure::nifi_flowfile_repository_directory_default, value) && !value.empty()) { + directory_ = value; + } + check_flowfile_content_size_ = getRepositoryCheckHealth(*configure); + logger_->log_debug("NiFi LMDB FlowFile Repository Directory {}", directory_); + + // Reserve virtual address space for the DB file (max size it can grow to) + const auto max_db_size = configure->get(Configure::nifi_flowfile_repository_lmdb_max_db_size) | utils::andThen([](auto max_db_size_str) -> std::optional { + if (max_db_size_str.empty()) { return std::nullopt; } + return parsing::parseDataSize(max_db_size_str) | utils::orThrow(fmt::format("{} was set to invalid value: '{}'", Configure::nifi_flowfile_repository_lmdb_max_db_size, max_db_size_str)); + }) | utils::orElse([] { + return std::make_optional(MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE); + }); + + if (!max_db_size) { + logger_->log_error("Invalid max DB size configuration for LMDB FlowFile Repository"); + return false; + } + + logger_->log_info("Using LMDB FlowFile Repository directory '{}'", directory_); + return lmdb_wrapper_.initialize(directory_, *max_db_size); +} + +bool LmdbFlowFileRepository::Delete(const std::string& key) { + keys_to_delete_.enqueue({.key = key}); + return true; +} + +bool LmdbFlowFileRepository::Delete(const std::shared_ptr& item) { + if (auto ff = std::dynamic_pointer_cast(item)) { + keys_to_delete_.enqueue({.key = item->getUUIDStr(), .content = ff->getResourceClaim()}); + } else { + keys_to_delete_.enqueue({.key = item->getUUIDStr()}); + } + return true; +} + +bool LmdbFlowFileRepository::Put(const std::string& key, const uint8_t* buf, size_t bufLen) { + return lmdb_wrapper_.putValue(key, std::string(reinterpret_cast(buf), bufLen)); +} + +bool LmdbFlowFileRepository::MultiPut(const std::vector>>& data) { + return lmdb_wrapper_.putValues(data); +} + +bool LmdbFlowFileRepository::Get(const std::string& key, std::string& value) { + auto result = lmdb_wrapper_.getValue(key); + if (result) { + value = std::move(*result); + return true; + } + return false; +} + +uint64_t LmdbFlowFileRepository::getRepositorySize() const { + const auto stat = lmdb_wrapper_.getDbStat(); + return stat.ms_psize * (stat.ms_branch_pages + stat.ms_leaf_pages + stat.ms_overflow_pages); +} + +uint64_t LmdbFlowFileRepository::getRepositoryEntryCount() const { + return lmdb_wrapper_.getDbStat().ms_entries; +} + +void LmdbFlowFileRepository::flush() { + std::list flow_files; + + while (keys_to_delete_.size_approx() > 0) { + ExpiredFlowFileInfo info; + if (keys_to_delete_.try_dequeue(info)) { + flow_files.push_back(std::move(info)); + } + } + + deserializeFlowFilesWithNoContentClaim(flow_files); + + std::vector flow_file_keys; + for (auto& ff : flow_files) { + flow_file_keys.push_back(ff.key); + logger_->log_debug("Issuing batch delete, including {}, Content path {}", ff.key, ff.content ? ff.content->getContentFullPath() : "null"); + } + + if (!lmdb_wrapper_.removeKeys(flow_file_keys)) { + for (auto&& ff : flow_files) { + keys_to_delete_.enqueue(std::move(ff)); + } + return; // Stop here - don't delete from content repo while we have records in FF repo + } + + if (content_repo_) { + for (auto& ff : flow_files) { + if (ff.content) { + ff.content->decreaseFlowFileRecordOwnedCount(); + } + } + } +} + +void LmdbFlowFileRepository::deserializeFlowFilesWithNoContentClaim(std::list& flow_files) { + std::vector keys; + std::vector::iterator> key_positions; + for (auto it = flow_files.begin(); it != flow_files.end(); ++it) { + if (!it->content) { + keys.push_back(it->key); + key_positions.push_back(it); + } + } + if (keys.empty()) { + return; + } + std::vector> values; + values.reserve(keys.size()); + for (const auto& key : keys) { + values.push_back(lmdb_wrapper_.getValue(key)); + } + + gsl_Expects(keys.size() == values.size()); + + for (size_t i = 0; i < keys.size(); ++i) { + if (!values[i]) { + logger_->log_error("Failed to read key from LMDB: {}! DB is most probably in an inconsistent state!", keys[i].data()); + flow_files.erase(key_positions.at(i)); + continue; + } + + utils::Identifier container_id; + auto flow_file = FlowFileRecord::DeSerialize(std::as_bytes(std::span(*values[i])), content_repo_, container_id); + if (flow_file) { + gsl_Expects(flow_file->getUUIDStr() == key_positions.at(i)->key); + key_positions.at(i)->content = flow_file->getResourceClaim(); + } else { + logger_->log_error("Could not deserialize flow file {}", key_positions.at(i)->key); + } + } +} + +void LmdbFlowFileRepository::run() { + while (isRunning()) { + std::this_thread::sleep_for(purge_period_); + flush(); + } + flush(); +} + +bool LmdbFlowFileRepository::contentSizeIsAmpleForFlowFile(const core::FlowFile& flow_file_record, const std::shared_ptr& resource_claim) const { + const auto stream_size = resource_claim ? content_repo_->size(*resource_claim) : 0; + const auto required_size = flow_file_record.getOffset() + flow_file_record.getSize(); + return stream_size >= required_size; +} + +core::Connectable* LmdbFlowFileRepository::getContainer(const std::string& container_id) { + auto container = containers_.find(container_id); + if (container != containers_.end()) + return container->second; + // for backward compatibility + container = connection_map_.find(container_id); + if (container != connection_map_.end()) + return container->second; + return nullptr; +} + +void LmdbFlowFileRepository::initialize_repository() { + gsl_Expects(content_repo_); + logger_->log_info("Reading existing flow files from database"); + + lmdb_wrapper_.forEach([this](const MDB_val& key, const MDB_val& value) { + utils::Identifier container_id; + const std::string key_str = std::string(static_cast(key.mv_data), key.mv_size); + const std::string data = std::string(static_cast(value.mv_data), value.mv_size); + auto eventRead = FlowFileRecord::DeSerialize(std::as_bytes(std::span(data.data(), data.size())), content_repo_, container_id); + if (!eventRead) { + keys_to_delete_.enqueue({.key = key_str}); + return; + } + auto claim = eventRead->getResourceClaim(); + if (claim) { + claim->increaseFlowFileRecordOwnedCount(); + } + const auto container = getContainer(container_id.to_string()); + if (!container) { + logger_->log_warn("Could not find connection for {}, path {}", container_id.to_string(), eventRead->getContentFullPath()); + keys_to_delete_.enqueue({.key = key_str, .content = eventRead->getResourceClaim()}); + return; + } + if (check_flowfile_content_size_ && !contentSizeIsAmpleForFlowFile(*eventRead, claim)) { + logger_->log_warn("Content is missing or too small for flowfile {}", eventRead->getContentFullPath()); + keys_to_delete_.enqueue({.key = key_str, .content = eventRead->getResourceClaim()}); + return; + } + + logger_->log_debug("Found connection for {}, path {}", container_id.to_string(), eventRead->getContentFullPath()); + eventRead->setStoredToRepository(true); + // we found the connection for the persistent flowFile + // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles + container->restore(eventRead); + }); + + flush(); + content_repo_->clearOrphans(); +} + +void LmdbFlowFileRepository::loadComponent(const std::shared_ptr &content_repo) { + content_repo_ = content_repo; + initialize_repository(); +} + +REGISTER_RESOURCE_AS(LmdbFlowFileRepository, InternalResource, ("LmdbFlowFileRepository", "lmdbflowfilerepository")); + +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbFlowFileRepository.h b/extensions/lmdb/LmdbFlowFileRepository.h new file mode 100644 index 0000000000..1903560123 --- /dev/null +++ b/extensions/lmdb/LmdbFlowFileRepository.h @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "core/ThreadedRepository.h" +#include "utils/file/FileUtils.h" +#include "core/Core.h" +#include "core/logging/LoggerFactory.h" +#include "minifi-cpp/Connection.h" +#include "concurrentqueue.h" +#include "minifi-cpp/utils/Literals.h" +#include "LmdbWrapper.h" + +namespace org::apache::nifi::minifi { +class FlowFileRecord; +} +namespace org::apache::nifi::minifi::extensions::lmdb { + +#ifdef WIN32 +constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = ".\\flowfile_repository"; +constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = ".\\flowfile_checkpoint"; +#else +constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = "./flowfile_repository"; +constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint"; +#endif +constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 100_MiB; +constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10); +constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2); + +constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::milliseconds(500); + +class LmdbFlowFileRepository : public core::ThreadedRepositoryImpl { + public: + LmdbFlowFileRepository(std::string_view name, const utils::Identifier& /*uuid*/) + : LmdbFlowFileRepository(name) { + } + + explicit LmdbFlowFileRepository(const std::string_view repo_name = "", + std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, + int64_t max_partition_bytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, + std::chrono::milliseconds purge_period = FLOWFILE_REPOSITORY_PURGE_PERIOD) + : ThreadedRepositoryImpl(repo_name.length() > 0 ? repo_name : core::className(), std::move(directory), + MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, max_partition_bytes, purge_period) { + } + + ~LmdbFlowFileRepository() override { + stop(); + } + + bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override; + bool MultiPut(const std::vector>>& data) override; + bool Get(const std::string &key, std::string &value) override; + bool Delete(const std::string& key) override; + bool Delete(const std::shared_ptr& item) override; + + bool isNoop() const override { + return false; + } + + void flush() override; + bool initialize(const std::shared_ptr &configure) override; + void loadComponent(const std::shared_ptr &content_repo) override; + + uint64_t getRepositorySize() const override; + uint64_t getRepositoryEntryCount() const override; + + EXTENSIONAPI static constexpr auto Properties = std::array{}; + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + + private: + std::thread& getThread() override { + return thread_; + } + + struct ExpiredFlowFileInfo { + std::string key; + std::shared_ptr content{}; + }; + + void run() override; + void initialize_repository(); + + void deserializeFlowFilesWithNoContentClaim(std::list& flow_files); + + bool contentSizeIsAmpleForFlowFile(const core::FlowFile& flow_file_record, const std::shared_ptr& resource_claim) const; + core::Connectable* getContainer(const std::string& container_id); + + moodycamel::ConcurrentQueue keys_to_delete_; + std::shared_ptr content_repo_; + + bool check_flowfile_content_size_ = true; + + std::thread thread_; + LmdbWrapper lmdb_wrapper_; + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); +}; + +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbStream.cpp b/extensions/lmdb/LmdbStream.cpp index 8162087d8e..914d61d1bf 100644 --- a/extensions/lmdb/LmdbStream.cpp +++ b/extensions/lmdb/LmdbStream.cpp @@ -25,35 +25,22 @@ #include "io/validation.h" -namespace org::apache::nifi::minifi::io { +namespace org::apache::nifi::minifi::extensions::lmdb { -LmdbStream::LmdbStream(std::string path, MDB_env* lmdb_env, MDB_dbi* lmdb_handle, bool write_enable) +LmdbStream::LmdbStream(std::string path, LmdbWrapper& lmdb_wrapper, bool write_enable) : BaseStreamImpl(), + lmdb_wrapper_(lmdb_wrapper), path_(std::move(path)), write_enable_(write_enable), - lmdb_env_(lmdb_env), - lmdb_handle_(lmdb_handle), exists_(loadValue()) {} bool LmdbStream::loadValue() { - MDB_val key{path_.size(), const_cast(path_.data())}; - MDB_val value{}; - - MDB_txn* txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB read transaction in loadValue: {}", mdb_strerror(rc)); + auto value_opt = lmdb_wrapper_.getValue(path_); + if (!value_opt) { return false; } - auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); - - const auto rc = mdb_get(txn, *lmdb_handle_, &key, &value); - if (rc == MDB_SUCCESS) { - value_ = std::string(static_cast(value.mv_data), value.mv_size); - return true; - } else if (rc != MDB_NOTFOUND) { - logger_->log_error("Failed to get value from LMDB database: {}", mdb_strerror(rc)); - } - return false; + value_ = *value_opt; + return true; } void LmdbStream::close() { @@ -63,28 +50,9 @@ void LmdbStream::close() { bool LmdbStream::commit() { if (!write_enable_ || !dirty_) { return false; } - MDB_txn* txn = nullptr; - auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); - if (rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB transaction in close: {}", mdb_strerror(rc)); + if (!lmdb_wrapper_.putValue(path_, value_)) { return false; } - - MDB_val key{path_.size(), const_cast(path_.data())}; - MDB_val val{value_.size(), const_cast(value_.data())}; - rc = mdb_put(txn, *lmdb_handle_, &key, &val, 0); - if (rc != MDB_SUCCESS) { - logger_->log_error("Failed to put value in LMDB database during close: {}", mdb_strerror(rc)); - mdb_txn_abort(txn); - return false; - } - - rc = mdb_txn_commit(txn); - if (rc != MDB_SUCCESS) { - logger_->log_error("Failed to commit LMDB transaction during close: {}", mdb_strerror(rc)); - return false; - } - dirty_ = false; return true; } @@ -98,15 +66,15 @@ size_t LmdbStream::tell() const { } size_t LmdbStream::write(const uint8_t* value, size_t size) { - if (!write_enable_) { return STREAM_ERROR; } - if (size != 0 && IsNullOrEmpty(value)) { return STREAM_ERROR; } + if (!write_enable_) { return io::STREAM_ERROR; } + if (size != 0 && IsNullOrEmpty(value)) { return io::STREAM_ERROR; } value_.append(reinterpret_cast(value), size); dirty_ = true; return size; } size_t LmdbStream::read(std::span buf) { - if (!exists_) { return STREAM_ERROR; } + if (!exists_) { return io::STREAM_ERROR; } if (buf.empty()) { return 0; } if (offset_ >= value_.size()) { return 0; } @@ -116,4 +84,4 @@ size_t LmdbStream::read(std::span buf) { return bytes_to_read; } -} // namespace org::apache::nifi::minifi::io +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbStream.h b/extensions/lmdb/LmdbStream.h index 30b72ef147..261a6f0b0c 100644 --- a/extensions/lmdb/LmdbStream.h +++ b/extensions/lmdb/LmdbStream.h @@ -23,13 +23,13 @@ #include "core/logging/LoggerFactory.h" #include "io/BaseStream.h" -#include "lmdb.h" +#include "LmdbWrapper.h" -namespace org::apache::nifi::minifi::io { +namespace org::apache::nifi::minifi::extensions::lmdb { class LmdbStream : public io::BaseStreamImpl { public: - explicit LmdbStream(std::string path, MDB_env* lmdb_env, MDB_dbi* lmdb_handle, bool write_enable = false); + explicit LmdbStream(std::string path, LmdbWrapper& lmdb_wrapper, bool write_enable = false); ~LmdbStream() override { close(); } @@ -51,11 +51,10 @@ class LmdbStream : public io::BaseStreamImpl { private: bool loadValue(); + LmdbWrapper& lmdb_wrapper_; std::string path_; bool write_enable_; std::string value_; - MDB_env* lmdb_env_; - MDB_dbi* lmdb_handle_; bool exists_; size_t offset_ = 0; bool dirty_ = false; @@ -63,4 +62,4 @@ class LmdbStream : public io::BaseStreamImpl { std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); }; -} // namespace org::apache::nifi::minifi::io +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbWrapper.cpp b/extensions/lmdb/LmdbWrapper.cpp new file mode 100644 index 0000000000..b96c5476a9 --- /dev/null +++ b/extensions/lmdb/LmdbWrapper.cpp @@ -0,0 +1,267 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LmdbWrapper.h" + +#include + +#include "minifi-cpp/utils/gsl.h" + +namespace org::apache::nifi::minifi::extensions::lmdb { + +bool LmdbWrapper::initialize(const std::string& directory, size_t max_db_size) { + if (const auto rc = mdb_env_create(&lmdb_env_)) { + logger_->log_error("Failed to create LMDB environment: {}", mdb_strerror(rc)); + return false; + } + + logger_->log_info("Setting LMDB max DB size to {} bytes", max_db_size); + if (const auto rc = mdb_env_set_mapsize(lmdb_env_, max_db_size); rc != MDB_SUCCESS) { + logger_->log_error("Failed to set LMDB map size: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; + return false; + } + + if (std::filesystem::exists(directory)) { + logger_->log_info("Using existing LMDB Repository directory at {}", directory); + } else { + logger_->log_info("Creating LMDB Repository directory at {}", directory); + if (!std::filesystem::create_directories(directory)) { + logger_->log_error("Failed to create LMDB Repository directory at {}", directory); + return false; + } + } + + if (const auto rc = mdb_env_open(lmdb_env_, directory.c_str(), MDB_NOTLS, 0664)) { + logger_->log_error("Failed to open LMDB environment: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; + return false; + } + + MDB_txn* init_txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB transaction during initialize: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; + return false; + } + if (const auto rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) { + logger_->log_error("Failed to open LMDB database: {}", mdb_strerror(rc)); + mdb_txn_abort(init_txn); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; + return false; + } + + if (const auto rc = mdb_txn_commit(init_txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during initialize: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; + return false; + } + + return true; +} + +bool LmdbWrapper::exists(const std::string& path) const { + MDB_val key{path.size(), const_cast(path.data())}; + MDB_val value{}; + + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in exists: {}", mdb_strerror(rc)); + return false; + } + auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); + + const auto rc = mdb_get(txn, lmdb_handle_, &key, &value); + if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) { logger_->log_error("Failed to get value from LMDB database: {}", mdb_strerror(rc)); } + return rc == MDB_SUCCESS; +} + +bool LmdbWrapper::removeKey(const std::string& path) { + MDB_val key{path.size(), const_cast(path.data())}; + + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB write transaction in removeKey: {}", mdb_strerror(rc)); + return false; + } + const auto rc = mdb_del(txn, lmdb_handle_, &key, nullptr); + + if (rc == MDB_SUCCESS) { + if (const auto rc = mdb_txn_commit(txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during delete: {}", mdb_strerror(rc)); + return false; + } + return true; + } else if (rc == MDB_NOTFOUND) { + logger_->log_debug("Key {} not found in LMDB database during delete", path); + mdb_txn_abort(txn); + return true; + } else { + logger_->log_error("Failed to delete key '{}' from LMDB database: {}", path, mdb_strerror(rc)); + mdb_txn_abort(txn); + return false; + } +} + +bool LmdbWrapper::removeKeys(const std::vector& paths) { + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB write transaction in removeKeys: {}", mdb_strerror(rc)); + return false; + } + + for (const auto& key_str : paths) { + MDB_val key{key_str.size(), const_cast(key_str.data())}; + auto rc = mdb_del(txn, lmdb_handle_, &key, nullptr); + + if (rc == MDB_NOTFOUND) { + logger_->log_warn("Key {} not found in LMDB database during delete", key_str); + } else if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to delete key '{}' from LMDB database: {}", key_str, mdb_strerror(rc)); + mdb_txn_abort(txn); + return false; + } + } + + if (const auto rc = mdb_txn_commit(txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during delete: {}", mdb_strerror(rc)); + return false; + } + return true; +} + +bool LmdbWrapper::forEach(const std::function& func) const { + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in forEach: {}", mdb_strerror(rc)); + return false; + } + + auto txn_guard = gsl::finally([txn] { mdb_txn_abort(txn); }); + + MDB_cursor* cursor = nullptr; + if (const auto rc = mdb_cursor_open(txn, lmdb_handle_, &cursor); rc != MDB_SUCCESS) { + logger_->log_error("Failed to open LMDB cursor in forEach: {}", mdb_strerror(rc)); + return false; + } + auto cursor_guard = gsl::finally([cursor] { mdb_cursor_close(cursor); }); + + MDB_val key{}; + MDB_val val{}; + auto rc = mdb_cursor_get(cursor, &key, &val, MDB_FIRST); + + while (rc == MDB_SUCCESS) { + func(key, val); + rc = mdb_cursor_get(cursor, &key, &val, MDB_NEXT); + } + + if (rc != MDB_NOTFOUND) { + logger_->log_error("Failed to iterate over LMDB database: {}", mdb_strerror(rc)); + return false; + } + return true; +} + +std::optional LmdbWrapper::getValue(const std::string& path) const { + MDB_val key{path.size(), const_cast(path.data())}; + MDB_val value{}; + + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in getValue: {}", mdb_strerror(rc)); + return std::nullopt; + } + auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); + + const auto rc = mdb_get(txn, lmdb_handle_, &key, &value); + if (rc == MDB_SUCCESS) { + return std::string(static_cast(value.mv_data), value.mv_size); + } else if (rc != MDB_NOTFOUND) { + logger_->log_error("Failed to get key '{}' from LMDB database: {}", path, mdb_strerror(rc)); + } + return std::nullopt; +} + +bool LmdbWrapper::putValue(const std::string& path, const std::string& value) { + MDB_txn* txn = nullptr; + auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); + if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB transaction in putValue: {}", mdb_strerror(rc)); + return false; + } + + MDB_val key{path.size(), const_cast(path.data())}; + MDB_val val{value.size(), const_cast(value.data())}; + rc = mdb_put(txn, lmdb_handle_, &key, &val, 0); + if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to put value in LMDB database during putValue: {}", mdb_strerror(rc)); + mdb_txn_abort(txn); + return false; + } + + rc = mdb_txn_commit(txn); + if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during putValue: {}", mdb_strerror(rc)); + return false; + } + return true; +} + +bool LmdbWrapper::putValues(const std::vector>>& data) { + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB write transaction in putValues: {}", mdb_strerror(rc)); + return false; + } + for (const auto& item : data) { + const auto buf = item.second->getBuffer(); + MDB_val mdb_key{item.first.size(), const_cast(item.first.data())}; + MDB_val mdb_value{buf.size(), const_cast(buf.data())}; + if (const auto rc = mdb_put(txn, lmdb_handle_, &mdb_key, &mdb_value, 0); rc != MDB_SUCCESS) { + logger_->log_error("Failed to put key '{}' into LMDB during putValues: {}", item.first, mdb_strerror(rc)); + mdb_txn_abort(txn); + return false; + } + } + if (const auto rc = mdb_txn_commit(txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB putValues transaction: {}", mdb_strerror(rc)); + return false; + } + return true; +} + +MDB_stat LmdbWrapper::getDbStat() const { + MDB_stat stat{}; + MDB_txn* txn = nullptr; + if (const auto rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in getDbStat: {}", mdb_strerror(rc)); + return stat; + } + if (const auto rc = mdb_stat(txn, lmdb_handle_, &stat); rc != MDB_SUCCESS) { + logger_->log_error("Failed to read LMDB database stats: {}", mdb_strerror(rc)); + } + mdb_txn_abort(txn); + return stat; +} + +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/LmdbWrapper.h b/extensions/lmdb/LmdbWrapper.h new file mode 100644 index 0000000000..f429d0b193 --- /dev/null +++ b/extensions/lmdb/LmdbWrapper.h @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "core/logging/LoggerFactory.h" +#include "io/BufferStream.h" +#include "lmdb.h" + +namespace org::apache::nifi::minifi::extensions::lmdb { + +class LmdbWrapper { + public: + LmdbWrapper() = default; + ~LmdbWrapper() { + if (lmdb_env_) { + mdb_dbi_close(lmdb_env_, lmdb_handle_); + mdb_env_close(lmdb_env_); + } + } + + LmdbWrapper(const LmdbWrapper&) = delete; + LmdbWrapper& operator=(const LmdbWrapper&) = delete; + LmdbWrapper(LmdbWrapper&&) = delete; + LmdbWrapper& operator=(LmdbWrapper&&) = delete; + + bool initialize(const std::string& directory, size_t max_db_size); + bool exists(const std::string& path) const; + MDB_stat getDbStat() const; + bool removeKey(const std::string& path); + bool removeKeys(const std::vector& paths); + bool forEach(const std::function& func) const; + std::optional getValue(const std::string& path) const; + bool putValue(const std::string& path, const std::string& value); + bool putValues(const std::vector>>& data); + + private: + MDB_env* lmdb_env_{nullptr}; + MDB_dbi lmdb_handle_{}; + std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; +}; + +} // namespace org::apache::nifi::minifi::extensions::lmdb diff --git a/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp b/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp index ec9b9c7b03..02084e8770 100644 --- a/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp +++ b/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp @@ -38,7 +38,7 @@ class LmdbContentRepositoryTests : TestController { protected: static constexpr std::string_view test_content_ = "well hello there"; - std::shared_ptr content_repo_ = std::make_shared(); + std::shared_ptr content_repo_ = std::make_shared(); void writeContent(const minifi::ResourceClaim& claim) { auto stream = content_repo_->write(claim); @@ -49,10 +49,10 @@ class LmdbContentRepositoryTests : TestController { TEST_CASE("Invalid or empty dbsize configuration value is set", "[lmdb]") { TestController controller; - LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); auto db_path = controller.createTempDirectory().string(); auto configuration = std::make_shared(); - auto content_repo = std::make_shared(); + auto content_repo = std::make_shared(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, db_path); SECTION("Invalid value") { configuration->set(minifi::Configure::nifi_content_repository_lmdb_max_db_size, "invalid"); @@ -68,9 +68,9 @@ TEST_CASE("Invalid or empty dbsize configuration value is set", "[lmdb]") { TEST_CASE("Valid dbsize configuration value is set", "[lmdb]") { TestController controller; - LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); auto configuration = std::make_shared(); - auto content_repo = std::make_shared(); + auto content_repo = std::make_shared(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, controller.createTempDirectory().string()); configuration->set(minifi::Configure::nifi_content_repository_lmdb_max_db_size, "100 MB"); REQUIRE(content_repo->initialize(configuration)); @@ -82,7 +82,7 @@ TEST_CASE("Initialize succeeds when target directory already exists", "[lmdb]") auto db_path = controller.createTempDirectory(); REQUIRE(std::filesystem::exists(db_path)); auto configuration = std::make_shared(); - auto content_repo = std::make_shared(); + auto content_repo = std::make_shared(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, db_path.string()); REQUIRE(content_repo->initialize(configuration)); } @@ -99,7 +99,7 @@ TEST_CASE("Initialize fails and is safe to destroy when the directory path is a auto configuration = std::make_shared(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, file_path.string()); - auto content_repo = std::make_shared(); + auto content_repo = std::make_shared(); REQUIRE_FALSE(content_repo->initialize(configuration)); } @@ -215,7 +215,7 @@ TEST_CASE("Content persists across LmdbContentRepository re-initialization", "[l std::string claim_path; static constexpr std::string_view content = "persisted content"; { - auto content_repo = std::make_shared(); + auto content_repo = std::make_shared(); REQUIRE(content_repo->initialize(configuration)); auto claim = std::make_shared(content_repo); claim_path = claim->getContentFullPath(); @@ -226,7 +226,7 @@ TEST_CASE("Content persists across LmdbContentRepository re-initialization", "[l content_repo->incrementStreamCount(*claim); } - auto reopened_repo = std::make_shared(); + auto reopened_repo = std::make_shared(); REQUIRE(reopened_repo->initialize(configuration)); auto reopened_claim = std::make_shared(claim_path, reopened_repo); REQUIRE(reopened_repo->exists(*reopened_claim)); @@ -237,22 +237,22 @@ TEST_CASE("Content persists across LmdbContentRepository re-initialization", "[l } TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[lmdb]") { - ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared()); + ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared()); } TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly", "[lmdb]") { - ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared()); - ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared()); + ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared()); + ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared()); } TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "[lmdb]") { - ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared()); + ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared()); } TEST_CASE("ProcessSession::write can be cancelled", "[lmdb]") { - ContentRepositoryDependentTests::testOkWrite(std::make_shared()); - ContentRepositoryDependentTests::testErrWrite(std::make_shared()); - ContentRepositoryDependentTests::testCancelWrite(std::make_shared()); + ContentRepositoryDependentTests::testOkWrite(std::make_shared()); + ContentRepositoryDependentTests::testErrWrite(std::make_shared()); + ContentRepositoryDependentTests::testCancelWrite(std::make_shared()); } } // namespace org::apache::nifi::minifi::test diff --git a/extensions/lmdb/tests/LmdbContentSessionTests.cpp b/extensions/lmdb/tests/LmdbContentSessionTests.cpp index f504469934..bc31c43d2f 100644 --- a/extensions/lmdb/tests/LmdbContentSessionTests.cpp +++ b/extensions/lmdb/tests/LmdbContentSessionTests.cpp @@ -31,7 +31,7 @@ namespace org::apache::nifi::minifi::test { class LmdbContentSessionController : public TestController { public: - LmdbContentSessionController() : content_repository_(std::make_shared()) { + LmdbContentSessionController() : content_repository_(std::make_shared()) { auto content_repo_path = createTempDirectory(); auto config = std::make_shared(); config->set(Configure::nifi_dbcontent_repository_directory_default, content_repo_path.string()); @@ -294,7 +294,7 @@ TEST_CASE("LmdbContentRepository::Session commit throws when underlying reposito auto unrelated_repository = std::make_shared(); unrelated_repository->initialize(std::make_shared()); - core::repository::LmdbContentRepository::Session session(unrelated_repository); + extensions::lmdb::LmdbContentRepository::Session session(unrelated_repository); REQUIRE_THROWS_WITH(session.commit(), Catch::Matchers::ContainsSubstring("Session's repository is not an LmdbContentRepository")); } diff --git a/extensions/lmdb/tests/LmdbFlowFileRepositoryTests.cpp b/extensions/lmdb/tests/LmdbFlowFileRepositoryTests.cpp new file mode 100644 index 0000000000..b6930ebc2c --- /dev/null +++ b/extensions/lmdb/tests/LmdbFlowFileRepositoryTests.cpp @@ -0,0 +1,470 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LmdbFlowFileRepository.h" +#include "LmdbContentRepository.h" +#include "Connection.h" +#include "FlowFileRecord.h" +#include "ResourceClaim.h" +#include "io/BufferStream.h" +#include "properties/Configure.h" +#include "utils/Id.h" +#include "unit/Catch.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::test { + +namespace { +std::shared_ptr createFlowFileWithContent(core::ContentRepository& content_repo, std::string_view content) { + auto flow_file = std::make_shared(); + const auto content_session = content_repo.createSession(); + const auto claim = content_session->create(); + const auto stream = content_session->write(claim); + stream->write(std::as_bytes(std::span(content))); + flow_file->setResourceClaim(claim); + flow_file->setSize(stream->size()); + flow_file->setOffset(0); + stream->close(); + content_session->commit(); + return flow_file; +} +} // namespace + +class LmdbFlowFileRepositoryTests : TestController { + public: + LmdbFlowFileRepositoryTests() { + db_path_ = createTempDirectory(); + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_flowfile_repository_directory_default, db_path_.string()); + REQUIRE(flow_file_repo_->initialize(configuration)); + content_db_path_ = createTempDirectory(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_db_path_.string()); + REQUIRE(content_repo_->initialize(configuration)); + flow_file_repo_->loadComponent(content_repo_); + } + + protected: + std::filesystem::path db_path_; + std::filesystem::path content_db_path_; + std::shared_ptr flow_file_repo_ = std::make_shared(); + std::shared_ptr content_repo_ = std::make_shared(); +}; + +TEST_CASE("Initialize LmdbFlowFileRepository", "[lmdb]") { + TestController controller; + + std::filesystem::path db_path; + SECTION("initialize succeeds when the directory does not exist") { + db_path = controller.createTempDirectory() / "does_not_exist_yet"; + REQUIRE_FALSE(std::filesystem::exists(db_path)); + } + + SECTION("initialize succeeds when the directory already exists") { + db_path = controller.createTempDirectory(); + REQUIRE(std::filesystem::exists(db_path)); + } + + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_flowfile_repository_directory_default, db_path.string()); + + auto flow_file_repo = std::make_shared(); + REQUIRE(flow_file_repo->initialize(configuration)); + REQUIRE(std::filesystem::exists(db_path)); +} + +TEST_CASE("initialize honors a valid max db size and persists data", "[lmdb]") { + TestController controller; + LogTestController::getInstance().setDebug(); + const auto db_path = controller.createTempDirectory(); + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_flowfile_repository_directory_default, db_path.string()); + configuration->set(minifi::Configure::nifi_flowfile_repository_lmdb_max_db_size, "32 MB"); + + auto flow_file_repo = std::make_shared(); + REQUIRE(flow_file_repo->initialize(configuration)); + REQUIRE(LogTestController::getInstance().contains("Setting LMDB max DB size to 33554432 bytes")); +} + +TEST_CASE("initialize throws on an invalid max db size", "[lmdb]") { + TestController controller; + const auto db_path = controller.createTempDirectory(); + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_flowfile_repository_directory_default, db_path.string()); + configuration->set(minifi::Configure::nifi_flowfile_repository_lmdb_max_db_size, "not-a-size"); + + auto flow_file_repo = std::make_shared(); + REQUIRE_THROWS(flow_file_repo->initialize(configuration)); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Put on empty repository increases entry count", "[lmdb]") { + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 0); + static constexpr std::string_view payload = "hello flowfile"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(payload.data()), payload.size())); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 1); + REQUIRE(flow_file_repo_->getRepositorySize() > 0); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Put with same key overwrites and keeps entry count at 1", "[lmdb]") { + static constexpr std::string_view first = "first"; + static constexpr std::string_view second = "second value, longer than first"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(first.data()), first.size())); + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(second.data()), second.size())); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 1); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Put of multiple distinct keys", "[lmdb]") { + static constexpr std::string_view payload = "data"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(payload.data()), payload.size())); + REQUIRE(flow_file_repo_->Put("key2", reinterpret_cast(payload.data()), payload.size())); + REQUIRE(flow_file_repo_->Put("key3", reinterpret_cast(payload.data()), payload.size())); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 3); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Put of empty value succeeds", "[lmdb]") { + REQUIRE(flow_file_repo_->Put("key1", nullptr, 0)); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 1); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Get on empty repository returns false", "[lmdb]") { + std::string value = "untouched"; + REQUIRE_FALSE(flow_file_repo_->Get("missing", value)); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Put then Get round-trips the value", "[lmdb]") { + static constexpr std::string_view payload = "hello flowfile"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(payload.data()), payload.size())); + std::string value; + REQUIRE(flow_file_repo_->Get("key1", value)); + REQUIRE(value == payload); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Get returns false for nonexistent key after other Puts", "[lmdb]") { + static constexpr std::string_view payload = "data"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(payload.data()), payload.size())); + std::string value; + REQUIRE_FALSE(flow_file_repo_->Get("key2", value)); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Get reflects the latest Put for a key", "[lmdb]") { + static constexpr std::string_view first = "first"; + static constexpr std::string_view second = "second value"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(first.data()), first.size())); + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(second.data()), second.size())); + std::string value; + REQUIRE(flow_file_repo_->Get("key1", value)); + REQUIRE(value == second); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "MultiPut with empty vector succeeds and adds nothing", "[lmdb]") { + REQUIRE(flow_file_repo_->MultiPut({})); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 0); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "MultiPut writes all entries", "[lmdb]") { + std::vector>> data; + data.emplace_back("key1", std::make_unique(std::string{"value-one"})); + data.emplace_back("key2", std::make_unique(std::string{"value-two"})); + data.emplace_back("key3", std::make_unique(std::string{"value-three"})); + REQUIRE(flow_file_repo_->MultiPut(data)); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 3); + std::string value; + REQUIRE(flow_file_repo_->Get("key1", value)); + REQUIRE(value == "value-one"); + REQUIRE(flow_file_repo_->Get("key2", value)); + REQUIRE(value == "value-two"); + REQUIRE(flow_file_repo_->Get("key3", value)); + REQUIRE(value == "value-three"); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "MultiPut overwrites existing keys", "[lmdb]") { + static constexpr std::string_view old_value = "old"; + REQUIRE(flow_file_repo_->Put("key1", reinterpret_cast(old_value.data()), old_value.size())); + + std::vector>> data; + data.emplace_back("key1", std::make_unique(std::string{"new-one"})); + data.emplace_back("key2", std::make_unique(std::string{"new-two"})); + REQUIRE(flow_file_repo_->MultiPut(data)); + + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 2); + std::string value; + REQUIRE(flow_file_repo_->Get("key1", value)); + REQUIRE(value == "new-one"); + REQUIRE(flow_file_repo_->Get("key2", value)); + REQUIRE(value == "new-two"); +} + +TEST_CASE("MultiPut entries persist across LmdbFlowFileRepository re-open", "[lmdb]") { + TestController controller; + auto db_path = controller.createTempDirectory(); + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_flowfile_repository_directory_default, db_path.string()); + + { + auto flow_file_repo = std::make_shared(); + REQUIRE(flow_file_repo->initialize(configuration)); + std::vector>> data; + data.emplace_back("key1", std::make_unique(std::string{"persisted-one"})); + data.emplace_back("key2", std::make_unique(std::string{"persisted-two"})); + REQUIRE(flow_file_repo->MultiPut(data)); + } + + auto reopened_repo = std::make_shared(); + REQUIRE(reopened_repo->initialize(configuration)); + REQUIRE(reopened_repo->getRepositoryEntryCount() == 2); + std::string value; + REQUIRE(reopened_repo->Get("key1", value)); + REQUIRE(value == "persisted-one"); + REQUIRE(reopened_repo->Get("key2", value)); + REQUIRE(value == "persisted-two"); +} + +TEST_CASE("Put persists across LmdbFlowFileRepository re-open", "[lmdb]") { + TestController controller; + auto db_path = controller.createTempDirectory(); + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_flowfile_repository_directory_default, db_path.string()); + + static constexpr std::string_view payload = "persisted flowfile"; + { + auto flow_file_repo = std::make_shared(); + REQUIRE(flow_file_repo->initialize(configuration)); + REQUIRE(flow_file_repo->Put("key1", reinterpret_cast(payload.data()), payload.size())); + REQUIRE(flow_file_repo->getRepositoryEntryCount() == 1); + } + + auto reopened_repo = std::make_shared(); + REQUIRE(reopened_repo->initialize(configuration)); + REQUIRE(reopened_repo->getRepositoryEntryCount() == 1); +} + +TEST_CASE_METHOD(LmdbFlowFileRepositoryTests, "Deleting keys is done in batches after flush", "[lmdb]") { + std::vector>> data; + data.emplace_back("key1", std::make_unique(std::string{"value-one"})); + data.emplace_back("key2", std::make_unique(std::string{"value-two"})); + data.emplace_back("key3", std::make_unique(std::string{"value-three"})); + REQUIRE(flow_file_repo_->MultiPut(data)); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 3); + REQUIRE(flow_file_repo_->Delete("key1")); + REQUIRE(flow_file_repo_->Delete("key2")); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 3); + flow_file_repo_->flush(); + REQUIRE(flow_file_repo_->getRepositoryEntryCount() == 1); + std::string value; + REQUIRE_FALSE(flow_file_repo_->Get("key1", value)); + REQUIRE_FALSE(flow_file_repo_->Get("key1", value)); + REQUIRE(flow_file_repo_->Get("key3", value)); + REQUIRE(value == "value-three"); +} + +TEST_CASE("loadComponent restores a persisted flow file into its connection", "[lmdb]") { + TestController controller; + const auto ff_dir = controller.createTempDirectory(); + const auto content_dir = controller.createTempDirectory(); + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + const auto connection_id = minifi::utils::IdGenerator::getIdGenerator()->generate(); + minifi::utils::Identifier ff_id; + + { + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + auto connection = std::make_shared(ff_repo, content_repo, "TestConnection", connection_id); + + auto flow_file = createFlowFileWithContent(*content_repo, "hello"); + ff_id = flow_file->getUUID(); + flow_file->setConnection(connection.get()); + REQUIRE(flow_file->Persist(ff_repo)); + ff_repo->flush(); + } + + { + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + auto connection = std::make_shared(ff_repo, content_repo, "TestConnection", connection_id); + + ff_repo->setConnectionMap({{connection_id.to_string(), connection.get()}}); + ff_repo->loadComponent(content_repo); + + REQUIRE(connection->getQueueSize() == 1); + std::set> expired; + auto restored = connection->poll(expired); + REQUIRE(expired.empty()); + REQUIRE(restored); + REQUIRE(restored->getUUID() == ff_id); + } +} + +TEST_CASE("loadComponent purges a flow file whose connection is unknown", "[lmdb]") { + TestController controller; + const auto ff_dir = controller.createTempDirectory(); + const auto content_dir = controller.createTempDirectory(); + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + { + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + auto connection = std::make_shared(ff_repo, content_repo, "TestConnection"); + + auto flow_file = createFlowFileWithContent(*content_repo, "hello"); + flow_file->setConnection(connection.get()); + REQUIRE(flow_file->Persist(ff_repo)); + ff_repo->flush(); + REQUIRE(ff_repo->getRepositoryEntryCount() == 1); + } + + { + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + + // No connection map set, so the persisted flow file has no owning container. + ff_repo->loadComponent(content_repo); + REQUIRE(ff_repo->getRepositoryEntryCount() == 0); + } +} + +TEST_CASE("loadComponent purges an entry that cannot be deserialized", "[lmdb]") { + TestController controller; + const auto ff_dir = controller.createTempDirectory(); + const auto content_dir = controller.createTempDirectory(); + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + static constexpr std::string_view garbage = "this is not a serialized flow file"; + REQUIRE(ff_repo->Put("key1", reinterpret_cast(garbage.data()), garbage.size())); + REQUIRE(ff_repo->getRepositoryEntryCount() == 1); + + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + ff_repo->loadComponent(content_repo); + + REQUIRE(ff_repo->getRepositoryEntryCount() == 0); +} + +TEST_CASE("loadComponent applies the content size health check", "[lmdb]") { + TestController controller; + const auto ff_dir = controller.createTempDirectory(); + const auto content_dir = controller.createTempDirectory(); + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + size_t expected_queue_size = 0; + SECTION("health check enabled drops the undersized flow file") { + config->set(minifi::Configure::nifi_flow_file_repository_check_health, "true"); + expected_queue_size = 1; // only the healthy flow file is restored + } + SECTION("health check disabled keeps the undersized flow file") { + config->set(minifi::Configure::nifi_flow_file_repository_check_health, "false"); + expected_queue_size = 2; // both flow files are restored + } + + const auto connection_id = minifi::utils::IdGenerator::getIdGenerator()->generate(); + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + auto connection = std::make_shared(ff_repo, content_repo, "TestConnection", connection_id); + + auto healthy_flow_file = createFlowFileWithContent(*content_repo, "foo"); + healthy_flow_file->setConnection(connection.get()); + REQUIRE(healthy_flow_file->Persist(ff_repo)); + + auto undersized_flow_file = createFlowFileWithContent(*content_repo, "bar"); + undersized_flow_file->setConnection(connection.get()); + undersized_flow_file->setSize(undersized_flow_file->getSize() * 2); // corrupt the flow file so it fails the health check + REQUIRE(undersized_flow_file->Persist(ff_repo)); + + ff_repo->setConnectionMap({{connection_id.to_string(), connection.get()}}); + REQUIRE(connection->getQueueSize() == 0); + ff_repo->loadComponent(content_repo); + REQUIRE(connection->getQueueSize() == expected_queue_size); +} + +TEST_CASE("loadComponent clears orphaned content from the content repository", "[lmdb]") { + TestController controller; + const auto ff_dir = controller.createTempDirectory(); + const auto content_dir = controller.createTempDirectory(); + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + // Write content with no referencing flow file + { + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + minifi::ResourceClaimImpl claim(content_repo); + content_repo->write(claim)->write("orphan"); + // ensure that the content is not deleted during resource claim destruction + content_repo->incrementStreamCount(claim); + REQUIRE(content_repo->getRepositoryEntryCount() == 1); + } + + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + REQUIRE(content_repo->getRepositoryEntryCount() == 1); + + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + + ff_repo->loadComponent(content_repo); + REQUIRE(content_repo->getRepositoryEntryCount() == 0); +} + +TEST_CASE("flush decreases the owned count and removes content for a deleted flow file", "[lmdb]") { + TestController controller; + const auto ff_dir = controller.createTempDirectory(); + const auto content_dir = controller.createTempDirectory(); + auto config = std::make_shared(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + auto ff_repo = std::make_shared(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(config)); + ff_repo->loadComponent(content_repo); + + auto flow_file = createFlowFileWithContent(*content_repo, "payload"); + REQUIRE(flow_file->Persist(ff_repo)); + REQUIRE(content_repo->getRepositoryEntryCount() == 1); + + REQUIRE(ff_repo->Delete(flow_file)); + ff_repo->flush(); + REQUIRE(ff_repo->getRepositoryEntryCount() == 0); + + CHECK(content_repo->getRepositoryEntryCount() == 1); + flow_file.reset(); // release the claim held by the live flow file, which should trigger deletion of the content + REQUIRE(content_repo->getRepositoryEntryCount() == 0); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/lmdb/tests/LmdbStreamTests.cpp b/extensions/lmdb/tests/LmdbStreamTests.cpp index 3c773fac8c..484c5fa1a8 100644 --- a/extensions/lmdb/tests/LmdbStreamTests.cpp +++ b/extensions/lmdb/tests/LmdbStreamTests.cpp @@ -20,65 +20,27 @@ #include "lmdb.h" #include "unit/Catch.h" #include "unit/TestBase.h" +#include "LmdbWrapper.h" namespace org::apache::nifi::minifi::test { +using LmdbStream = org::apache::nifi::minifi::extensions::lmdb::LmdbStream; +using LmdbWrapper = org::apache::nifi::minifi::extensions::lmdb::LmdbWrapper; + class LmdbStreamTest : TestController { public: LmdbStreamTest() : db_path_(createTempDirectory().string()) { - if (const int rc = mdb_env_create(&lmdb_env_)) { - throw std::runtime_error("Failed to create LMDB environment: " + std::string(mdb_strerror(rc))); - } - if (const int rc = mdb_env_set_mapsize(lmdb_env_, 100ULL * 1024 * 1024); rc != MDB_SUCCESS) { - throw std::runtime_error("Failed to set LMDB map size: " + std::string(mdb_strerror(rc))); - } - - if (const int rc = mdb_env_open(lmdb_env_, db_path_.c_str(), MDB_NOTLS, 0664)) { - throw std::runtime_error("Failed to open LMDB environment " + db_path_ + ": " + std::string(mdb_strerror(rc))); - } - - MDB_txn* init_txn = nullptr; - mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); - if (const auto rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) { - mdb_txn_abort(init_txn); - mdb_env_close(lmdb_env_); - throw std::runtime_error("Failed to open LMDB database: " + std::string(mdb_strerror(rc))); - } - mdb_txn_commit(init_txn); - } - - LmdbStreamTest(const LmdbStreamTest&) = delete; - LmdbStreamTest& operator=(const LmdbStreamTest&) = delete; - LmdbStreamTest(LmdbStreamTest&&) = delete; - LmdbStreamTest& operator=(LmdbStreamTest&&) = delete; - - ~LmdbStreamTest() override { - mdb_dbi_close(lmdb_env_, lmdb_handle_); - mdb_env_close(lmdb_env_); - } - - std::optional readValue(const std::string& key) { - MDB_val db_key{key.size(), const_cast(key.data())}; - MDB_val db_value{}; - - MDB_txn* txn = nullptr; - mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); - auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); - const auto result = mdb_get(txn, lmdb_handle_, &db_key, &db_value); - - if (result == MDB_SUCCESS) { return std::string(static_cast(db_value.mv_data), db_value.mv_size); } - return std::nullopt; + lmdb_wrapper_.initialize(db_path_, 100ULL * 1024 * 1024); // 100 MB } protected: std::string db_path_; - MDB_env* lmdb_env_{nullptr}; - MDB_dbi lmdb_handle_{}; + LmdbWrapper lmdb_wrapper_; }; namespace { -size_t writeString(io::LmdbStream& stream, const std::string& content) { +size_t writeString(LmdbStream& stream, const std::string& content) { return stream.write(reinterpret_cast(content.data()), content.size()); } @@ -98,10 +60,10 @@ TEST_CASE_METHOD(LmdbStreamTest, "Simple write tests") { } { - io::LmdbStream stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(stream, content))); } - auto val = readValue(db_path_); + auto val = lmdb_wrapper_.getValue(db_path_); REQUIRE(val.has_value()); CHECK(val->size() == content.size()); CHECK(*val == content); @@ -112,13 +74,13 @@ TEST_CASE_METHOD(LmdbStreamTest, "Multiple write test") { std::string expected_content; { - io::LmdbStream stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream stream(db_path_, lmdb_wrapper_, true); for (int i = 0; i < 5; ++i) { REQUIRE_FALSE(minifi::io::isError(writeString(stream, content))); expected_content += content; } } - auto val = readValue(db_path_); + auto val = lmdb_wrapper_.getValue(db_path_); REQUIRE(val.has_value()); CHECK(val->size() == expected_content.size()); CHECK(*val == expected_content); @@ -133,11 +95,11 @@ TEST_CASE_METHOD(LmdbStreamTest, "Simple read tests") { content = ""; } - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, content))); write_stream.close(); - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); std::vector buffer(content.size()); REQUIRE_FALSE(minifi::io::isError(read_stream.read(buffer))); REQUIRE(bytesToString(buffer, buffer.size()) == content); @@ -145,11 +107,11 @@ TEST_CASE_METHOD(LmdbStreamTest, "Simple read tests") { TEST_CASE_METHOD(LmdbStreamTest, "Read in chunks") { std::string content = "banana"; - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, content))); write_stream.close(); - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); std::vector buffer(2); REQUIRE_FALSE(minifi::io::isError(read_stream.read(buffer))); @@ -165,38 +127,38 @@ TEST_CASE_METHOD(LmdbStreamTest, "Read in chunks") { } TEST_CASE_METHOD(LmdbStreamTest, "Reading a nonexistent key returns STREAM_ERROR") { - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); std::vector buffer(8); REQUIRE(minifi::io::isError(read_stream.read(buffer))); } TEST_CASE_METHOD(LmdbStreamTest, "Reading after EOF returns zero") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "abc"))); write_stream.close(); - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); std::vector buffer(3); REQUIRE(read_stream.read(buffer) == 3); REQUIRE(read_stream.read(buffer) == 0); } TEST_CASE_METHOD(LmdbStreamTest, "Reading into an empty buffer returns zero") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "abc"))); write_stream.close(); - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); std::vector empty_buffer; REQUIRE(read_stream.read(empty_buffer) == 0); } TEST_CASE_METHOD(LmdbStreamTest, "seek and tell control read offset") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "banana"))); write_stream.close(); - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); read_stream.seek(2); REQUIRE(read_stream.tell() == 2); std::vector buffer(3); @@ -206,7 +168,7 @@ TEST_CASE_METHOD(LmdbStreamTest, "seek and tell control read offset") { } TEST_CASE_METHOD(LmdbStreamTest, "size reflects buffered writes before commit") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE(write_stream.size() == 0); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "hello"))); REQUIRE(write_stream.size() == 5); @@ -215,29 +177,29 @@ TEST_CASE_METHOD(LmdbStreamTest, "size reflects buffered writes before commit") } TEST_CASE_METHOD(LmdbStreamTest, "Writing to a read-only stream returns STREAM_ERROR") { - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); REQUIRE(minifi::io::isError(writeString(read_stream, "anything"))); } TEST_CASE_METHOD(LmdbStreamTest, "Writing nullptr with a non-zero length returns STREAM_ERROR") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE(minifi::io::isError(write_stream.write(static_cast(nullptr), 4))); } TEST_CASE_METHOD(LmdbStreamTest, "Writing zero bytes is a no-op without error") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); const std::array dummy{}; REQUIRE(write_stream.write(dummy.data(), 0) == 0); REQUIRE(write_stream.size() == 0); } TEST_CASE_METHOD(LmdbStreamTest, "commit on a read-only stream returns false") { - io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + LmdbStream read_stream(db_path_, lmdb_wrapper_, false); REQUIRE_FALSE(read_stream.commit()); } TEST_CASE_METHOD(LmdbStreamTest, "Repeated commit is a no-op after the first") { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "banana"))); REQUIRE(write_stream.commit()); REQUIRE_FALSE(write_stream.commit()); @@ -245,27 +207,27 @@ TEST_CASE_METHOD(LmdbStreamTest, "Repeated commit is a no-op after the first") { TEST_CASE_METHOD(LmdbStreamTest, "Destructor commits buffered writes") { { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "destroyed-write"))); // Stream goes out of scope here; the destructor must commit } - auto val = readValue(db_path_); + auto val = lmdb_wrapper_.getValue(db_path_); REQUIRE(val.has_value()); REQUIRE(*val == "destroyed-write"); } TEST_CASE_METHOD(LmdbStreamTest, "Reopening an existing key in write mode appends to existing value") { { - io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream write_stream(db_path_, lmdb_wrapper_, true); REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "first-"))); } - io::LmdbStream reopened_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + LmdbStream reopened_stream(db_path_, lmdb_wrapper_, true); REQUIRE(reopened_stream.size() == 6); REQUIRE_FALSE(minifi::io::isError(writeString(reopened_stream, "second"))); REQUIRE(reopened_stream.commit()); - auto val = readValue(db_path_); + auto val = lmdb_wrapper_.getValue(db_path_); REQUIRE(val.has_value()); REQUIRE(*val == "first-second"); } diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 491627d075..73c9519c2e 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -56,6 +56,7 @@ const std::unordered_map