Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace ServiceControl.Transport.Tests;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Time.Testing;
using Npgsql;
using NUnit.Framework;
using Particular.Approvals;
using Transports;
Expand Down Expand Up @@ -113,7 +114,77 @@ public async Task RunScenario()
reset.Set();
await runScenarioAndAdvanceTime.WaitAsync(token);

// Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion.
// Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion.
Assert.That(total, Is.EqualTo(23));
}

[Test]
public async Task NoNegativeThroughputWhenQueueTableIsDeletedBetweenSnapshots()
{
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(50));
CancellationToken token = cancellationTokenSource.Token;
var dictionary = new Dictionary<string, string>
{
{ PostgreSqlQuery.PostgreSqlSettings.ConnectionString, configuration.ConnectionString }
};

await CreateTestQueue(transportSettings.EndpointName);

query.Initialize(new ReadOnlyDictionary<string, string>(dictionary));

var queueNames = new List<IBrokerQueue>();
await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
{
queueNames.Add(queueName);
}

IBrokerQueue queue = queueNames.Find(name => ((BrokerQueueTable)name).SanitizedName == transportSettings.EndpointName);
Assert.That(queue, Is.Not.Null);

// Send messages so the start snapshot has a positive RowVersion
await SendAndReceiveMessages(transportSettings.EndpointName, 5);

var brokerQueueTable = (BrokerQueueTable)queue;
int advanceCount = 0;
using var done = new ManualResetEventSlim();

var dropTableTask = Task.Run(async () =>
{
while (!done.IsSet)
{
// Drop the table mid-way while GetThroughputPerDay is waiting for the next hour
if (advanceCount == 3)
{
await DropQueueTable(brokerQueueTable.QueueAddress.QualifiedTableName);
}

provider.Advance(TimeSpan.FromHours(1));
advanceCount++;

// Pace advances to give GetThroughputPerDay time to process the iteration
// and register its next Task.Delay before we advance again
await Task.Delay(TimeSpan.FromMilliseconds(500), CancellationToken.None);
}
}, token);

var throughputValues = new List<QueueThroughput>();
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, new DateOnly(), token))
{
throughputValues.Add(queueThroughput);
}

done.Set();
await dropTableTask.WaitAsync(token);

Assert.That(throughputValues, Has.All.Matches<QueueThroughput>(qt => qt.TotalThroughput >= 0));
}

async Task DropQueueTable(string qualifiedTableName)
{
await using var conn = new NpgsqlConnection(configuration.ConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"DROP TABLE IF EXISTS {qualifiedTableName} CASCADE;";
await cmd.ExecuteNonQueryAsync();
}
}
19 changes: 13 additions & 6 deletions src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,21 @@ public async Task<BrokerQueueTableSnapshot> GetSnapshot(BrokerQueueTable brokerQ
{
var table = new BrokerQueueTableSnapshot(brokerQueueTable);

await using var conn = await OpenConnectionAsync(cancellationToken);
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"select last_value from \"{table.SequenceName}\";";
var value = await cmd.ExecuteScalarAsync(cancellationToken);
try
{
await using var conn = await OpenConnectionAsync(cancellationToken);
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"select last_value from \"{table.SequenceName}\";";
var value = await cmd.ExecuteScalarAsync(cancellationToken);

if (value is long longValue)
if (value is long longValue)
{
table.RowVersion = longValue;
}
}
catch (NpgsqlException ex) when (ex.SqlState == "42P01")
{
table.RowVersion = longValue;
// Queue table has been deleted; RowVersion remains null
}

return table;
Expand Down
11 changes: 7 additions & 4 deletions src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
var endData =
await queueTableName.DatabaseDetails.GetSnapshot(queueTableName, cancellationToken);

yield return new QueueThroughput
if (endData.RowVersion.HasValue && startData.RowVersion.HasValue)
{
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
TotalThroughput = endData.RowVersion - startData.RowVersion
};
yield return new QueueThroughput
{
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
TotalThroughput = endData.RowVersion.Value - startData.RowVersion.Value
};
}

startData = endData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ namespace ServiceControl.Transports.PostgreSql;

public class BrokerQueueTableSnapshot(BrokerQueueTable details) : BrokerQueueTable(details.DatabaseDetails, details.QueueAddress)
{
public long RowVersion { get; set; }
public long? RowVersion { get; set; }
}