diff --git a/SmartMeter.sln b/SmartMeter.sln index 16832bd..baf4f94 100644 --- a/SmartMeter.sln +++ b/SmartMeter.sln @@ -24,6 +24,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.Server.Core", "source\CreativeCoders.SmartMeter.Server.Core\CreativeCoders.SmartMeter.Server.Core.csproj", "{29638431-6971-4757-BE3B-A83D96300ED4}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{675F198B-B173-421F-A53B-F7B98C8D0E4F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.DataProcessing.Tests", "tests\CreativeCoders.SmartMeter.DataProcessing.Tests\CreativeCoders.SmartMeter.DataProcessing.Tests.csproj", "{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,6 +39,7 @@ Global {9452116E-6A8B-42D2-BBDD-BF465097AEA1} = {B259CB14-56CC-45FA-9756-64A195F4F789} {7AD8C940-B783-4FE9-B437-CE7FB87A97CA} = {B259CB14-56CC-45FA-9756-64A195F4F789} {29638431-6971-4757-BE3B-A83D96300ED4} = {B259CB14-56CC-45FA-9756-64A195F4F789} + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB} = {675F198B-B173-421F-A53B-F7B98C8D0E4F} EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {9CFDCB58-C344-4BF8-9377-60B4D2A684A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -55,5 +60,9 @@ Global {29638431-6971-4757-BE3B-A83D96300ED4}.Debug|Any CPU.Build.0 = Debug|Any CPU {29638431-6971-4757-BE3B-A83D96300ED4}.Release|Any CPU.ActiveCfg = Release|Any CPU {29638431-6971-4757-BE3B-A83D96300ED4}.Release|Any CPU.Build.0 = Release|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj b/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj index c2fc72e..3d63d05 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj +++ b/source/CreativeCoders.SmartMeter.DataProcessing/CreativeCoders.SmartMeter.DataProcessing.csproj @@ -7,13 +7,13 @@ - - - + + + - + diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs index 6e4adae..19a61bf 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/MqttValuePublisher.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Globalization; using System.Text; using System.Text.Json; using CreativeCoders.Core; @@ -23,14 +24,24 @@ public class MqttValuePublisher : IObserver public MqttValuePublisher(MqttPublisherOptions options, ILogger logger) { - _options = Ensure.NotNull(options, nameof(options)); - _logger = Ensure.NotNull(logger, nameof(logger)); + _options = Ensure.NotNull(options); + _logger = Ensure.NotNull(logger); _client = new MqttFactory().CreateMqttClient(); _publishingQueue = new BlockingCollection(); - _workerThread = new Thread(DoWork); + _workerThread = new Thread(async void () => + { + try + { + await DoWorkAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error in MqttValuePublisher worker thread"); + } + }); } public async Task InitAsync() @@ -41,6 +52,8 @@ public async Task InitAsync() .WithConnectionUri(_options.Server) .Build()); + _client.DisconnectedAsync += _ => TryReconnectAsync(); + if (connectResult.ResultCode != MqttClientConnectResultCode.Success) { throw new InvalidOperationException( @@ -50,24 +63,57 @@ public async Task InitAsync() _workerThread.Start(); } - private async void DoWork() + private async Task TryReconnectAsync() + { + _logger.LogWarning("Mqtt client disconnected. Reconnecting..."); + + await Task.Delay(1000); + + await _client.ConnectAsync(new MqttClientOptionsBuilder() + .WithClientId(_options.ClientName) + .WithConnectionUri(_options.Server) + .Build()); + } + + private async Task DoWorkAsync() { foreach (var value in _publishingQueue.GetConsumingEnumerable()) { - _logger.LogDebug($"Publish value: {value.Type} = {value.Value}"); + _logger.LogDebug("Publish value: {ValueType} = {Value}", value.Type, value.Value); + + var payload = value.WriteAsJson + ? JsonSerializer.Serialize(new { value.Value }) + : value.Value.ToString(CultureInfo.InvariantCulture); + + var message = new MqttApplicationMessage + { + Topic = string.Format(_options.TopicTemplate, value.Type), + //ContentType = ContentMediaTypes.Application.Json, + Payload = Encoding.UTF8.GetBytes(payload) + }; - var publishResult = await _client.PublishAsync( - new MqttApplicationMessage - { - Topic = string.Format(_options.TopicTemplate, value.Type), - ContentType = ContentMediaTypes.Application.Json, - PayloadSegment = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { value.Value })) - }); + var publishResult = await SendMessageAsync(message); - _logger.LogDebug($"Publishing result: {publishResult.ReasonCode} {publishResult.ReasonString}"); + _logger.LogDebug("Publishing result: {ReasonCode} {ReasonString}", publishResult.ReasonCode, + publishResult.ReasonString); } } + private async Task SendMessageAsync(MqttApplicationMessage message) + { + try + { + return await _client.PublishAsync(message).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error sending message"); + } + + return new MqttClientPublishResult(null, MqttClientPublishReasonCode.UnspecifiedError, + "Sending message failed with exception", []); + } + public void OnCompleted() { } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs index 25f0dc4..770c7aa 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterReactiveExtensions.cs @@ -4,8 +4,9 @@ namespace CreativeCoders.SmartMeter.DataProcessing; public static class SmartMeterReactiveExtensions { - public static IObservable SelectSmartMeterValues(this IObservable observable) + public static IObservable SelectSmartMeterValues(this IObservable observable, + TimeProvider? timeProvider = null) { - return new SmlValueProcessor(observable); + return new SmlValueProcessor(observable, timeProvider); } } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs index bd14d78..2182b65 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValue.cs @@ -8,6 +8,8 @@ public SmartMeterValue(SmartMeterValueType type) } public SmartMeterValueType Type { get; } - + public decimal Value { get; init; } -} \ No newline at end of file + + public bool WriteAsJson { get; set; } = true; +} diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs index ca61106..bab02ea 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmartMeterValueType.cs @@ -5,5 +5,6 @@ public enum SmartMeterValueType TotalPurchasedEnergy, TotalSoldEnergy, CurrentPurchasingPower, - CurrentSellingPower + CurrentSellingPower, + GridPowerBalance } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs index 84b0b91..099cc49 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/SmlValueProcessor.cs @@ -6,14 +6,15 @@ namespace CreativeCoders.SmartMeter.DataProcessing; public class SmlValueProcessor : IObservable { - private readonly ValueHistory _valueHistory; - private readonly Subject _valueSubject; + private readonly TimeProvider _timeProvider; - public SmlValueProcessor(IObservable observable) - { - _valueSubject = new Subject(); + private readonly ValueHistory _valueHistory = new ValueHistory(); + + private readonly Subject _valueSubject = new Subject(); - _valueHistory = new ValueHistory(); + public SmlValueProcessor(IObservable observable, TimeProvider? timeProvider = null) + { + _timeProvider = timeProvider ?? TimeProvider.System; observable .Select(Observable.Return) @@ -27,7 +28,7 @@ private void ProcessValue(SmlValue smlValue) CalcCurrentPower(smlValue, historyData); - var now = DateTimeOffset.Now; + var now = _timeProvider.GetUtcNow(); historyData.DataSets.Add(new ValueHistoryDataSet(smlValue) { TimeStamp = now }); @@ -39,7 +40,7 @@ private void ProcessValue(SmlValue smlValue) return; } - _valueSubject.OnNext(CreateTotalSmartMeterValue(smlValue)); + _valueSubject.OnNext(CreateTotalSmartMeterValue(smlValue.ValueType, smlValue.Value)); historyData.LastValue = smlValue; historyData.LastValueTimeStamp = now; @@ -50,7 +51,7 @@ private void CalcCurrentPower(SmlValue smlValue, ValueHistoryData historyData) foreach (var dataSet in historyData.DataSets) { var valueDiff = smlValue.Value - dataSet.Value.Value; - var timeDiff = DateTimeOffset.Now - dataSet.TimeStamp; + var timeDiff = _timeProvider.GetUtcNow() - dataSet.TimeStamp; if (valueDiff > 10 || timeDiff > TimeSpan.FromSeconds(20)) { @@ -61,13 +62,41 @@ private void CalcCurrentPower(SmlValue smlValue, ValueHistoryData historyData) var value = (decimal)((double)valueDiff * mp); value = Math.Round(value, 0); - _valueSubject.OnNext(CreateCurrentSmartMeterValue(smlValue.ValueType, value)); + PushNewCurrentValue(CreateCurrentSmartMeterValue(smlValue.ValueType, value)); break; } } } + private void PushNewCurrentValue(SmartMeterValue value) + { + _valueSubject.OnNext(value); + + if (value.Value == 0) + { + return; + } + + switch (value.Type) + { + case SmartMeterValueType.CurrentPurchasingPower: + _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) + { + Value = value.Value, + WriteAsJson = false + }); + break; + case SmartMeterValueType.CurrentSellingPower: + _valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance) + { + Value = value.Value * -1, + WriteAsJson = false + }); + break; + } + } + private static SmartMeterValue CreateCurrentSmartMeterValue(SmlValueType smlValueType, decimal value) { return smlValueType switch @@ -84,19 +113,19 @@ private static SmartMeterValue CreateCurrentSmartMeterValue(SmlValueType smlValu }; } - private static SmartMeterValue CreateTotalSmartMeterValue(SmlValue smlValue) + private static SmartMeterValue CreateTotalSmartMeterValue(SmlValueType smlValueType, decimal value) { - return smlValue.ValueType switch + return smlValueType switch { SmlValueType.PurchasedEnergy => new SmartMeterValue(SmartMeterValueType.TotalPurchasedEnergy) { - Value = smlValue.Value + Value = value }, SmlValueType.SoldEnergy => new SmartMeterValue(SmartMeterValueType.TotalSoldEnergy) { - Value = smlValue.Value + Value = value }, - _ => throw new ArgumentOutOfRangeException(nameof(smlValue.ValueType)) + _ => throw new ArgumentOutOfRangeException(nameof(smlValueType)) }; } diff --git a/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs b/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs index c6e1d6d..d0de85a 100644 --- a/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs +++ b/source/CreativeCoders.SmartMeter.DataProcessing/ValueHistoryDataSet.cs @@ -7,10 +7,10 @@ public class ValueHistoryDataSet { public ValueHistoryDataSet(SmlValue value) { - Value = Ensure.NotNull(value, nameof(value)); + Value = Ensure.NotNull(value); } public DateTimeOffset TimeStamp { get; init; } public SmlValue Value { get; } -} \ No newline at end of file +} diff --git a/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs b/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs index 4d866e8..1a151a6 100644 --- a/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs +++ b/source/CreativeCoders.SmartMeter.Server.Core/SmartMeterServer.cs @@ -13,26 +13,47 @@ namespace CreativeCoders.SmartMeter.Server.Core; [UsedImplicitly] public class SmartMeterServer : IDaemonService { - private readonly ILogger _publisherLogger; private readonly ILogger _logger; - - private IDisposable? _subscription; - private readonly ReactiveSerialPort _serialPort; - private readonly MqttPublisherOptions _mqttPublisherOptions; + private readonly ILogger _publisherLogger; + + private readonly ReactiveSerialPort _serialPort; + + private IDisposable? _subscription; public SmartMeterServer(ILogger logger, IOptions mqttPublisherOptions, ILogger publisherLogger) { - _logger = Ensure.NotNull(logger, nameof(logger)); + _logger = Ensure.NotNull(logger); _mqttPublisherOptions = mqttPublisherOptions.Value; - _publisherLogger = Ensure.NotNull(publisherLogger, nameof(publisherLogger)); - + _publisherLogger = Ensure.NotNull(publisherLogger); + _serialPort = new ReactiveSerialPort("/dev/ttyUSB0"); } - + + private void CloseSerialPort() + { + _logger.LogInformation("Closing serial port..."); + _serialPort.Close(); + _logger.LogInformation("Serial port closed"); + } + + private void DisposingSubscription() + { + if (_subscription != null) + { + _logger.LogInformation("Disposing subscription..."); + + _subscription.Dispose(); + + _logger.LogInformation("Subscription disposed"); + + _subscription = null; + } + } + public async Task StartAsync() { _logger.LogInformation("Starting SmartMeter server"); @@ -40,28 +61,26 @@ public async Task StartAsync() var mqttValuePublisher = new MqttValuePublisher(_mqttPublisherOptions, _publisherLogger); await mqttValuePublisher.InitAsync(); - + _subscription ??= _serialPort .SelectSmlMessages() .SelectSmlValues() .SelectSmartMeterValues() .SubscribeOn(new TaskPoolScheduler(new TaskFactory())) .Subscribe(mqttValuePublisher); - + _serialPort.Open(); } public Task StopAsync() { _logger.LogInformation("Stopping SmartMeter server"); - - _serialPort.Close(); - - if (_subscription != null) - { - _subscription.Dispose(); - _subscription = null; - } + + DisposingSubscription(); + + CloseSerialPort(); + + _logger.LogInformation("SmartMeter server stopped"); return Task.CompletedTask; } diff --git a/source/CreativeCoders.SmartMeter.Server.Linux/install.sh b/source/CreativeCoders.SmartMeter.Server.Linux/install.sh index 56ff21b..19bc275 100644 --- a/source/CreativeCoders.SmartMeter.Server.Linux/install.sh +++ b/source/CreativeCoders.SmartMeter.Server.Linux/install.sh @@ -12,7 +12,8 @@ fi if [ -d "$APP_DIR" ]; then echo "Delete existing installation files" - rm -rf "${APP_DIR:?}/"* + rm -rf "${APP_DIR:?}.bak/"* + mv -f "$APP_DIR" "${APP_DIR:?}.bak" else echo "Create installation target directory" mkdir "$APP_DIR" diff --git a/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs b/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs index afd6598..324c65b 100644 --- a/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs +++ b/source/CreativeCoders.SmartMeter.Sml/Reactive/ReactiveSerialPort.cs @@ -7,6 +7,7 @@ namespace CreativeCoders.SmartMeter.Sml.Reactive; public sealed class ReactiveSerialPort : IObservable, IDisposable { private readonly IObservable _dataObservable; + private readonly SerialPort _serialPort; public ReactiveSerialPort(string portName) : this(new SerialPort(portName)) @@ -15,7 +16,7 @@ public ReactiveSerialPort(string portName) : this(new SerialPort(portName)) public ReactiveSerialPort(SerialPort serialPort) { - _serialPort = Ensure.NotNull(serialPort, nameof(serialPort)); + _serialPort = Ensure.NotNull(serialPort); _dataObservable = Observable .FromEvent( diff --git a/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj new file mode 100644 index 0000000..75791c5 --- /dev/null +++ b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/CreativeCoders.SmartMeter.DataProcessing.Tests.csproj @@ -0,0 +1,32 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + diff --git a/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs new file mode 100644 index 0000000..45db059 --- /dev/null +++ b/tests/CreativeCoders.SmartMeter.DataProcessing.Tests/SmlValueProcessorTests.cs @@ -0,0 +1,106 @@ +using System.Reactive.Subjects; +using CreativeCoders.SmartMeter.Sml; +using FluentAssertions; +using Microsoft.Extensions.Time.Testing; +using Xunit; + +namespace CreativeCoders.SmartMeter.DataProcessing.Tests; + +public class SmlValueProcessorTests +{ + [Fact] + public void Subscribe_WithPurchasedEnergyValue_ShouldReturnSmartMeterValueWithTotalPurchasedEnergy() + { + // Arrange + SmartMeterValue? resultValue = null; + + var input = new Subject(); + + var smlValue = new SmlValue(SmlValueType.PurchasedEnergy) + { + Value = 123.45m + }; + + var smlValueProcessor = new SmlValueProcessor(input); + + // Act + smlValueProcessor.Subscribe(x => resultValue = x); + + input.OnNext(smlValue); + + // Assert + resultValue + .Should() + .NotBeNull(); + + resultValue!.Type + .Should() + .Be(SmartMeterValueType.TotalPurchasedEnergy); + } + + [Theory] + [InlineData(SmlValueType.PurchasedEnergy, 100, 200)] + [InlineData(SmlValueType.PurchasedEnergy, 250, 300)] + [InlineData(SmlValueType.SoldEnergy, 200, 250)] + [InlineData(SmlValueType.SoldEnergy, 15, 250)] + public void Subscribe_WithTwoPurchasedEnergyValues_ShouldReturnTotalAndCurrentAndBalancePurchasedEnergy( + SmlValueType smlValueType, decimal smlValueValue1, decimal smlValueValue2) + { + // Arrange + var expectedBalanceValue = (smlValueValue2 - smlValueValue1) * 60; + if (smlValueType == SmlValueType.PurchasedEnergy) + { + expectedBalanceValue *= -1; + } + + var expectedSmartMeterValueType = smlValueType == SmlValueType.PurchasedEnergy + ? SmartMeterValueType.CurrentPurchasingPower + : SmartMeterValueType.CurrentSellingPower; + + List resultValues = []; + var fakeTimeProvider = new FakeTimeProvider(DateTimeOffset.Now); + + var input = new Subject(); + + var smlValue1 = new SmlValue(smlValueType) + { + Value = smlValueValue1 + }; + + var smlValue2 = new SmlValue(smlValueType) + { + Value = smlValueValue2 + }; + + var smlValueProcessor = new SmlValueProcessor(input, fakeTimeProvider); + + // Act + smlValueProcessor.Subscribe(x => resultValues.Add(x)); + + input.OnNext(smlValue1); + + fakeTimeProvider.Advance(TimeSpan.FromSeconds(60)); + + input.OnNext(smlValue2); + input.OnCompleted(); + + // Assert + resultValues + .Should() + .HaveCount(4); + + var gridPowerBalanceValue = resultValues + .Single(x => x.Type == SmartMeterValueType.GridPowerBalance); + + gridPowerBalanceValue.Value + .Should() + .Be(expectedBalanceValue); + + var currentPowerValue = resultValues + .Single(x => x.Type == expectedSmartMeterValueType); + + currentPowerValue.Value + .Should() + .Be((smlValueValue2 - smlValueValue1) * 60); + } +}