From 20aeb06dabe8643b2e428bba5e7f19ebb6d28f7d Mon Sep 17 00:00:00 2001 From: jasontaylordev Date: Wed, 3 Jun 2026 15:38:37 +1000 Subject: [PATCH] Do not return negative throughput when queue table deleted --- .../PostgreSqlQueryTests.cs | 73 ++++++++++++++++++- .../DatabaseDetails.cs | 19 +++-- .../PostgreSqlQuery.cs | 11 ++- .../QueueTableSnapshot.cs | 2 +- 4 files changed, 93 insertions(+), 12 deletions(-) diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs b/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs index bee96d7921..4de9c5c636 100644 --- a/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs +++ b/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs @@ -8,6 +8,7 @@ namespace ServiceControl.Transport.Tests; using System.Threading.Tasks; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Time.Testing; +using Npgsql; using NUnit.Framework; using Particular.Approvals; using Transports; @@ -113,7 +114,77 @@ public async Task RunScenario() reset.Set(); await runScenarioAndAdvanceTime.WaitAsync(token); - // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion. + // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion. Assert.That(total, Is.EqualTo(23)); } + + [Test] + public async Task NoNegativeThroughputWhenQueueTableIsDeletedBetweenSnapshots() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(50)); + CancellationToken token = cancellationTokenSource.Token; + var dictionary = new Dictionary + { + { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, configuration.ConnectionString } + }; + + await CreateTestQueue(transportSettings.EndpointName); + + query.Initialize(new ReadOnlyDictionary(dictionary)); + + var queueNames = new List(); + await foreach (IBrokerQueue queueName in query.GetQueueNames(token)) + { + queueNames.Add(queueName); + } + + IBrokerQueue queue = queueNames.Find(name => ((BrokerQueueTable)name).SanitizedName == transportSettings.EndpointName); + Assert.That(queue, Is.Not.Null); + + // Send messages so the start snapshot has a positive RowVersion + await SendAndReceiveMessages(transportSettings.EndpointName, 5); + + var brokerQueueTable = (BrokerQueueTable)queue; + int advanceCount = 0; + using var done = new ManualResetEventSlim(); + + var dropTableTask = Task.Run(async () => + { + while (!done.IsSet) + { + // Drop the table mid-way while GetThroughputPerDay is waiting for the next hour + if (advanceCount == 3) + { + await DropQueueTable(brokerQueueTable.QueueAddress.QualifiedTableName); + } + + provider.Advance(TimeSpan.FromHours(1)); + advanceCount++; + + // Pace advances to give GetThroughputPerDay time to process the iteration + // and register its next Task.Delay before we advance again + await Task.Delay(TimeSpan.FromMilliseconds(500), CancellationToken.None); + } + }, token); + + var throughputValues = new List(); + await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, new DateOnly(), token)) + { + throughputValues.Add(queueThroughput); + } + + done.Set(); + await dropTableTask.WaitAsync(token); + + Assert.That(throughputValues, Has.All.Matches(qt => qt.TotalThroughput >= 0)); + } + + async Task DropQueueTable(string qualifiedTableName) + { + await using var conn = new NpgsqlConnection(configuration.ConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"DROP TABLE IF EXISTS {qualifiedTableName} CASCADE;"; + await cmd.ExecuteNonQueryAsync(); + } } \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs b/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs index 556aba5a43..93474ae67c 100644 --- a/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs +++ b/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs @@ -97,14 +97,21 @@ public async Task GetSnapshot(BrokerQueueTable brokerQ { var table = new BrokerQueueTableSnapshot(brokerQueueTable); - await using var conn = await OpenConnectionAsync(cancellationToken); - await using var cmd = conn.CreateCommand(); - cmd.CommandText = $"select last_value from \"{table.SequenceName}\";"; - var value = await cmd.ExecuteScalarAsync(cancellationToken); + try + { + await using var conn = await OpenConnectionAsync(cancellationToken); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"select last_value from \"{table.SequenceName}\";"; + var value = await cmd.ExecuteScalarAsync(cancellationToken); - if (value is long longValue) + if (value is long longValue) + { + table.RowVersion = longValue; + } + } + catch (NpgsqlException ex) when (ex.SqlState == "42P01") { - table.RowVersion = longValue; + // Queue table has been deleted; RowVersion remains null } return table; diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs index 12ed41546b..7d50e07906 100644 --- a/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs +++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs @@ -50,11 +50,14 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro var endData = await queueTableName.DatabaseDetails.GetSnapshot(queueTableName, cancellationToken); - yield return new QueueThroughput + if (endData.RowVersion.HasValue && startData.RowVersion.HasValue) { - DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), - TotalThroughput = endData.RowVersion - startData.RowVersion - }; + yield return new QueueThroughput + { + DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), + TotalThroughput = endData.RowVersion.Value - startData.RowVersion.Value + }; + } startData = endData; } diff --git a/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs b/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs index ba4fc7180b..6098c606f9 100644 --- a/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs +++ b/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs @@ -2,5 +2,5 @@ namespace ServiceControl.Transports.PostgreSql; public class BrokerQueueTableSnapshot(BrokerQueueTable details) : BrokerQueueTable(details.DatabaseDetails, details.QueueAddress) { - public long RowVersion { get; set; } + public long? RowVersion { get; set; } } \ No newline at end of file