diff --git a/core-framework/include/core/Repository.h b/core-framework/include/core/Repository.h index ee2d3a0403..7a817ae2ff 100644 --- a/core-framework/include/core/Repository.h +++ b/core-framework/include/core/Repository.h @@ -106,8 +106,8 @@ class RepositoryImpl : public core::CoreComponentImpl, public core::RepositoryMe return false; } - bool getElements(std::vector>& /*store*/, size_t& /*max_size*/) override { - return true; + std::vector> getElements(size_t /*max_size*/) override { + return {}; } bool storeElement(const std::shared_ptr& element) override; diff --git a/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp b/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp index 53428988f7..100d60d52d 100644 --- a/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp +++ b/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp @@ -116,7 +116,7 @@ std::expected PushGrafanaLokiGrpc::submitRequest(const std::v for (const auto& flow_file : batched_flow_files) { logproto::EntryAdapter *entry = stream->add_entries(); - auto timestamp_str = std::to_string(flow_file->getlineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1)); + auto timestamp_str = std::to_string(flow_file->getLineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1)); auto timestamp_nanos = std::stoll(timestamp_str); *entry->mutable_timestamp() = google::protobuf::util::TimeUtil::NanosecondsToTimestamp(timestamp_nanos); diff --git a/extensions/grafana-loki/PushGrafanaLokiREST.cpp b/extensions/grafana-loki/PushGrafanaLokiREST.cpp index fd01e15d4c..1a95918882 100644 --- a/extensions/grafana-loki/PushGrafanaLokiREST.cpp +++ b/extensions/grafana-loki/PushGrafanaLokiREST.cpp @@ -133,7 +133,7 @@ std::string PushGrafanaLokiREST::createLokiJson(const std::vectorgetlineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1)); + auto timestamp_str = std::to_string(flow_file->getLineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1)); rapidjson::Value timestamp; timestamp.SetString(timestamp_str.c_str(), gsl::narrow(timestamp_str.length()), allocator); rapidjson::Value log_line_value; diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index e14a97299e..5c380545f7 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -73,28 +73,30 @@ bool ProvenanceRepository::initialize(const std::shared_ptr> &records, size_t &max_size) { +std::vector> ProvenanceRepository::getElements(size_t max_size) { + if (max_size == 0) { + return {}; + } auto opendb = db_->open(); if (!opendb) { - return false; + return {}; } + std::vector> records; rocksdb::ReadOptions options; options.verify_checksums = verify_checksums_in_rocksdb_reads_; std::unique_ptr it(opendb->NewIterator(options)); - size_t requested_batch = max_size; - max_size = 0; for (it->SeekToFirst(); it->Valid(); it->Next()) { - if (max_size >= requested_batch) - break; auto eventRead = ProvenanceEventRecord::create(); const auto slice = it->value(); io::BufferStream stream(std::as_bytes(std::span(slice.data(), slice.size()))); if (eventRead->deserialize(stream)) { - max_size++; records.push_back(eventRead); + if (--max_size == 0) { + break; + } } } - return max_size > 0; + return records; } void ProvenanceRepository::destroy() { diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h index 3984c9911f..950ecd31cf 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.h +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -72,7 +72,7 @@ class ProvenanceRepository : public core::repository::RocksDbRepository { // The repo is cleaned up by itself, there is no need to delete items. return true; } - bool getElements(std::vector> &records, size_t &max_size) override; + std::vector> getElements(size_t max_size) override; void destroy(); diff --git a/extensions/rocksdb-repos/tests/ProvenanceTests.cpp b/extensions/rocksdb-repos/tests/ProvenanceTests.cpp index 95f1298835..fca1926bb3 100644 --- a/extensions/rocksdb-repos/tests/ProvenanceTests.cpp +++ b/extensions/rocksdb-repos/tests/ProvenanceTests.cpp @@ -52,7 +52,7 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv record1->setEventDuration(sample); testRepository->storeElement(record1); - auto record2 = std::make_shared(); + auto record2 = provenance::ProvenanceEventRecord::create(); record2->setEventId(eventId); REQUIRE(record2->loadFromRepository(testRepository) == true); REQUIRE(record2->getEventId() == record1->getEventId()); @@ -77,15 +77,13 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { record1->setEventDuration(sample); testRepository->storeElement(record1); - auto record2 = std::make_shared(); + auto record2 = provenance::ProvenanceEventRecord::create(); record2->setEventId(eventId); REQUIRE(record2->loadFromRepository(testRepository) == true); REQUIRE(record1->getChildrenUuids().size() == 1); REQUIRE(record2->getChildrenUuids().size() == 1); utils::Identifier childId = record2->getChildrenUuids().at(0); REQUIRE(childId == ffr1->getUUID()); - record2->removeChildUuid(childId); - REQUIRE(record2->getChildrenUuids().empty()); } TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { @@ -103,7 +101,7 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro record1->setEventDuration(sample); testRepository->storeElement(record1); - auto record2 = std::make_shared(); + auto record2 = provenance::ProvenanceEventRecord::create(); record2->setEventId(eventId); REQUIRE(record2->loadFromRepository(testRepository) == true); REQUIRE(record2->getEventId() == record1->getEventId()); @@ -129,15 +127,13 @@ TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[Test record1->setEventDuration(sample); testRepository->storeElement(record1); - auto record2 = std::make_shared(); + auto record2 = provenance::ProvenanceEventRecord::create(); record2->setEventId(eventId); REQUIRE(record2->loadFromRepository(testRepository) == true); REQUIRE(record1->getChildrenUuids().size() == 1); REQUIRE(record2->getChildrenUuids().size() == 1); utils::Identifier childId = record2->getChildrenUuids().at(0); REQUIRE(childId == ffr1->getUUID()); - record2->removeChildUuid(childId); - REQUIRE(record2->getChildrenUuids().empty()); } TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { @@ -155,7 +151,7 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena record1->setEventDuration(sample); REQUIRE(testRepository->storeElement(record1)); - auto record2 = std::make_shared(); + auto record2 = provenance::ProvenanceEventRecord::create(); record2->setEventId(eventId); REQUIRE(record2->loadFromRepository(testRepository) == false); } diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp index 66cd7c040a..4d8170569a 100644 --- a/extensions/standard-processors/processors/LogAttribute.cpp +++ b/extensions/standard-processors/processors/LogAttribute.cpp @@ -82,7 +82,7 @@ std::string LogAttribute::generateLogMessage(core::ProcessSession& session, cons message << "\nStandard FlowFile Attributes"; message << "\n" << "UUID:" << flow_file->getUUIDStr(); message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow_file->getEntryDate()); - message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow_file->getlineageStartDate()); + message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow_file->getLineageStartDate()); message << "\n" << "Size:" << flow_file->getSize() << " Offset:" << flow_file->getOffset(); message << "\nFlowFile Attributes Map Content"; for (const auto& [attr_key, attr_value] : flow_file->getAttributes()) { diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp index aa0e048ca5..6dd308c7c7 100644 --- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp +++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp @@ -444,18 +444,18 @@ TEST_CASE("Test Find file", "[getfileCreate3]") { REQUIRE(2 == repo->getRepoMap().size()); for (auto entry : repo->getRepoMap()) { - minifi::provenance::ProvenanceEventRecordImpl newRecord; + auto newRecord = minifi::provenance::ProvenanceEventRecordImpl::create(); minifi::io::BufferStream stream(std::as_bytes(std::span(entry.second))); - newRecord.deserialize(stream); + newRecord->deserialize(stream); bool found = false; for (const auto& provRec : records) { - if (provRec->getEventId() == newRecord.getEventId()) { - REQUIRE(provRec->getEventId() == newRecord.getEventId()); - REQUIRE(provRec->getComponentId() == newRecord.getComponentId()); - REQUIRE(provRec->getComponentType() == newRecord.getComponentType()); - REQUIRE(provRec->getDetails() == newRecord.getDetails()); - REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration()); + if (provRec->getEventId() == newRecord->getEventId()) { + REQUIRE(provRec->getEventId() == newRecord->getEventId()); + REQUIRE(provRec->getComponentId() == newRecord->getComponentId()); + REQUIRE(provRec->getComponentType() == newRecord->getComponentType()); + REQUIRE(provRec->getDetails() == newRecord->getDetails()); + REQUIRE(provRec->getEventDuration() == newRecord->getEventDuration()); found = true; break; } @@ -466,19 +466,15 @@ TEST_CASE("Test Find file", "[getfileCreate3]") { } auto taskReport = &processorReport.get(); taskReport->setBatchSize(1); - std::vector> recordsReport; - recordsReport.push_back(std::make_shared()); processorReport->incrementActiveTasks(); processorReport->setScheduledState(core::ScheduledState::RUNNING); - std::string jsonStr; - std::size_t deserialized = 0; - repo->getElements(recordsReport, deserialized); + auto recordsReport = repo->getElements(1); std::function &, const std::shared_ptr&)> verifyReporter = [&](const std::shared_ptr &context, const std::shared_ptr &session) { - taskReport->getJsonReport(*context, *session, recordsReport, jsonStr); + auto json_str = taskReport->getJsonReport(*context, *session, recordsReport); REQUIRE(recordsReport.size() == 1); REQUIRE(taskReport->getName() == std::string(minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName)); - REQUIRE(jsonStr.find("\"componentType\": \"getfileCreate2\"") != std::string::npos); + REQUIRE(json_str.find("\"componentType\": \"getfileCreate2\"") != std::string::npos); }; testController.runSession(plan, false, verifyReporter); diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h index 72f1aea2f1..931bb10559 100644 --- a/libminifi/include/core/FlowFile.h +++ b/libminifi/include/core/FlowFile.h @@ -115,7 +115,7 @@ class FlowFileImpl : public CoreComponentImpl, public ReferenceContainerImpl, pu * Get lineage start date * @return lineage start date uint64_t */ - [[nodiscard]] std::chrono::system_clock::time_point getlineageStartDate() const override; + [[nodiscard]] std::chrono::system_clock::time_point getLineageStartDate() const override; /** * Sets the lineage start date diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h index 182e6f15d3..29767ad600 100644 --- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -45,9 +45,8 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessGroupPort static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask"; static const char *ProvenanceAppStr; - static void getJsonReport(core::ProcessContext& context, core::ProcessSession& session, std::vector> &records, std::string &report); // NOLINT + static std::string getJsonReport(core::ProcessContext& context, core::ProcessSession& session, const std::vector> &records); // NOLINT - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 37e24f6809..5539dc5c2a 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -45,12 +45,12 @@ class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public public: static const char *ProvenanceEventTypeStr[REPLAY + 1]; - ProvenanceEventRecordImpl(ProvenanceEventType event, std::string componentId, std::string componentType); + ProvenanceEventRecordImpl(ProvenanceEventType event, utils::Identifier component_id, std::string component_type); - ProvenanceEventRecordImpl() - : core::SerializableComponentImpl(core::className()) { - _eventTime = std::chrono::system_clock::now(); - } + ProvenanceEventRecordImpl(const ProvenanceEventRecordImpl&) = delete; + ProvenanceEventRecordImpl(ProvenanceEventRecordImpl&&) = delete; + ProvenanceEventRecordImpl& operator=(const ProvenanceEventRecordImpl&) = delete; + ProvenanceEventRecordImpl& operator=(ProvenanceEventRecordImpl&&) = delete; ~ProvenanceEventRecordImpl() override = default; @@ -63,47 +63,47 @@ class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public } std::map getAttributes() const override { - return _attributes; + return attributes_; } uint64_t getFileSize() const override { - return _size; + return size_; } uint64_t getFileOffset() const override { - return _offset; + return offset_; } std::chrono::system_clock::time_point getFlowFileEntryDate() const override { - return _entryDate; + return entry_date_; } - std::chrono::system_clock::time_point getlineageStartDate() const override { - return _lineageStartDate; + std::chrono::system_clock::time_point getLineageStartDate() const override { + return lineage_start_date_; } std::chrono::system_clock::time_point getEventTime() const override { - return _eventTime; + return event_time_; } std::chrono::milliseconds getEventDuration() const override { - return _eventDuration; + return event_duration_; } void setEventDuration(std::chrono::milliseconds duration) override { - _eventDuration = duration; + event_duration_ = duration; } ProvenanceEventType getEventType() const override { - return _eventType; + return event_type_; } std::string getComponentId() const override { - return _componentId; + return component_id_; } std::string getComponentType() const override { - return _componentType; + return component_type_; } utils::Identifier getFlowFileUuid() const override { @@ -111,69 +111,61 @@ class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public } std::string getContentFullPath() const override { - return _contentFullPath; + return content_full_path; } std::vector getLineageIdentifiers() const override { - return _lineageIdentifiers; + return lineage_identifiers; } std::string getDetails() const override { - return _details; + return details_; } void setDetails(const std::string& details) override { - _details = details; + details_ = details; } std::string getTransitUri() override { - return _transitUri; + return transit_uri_; } void setTransitUri(const std::string& uri) override { - _transitUri = uri; + transit_uri_ = uri; } std::string getSourceSystemFlowFileIdentifier() const override { - return _sourceSystemFlowFileIdentifier; + return source_system_flow_file_identifier_; } void setSourceSystemFlowFileIdentifier(const std::string& identifier) override { - _sourceSystemFlowFileIdentifier = identifier; + source_system_flow_file_identifier_ = identifier; } std::vector getParentUuids() const override { - return _parentUuids; + return parent_uuids_; } void addParentUuid(const utils::Identifier& uuid) override { - if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end()) + if (std::find(parent_uuids_.begin(), parent_uuids_.end(), uuid) != parent_uuids_.end()) return; else - _parentUuids.push_back(uuid); + parent_uuids_.push_back(uuid); } void addParentFlowFile(const core::FlowFile& flow_file) override { addParentUuid(flow_file.getUUID()); } - void removeParentUuid(const utils::Identifier& uuid) override { - _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end()); - } - - void removeParentFlowFile(const core::FlowFile& flow_file) override { - removeParentUuid(flow_file.getUUID()); - } - std::vector getChildrenUuids() const override { - return _childrenUuids; + return children_uuids_; } void addChildUuid(const utils::Identifier& uuid) override { - if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end()) + if (std::find(children_uuids_.begin(), children_uuids_.end(), uuid) != children_uuids_.end()) return; else - _childrenUuids.push_back(uuid); + children_uuids_.push_back(uuid); } void addChildFlowFile(const core::FlowFile& flow_file) override { @@ -181,50 +173,42 @@ class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public return; } - void removeChildUuid(const utils::Identifier& uuid) override { - _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end()); - } - - void removeChildFlowFile(const core::FlowFile& flow_file) override { - removeChildUuid(flow_file.getUUID()); - } - std::string getAlternateIdentifierUri() const override { - return _alternateIdentifierUri; + return alternate_identifier_uri_; } void setAlternateIdentifierUri(const std::string& uri) override { - _alternateIdentifierUri = uri; + alternate_identifier_uri_ = uri; } std::string getRelationship() const override { - return _relationship; + return relationship_; } void setRelationship(const std::string& relation) override { - _relationship = relation; + relationship_ = relation; } std::string getSourceQueueIdentifier() const override { - return _sourceQueueIdentifier; + return source_queue_identifier_; } void setSourceQueueIdentifier(const std::string& identifier) override { - _sourceQueueIdentifier = identifier; + source_queue_identifier_ = identifier; } void fromFlowFile(const core::FlowFile& flow_file) override { - _entryDate = flow_file.getEntryDate(); - _lineageStartDate = flow_file.getlineageStartDate(); - _lineageIdentifiers = flow_file.getlineageIdentifiers(); + entry_date_ = flow_file.getEntryDate(); + lineage_start_date_ = flow_file.getLineageStartDate(); + lineage_identifiers = flow_file.getlineageIdentifiers(); flow_uuid_ = flow_file.getUUID(); - _attributes = flow_file.getAttributes(); - _size = flow_file.getSize(); - _offset = flow_file.getOffset(); + attributes_ = flow_file.getAttributes(); + size_ = flow_file.getSize(); + offset_ = flow_file.getOffset(); if (flow_file.getConnection()) - _sourceQueueIdentifier = flow_file.getConnection()->getName(); + source_queue_identifier_ = flow_file.getConnection()->getName(); if (flow_file.getResourceClaim()) { - _contentFullPath = flow_file.getResourceClaim()->getContentFullPath(); + content_full_path = flow_file.getResourceClaim()->getContentFullPath(); } } @@ -233,69 +217,69 @@ class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public bool loadFromRepository(const std::shared_ptr &repo) override; protected: - ProvenanceEventType _eventType; + ProvenanceEventType event_type_; // Date at which the event was created - std::chrono::system_clock::time_point _eventTime{}; + std::chrono::system_clock::time_point event_time_{}; // Date at which the flow file entered the flow - std::chrono::system_clock::time_point _entryDate{}; + std::chrono::system_clock::time_point entry_date_{}; // Date at which the origin of this flow file entered the flow - std::chrono::system_clock::time_point _lineageStartDate{}; - std::chrono::milliseconds _eventDuration{}; - std::string _componentId; - std::string _componentType; + std::chrono::system_clock::time_point lineage_start_date_{}; + std::chrono::milliseconds event_duration_{}; + std::string component_id_; + std::string component_type_; // Size in bytes of the data corresponding to this flow file - uint64_t _size = 0; + uint64_t size_ = 0; utils::Identifier flow_uuid_; - uint64_t _offset = 0; - std::string _contentFullPath; - std::map _attributes; + uint64_t offset_ = 0; + std::string content_full_path; + std::map attributes_; // UUID string for all parents - std::vector _lineageIdentifiers; - std::string _transitUri; - std::string _sourceSystemFlowFileIdentifier; - std::vector _parentUuids; - std::vector _childrenUuids; - std::string _details; - std::string _sourceQueueIdentifier; - std::string _relationship; - std::string _alternateIdentifierUri; + std::vector lineage_identifiers; + std::string transit_uri_; + std::string source_system_flow_file_identifier_; + std::vector parent_uuids_; + std::vector children_uuids_; + std::string details_; + std::string source_queue_identifier_; + std::string relationship_; + std::string alternate_identifier_uri_; private: - ProvenanceEventRecordImpl(const ProvenanceEventRecordImpl &parent); - ProvenanceEventRecordImpl &operator=(const ProvenanceEventRecordImpl &parent); static std::shared_ptr logger_; static std::shared_ptr id_generator_; }; class ProvenanceReporterImpl : public virtual ProvenanceReporter { public: - ProvenanceReporterImpl(std::shared_ptr repo, std::string componentId, std::string componentType) - : logger_(core::logging::LoggerFactory::getLogger()) { - _componentId = componentId; - _componentType = componentType; - repo_ = repo; - } + ProvenanceReporterImpl(std::shared_ptr repo, utils::Identifier component_id, std::string component_type) + : component_id_(component_id), + component_type_(std::move(component_type)), + logger_(core::logging::LoggerFactory::getLogger()), + repo_(std::move(repo)) {} + + ProvenanceReporterImpl(const ProvenanceReporterImpl&) = delete; + ProvenanceReporterImpl(ProvenanceReporterImpl&&) = delete; + ProvenanceReporterImpl& operator=(const ProvenanceReporterImpl&) = delete; + ProvenanceReporterImpl& operator=(ProvenanceReporterImpl&&) = delete; ~ProvenanceReporterImpl() override { clear(); } std::set> getEvents() const override { - return _events; + return events_; } void add(const std::shared_ptr &event) override { - _events.insert(event); + events_.insert(event); } void remove(const std::shared_ptr &event) override { - if (_events.find(event) != _events.end()) { - _events.erase(event); - } + events_.erase(event); } - void clear() override { - _events.clear(); + void clear() final { + events_.clear(); } void commit() override; @@ -317,23 +301,20 @@ class ProvenanceReporterImpl : public virtual ProvenanceReporter { return nullptr; } - auto event = std::make_shared(eventType, _componentId, _componentType); + auto event = std::make_shared(eventType, component_id_, component_type_); if (event) event->fromFlowFile(flow_file); return event; } - std::string _componentId; - std::string _componentType; + utils::Identifier component_id_; + std::string component_type_; private: std::shared_ptr logger_; - std::set> _events; + std::set> events_; std::shared_ptr repo_; - - ProvenanceReporterImpl(const ProvenanceReporterImpl &parent); - ProvenanceReporterImpl &operator=(const ProvenanceReporterImpl &parent); }; } // namespace org::apache::nifi::minifi::provenance diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index 7f0819a28b..0a19365a33 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -131,7 +131,7 @@ std::chrono::system_clock::time_point FlowFileImpl::getEventTime() const { return event_time_; } // ! Get Lineage Start Date -std::chrono::system_clock::time_point FlowFileImpl::getlineageStartDate() const { +std::chrono::system_clock::time_point FlowFileImpl::getLineageStartDate() const { return lineage_start_date_; } diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index bd59eeaaa3..31c6a6b29b 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -69,7 +69,7 @@ ProcessSessionImpl::ProcessSessionImpl(std::shared_ptr processCo stateManager_(process_context_->getStateManager()) { logger_->log_trace("ProcessSession created for {}", process_context_->getProcessor().getName()); auto repo = process_context_->getProvenanceRepository(); - provenance_report_ = std::make_shared(repo, process_context_->getProcessor().getName(), process_context_->getProcessor().getName()); + provenance_report_ = std::make_shared(repo, process_context_->getProcessor().getUUID(), process_context_->getProcessor().getName()); content_session_ = process_context_->getContentRepository()->createSession(); if (stateManager_ && !stateManager_->beginTransaction()) { @@ -113,7 +113,7 @@ std::shared_ptr ProcessSessionImpl::create(const core::FlowFile* } record->setAttribute(attribute.first, attribute.second); } - record->setLineageStartDate(parent->getlineageStartDate()); + record->setLineageStartDate(parent->getLineageStartDate()); record->setLineageIdentifiers(parent->getlineageIdentifiers()); record->getlineageIdentifiers().push_back(parent->getUUID()); } @@ -163,7 +163,7 @@ std::shared_ptr ProcessSessionImpl::cloneDuringTransfer(const co } record->setAttribute(attribute.first, attribute.second); } - record->setLineageStartDate(parent.getlineageStartDate()); + record->setLineageStartDate(parent.getLineageStartDate()); record->setLineageIdentifiers(parent.getlineageIdentifiers()); record->getlineageIdentifiers().push_back(parent.getUUID()); diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index 06ad9fe112..76d48f4270 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -135,9 +135,9 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .controller_services = {"controllerServices"}, .controller_service_properties = {"properties"}, .remote_process_group = {"remoteProcessGroups"}, - .provenance_reporting = {}, - .provenance_reporting_port_uuid = {}, - .provenance_reporting_batch_size = {}, + .provenance_reporting = {"provenanceReporting"}, + .provenance_reporting_port_uuid = {"portUuid"}, + .provenance_reporting_batch_size = {"batchSize"}, .funnels = {"funnels"}, .input_ports = {"inputPorts"}, .output_ports = {"outputPorts"}, diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index 363fddede1..6fb3d5d3c8 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -88,8 +88,7 @@ void appendJsonStr(const utils::SmallString& value, rapidjson::Value& parent, parent.PushBack(valueVal, alloc); } -void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext&, core::ProcessSession&, - std::vector> &records, std::string &report) { +std::string SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext&, core::ProcessSession&, const std::vector> &records) { rapidjson::Document array(rapidjson::kArrayType); rapidjson::Document::AllocatorType &alloc = array.GetAllocator(); @@ -106,7 +105,7 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext&, cor recordJson.AddMember("timestampMillis", int64_t{std::chrono::duration_cast(record->getEventTime().time_since_epoch()).count()}, alloc); recordJson.AddMember("durationMillis", int64_t{record->getEventDuration().count()}, alloc); - recordJson.AddMember("lineageStart", int64_t{std::chrono::duration_cast(record->getlineageStartDate().time_since_epoch()).count()}, alloc); + recordJson.AddMember("lineageStart", int64_t{std::chrono::duration_cast(record->getLineageStartDate().time_since_epoch()).count()}, alloc); recordJson.AddMember("entitySize", record->getFileSize(), alloc); recordJson.AddMember("entityOffset", record->getFileOffset(), alloc); @@ -148,24 +147,27 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext&, cor rapidjson::PrettyWriter writer(buffer); array.Accept(writer); - report = buffer.GetString(); -} - -void SiteToSiteProvenanceReportingTask::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) { + return buffer.GetString(); } void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { - logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger"); - std::vector> records; - logger_->log_debug("batch size {} records", batch_size_); - size_t deserialized = batch_size_; + auto* state_manager = context.getStateManager(); + if (!state_manager) { + logger_->log_error("Failed to get StateManager"); + context.yield(); + return; + } std::shared_ptr repo = context.getProvenanceRepository(); - if (!repo->getElements(records, deserialized) && deserialized == 0) { + if (!repo) { + throw minifi::Exception(ExceptionType::REPOSITORY_EXCEPTION, "Failed to retrieve provenance repository"); + } + auto records = repo->getElements(batch_size_); + if (records.empty()) { + logger_->log_debug("No new provenance records"); return; } - logger_->log_debug("Captured {} records", deserialized); - std::string jsonStr; - this->getJsonReport(context, session, records, jsonStr); + logger_->log_debug("Captured {} records", records.size()); + std::string jsonStr = getJsonReport(context, session, records); if (jsonStr.empty()) { return; } diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index d8ee5e7239..66bcff9763 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -40,12 +40,12 @@ std::shared_ptr ProvenanceEventRecordImpl::logger_ = core const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", // NOLINT(cppcoreguidelines-avoid-c-arrays) "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" }; -ProvenanceEventRecordImpl::ProvenanceEventRecordImpl(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType) +ProvenanceEventRecordImpl::ProvenanceEventRecordImpl(ProvenanceEventRecord::ProvenanceEventType event, utils::Identifier component_id, std::string component_type) : core::SerializableComponentImpl(core::className()), - _eventType(event), - _eventTime(std::chrono::system_clock::now()), - _componentId(std::move(componentId)), - _componentType(std::move(componentType)) { + event_type_(event), + event_time_(std::chrono::system_clock::now()), + component_id_(std::move(component_id.to_string())), + component_type_(std::move(component_type)) { } bool ProvenanceEventRecordImpl::loadFromRepository(const std::shared_ptr &repo) { @@ -70,9 +70,9 @@ bool ProvenanceEventRecordImpl::loadFromRepository(const std::shared_ptrlog_debug("NiFi Provenance retrieve event {} size {} eventType {} success", getUUIDStr(), stream.size(), magic_enum::enum_name(_eventType)); + logger_->log_debug("NiFi Provenance retrieve event {} size {} eventType {} success", getUUIDStr(), stream.size(), magic_enum::enum_name(event_type_)); } else { - logger_->log_debug("NiFi Provenance retrieve event {} size {} eventType {} fail", getUUIDStr(), stream.size(), magic_enum::enum_name(_eventType)); + logger_->log_debug("NiFi Provenance retrieve event {} size {} eventType {} fail", getUUIDStr(), stream.size(), magic_enum::enum_name(event_type_)); } return ret; @@ -80,79 +80,79 @@ bool ProvenanceEventRecordImpl::loadFromRepository(const std::shared_ptruuid_); + const auto ret = output_stream.write(uuid_); if (ret == 0 || io::isError(ret)) { return false; } } { - uint32_t eventType = this->_eventType; + uint32_t eventType = event_type_; const auto ret = output_stream.write(eventType); if (ret != 4) { return false; } } { - uint64_t event_time_ms = std::chrono::duration_cast(_eventTime.time_since_epoch()).count(); + uint64_t event_time_ms = std::chrono::duration_cast(event_time_.time_since_epoch()).count(); const auto ret = output_stream.write(event_time_ms); if (ret != 8) { return false; } } { - uint64_t entry_date_ms = std::chrono::duration_cast(_entryDate.time_since_epoch()).count(); + uint64_t entry_date_ms = std::chrono::duration_cast(entry_date_.time_since_epoch()).count(); const auto ret = output_stream.write(entry_date_ms); if (ret != 8) { return false; } } { - uint64_t event_duration_ms = this->_eventDuration.count(); + uint64_t event_duration_ms = event_duration_.count(); const auto ret = output_stream.write(event_duration_ms); if (ret != 8) { return false; } } { - uint64_t lineage_start_date_ms = std::chrono::duration_cast(_lineageStartDate.time_since_epoch()).count(); + uint64_t lineage_start_date_ms = std::chrono::duration_cast(lineage_start_date_.time_since_epoch()).count(); const auto ret = output_stream.write(lineage_start_date_ms); if (ret != 8) { return false; } } { - const auto ret = output_stream.write(this->_componentId); + const auto ret = output_stream.write(component_id_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = output_stream.write(this->_componentType); + const auto ret = output_stream.write(component_type_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = output_stream.write(this->flow_uuid_); + const auto ret = output_stream.write(flow_uuid_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = output_stream.write(this->_details); + const auto ret = output_stream.write(details_); if (ret == 0 || io::isError(ret)) { return false; } } // write flow attributes { - const auto numAttributes = gsl::narrow(this->_attributes.size()); + const auto numAttributes = gsl::narrow(attributes_.size()); const auto ret = output_stream.write(numAttributes); if (ret != 4) { return false; } } - for (const auto& itAttribute : _attributes) { + for (const auto& itAttribute : attributes_) { { const auto ret = output_stream.write(itAttribute.first); if (ret == 0 || io::isError(ret)) { @@ -167,71 +167,71 @@ bool ProvenanceEventRecordImpl::serialize(io::OutputStream& output_stream) { } } { - const auto ret = output_stream.write(this->_contentFullPath); + const auto ret = output_stream.write(content_full_path); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = output_stream.write(this->_size); + const auto ret = output_stream.write(size_); if (ret != 8) { return false; } } { - const auto ret = output_stream.write(this->_offset); + const auto ret = output_stream.write(offset_); if (ret != 8) { return false; } } { - const auto ret = output_stream.write(this->_sourceQueueIdentifier); + const auto ret = output_stream.write(source_queue_identifier_); if (ret == 0 || io::isError(ret)) { return false; } } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) { + if (event_type_ == ProvenanceEventRecord::FORK || event_type_ == ProvenanceEventRecord::CLONE || event_type_ == ProvenanceEventRecord::JOIN) { // write UUIDs { - const auto parent_uuids_count = gsl::narrow(this->_parentUuids.size()); + const auto parent_uuids_count = gsl::narrow(parent_uuids_.size()); const auto ret = output_stream.write(parent_uuids_count); if (ret != 4) { return false; } } - for (const auto& parentUUID : _parentUuids) { + for (const auto& parentUUID : parent_uuids_) { const auto ret = output_stream.write(parentUUID); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto children_uuids_count = gsl::narrow(this->_childrenUuids.size()); + const auto children_uuids_count = gsl::narrow(children_uuids_.size()); const auto ret = output_stream.write(children_uuids_count); if (ret != 4) { return false; } } - for (const auto& childUUID : _childrenUuids) { + for (const auto& childUUID : children_uuids_) { const auto ret = output_stream.write(childUUID); if (ret == 0 || io::isError(ret)) { return false; } } - } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { - const auto ret = output_stream.write(this->_transitUri); + } else if (event_type_ == ProvenanceEventRecord::SEND || event_type_ == ProvenanceEventRecord::FETCH) { + const auto ret = output_stream.write(transit_uri_); if (ret == 0 || io::isError(ret)) { return false; } - } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + } else if (event_type_ == ProvenanceEventRecord::RECEIVE) { { - const auto ret = output_stream.write(this->_transitUri); + const auto ret = output_stream.write(transit_uri_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = output_stream.write(this->_sourceSystemFlowFileIdentifier); + const auto ret = output_stream.write(source_system_flow_file_identifier_); if (ret == 0 || io::isError(ret)) { return false; } @@ -258,7 +258,7 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { } if (auto event_type_opt = magic_enum::enum_cast(eventType)) { - _eventType = *event_type_opt; + event_type_ = *event_type_opt; } else { return false; } @@ -269,7 +269,7 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { if (ret != 8) { return false; } - _eventTime = std::chrono::system_clock::time_point() + std::chrono::milliseconds(event_time_in_ms); + event_time_ = std::chrono::system_clock::time_point() + std::chrono::milliseconds(event_time_in_ms); } { @@ -278,7 +278,7 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { if (ret != 8) { return false; } - _entryDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(entry_date_in_ms); + entry_date_ = std::chrono::system_clock::time_point() + std::chrono::milliseconds(entry_date_in_ms); } { @@ -287,7 +287,7 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { if (ret != 8) { return false; } - _eventDuration = std::chrono::milliseconds(event_duration_ms); + event_duration_ = std::chrono::milliseconds(event_duration_ms); } { @@ -296,32 +296,32 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { if (ret != 8) { return false; } - _lineageStartDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(lineage_start_date_in_ms); + lineage_start_date_ = std::chrono::system_clock::time_point() + std::chrono::milliseconds(lineage_start_date_in_ms); } { - const auto ret = input_stream.read(this->_componentId); + const auto ret = input_stream.read(component_id_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = input_stream.read(this->_componentType); + const auto ret = input_stream.read(component_type_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = input_stream.read(this->flow_uuid_); + const auto ret = input_stream.read(flow_uuid_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = input_stream.read(this->_details); + const auto ret = input_stream.read(details_); if (ret == 0 || io::isError(ret)) { return false; } @@ -351,38 +351,38 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { return false; } } - this->_attributes[key] = value; + attributes_[key] = value; } { - const auto ret = input_stream.read(this->_contentFullPath); + const auto ret = input_stream.read(content_full_path); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = input_stream.read(this->_size); + const auto ret = input_stream.read(size_); if (ret != 8) { return false; } } { - const auto ret = input_stream.read(this->_offset); + const auto ret = input_stream.read(offset_); if (ret != 8) { return false; } } { - const auto ret = input_stream.read(this->_sourceQueueIdentifier); + const auto ret = input_stream.read(source_queue_identifier_); if (ret == 0 || io::isError(ret)) { return false; } } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) { + if (event_type_ == ProvenanceEventRecord::FORK || event_type_ == ProvenanceEventRecord::CLONE || event_type_ == ProvenanceEventRecord::JOIN) { // read UUIDs uint32_t number = 0; { @@ -400,7 +400,7 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { return false; } } - this->addParentUuid(parentUUID); + addParentUuid(parentUUID); } number = 0; { @@ -417,24 +417,24 @@ bool ProvenanceEventRecordImpl::deserialize(io::InputStream &input_stream) { return false; } } - this->addChildUuid(childUUID); + addChildUuid(childUUID); } - } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { + } else if (event_type_ == ProvenanceEventRecord::SEND || event_type_ == ProvenanceEventRecord::FETCH) { { - const auto ret = input_stream.read(this->_transitUri); + const auto ret = input_stream.read(transit_uri_); if (ret == 0 || io::isError(ret)) { return false; } } - } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + } else if (event_type_ == ProvenanceEventRecord::RECEIVE) { { - const auto ret = input_stream.read(this->_transitUri); + const auto ret = input_stream.read(transit_uri_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = input_stream.read(this->_sourceSystemFlowFileIdentifier); + const auto ret = input_stream.read(source_system_flow_file_identifier_); if (ret == 0 || io::isError(ret)) { return false; } @@ -456,7 +456,7 @@ void ProvenanceReporterImpl::commit() { std::vector>> flowData; - for (auto& event : _events) { + for (auto& event : events_) { auto stramptr = std::make_unique(); event->serialize(*stramptr); @@ -509,7 +509,6 @@ void ProvenanceReporterImpl::clone(const core::FlowFile& parent, const core::Flo if (event) { event->addChildFlowFile(child); - event->addParentFlowFile(parent); add(event); } } @@ -577,7 +576,7 @@ void ProvenanceReporterImpl::fetch(const core::FlowFile& flow_file, const std::s } std::shared_ptr ProvenanceEventRecord::create() { - return std::make_shared(); + return std::make_shared(ProvenanceEventType::CLONE, utils::Identifier{}, ""); } } // namespace org::apache::nifi::minifi::provenance diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h b/libminifi/test/libtest/unit/ProvenanceTestHelper.h index 0d0407a9cd..03d6e0c328 100644 --- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h +++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h @@ -34,6 +34,7 @@ #include "properties/Configure.h" #include "minifi-cpp/SwapManager.h" #include "io/BufferStream.h" +#include "minifi-cpp/provenance/Provenance.h" using namespace std::literals::chrono_literals; @@ -88,19 +89,23 @@ class TestRepositoryBase : public T_BaseRepository { } } - bool getElements(std::vector> &store, size_t &max_size) override { + std::vector> getElements(size_t max_size) override { + if (max_size == 0) { + return {}; + } + std::vector> store; std::lock_guard lock{repository_results_mutex_}; - max_size = 0; for (const auto &entry : repository_results_) { - if (max_size >= store.size()) { - break; - } - const auto eventRead = store.at(max_size); + const auto eventRead = org::apache::nifi::minifi::provenance::ProvenanceEventRecord::create(); org::apache::nifi::minifi::io::BufferStream stream(std::as_bytes(std::span(entry.second))); - eventRead->deserialize(stream); - ++max_size; + if (eventRead->deserialize(stream)) { + store.push_back(std::move(eventRead)); + if (--max_size == 0) { + break; + } + } } - return true; + return store; } std::map getRepoMap() const { diff --git a/minifi-api/include/minifi-cpp/core/FlowFile.h b/minifi-api/include/minifi-cpp/core/FlowFile.h index 811f9e8a48..951829b45f 100644 --- a/minifi-api/include/minifi-cpp/core/FlowFile.h +++ b/minifi-api/include/minifi-cpp/core/FlowFile.h @@ -54,7 +54,7 @@ class FlowFile : public virtual CoreComponent, public virtual ReferenceContainer virtual void setDeleted(bool deleted) = 0; [[nodiscard]] virtual std::chrono::system_clock::time_point getEntryDate() const = 0; [[nodiscard]] virtual std::chrono::system_clock::time_point getEventTime() const = 0; - [[nodiscard]] virtual std::chrono::system_clock::time_point getlineageStartDate() const = 0; + [[nodiscard]] virtual std::chrono::system_clock::time_point getLineageStartDate() const = 0; virtual void setLineageStartDate(std::chrono::system_clock::time_point date) = 0; virtual void setLineageIdentifiers(const std::vector& lineage_Identifiers) = 0; virtual void setAttribute(std::string_view name, std::string value) = 0; diff --git a/minifi-api/include/minifi-cpp/core/Repository.h b/minifi-api/include/minifi-cpp/core/Repository.h index f2177f00c7..33b5c3747a 100644 --- a/minifi-api/include/minifi-cpp/core/Repository.h +++ b/minifi-api/include/minifi-cpp/core/Repository.h @@ -54,7 +54,7 @@ class Repository : public virtual core::CoreComponent, public virtual core::Repo virtual bool Get(const std::string& /*key*/, std::string& /*value*/) = 0; - virtual bool getElements(std::vector>& /*store*/, size_t& /*max_size*/) = 0; + virtual std::vector> getElements(size_t max_size) = 0; virtual bool storeElement(const std::shared_ptr& element) = 0; diff --git a/minifi-api/include/minifi-cpp/provenance/Provenance.h b/minifi-api/include/minifi-cpp/provenance/Provenance.h index ef5c5a9b5f..ef396958a0 100644 --- a/minifi-api/include/minifi-cpp/provenance/Provenance.h +++ b/minifi-api/include/minifi-cpp/provenance/Provenance.h @@ -154,7 +154,7 @@ class ProvenanceEventRecord : public virtual core::SerializableComponent { virtual uint64_t getFileSize() const = 0; virtual uint64_t getFileOffset() const = 0; virtual std::chrono::system_clock::time_point getFlowFileEntryDate() const = 0; - virtual std::chrono::system_clock::time_point getlineageStartDate() const = 0; + virtual std::chrono::system_clock::time_point getLineageStartDate() const = 0; virtual std::chrono::system_clock::time_point getEventTime() const = 0; virtual std::chrono::milliseconds getEventDuration() const = 0; virtual void setEventDuration(std::chrono::milliseconds duration) = 0; @@ -173,13 +173,9 @@ class ProvenanceEventRecord : public virtual core::SerializableComponent { virtual std::vector getParentUuids() const = 0; virtual void addParentUuid(const utils::Identifier& uuid) = 0; virtual void addParentFlowFile(const core::FlowFile& flow_file) = 0; - virtual void removeParentUuid(const utils::Identifier& uuid) = 0; - virtual void removeParentFlowFile(const core::FlowFile& flow_file) = 0; virtual std::vector getChildrenUuids() const = 0; virtual void addChildUuid(const utils::Identifier& uuid) = 0; virtual void addChildFlowFile(const core::FlowFile& flow_file) = 0; - virtual void removeChildUuid(const utils::Identifier& uuid) = 0; - virtual void removeChildFlowFile(const core::FlowFile& flow_file) = 0; virtual std::string getAlternateIdentifierUri() const = 0; virtual void setAlternateIdentifierUri(const std::string& uri) = 0; virtual std::string getRelationship() const = 0;