Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core-framework/include/core/Repository.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class RepositoryImpl : public core::CoreComponentImpl, public core::RepositoryMe
return false;
}

bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/) override {
return true;
std::vector<std::shared_ptr<core::SerializableComponent>> getElements(size_t /*max_size*/) override {
return {};
}

bool storeElement(const std::shared_ptr<core::SerializableComponent>& element) override;
Expand Down
2 changes: 1 addition & 1 deletion extensions/grafana-loki/PushGrafanaLokiGrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ std::expected<void, std::string> PushGrafanaLokiGrpc::submitRequest(const std::v

for (const auto& flow_file : batched_flow_files) {
logproto::EntryAdapter *entry = stream->add_entries();
auto timestamp_str = std::to_string(flow_file->getlineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1));
auto timestamp_str = std::to_string(flow_file->getLineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1));
auto timestamp_nanos = std::stoll(timestamp_str);
*entry->mutable_timestamp() = google::protobuf::util::TimeUtil::NanosecondsToTimestamp(timestamp_nanos);

Expand Down
2 changes: 1 addition & 1 deletion extensions/grafana-loki/PushGrafanaLokiREST.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ std::string PushGrafanaLokiREST::createLokiJson(const std::vector<std::shared_pt
for (const auto& flow_file : batched_flow_files) {
rapidjson::Value log_line(rapidjson::kArrayType);

auto timestamp_str = std::to_string(flow_file->getlineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1));
auto timestamp_str = std::to_string(flow_file->getLineageStartDate().time_since_epoch() / std::chrono::nanoseconds(1));
rapidjson::Value timestamp;
timestamp.SetString(timestamp_str.c_str(), gsl::narrow<rapidjson::SizeType>(timestamp_str.length()), allocator);
rapidjson::Value log_line_value;
Expand Down
18 changes: 10 additions & 8 deletions extensions/rocksdb-repos/ProvenanceRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,30 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m
return true;
}

bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) {
std::vector<std::shared_ptr<core::SerializableComponent>> ProvenanceRepository::getElements(size_t max_size) {
if (max_size == 0) {
return {};
}
auto opendb = db_->open();
if (!opendb) {
return false;
return {};
}
Comment on lines 80 to 83
std::vector<std::shared_ptr<core::SerializableComponent>> records;
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(options));
size_t requested_batch = max_size;
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (max_size >= requested_batch)
break;
auto eventRead = ProvenanceEventRecord::create();
const auto slice = it->value();
io::BufferStream stream(std::as_bytes(std::span(slice.data(), slice.size())));
if (eventRead->deserialize(stream)) {
max_size++;
records.push_back(eventRead);
if (--max_size == 0) {
break;
}
}
}
return max_size > 0;
return records;
}

void ProvenanceRepository::destroy() {
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/ProvenanceRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ProvenanceRepository : public core::repository::RocksDbRepository {
// The repo is cleaned up by itself, there is no need to delete items.
return true;
}
bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) override;
std::vector<std::shared_ptr<core::SerializableComponent>> getElements(size_t max_size) override;

void destroy();

Expand Down
14 changes: 5 additions & 9 deletions extensions/rocksdb-repos/tests/ProvenanceTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
record1->setEventDuration(sample);

testRepository->storeElement(record1);
auto record2 = std::make_shared<provenance::ProvenanceEventRecordImpl>();
auto record2 = provenance::ProvenanceEventRecord::create();
record2->setEventId(eventId);
Comment on lines +55 to 56
REQUIRE(record2->loadFromRepository(testRepository) == true);
REQUIRE(record2->getEventId() == record1->getEventId());
Expand All @@ -77,15 +77,13 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
record1->setEventDuration(sample);

testRepository->storeElement(record1);
auto record2 = std::make_shared<provenance::ProvenanceEventRecordImpl>();
auto record2 = provenance::ProvenanceEventRecord::create();
record2->setEventId(eventId);
REQUIRE(record2->loadFromRepository(testRepository) == true);
REQUIRE(record1->getChildrenUuids().size() == 1);
REQUIRE(record2->getChildrenUuids().size() == 1);
utils::Identifier childId = record2->getChildrenUuids().at(0);
REQUIRE(childId == ffr1->getUUID());
record2->removeChildUuid(childId);
REQUIRE(record2->getChildrenUuids().empty());
}

TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
Expand All @@ -103,7 +101,7 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro
record1->setEventDuration(sample);

testRepository->storeElement(record1);
auto record2 = std::make_shared<provenance::ProvenanceEventRecordImpl>();
auto record2 = provenance::ProvenanceEventRecord::create();
record2->setEventId(eventId);
REQUIRE(record2->loadFromRepository(testRepository) == true);
REQUIRE(record2->getEventId() == record1->getEventId());
Expand All @@ -129,15 +127,13 @@ TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[Test
record1->setEventDuration(sample);

testRepository->storeElement(record1);
auto record2 = std::make_shared<provenance::ProvenanceEventRecordImpl>();
auto record2 = provenance::ProvenanceEventRecord::create();
record2->setEventId(eventId);
REQUIRE(record2->loadFromRepository(testRepository) == true);
REQUIRE(record1->getChildrenUuids().size() == 1);
REQUIRE(record2->getChildrenUuids().size() == 1);
utils::Identifier childId = record2->getChildrenUuids().at(0);
REQUIRE(childId == ffr1->getUUID());
record2->removeChildUuid(childId);
REQUIRE(record2->getChildrenUuids().empty());
}

TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
Expand All @@ -155,7 +151,7 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena
record1->setEventDuration(sample);

REQUIRE(testRepository->storeElement(record1));
auto record2 = std::make_shared<provenance::ProvenanceEventRecordImpl>();
auto record2 = provenance::ProvenanceEventRecord::create();
record2->setEventId(eventId);
REQUIRE(record2->loadFromRepository(testRepository) == false);
}
2 changes: 1 addition & 1 deletion extensions/standard-processors/processors/LogAttribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ std::string LogAttribute::generateLogMessage(core::ProcessSession& session, cons
message << "\nStandard FlowFile Attributes";
message << "\n" << "UUID:" << flow_file->getUUIDStr();
message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow_file->getEntryDate());
message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow_file->getlineageStartDate());
message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow_file->getLineageStartDate());
message << "\n" << "Size:" << flow_file->getSize() << " Offset:" << flow_file->getOffset();
message << "\nFlowFile Attributes Map Content";
for (const auto& [attr_key, attr_value] : flow_file->getAttributes()) {
Expand Down
26 changes: 11 additions & 15 deletions extensions/standard-processors/tests/unit/ProcessorTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,18 +444,18 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
REQUIRE(2 == repo->getRepoMap().size());

for (auto entry : repo->getRepoMap()) {
minifi::provenance::ProvenanceEventRecordImpl newRecord;
auto newRecord = minifi::provenance::ProvenanceEventRecordImpl::create();
minifi::io::BufferStream stream(std::as_bytes(std::span(entry.second)));
newRecord.deserialize(stream);
newRecord->deserialize(stream);

bool found = false;
for (const auto& provRec : records) {
if (provRec->getEventId() == newRecord.getEventId()) {
REQUIRE(provRec->getEventId() == newRecord.getEventId());
REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
REQUIRE(provRec->getDetails() == newRecord.getDetails());
REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
if (provRec->getEventId() == newRecord->getEventId()) {
REQUIRE(provRec->getEventId() == newRecord->getEventId());
REQUIRE(provRec->getComponentId() == newRecord->getComponentId());
REQUIRE(provRec->getComponentType() == newRecord->getComponentType());
REQUIRE(provRec->getDetails() == newRecord->getDetails());
REQUIRE(provRec->getEventDuration() == newRecord->getEventDuration());
found = true;
break;
}
Expand All @@ -466,19 +466,15 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
}
auto taskReport = &processorReport.get();
taskReport->setBatchSize(1);
std::vector<std::shared_ptr<core::SerializableComponent>> recordsReport;
recordsReport.push_back(std::make_shared<minifi::provenance::ProvenanceEventRecordImpl>());
processorReport->incrementActiveTasks();
processorReport->setScheduledState(core::ScheduledState::RUNNING);
std::string jsonStr;
std::size_t deserialized = 0;
repo->getElements(recordsReport, deserialized);
auto recordsReport = repo->getElements(1);
std::function<void(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession>&)> verifyReporter =
[&](const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
taskReport->getJsonReport(*context, *session, recordsReport, jsonStr);
auto json_str = taskReport->getJsonReport(*context, *session, recordsReport);
REQUIRE(recordsReport.size() == 1);
REQUIRE(taskReport->getName() == std::string(minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
REQUIRE(jsonStr.find("\"componentType\": \"getfileCreate2\"") != std::string::npos);
REQUIRE(json_str.find("\"componentType\": \"getfileCreate2\"") != std::string::npos);
};

testController.runSession(plan, false, verifyReporter);
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/FlowFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class FlowFileImpl : public CoreComponentImpl, public ReferenceContainerImpl, pu
* Get lineage start date
* @return lineage start date uint64_t
*/
[[nodiscard]] std::chrono::system_clock::time_point getlineageStartDate() const override;
[[nodiscard]] std::chrono::system_clock::time_point getLineageStartDate() const override;

/**
* Sets the lineage start date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessGroupPort
static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask";
static const char *ProvenanceAppStr;

static void getJsonReport(core::ProcessContext& context, core::ProcessSession& session, std::vector<std::shared_ptr<core::SerializableComponent>> &records, std::string &report); // NOLINT
static std::string getJsonReport(core::ProcessContext& context, core::ProcessSession& session, const std::vector<std::shared_ptr<core::SerializableComponent>> &records); // NOLINT

void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

void initialize() override;
Expand Down
Loading
Loading