From 5cdba6273fa11d380aa7c005c42e8a72d84dd1b8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:44:32 +0200 Subject: [PATCH 01/11] Add call unified reusable workflow --- .github/workflows/ci-downstream.yml | 24 ++++++++++++++++++++++++ CMakeLists.txt | 4 ++++ CMakePresets.json | 14 ++++++++++++++ 3 files changed, 42 insertions(+) create mode 100644 .github/workflows/ci-downstream.yml diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml new file mode 100644 index 00000000..623f4b10 --- /dev/null +++ b/.github/workflows/ci-downstream.yml @@ -0,0 +1,24 @@ +name: CI Downstream - MQTT Streaming + +on: + workflow_dispatch: + inputs: + opendaq-ref: + description: "openDAQ SDK commit, branch or tag" + required: false + default: "" + pull_request: + +jobs: + call-opendaq-reusable: + name: MQTT Streaming + uses: openDAQ/openDAQ-CI/.github/workflows/reusable.yml@jira/TBBAS-3031-reusable-ci + with: + opendaq-ref: ${{ github.event.inputs.opendaq-ref || '' }} + cmake-presets: > + [ + { + "configure-preset": "module", + "test-preset": "module-test" + } + ] diff --git a/CMakeLists.txt b/CMakeLists.txt index e8075729..50cd37ed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,10 @@ option(${REPO_OPTION_PREFIX}_ENABLE_SSL "Enable building with openSSL" OFF) opendaq_common_compile_targets_settings() opendaq_setup_compiler_flags(${REPO_OPTION_PREFIX}) +if (CMAKE_CXX_COMPILER_ID MATCHES "Clang|AppleClang" AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 10) + add_compile_options(-Wno-unknown-warning-option) +endif() + if (${REPO_OPTION_PREFIX}_ENABLE_TESTS) message(STATUS "Unit tests in ${REPO_NAME} are ENABLED") enable_testing() diff --git a/CMakePresets.json b/CMakePresets.json index 25d76765..6bc5cf45 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -178,6 +178,20 @@ "clang", "ninja" ] + }, + { + "name": "module", + "hidden": true, + "cacheVariables": { + "DAQMODULES_MQTT_ENABLE_TESTS": "ON" + } + } + ], + "testPresets": [ + { + "name": "module-test", + "hidden": true, + "configurePreset": "module" } ] } From c913cf1bba7ee5108f1a01e771975ce0ac00200d Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:44:32 +0200 Subject: [PATCH 02/11] Run test only for ubuntu --- .github/workflows/ci-downstream.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index 623f4b10..0e7462bd 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -15,10 +15,20 @@ jobs: uses: openDAQ/openDAQ-CI/.github/workflows/reusable.yml@jira/TBBAS-3031-reusable-ci with: opendaq-ref: ${{ github.event.inputs.opendaq-ref || '' }} + packages: > + [ + { + "match-jobs": ["ubuntu-*"], + "apt-install": ["mosquitto"] + } + ] cmake-presets: > [ { - "configure-preset": "module", + "configure-preset": "module" + }, + { + "match-jobs": ["ubuntu-*"], "test-preset": "module-test" } ] From cdf54e894506f7e53f24d18d06448cd07fcc9006 Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:44:32 +0200 Subject: [PATCH 03/11] Run mosquitto after install on Ubuntu --- .github/workflows/ci-downstream.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index 0e7462bd..1b2a8967 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -19,7 +19,8 @@ jobs: [ { "match-jobs": ["ubuntu-*"], - "apt-install": ["mosquitto"] + "apt-install": ["mosquitto"], + "run": "mosquitto -d" } ] cmake-presets: > From 660b3cea7915c1cb6365a8b1f99c580672914ad3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:44:32 +0200 Subject: [PATCH 04/11] Investigate mosquitto tests on Windows --- .github/workflows/ci-downstream.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index 1b2a8967..21a59540 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -21,15 +21,21 @@ jobs: "match-jobs": ["ubuntu-*"], "apt-install": ["mosquitto"], "run": "mosquitto -d" + }, + { + "match-jobs": ["windows-*"], + "choco-install": ["mosquitto"], + "run": "net start mosquitto" } ] cmake-presets: > [ { - "configure-preset": "module" + "configure-preset": "module", + "test-preset": "module-test" }, { - "match-jobs": ["ubuntu-*"], - "test-preset": "module-test" + "match-jobs": ["macos-*"], + "test-preset": "" } ] From d44bb3a4a534145c410780a7d98271fc0cf08daf Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:44:32 +0200 Subject: [PATCH 05/11] Install mosquitto via winget --- .github/workflows/ci-downstream.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index 21a59540..d76b77c0 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -24,8 +24,7 @@ jobs: }, { "match-jobs": ["windows-*"], - "choco-install": ["mosquitto"], - "run": "net start mosquitto" + "run": "winget install --id EclipseFoundation.Mosquitto --accept-source-agreements --accept-package-agreements && net start mosquitto" } ] cmake-presets: > From 82a9acaee55df6b01a514c739f0c0f789e7199ef Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:44:32 +0200 Subject: [PATCH 06/11] Mosquitto is run by winget --- .github/workflows/ci-downstream.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index d76b77c0..786addb1 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -24,7 +24,7 @@ jobs: }, { "match-jobs": ["windows-*"], - "run": "winget install --id EclipseFoundation.Mosquitto --accept-source-agreements --accept-package-agreements && net start mosquitto" + "run": "winget install --id EclipseFoundation.Mosquitto --accept-source-agreements --accept-package-agreements" } ] cmake-presets: > From c51d05d809bc465790cdcaac45105833e6973b92 Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 18:08:28 +0200 Subject: [PATCH 07/11] Fix GCC-7 false unused compile time check --- modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp b/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp index 028d5bc2..84c01c3c 100644 --- a/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp +++ b/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp @@ -451,7 +451,7 @@ class MqttPublisherFbHelper : public DaqTestHelper } template - static std::string valueToString(const vT& value, bool quoteString = true) + static std::string valueToString(const vT& value, [[maybe_unused]] bool quoteString = true) { std::string result; if constexpr (std::is_same_v) From d8d94c45be296c5e8543741391f44958a783419a Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Thu, 9 Apr 2026 17:17:37 +0200 Subject: [PATCH 08/11] Add mosquitto for macOS --- .github/workflows/ci-downstream.yml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index 786addb1..9907f257 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -24,7 +24,12 @@ jobs: }, { "match-jobs": ["windows-*"], - "run": "winget install --id EclipseFoundation.Mosquitto --accept-source-agreements --accept-package-agreements" + "winget-install": ["EclipseFoundation.Mosquitto"] + }, + { + "match-jobs": ["macos-*"], + "brew-install": ["mosquitto"], + "run": "$(brew --prefix mosquitto)/sbin/mosquitto -d" } ] cmake-presets: > @@ -32,9 +37,5 @@ jobs: { "configure-preset": "module", "test-preset": "module-test" - }, - { - "match-jobs": ["macos-*"], - "test-preset": "" } ] From cffe1cde8687591e7995845c2f9b45955602a3ee Mon Sep 17 00:00:00 2001 From: viacheslauK Date: Wed, 15 Apr 2026 13:38:03 +0300 Subject: [PATCH 09/11] MQTT subscriber fb: test fixes (#16) --- .../mqtt_subscriber_fb_impl.h | 1 + .../src/mqtt_subscriber_fb_impl.cpp | 3 +- .../tests/test_mqtt_subscriber_fb.cpp | 133 +++++++++--------- 3 files changed, 72 insertions(+), 65 deletions(-) diff --git a/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h b/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h index f6988dbe..04c61bd3 100644 --- a/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h +++ b/modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h @@ -86,6 +86,7 @@ class MqttSubscriberFbImpl final : public FunctionBlock std::mutex queueMutex; std::condition_variable queueCv; mutable std::recursive_mutex processingMutex; + std::mutex componentStatusMutex; DAQ_MQTT_STREAM_MODULE_API void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg); diff --git a/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp b/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp index e389ed3f..203d2c2e 100644 --- a/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp +++ b/modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp @@ -101,6 +101,8 @@ void MqttSubscriberFbImpl::updateStatuses() if (!statuses->isUpdated()) return; + std::scoped_lock lock(componentStatusMutex); + if (!jsonConfigErr.ok()) { setComponentStatusWithMessage(ComponentStatus::Error, jsonConfigErr.buildStatusMessage()); @@ -508,7 +510,6 @@ void MqttSubscriberFbImpl::processMessageImpl(const mqtt::MqttMessage& msg, cons decoderFb->processMessage(jsonObjStr, epochTime); } } - updateStatuses(); } void MqttSubscriberFbImpl::processingLoop() diff --git a/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp b/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp index 57a9306c..ac27cf9b 100644 --- a/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp +++ b/modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp @@ -1,7 +1,6 @@ #include "mqtt_streaming_module/mqtt_subscriber_fb_impl.h" #include "test_daq_test_helper.h" #include "test_data.h" -#include #include #include #include @@ -72,6 +71,35 @@ class MqttSubscriberFbHelper fb->updateStatuses(); return dp; } + + bool waitStatusTheSame(size_t ms, daq::FunctionBlockPtr fb, EnumerationPtr status) { + helper::utils::Timer timer(ms); + auto getComponentStatus = [&fb]() { return fb.getStatusContainer().getStatus("ComponentStatus"); }; + bool result = true; + while (!timer.expired()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + result = (getComponentStatus() == status); + if (result == false) + break; + } + return result; + } + + bool waitStatusChange(size_t ms, daq::FunctionBlockPtr fb, EnumerationPtr status) { + helper::utils::Timer timer(ms); + auto getComponentStatus = [&fb]() { return fb.getStatusContainer().getStatus("ComponentStatus"); }; + bool result = false; + while (!timer.expired()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + result = (getComponentStatus() == status); + if (result == true) + break; + } + return result; + + } }; class MqttSubscriberFbTest : public testing::Test, public DaqTestHelper, public MqttSubscriberFbHelper @@ -185,8 +213,7 @@ TEST_F(MqttSubscriberFbTest, CreationWithDefaultConfig) daq::FunctionBlockPtr subFb; ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME)); EXPECT_EQ(subFb.getSignals(daq::search::Any()).getCount(), 0u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); } TEST_F(MqttSubscriberFbTest, CreationWithPartialConfig) @@ -198,8 +225,7 @@ TEST_F(MqttSubscriberFbTest, CreationWithPartialConfig) config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, String(buildTopicName()))); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getSignals(daq::search::Any()).getCount(), 0u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } TEST_F(MqttSubscriberFbTest, CreationWithCustomConfig) @@ -213,8 +239,7 @@ TEST_F(MqttSubscriberFbTest, CreationWithCustomConfig) config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL_TS_MODE, static_cast(SDSM::SystemTime)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getSignals(daq::search::Any()).getCount(), 2u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } TEST_F(MqttSubscriberFbTest, PreviewSignal) @@ -261,8 +286,7 @@ TEST_F(MqttSubscriberFbTest, SubscriptionStatusWaitingForData) config.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, buildTopicName()); daq::FunctionBlockPtr subFb; ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } TEST_P(MqttSubscriberFbTopicPTest, CheckSubscriberFbTopic) @@ -279,8 +303,7 @@ TEST_P(MqttSubscriberFbTopicPTest, CheckSubscriberFbTopic) auto signals = fb.getSignals(); ASSERT_EQ(signals.getCount(), 1); const auto expectedComponentStatus = result ? "Warning" : "Error"; - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", expectedComponentStatus, daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", expectedComponentStatus, daqInstance.getContext().getTypeManager()))); } INSTANTIATE_TEST_SUITE_P(TopicTest, @@ -323,16 +346,14 @@ TEST_F(MqttSubscriberFbTest, TwoFbCreation) auto config = PropertyObject(); config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, buildTopicName("0"))); ASSERT_NO_THROW(fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } { daq::FunctionBlockPtr fb; auto config = PropertyObject(); config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, buildTopicName("1"))); ASSERT_NO_THROW(fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } auto fbs = clientMqttFb.getFunctionBlocks(); ASSERT_EQ(fbs.getCount(), 2u); @@ -347,8 +368,7 @@ TEST_F(MqttSubscriberFbTest, PropertyChanged) auto topic = buildTopicName("0"); config.addProperty(StringProperty(PROPERTY_NAME_SUB_TOPIC, topic)); ASSERT_NO_THROW(fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, fb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); auto subFb = reinterpret_cast(*fb); ASSERT_EQ(topic, subFb->getSubscribedTopic()); @@ -365,8 +385,7 @@ TEST_F(MqttSubscriberFbTest, JsonInit0) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG, String(VALID_JSON_1_TOPIC_0)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); ASSERT_EQ(subFb.getFunctionBlocks().getCount(), 3u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); auto lambda = [&](FunctionBlockPtr nestedFb, std::string value, std::string ts, std::string symbol) { EXPECT_EQ(nestedFb.getSignals()[0].getName().toStdString(), value); @@ -391,8 +410,7 @@ TEST_F(MqttSubscriberFbTest, JsonInit1) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG, String(VALID_JSON_1_TOPIC_1)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); ASSERT_EQ(subFb.getFunctionBlocks().getCount(), 3u); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); auto lambda = [&](FunctionBlockPtr nestedFb, std::string value, std::string ts, std::string symbol) { EXPECT_EQ(nestedFb.getSignals()[0].getName().toStdString(), value); @@ -418,8 +436,8 @@ TEST_P(MqttSubscriberFbConfigPTest, JsonWrongInit) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG, String(configJson)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getFunctionBlocks().getCount(), 0u); - EXPECT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); + ASSERT_TRUE(waitStatusTheSame(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); EXPECT_EQ(subFb.getPropertyValue(PROPERTY_NAME_SUB_TOPIC).asPtr().toStdString(), ""); } @@ -441,8 +459,7 @@ TEST_P(MqttSubscriberFbConfigFilePTest, JsonInitFromFile) auto config = clientMqttFb.getAvailableFunctionBlockTypes().get(SUB_FB_NAME).createDefaultConfig(); config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG_FILE, String(configJson)); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); } INSTANTIATE_TEST_SUITE_P(JsonConfigTest, @@ -459,8 +476,7 @@ TEST_F(MqttSubscriberFbTest, JsonInitFromFileWithChecking) auto config = clientMqttFb.getAvailableFunctionBlockTypes().get(SUB_FB_NAME).createDefaultConfig(); config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG_FILE, String("data/public-example0.json")); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); - ASSERT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); ASSERT_EQ(subFb.getFunctionBlocks().getCount(), 3u); auto lambda = [&](FunctionBlockPtr nestedFb, std::string value, std::string ts, std::string symbol) { @@ -486,8 +502,8 @@ TEST_F(MqttSubscriberFbTest, JsonInitFromFileWrongPath) config.setPropertyValue(PROPERTY_NAME_SUB_JSON_CONFIG_FILE, String("/justWrongPath/wrongFile.txt")); ASSERT_NO_THROW(subFb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config)); EXPECT_EQ(subFb.getFunctionBlocks().getCount(), 0u); - EXPECT_EQ(subFb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_TRUE(waitStatusChange(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); + ASSERT_TRUE(waitStatusTheSame(1500, subFb, Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()))); EXPECT_EQ(subFb.getPropertyValue(PROPERTY_NAME_SUB_TOPIC).asPtr().toStdString(), ""); } @@ -568,39 +584,39 @@ TEST_F(MqttSubscriberFbTest, WaitingData) auto rawFB = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); MqttAsyncClientWrapper publisher("testPublisherId"); ASSERT_TRUE(publisher.connect("127.0.0.1")); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[0], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(waitStatusChange(1500, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(waitStatusChange(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[1], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusChange(1500, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[2], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusTheSame(150, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_TRUE(waitStatusChange(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(waitStatusTheSame(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); rawFB.setPropertyValue(PROPERTY_NAME_SUB_DATA_TIMEOUT, 0); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(waitStatusTheSame(600, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); ASSERT_TRUE(publisher.publishMsg(mqtt::MqttMessage{topic, dataToSend[3], 1, 0})); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(waitStatusChange(1500, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(waitStatusTheSame(600, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received"), std::string::npos); } @@ -672,6 +688,7 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring) MqttAsyncClientWrapper publisher("testPublisherId"); ASSERT_TRUE(publisher.connect("127.0.0.1")); + ASSERT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); mqtt::MqttMessage msg = {topic0, dataToSend[0], 2, 0}; @@ -695,14 +712,9 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring) } } }; - helper::utils::Timer tmr(1000, true); - - bool hasData = false; - while (tmr.expired() == false && hasData == false) - hasData = (rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); - - EXPECT_TRUE(hasData); + EXPECT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); + EXPECT_TRUE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); readerLambda(); ASSERT_EQ(dataToReceive.size(), 1u); @@ -711,19 +723,14 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring) dataToReceive.clear(); ASSERT_NO_THROW(rawFB.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, topic1)); - EXPECT_EQ(rawFB.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager())); + EXPECT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()))); EXPECT_NE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Waiting for data"), std::string::npos); msg = {topic1, dataToSend[1], 2, 0}; ASSERT_TRUE(publisher.publishMsg(msg)); - tmr.restart(); - - hasData = false; - while (tmr.expired() == false && hasData == false) - hasData = (rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); - EXPECT_TRUE(hasData); + EXPECT_TRUE(waitStatusChange(1000, rawFB, Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()))); + EXPECT_TRUE(rawFB.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Data has been received") != std::string::npos); readerLambda(); ASSERT_EQ(dataToReceive.size(), 1u); @@ -742,44 +749,42 @@ TEST_F(MqttSubscriberFbTest, DomainDataPacketWithTheSameTS) config.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, topic); config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL, True); config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL_TS_MODE, static_cast(SDSM::SystemTime)); + config.setPropertyValue(PROPERTY_NAME_SUB_DATA_TIMEOUT, 0); auto fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config); auto getTime = []() { return duration_cast(system_clock::now().time_since_epoch()).count(); }; - auto getComponentStatus = [&]() { return fb.getStatusContainer().getStatus("ComponentStatus"); }; auto getStatusMsg = [&]() { return fb.getStatusContainer().getStatusMessage("ComponentStatus").toStdString(); }; + const auto warning = Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager()); const auto ok = Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()); const auto ts = getTime(); auto packet = createDomainDataPacket(fb, ts); - - ASSERT_EQ(getComponentStatus(), ok); + ASSERT_TRUE(waitStatusTheSame(1500, fb, ok)); packet = createDomainDataPacket(fb, ts); - - EXPECT_EQ(getComponentStatus(), warning); + ASSERT_TRUE(waitStatusChange(1500, fb, warning)); EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos); packet = createDomainDataPacket(fb, getTime()); - - EXPECT_EQ(getComponentStatus(), warning); + ASSERT_TRUE(waitStatusTheSame(1500, fb, warning)); EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos); // reconfiguring should reset warning fb.setPropertyValue(PROPERTY_NAME_SUB_QOS, 2); - EXPECT_EQ(getComponentStatus(), warning); + ASSERT_TRUE(waitStatusTheSame(1500, fb, warning)); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); EXPECT_NE(getStatusMsg().find("Waiting for data"), std::string::npos); packet = createDomainDataPacket(fb, getTime()); - EXPECT_EQ(getComponentStatus(), ok); + ASSERT_TRUE(waitStatusChange(1500, fb, ok)); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); packet = createDomainDataPacket(fb, getTime()); - EXPECT_EQ(getComponentStatus(), ok); + ASSERT_TRUE(waitStatusTheSame(1500, fb, ok)); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); } From 6e7990c63f0385ec65937616c472979525c62140 Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:18:10 +0200 Subject: [PATCH 10/11] Fix signed/unsigned cast in MQTT publisher FB --- modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp b/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp index 84c01c3c..b808626d 100644 --- a/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp +++ b/modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp @@ -1709,7 +1709,7 @@ TEST_F(MqttPublisherFbTest, DISABLED_MultiReaderTest) auto send = [&]() { std::cout << "---------------sending-data---------------" << std::endl; - for (size_t i = 0; i < samples * divider; i++) + for (size_t i = 0; i < static_cast(samples * divider); i++) { uint64_t ts0, ts1; if (i % divider == 0) From fa19daa8c12969df1d3ed1dbbd8ee739f0167e85 Mon Sep 17 00:00:00 2001 From: Aliaksandr Adziareika <8034372+alexadereyko@users.noreply.github.com> Date: Wed, 15 Apr 2026 17:38:19 +0200 Subject: [PATCH 11/11] Change unified workflow ref @v1 --- .github/workflows/ci-downstream.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-downstream.yml b/.github/workflows/ci-downstream.yml index 9907f257..a1032834 100644 --- a/.github/workflows/ci-downstream.yml +++ b/.github/workflows/ci-downstream.yml @@ -12,7 +12,7 @@ on: jobs: call-opendaq-reusable: name: MQTT Streaming - uses: openDAQ/openDAQ-CI/.github/workflows/reusable.yml@jira/TBBAS-3031-reusable-ci + uses: openDAQ/openDAQ-CI/.github/workflows/reusable.yml@v1 with: opendaq-ref: ${{ github.event.inputs.opendaq-ref || '' }} packages: >