Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions SmartMeter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.Server.Core", "source\CreativeCoders.SmartMeter.Server.Core\CreativeCoders.SmartMeter.Server.Core.csproj", "{29638431-6971-4757-BE3B-A83D96300ED4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{675F198B-B173-421F-A53B-F7B98C8D0E4F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CreativeCoders.SmartMeter.DataProcessing.Tests", "tests\CreativeCoders.SmartMeter.DataProcessing.Tests\CreativeCoders.SmartMeter.DataProcessing.Tests.csproj", "{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -35,6 +39,7 @@ Global
{9452116E-6A8B-42D2-BBDD-BF465097AEA1} = {B259CB14-56CC-45FA-9756-64A195F4F789}
{7AD8C940-B783-4FE9-B437-CE7FB87A97CA} = {B259CB14-56CC-45FA-9756-64A195F4F789}
{29638431-6971-4757-BE3B-A83D96300ED4} = {B259CB14-56CC-45FA-9756-64A195F4F789}
{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB} = {675F198B-B173-421F-A53B-F7B98C8D0E4F}
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{9CFDCB58-C344-4BF8-9377-60B4D2A684A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand All @@ -55,5 +60,9 @@ Global
{29638431-6971-4757-BE3B-A83D96300ED4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{29638431-6971-4757-BE3B-A83D96300ED4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{29638431-6971-4757-BE3B-A83D96300ED4}.Release|Any CPU.Build.0 = Release|Any CPU
{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{29653CEF-A35C-4F4F-88EB-9336B7ABA9FB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CreativeCoders.Net" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageReference Include="MQTTnet" Version="4.3.7.1207" />
<PackageReference Include="CreativeCoders.Net" Version="4.3.0"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2"/>
<PackageReference Include="MQTTnet" Version="4.3.7.1207"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\CreativeCoders.SmartMeter.Sml\CreativeCoders.SmartMeter.Sml.csproj" />
<ProjectReference Include="..\CreativeCoders.SmartMeter.Sml\CreativeCoders.SmartMeter.Sml.csproj"/>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Globalization;
using System.Text;
using System.Text.Json;
using CreativeCoders.Core;
Expand All @@ -23,14 +24,24 @@ public class MqttValuePublisher : IObserver<SmartMeterValue>

public MqttValuePublisher(MqttPublisherOptions options, ILogger<MqttValuePublisher> logger)
{
_options = Ensure.NotNull(options, nameof(options));
_logger = Ensure.NotNull(logger, nameof(logger));
_options = Ensure.NotNull(options);
_logger = Ensure.NotNull(logger);

_client = new MqttFactory().CreateMqttClient();

_publishingQueue = new BlockingCollection<SmartMeterValue>();

_workerThread = new Thread(DoWork);
_workerThread = new Thread(async void () =>
{
try
{
await DoWorkAsync();
}
catch (Exception e)
{
_logger.LogError(e, "Error in MqttValuePublisher worker thread");
}
});
}

public async Task InitAsync()
Expand All @@ -41,6 +52,8 @@ public async Task InitAsync()
.WithConnectionUri(_options.Server)
.Build());

_client.DisconnectedAsync += _ => TryReconnectAsync();

if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
throw new InvalidOperationException(
Expand All @@ -50,24 +63,57 @@ public async Task InitAsync()
_workerThread.Start();
}

private async void DoWork()
private async Task TryReconnectAsync()
{
_logger.LogWarning("Mqtt client disconnected. Reconnecting...");

await Task.Delay(1000);

await _client.ConnectAsync(new MqttClientOptionsBuilder()
.WithClientId(_options.ClientName)
.WithConnectionUri(_options.Server)
.Build());
}

private async Task DoWorkAsync()
{
foreach (var value in _publishingQueue.GetConsumingEnumerable())
{
_logger.LogDebug($"Publish value: {value.Type} = {value.Value}");
_logger.LogDebug("Publish value: {ValueType} = {Value}", value.Type, value.Value);

var payload = value.WriteAsJson
? JsonSerializer.Serialize(new { value.Value })
: value.Value.ToString(CultureInfo.InvariantCulture);

var message = new MqttApplicationMessage
{
Topic = string.Format(_options.TopicTemplate, value.Type),
//ContentType = ContentMediaTypes.Application.Json,
Payload = Encoding.UTF8.GetBytes(payload)
};

var publishResult = await _client.PublishAsync(
new MqttApplicationMessage
{
Topic = string.Format(_options.TopicTemplate, value.Type),
ContentType = ContentMediaTypes.Application.Json,
PayloadSegment = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { value.Value }))
});
var publishResult = await SendMessageAsync(message);

_logger.LogDebug($"Publishing result: {publishResult.ReasonCode} {publishResult.ReasonString}");
_logger.LogDebug("Publishing result: {ReasonCode} {ReasonString}", publishResult.ReasonCode,
publishResult.ReasonString);
}
}

private async Task<MqttClientPublishResult> SendMessageAsync(MqttApplicationMessage message)
{
try
{
return await _client.PublishAsync(message).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error sending message");
}

return new MqttClientPublishResult(null, MqttClientPublishReasonCode.UnspecifiedError,
"Sending message failed with exception", []);
}

public void OnCompleted()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ namespace CreativeCoders.SmartMeter.DataProcessing;

public static class SmartMeterReactiveExtensions
{
public static IObservable<SmartMeterValue> SelectSmartMeterValues(this IObservable<SmlValue> observable)
public static IObservable<SmartMeterValue> SelectSmartMeterValues(this IObservable<SmlValue> observable,
TimeProvider? timeProvider = null)
{
return new SmlValueProcessor(observable);
return new SmlValueProcessor(observable, timeProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public SmartMeterValue(SmartMeterValueType type)
}

public SmartMeterValueType Type { get; }

public decimal Value { get; init; }
}

public bool WriteAsJson { get; set; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ public enum SmartMeterValueType
TotalPurchasedEnergy,
TotalSoldEnergy,
CurrentPurchasingPower,
CurrentSellingPower
CurrentSellingPower,
GridPowerBalance
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ namespace CreativeCoders.SmartMeter.DataProcessing;

public class SmlValueProcessor : IObservable<SmartMeterValue>
{
private readonly ValueHistory _valueHistory;
private readonly Subject<SmartMeterValue> _valueSubject;
private readonly TimeProvider _timeProvider;

public SmlValueProcessor(IObservable<SmlValue> observable)
{
_valueSubject = new Subject<SmartMeterValue>();
private readonly ValueHistory _valueHistory = new ValueHistory();

private readonly Subject<SmartMeterValue> _valueSubject = new Subject<SmartMeterValue>();

_valueHistory = new ValueHistory();
public SmlValueProcessor(IObservable<SmlValue> observable, TimeProvider? timeProvider = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;

observable
.Select(Observable.Return)
Expand All @@ -27,7 +28,7 @@ private void ProcessValue(SmlValue smlValue)

CalcCurrentPower(smlValue, historyData);

var now = DateTimeOffset.Now;
var now = _timeProvider.GetUtcNow();

historyData.DataSets.Add(new ValueHistoryDataSet(smlValue) { TimeStamp = now });

Expand All @@ -39,7 +40,7 @@ private void ProcessValue(SmlValue smlValue)
return;
}

_valueSubject.OnNext(CreateTotalSmartMeterValue(smlValue));
_valueSubject.OnNext(CreateTotalSmartMeterValue(smlValue.ValueType, smlValue.Value));

historyData.LastValue = smlValue;
historyData.LastValueTimeStamp = now;
Expand All @@ -50,7 +51,7 @@ private void CalcCurrentPower(SmlValue smlValue, ValueHistoryData historyData)
foreach (var dataSet in historyData.DataSets)
{
var valueDiff = smlValue.Value - dataSet.Value.Value;
var timeDiff = DateTimeOffset.Now - dataSet.TimeStamp;
var timeDiff = _timeProvider.GetUtcNow() - dataSet.TimeStamp;

if (valueDiff > 10 || timeDiff > TimeSpan.FromSeconds(20))
{
Expand All @@ -61,13 +62,41 @@ private void CalcCurrentPower(SmlValue smlValue, ValueHistoryData historyData)
var value = (decimal)((double)valueDiff * mp);
value = Math.Round(value, 0);

_valueSubject.OnNext(CreateCurrentSmartMeterValue(smlValue.ValueType, value));
PushNewCurrentValue(CreateCurrentSmartMeterValue(smlValue.ValueType, value));

break;
}
}
}

private void PushNewCurrentValue(SmartMeterValue value)
{
_valueSubject.OnNext(value);

if (value.Value == 0)
{
return;
}

switch (value.Type)
{
case SmartMeterValueType.CurrentPurchasingPower:
_valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance)
{
Value = value.Value,
WriteAsJson = false
});
break;
case SmartMeterValueType.CurrentSellingPower:
_valueSubject.OnNext(new SmartMeterValue(SmartMeterValueType.GridPowerBalance)
{
Value = value.Value * -1,
WriteAsJson = false
});
break;
}
}

private static SmartMeterValue CreateCurrentSmartMeterValue(SmlValueType smlValueType, decimal value)
{
return smlValueType switch
Expand All @@ -84,19 +113,19 @@ private static SmartMeterValue CreateCurrentSmartMeterValue(SmlValueType smlValu
};
}

private static SmartMeterValue CreateTotalSmartMeterValue(SmlValue smlValue)
private static SmartMeterValue CreateTotalSmartMeterValue(SmlValueType smlValueType, decimal value)
{
return smlValue.ValueType switch
return smlValueType switch
{
SmlValueType.PurchasedEnergy => new SmartMeterValue(SmartMeterValueType.TotalPurchasedEnergy)
{
Value = smlValue.Value
Value = value
},
SmlValueType.SoldEnergy => new SmartMeterValue(SmartMeterValueType.TotalSoldEnergy)
{
Value = smlValue.Value
Value = value
},
_ => throw new ArgumentOutOfRangeException(nameof(smlValue.ValueType))
_ => throw new ArgumentOutOfRangeException(nameof(smlValueType))
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ public class ValueHistoryDataSet
{
public ValueHistoryDataSet(SmlValue value)
{
Value = Ensure.NotNull(value, nameof(value));
Value = Ensure.NotNull(value);
}

public DateTimeOffset TimeStamp { get; init; }

public SmlValue Value { get; }
}
}
Loading
Loading