From f4ae87adf56a80066054526aa2108c27aff7345e Mon Sep 17 00:00:00 2001 From: darthsharp <48331467+darthsharp@users.noreply.github.com> Date: Sat, 23 Nov 2024 16:30:20 +0100 Subject: [PATCH 1/6] Refactor async handling in MqttValuePublisher Replaced the synchronous worker thread with an asynchronous method in MqttValuePublisher to enhance error handling and logging. Simplified the Ensure.NotNull calls and adjusted logging statements for better readability. Added minor formatting changes to improve code consistency. --- .../MqttValuePublisher.cs | 23 +++++++++++----- .../SmlValueProcessor.cs | 1 + .../ValueHistoryDataSet.cs | 4 +-- .../SmartMeterServer.cs | 26 +++++++++---------- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs index 6e4adae..8ac6a32 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs @@ -23,14 +23,24 @@ public class MqttValuePublisher : IObserver public MqttValuePublisher(MqttPublisherOptions options, ILogger logger) { - _options = Ensure.NotNull(options, nameof(options)); - _logger = Ensure.NotNull(logger, nameof(logger)); + _options = Ensure.NotNull(options); + _logger = Ensure.NotNull(logger); _client = new MqttFactory().CreateMqttClient(); _publishingQueue = new BlockingCollection(); - _workerThread = new Thread(DoWork); + _workerThread = new Thread(async void () => + { + try + { + await DoWorkAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error in MqttValuePublisher worker thread"); + } + }); } public async Task InitAsync() @@ -50,11 +60,11 @@ public async Task InitAsync() _workerThread.Start(); } - private async void DoWork() + private async Task DoWorkAsync() { foreach (var value in _publishingQueue.GetConsumingEnumerable()) { - _logger.LogDebug($"Publish value: {value.Type} = {value.Value}"); + _logger.LogDebug("Publish value: {ValueType} = {Value}", value.Type, value.Value); var publishResult = await _client.PublishAsync( new MqttApplicationMessage @@ -64,7 +74,8 @@ private async void DoWork() PayloadSegment = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { value.Value })) }); - _logger.LogDebug($"Publishing result: {publishResult.ReasonCode} {publishResult.ReasonString}"); + _logger.LogDebug("Publishing result: {ReasonCode} {ReasonString}", publishResult.ReasonCode, + publishResult.ReasonString); } } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs index 84b0b91..ff07b3a 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs @@ -7,6 +7,7 @@ namespace CreativeCoders.SmartMeter.DataProcessing; public class SmlValueProcessor : IObservable { private readonly ValueHistory _valueHistory; + private readonly Subject _valueSubject; public SmlValueProcessor(IObservable observable) diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs b/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs index c6e1d6d..d0de85a 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs @@ -7,10 +7,10 @@ public class ValueHistoryDataSet { public ValueHistoryDataSet(SmlValue value) { - Value = Ensure.NotNull(value, nameof(value)); + Value = Ensure.NotNull(value); } public DateTimeOffset TimeStamp { get; init; } public SmlValue Value { get; } -} \ No newline at end of file +} diff --git a/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs b/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs index 4d866e8..9ee5566 100644 --- a/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs +++ b/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs @@ -13,26 +13,26 @@ namespace CreativeCoders.SmartMeter.Server.Core; [UsedImplicitly] public class SmartMeterServer : IDaemonService { - private readonly ILogger _publisherLogger; private readonly ILogger _logger; - - private IDisposable? _subscription; - private readonly ReactiveSerialPort _serialPort; - private readonly MqttPublisherOptions _mqttPublisherOptions; + private readonly ILogger _publisherLogger; + + private readonly ReactiveSerialPort _serialPort; + + private IDisposable? _subscription; public SmartMeterServer(ILogger logger, IOptions mqttPublisherOptions, ILogger publisherLogger) { - _logger = Ensure.NotNull(logger, nameof(logger)); + _logger = Ensure.NotNull(logger); _mqttPublisherOptions = mqttPublisherOptions.Value; - _publisherLogger = Ensure.NotNull(publisherLogger, nameof(publisherLogger)); - + _publisherLogger = Ensure.NotNull(publisherLogger); + _serialPort = new ReactiveSerialPort("/dev/ttyUSB0"); } - + public async Task StartAsync() { _logger.LogInformation("Starting SmartMeter server"); @@ -40,23 +40,23 @@ public async Task StartAsync() var mqttValuePublisher = new MqttValuePublisher(_mqttPublisherOptions, _publisherLogger); await mqttValuePublisher.InitAsync(); - + _subscription ??= _serialPort .SelectSmlMessages() .SelectSmlValues() .SelectSmartMeterValues() .SubscribeOn(new TaskPoolScheduler(new TaskFactory())) .Subscribe(mqttValuePublisher); - + _serialPort.Open(); } public Task StopAsync() { _logger.LogInformation("Stopping SmartMeter server"); - + _serialPort.Close(); - + if (_subscription != null) { _subscription.Dispose(); From 80348205a2b567122a2c715dff2227132cceae30 Mon Sep 17 00:00:00 2001 From: darthsharp <48331467+darthsharp@users.noreply.github.com> Date: Sun, 24 Nov 2024 19:07:20 +0100 Subject: [PATCH 2/6] Add unit tests for DataProcessing and enhance SmlValueProcessor Introduced new unit tests for DataProcessing to validate SmlValueProcessor behavior. Updated SmlValueProcessor to track GridPowerBalance and use a customizable TimeProvider. Made minor refactors and included new test project setup in the solution file. --- SmartMeter.sln | 9 ++ .../SmartMeterReactiveExtensions.cs | 5 +- .../SmartMeterValueType.cs | 3 +- .../SmlValueProcessor.cs | 53 ++++++--- .../Reactive/ReactiveSerialPort.cs | 3 +- ...ers.SmartMeter.DataProcessing.Tests.csproj | 32 ++++++ .../SmlValueProcessorTests.cs | 106 ++++++++++++++++++ 7 files changed, 191 insertions(+), 20 deletions(-) create mode 100644 tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj create mode 100644 tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs diff --git a/SmartMeter.sln b/SmartMeter.sln index 16832bd..baf4f94 100644 --- a/SmartMeter.sln +++ b/SmartMeter.sln @@ -24,6 +24,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.Server.Core", "source\CreativeCoders.SmartMeter.Server.Core\CreativeCoders.SmartMeter.Server.Core.csproj", "{29638431-6971-4757-BE3B-A83D96300ED4}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{675F198B-B173-421F-A53B-F7B98C8D0E4F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.DataProcessing.Tests", "tests\CreativeCoders.SmartMeter.DataProcessing.Tests\CreativeCoders.SmartMeter.DataProcessing.Tests.csproj", "{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,6 +39,7 @@ Global {9452116E-6A8B-42D2-BBDD-BF465097AEA1} = {B259CB14-56CC-45FA-9756-64A195F4F789} {7AD8C940-B783-4FE9-B437-CE7FB87A97CA} = {B259CB14-56CC-45FA-9756-64A195F4F789} {29638431-6971-4757-BE3B-A83D96300ED4} = {B259CB14-56CC-45FA-9756-64A195F4F789} + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB} = {675F198B-B173-421F-A53B-F7B98C8D0E4F} EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {9CFDCB58-C344-4BF8-9377-60B4D2A684A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -55,5 +60,9 @@ Global {29638431-6971-4757-BE3B-A83D96300ED4}.Debug|Any CPU.Build.0 = Debug|Any CPU {29638431-6971-4757-BE3B-A83D96300ED4}.Release|Any CPU.ActiveCfg = Release|Any CPU {29638431-6971-4757-BE3B-A83D96300ED4}.Release|Any CPU.Build.0 = Release|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs index 25f0dc4..770c7aa 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs @@ -4,8 +4,9 @@ namespace CreativeCoders.SmartMeter.DataProcessing; public static class SmartMeterReactiveExtensions { - public static IObservable SelectSmartMeterValues(this IObservable observable) + public static IObservable SelectSmartMeterValues(this IObservable observable, + TimeProvider? timeProvider = null) { - return new SmlValueProcessor(observable); + return new SmlValueProcessor(observable, timeProvider); } } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs index ca61106..bab02ea 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs @@ -5,5 +5,6 @@ public enum SmartMeterValueType TotalPurchasedEnergy, TotalSoldEnergy, CurrentPurchasingPower, - CurrentSellingPower + CurrentSellingPower, + GridPowerBalance } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs index ff07b3a..1914dda 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs @@ -6,20 +6,20 @@ namespace CreativeCoders.SmartMeter.DataProcessing; public class SmlValueProcessor : IObservable { - private readonly ValueHistory _valueHistory; + private readonly TimeProvider _timeProvider; - private readonly Subject _valueSubject; + private readonly ValueHistory _valueHistory = new ValueHistory(); - public SmlValueProcessor(IObservable observable) - { - _valueSubject = new Subject(); + private readonly Subject _valueSubject = new Subject(); - _valueHistory = new ValueHistory(); + public SmlValueProcessor(IObservable observable, TimeProvider? timeProvider = null) + { + _timeProvider = timeProvider ?? TimeProvider.System; observable .Select(Observable.Return) .Concat() - .Subscribe(ProcessValue); + .Subscribe(ProcessValue, () => _valueSubject.OnCompleted()); } private void ProcessValue(SmlValue smlValue) @@ -28,7 +28,7 @@ private void ProcessValue(SmlValue smlValue) CalcCurrentPower(smlValue, historyData); - var now = DateTimeOffset.Now; + var now = _timeProvider.GetUtcNow(); historyData.DataSets.Add(new ValueHistoryDataSet(smlValue) { TimeStamp = now }); @@ -40,7 +40,7 @@ private void ProcessValue(SmlValue smlValue) return; } - _valueSubject.OnNext(CreateTotalSmartMeterValue(smlValue)); + _valueSubject.OnNext(CreateTotalSmartMeterValue(smlValue.ValueType, smlValue.Value)); historyData.LastValue = smlValue; historyData.LastValueTimeStamp = now; @@ -51,7 +51,7 @@ private void CalcCurrentPower(SmlValue smlValue, ValueHistoryData historyData) foreach (var dataSet in historyData.DataSets) { var valueDiff = smlValue.Value - dataSet.Value.Value; - var timeDiff = DateTimeOffset.Now - dataSet.TimeStamp; + var timeDiff = _timeProvider.GetUtcNow() - dataSet.TimeStamp; if (valueDiff > 10 || timeDiff > TimeSpan.FromSeconds(20)) { @@ -62,13 +62,34 @@ private void CalcCurrentPower(SmlValue smlValue, ValueHistoryData historyData) var value = (decimal)((double)valueDiff * mp); value = Math.Round(value, 0); - _valueSubject.OnNext(CreateCurrentSmartMeterValue(smlValue.ValueType, value)); + PushNewCurrentValue(CreateCurrentSmartMeterValue(smlValue.ValueType, value)); break; } } } + private void PushNewCurrentValue(SmartMeterValue value) + { + _valueSubject.OnNext(value); + + switch (value.Type) + { + case SmartMeterValueType.CurrentPurchasingPower: + _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) + { + Value = value.Value * -1 + }); + break; + case SmartMeterValueType.CurrentSellingPower: + _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) + { + Value = value.Value + }); + break; + } + } + private static SmartMeterValue CreateCurrentSmartMeterValue(SmlValueType smlValueType, decimal value) { return smlValueType switch @@ -85,19 +106,19 @@ private static SmartMeterValue CreateCurrentSmartMeterValue(SmlValueType smlValu }; } - private static SmartMeterValue CreateTotalSmartMeterValue(SmlValue smlValue) + private static SmartMeterValue CreateTotalSmartMeterValue(SmlValueType smlValueType, decimal value) { - return smlValue.ValueType switch + return smlValueType switch { SmlValueType.PurchasedEnergy => new SmartMeterValue(SmartMeterValueType.TotalPurchasedEnergy) { - Value = smlValue.Value + Value = value }, SmlValueType.SoldEnergy => new SmartMeterValue(SmartMeterValueType.TotalSoldEnergy) { - Value = smlValue.Value + Value = value }, - _ => throw new ArgumentOutOfRangeException(nameof(smlValue.ValueType)) + _ => throw new ArgumentOutOfRangeException(nameof(smlValueType)) }; } diff --git a/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs b/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs index afd6598..324c65b 100644 --- a/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs +++ b/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs @@ -7,6 +7,7 @@ namespace CreativeCoders.SmartMeter.Sml.Reactive; public sealed class ReactiveSerialPort : IObservable, IDisposable { private readonly IObservable _dataObservable; + private readonly SerialPort _serialPort; public ReactiveSerialPort(string portName) : this(new SerialPort(portName)) @@ -15,7 +16,7 @@ public ReactiveSerialPort(string portName) : this(new SerialPort(portName)) public ReactiveSerialPort(SerialPort serialPort) { - _serialPort = Ensure.NotNull(serialPort, nameof(serialPort)); + _serialPort = Ensure.NotNull(serialPort); _dataObservable = Observable .FromEvent( diff --git a/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj new file mode 100644 index 0000000..75791c5 --- /dev/null +++ b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj @@ -0,0 +1,32 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + diff --git a/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs new file mode 100644 index 0000000..45db059 --- /dev/null +++ b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs @@ -0,0 +1,106 @@ +using System.Reactive.Subjects; +using CreativeCoders.SmartMeter.Sml; +using FluentAssertions; +using Microsoft.Extensions.Time.Testing; +using Xunit; + +namespace CreativeCoders.SmartMeter.DataProcessing.Tests; + +public class SmlValueProcessorTests +{ + [Fact] + public void Subscribe_WithPurchasedEnergyValue_ShouldReturnSmartMeterValueWithTotalPurchasedEnergy() + { + // Arrange + SmartMeterValue? resultValue = null; + + var input = new Subject(); + + var smlValue = new SmlValue(SmlValueType.PurchasedEnergy) + { + Value = 123.45m + }; + + var smlValueProcessor = new SmlValueProcessor(input); + + // Act + smlValueProcessor.Subscribe(x => resultValue = x); + + input.OnNext(smlValue); + + // Assert + resultValue + .Should() + .NotBeNull(); + + resultValue!.Type + .Should() + .Be(SmartMeterValueType.TotalPurchasedEnergy); + } + + [Theory] + [InlineData(SmlValueType.PurchasedEnergy, 100, 200)] + [InlineData(SmlValueType.PurchasedEnergy, 250, 300)] + [InlineData(SmlValueType.SoldEnergy, 200, 250)] + [InlineData(SmlValueType.SoldEnergy, 15, 250)] + public void Subscribe_WithTwoPurchasedEnergyValues_ShouldReturnTotalAndCurrentAndBalancePurchasedEnergy( + SmlValueType smlValueType, decimal smlValueValue1, decimal smlValueValue2) + { + // Arrange + var expectedBalanceValue = (smlValueValue2 - smlValueValue1) * 60; + if (smlValueType == SmlValueType.PurchasedEnergy) + { + expectedBalanceValue *= -1; + } + + var expectedSmartMeterValueType = smlValueType == SmlValueType.PurchasedEnergy + ? SmartMeterValueType.CurrentPurchasingPower + : SmartMeterValueType.CurrentSellingPower; + + List resultValues = []; + var fakeTimeProvider = new FakeTimeProvider(DateTimeOffset.Now); + + var input = new Subject(); + + var smlValue1 = new SmlValue(smlValueType) + { + Value = smlValueValue1 + }; + + var smlValue2 = new SmlValue(smlValueType) + { + Value = smlValueValue2 + }; + + var smlValueProcessor = new SmlValueProcessor(input, fakeTimeProvider); + + // Act + smlValueProcessor.Subscribe(x => resultValues.Add(x)); + + input.OnNext(smlValue1); + + fakeTimeProvider.Advance(TimeSpan.FromSeconds(60)); + + input.OnNext(smlValue2); + input.OnCompleted(); + + // Assert + resultValues + .Should() + .HaveCount(4); + + var gridPowerBalanceValue = resultValues + .Single(x => x.Type == SmartMeterValueType.GridPowerBalance); + + gridPowerBalanceValue.Value + .Should() + .Be(expectedBalanceValue); + + var currentPowerValue = resultValues + .Single(x => x.Type == expectedSmartMeterValueType); + + currentPowerValue.Value + .Should() + .Be((smlValueValue2 - smlValueValue1) * 60); + } +} From 59eef4be0e0ee4f5f5bd641fe67735f61415fca6 Mon Sep 17 00:00:00 2001 From: darthsharp <48331467+darthsharp@users.noreply.github.com> Date: Sun, 24 Nov 2024 19:20:19 +0100 Subject: [PATCH 3/6] Move existing install directory to backup before reinstall Previously, the script deleted the contents of the installation directory directly. Now, it renames the existing installation directory to a backup location before proceeding, ensuring no files are lost until the new installation is confirmed successful. --- source/CreativeCoders.SmartMeter.Server.Linux/install.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/CreativeCoders.SmartMeter.Server.Linux/install.sh b/source/CreativeCoders.SmartMeter.Server.Linux/install.sh index 56ff21b..19bc275 100644 --- a/source/CreativeCoders.SmartMeter.Server.Linux/install.sh +++ b/source/CreativeCoders.SmartMeter.Server.Linux/install.sh @@ -12,7 +12,8 @@ fi if [ -d "$APP_DIR" ]; then echo "Delete existing installation files" - rm -rf "${APP_DIR:?}/"* + rm -rf "${APP_DIR:?}.bak/"* + mv -f "$APP_DIR" "${APP_DIR:?}.bak" else echo "Create installation target directory" mkdir "$APP_DIR" From 5db28e9d4e489df575e36747a9480718117415a9 Mon Sep 17 00:00:00 2001 From: darthsharp <48331467+darthsharp@users.noreply.github.com> Date: Sun, 24 Nov 2024 19:47:31 +0100 Subject: [PATCH 4/6] Fix payload handling, processing logic, and package versions Removed the redundant ContentType setting when publishing MQTT messages for cleaner code. Updated SmlValueProcessor to ignore zero values and removed unnecessary complete action in the subscription. Adjusted package versions in the project file to match the correct dependencies. --- .../CreativeCoders.SmartMeter.DataProcessing.csproj | 8 ++++---- .../MqttValuePublisher.cs | 4 ++-- .../SmlValueProcessor.cs | 7 ++++++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj b/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj index c2fc72e..752256a 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj +++ b/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj @@ -7,13 +7,13 @@ - - - + + + - + diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs index 8ac6a32..5d04093 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs @@ -70,8 +70,8 @@ private async Task DoWorkAsync() new MqttApplicationMessage { Topic = string.Format(_options.TopicTemplate, value.Type), - ContentType = ContentMediaTypes.Application.Json, - PayloadSegment = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { value.Value })) + //ContentType = ContentMediaTypes.Application.Json, + Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { value.Value })) }); _logger.LogDebug("Publishing result: {ReasonCode} {ReasonString}", publishResult.ReasonCode, diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs index 1914dda..1907dcb 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs @@ -19,7 +19,7 @@ public SmlValueProcessor(IObservable observable, TimeProvider? timePro observable .Select(Observable.Return) .Concat() - .Subscribe(ProcessValue, () => _valueSubject.OnCompleted()); + .Subscribe(ProcessValue); } private void ProcessValue(SmlValue smlValue) @@ -73,6 +73,11 @@ private void PushNewCurrentValue(SmartMeterValue value) { _valueSubject.OnNext(value); + if (value.Value == 0) + { + return; + } + switch (value.Type) { case SmartMeterValueType.CurrentPurchasingPower: From df2777707a7ed9d49e0b769115269375630178d8 Mon Sep 17 00:00:00 2001 From: darthsharp <48331467+darthsharp@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:17:40 +0100 Subject: [PATCH 5/6] Refactor MQTT handling and add reconnect feature Added a new async method to handle MQTT client reconnection on disconnection. Modified message publishing to support both JSON and string payloads with a new `WriteAsJson` flag in `SmartMeterValue`. Updated MQTTnet package version and refactored server stop procedures for better code organization. --- ...iveCoders.SmartMeter.DataProcessing.csproj | 2 +- .../MqttValuePublisher.cs | 49 ++++++++++++++++--- .../SmartMeterValue.cs | 6 ++- .../SmlValueProcessor.cs | 6 ++- .../SmartMeterServer.cs | 31 +++++++++--- 5 files changed, 76 insertions(+), 18 deletions(-) diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj b/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj index 752256a..3d63d05 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj +++ b/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj @@ -9,7 +9,7 @@ - + diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs index 5d04093..19a61bf 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Globalization; using System.Text; using System.Text.Json; using CreativeCoders.Core; @@ -51,6 +52,8 @@ public async Task InitAsync() .WithConnectionUri(_options.Server) .Build()); + _client.DisconnectedAsync += _ => TryReconnectAsync(); + if (connectResult.ResultCode != MqttClientConnectResultCode.Success) { throw new InvalidOperationException( @@ -60,25 +63,57 @@ public async Task InitAsync() _workerThread.Start(); } + private async Task TryReconnectAsync() + { + _logger.LogWarning("Mqtt client disconnected. Reconnecting..."); + + await Task.Delay(1000); + + await _client.ConnectAsync(new MqttClientOptionsBuilder() + .WithClientId(_options.ClientName) + .WithConnectionUri(_options.Server) + .Build()); + } + private async Task DoWorkAsync() { foreach (var value in _publishingQueue.GetConsumingEnumerable()) { _logger.LogDebug("Publish value: {ValueType} = {Value}", value.Type, value.Value); - var publishResult = await _client.PublishAsync( - new MqttApplicationMessage - { - Topic = string.Format(_options.TopicTemplate, value.Type), - //ContentType = ContentMediaTypes.Application.Json, - Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { value.Value })) - }); + var payload = value.WriteAsJson + ? JsonSerializer.Serialize(new { value.Value }) + : value.Value.ToString(CultureInfo.InvariantCulture); + + var message = new MqttApplicationMessage + { + Topic = string.Format(_options.TopicTemplate, value.Type), + //ContentType = ContentMediaTypes.Application.Json, + Payload = Encoding.UTF8.GetBytes(payload) + }; + + var publishResult = await SendMessageAsync(message); _logger.LogDebug("Publishing result: {ReasonCode} {ReasonString}", publishResult.ReasonCode, publishResult.ReasonString); } } + private async Task SendMessageAsync(MqttApplicationMessage message) + { + try + { + return await _client.PublishAsync(message).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error sending message"); + } + + return new MqttClientPublishResult(null, MqttClientPublishReasonCode.UnspecifiedError, + "Sending message failed with exception", []); + } + public void OnCompleted() { } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs index bd14d78..2182b65 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs @@ -8,6 +8,8 @@ public SmartMeterValue(SmartMeterValueType type) } public SmartMeterValueType Type { get; } - + public decimal Value { get; init; } -} \ No newline at end of file + + public bool WriteAsJson { get; set; } = true; +} diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs index 1907dcb..5478b59 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs @@ -83,13 +83,15 @@ private void PushNewCurrentValue(SmartMeterValue value) case SmartMeterValueType.CurrentPurchasingPower: _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) { - Value = value.Value * -1 + Value = value.Value * -1, + WriteAsJson = false }); break; case SmartMeterValueType.CurrentSellingPower: _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) { - Value = value.Value + Value = value.Value, + WriteAsJson = false }); break; } diff --git a/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs b/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs index 9ee5566..1a151a6 100644 --- a/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs +++ b/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs @@ -33,6 +33,27 @@ public SmartMeterServer(ILogger logger, _serialPort = new ReactiveSerialPort("/dev/ttyUSB0"); } + private void CloseSerialPort() + { + _logger.LogInformation("Closing serial port..."); + _serialPort.Close(); + _logger.LogInformation("Serial port closed"); + } + + private void DisposingSubscription() + { + if (_subscription != null) + { + _logger.LogInformation("Disposing subscription..."); + + _subscription.Dispose(); + + _logger.LogInformation("Subscription disposed"); + + _subscription = null; + } + } + public async Task StartAsync() { _logger.LogInformation("Starting SmartMeter server"); @@ -55,13 +76,11 @@ public Task StopAsync() { _logger.LogInformation("Stopping SmartMeter server"); - _serialPort.Close(); + DisposingSubscription(); - if (_subscription != null) - { - _subscription.Dispose(); - _subscription = null; - } + CloseSerialPort(); + + _logger.LogInformation("SmartMeter server stopped"); return Task.CompletedTask; } From c8e503f4044092c886f47dc45346339d8c0e083f Mon Sep 17 00:00:00 2001 From: darthsharp <48331467+darthsharp@users.noreply.github.com> Date: Sat, 30 Nov 2024 16:20:38 +0100 Subject: [PATCH 6/6] Fix grid power balance sign inversion. Corrected the sign inversion logic for grid power balance calculation in SmlValueProcessor. The update ensures that current purchasing power and current selling power values are processed with the correct sign. This resolves issues with interpreting power flow direction. --- .../SmlValueProcessor.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs index 5478b59..099cc49 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs @@ -83,14 +83,14 @@ private void PushNewCurrentValue(SmartMeterValue value) case SmartMeterValueType.CurrentPurchasingPower: _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) { - Value = value.Value * -1, + Value = value.Value, WriteAsJson = false }); break; case SmartMeterValueType.CurrentSellingPower: _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) { - Value = value.Value, + Value = value.Value * -1, WriteAsJson = false }); break;