From 018e13209737197e5082113102eb2ef4db035939 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 21 Apr 2026 13:16:43 +0100 Subject: [PATCH 1/2] propose XNACK for 8.8 --- docs/ReleaseNotes.md | 1 + src/StackExchange.Redis/Enums/RedisCommand.cs | 2 + .../Enums/StreamNackMode.cs | 26 ++++ .../Interfaces/IDatabase.cs | 30 ++++ .../Interfaces/IDatabaseAsync.cs | 8 ++ .../KeyspaceIsolation/KeyPrefixed.cs | 6 + .../KeyspaceIsolation/KeyPrefixedDatabase.cs | 6 + .../PublicAPI/PublicAPI.Shipped.txt | 8 ++ src/StackExchange.Redis/RedisFeatures.cs | 3 +- src/StackExchange.Redis/StreamNackMessage.cs | 130 ++++++++++++++++++ .../GcraIntegrationTests.cs | 2 +- .../KeyPrefixedDatabaseTests.cs | 15 ++ .../KeyPrefixedTests.cs | 15 ++ .../StackExchange.Redis.Tests/StreamTests.cs | 57 ++++++++ 14 files changed, 307 insertions(+), 2 deletions(-) create mode 100644 src/StackExchange.Redis/Enums/StreamNackMode.cs create mode 100644 src/StackExchange.Redis/StreamNackMessage.cs diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 83abf5a60..b5ea74329 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -9,6 +9,7 @@ Current package versions: ## Unreleased - Detect server-mode correctly on Valkey 8+ instances ([#3050 by @wipiano](https://github.com/StackExchange/StackExchange.Redis/pull/3050)) +- Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#xxxx by @mgravell](https://github.com/StackExchange/StackExchange.Redis/issues/xxxx)) ## 2.12.14 diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index 5a9ba4c66..4f294f46b 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -240,6 +240,7 @@ internal enum RedisCommand XGROUP, XINFO, XLEN, + XNACK, XPENDING, XRANGE, XREAD, @@ -561,6 +562,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command) case RedisCommand.XDEL: case RedisCommand.XDELEX: case RedisCommand.XGROUP: + case RedisCommand.XNACK: case RedisCommand.XREADGROUP: case RedisCommand.XTRIM: return false; diff --git a/src/StackExchange.Redis/Enums/StreamNackMode.cs b/src/StackExchange.Redis/Enums/StreamNackMode.cs new file mode 100644 index 000000000..68a8530aa --- /dev/null +++ b/src/StackExchange.Redis/Enums/StreamNackMode.cs @@ -0,0 +1,26 @@ +using System.Diagnostics.CodeAnalysis; +using RESPite; + +namespace StackExchange.Redis; + +/// +/// Determines how a stream message is negatively acknowledged back to the consumer group. +/// +[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] +public enum StreamNackMode +{ + /// + /// Release the message without counting it as an additional failure. + /// + Silent = 0, + + /// + /// Release the message and treat it as a normal failed delivery. + /// + Fail = 1, + + /// + /// Release the message and mark it as a terminal failure. + /// + Fatal = 2, +} diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 776741cfa..69ce36476 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2614,6 +2614,36 @@ IEnumerable SortedSetScan( StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 + /// + /// Allow the consumer to release a pending message back to the group without marking it as correctly processed. + /// Returns the number of messages negatively acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// The name of the consumer releasing the message. + /// The negative acknowledge mode to use. + /// The ID of the message to negatively acknowledge. + /// The flags to use for this operation. + /// The number of messages negatively acknowledged. + /// + [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] + long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); + + /// + /// Allow the consumer to release pending messages back to the group without marking them as correctly processed. + /// Returns the number of messages negatively acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the messages. + /// The name of the consumer releasing the messages. + /// The negative acknowledge mode to use. + /// The IDs of the messages to negatively acknowledge. + /// The flags to use for this operation. + /// The number of messages negatively acknowledged. + /// + [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] + long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + /// /// Adds an entry using the specified values to the given stream key. /// If key does not exist, a new key holding a stream is created. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index b8ac17777..e9c133c27 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -644,6 +644,14 @@ IAsyncEnumerable SortedSetScanAsync( Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 + /// + [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] + Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); + + /// + [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] + Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + /// Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index d3314ab7a..de5ce027b 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -609,6 +609,12 @@ public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, Redi public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags); + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageId, flags); + + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageIds, flags); + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 7b98255a3..89d25d538 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -591,6 +591,12 @@ public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue grou public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags); + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageId, flags); + + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageIds, flags); + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 56a9437ba..55f7e7107 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -2258,9 +2258,13 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis [SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue [SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue [SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void +[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long [SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! [SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! [SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! [SER003]StackExchange.Redis.StreamConfiguration [SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long? [SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void @@ -2279,6 +2283,10 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis [SER003]StackExchange.Redis.StreamInfo.IidsDuplicates.get -> long [SER003]StackExchange.Redis.StreamInfo.IidsTracked.get -> long [SER003]StackExchange.Redis.StreamInfo.PidsTracked.get -> long +[SER006]StackExchange.Redis.StreamNackMode +[SER006]StackExchange.Redis.StreamNackMode.Fail = 1 -> StackExchange.Redis.StreamNackMode +[SER006]StackExchange.Redis.StreamNackMode.Fatal = 2 -> StackExchange.Redis.StreamNackMode +[SER006]StackExchange.Redis.StreamNackMode.Silent = 0 -> StackExchange.Redis.StreamNackMode StackExchange.Redis.StreamInfo.EntriesAdded.get -> long StackExchange.Redis.StreamInfo.MaxDeletedEntryId.get -> StackExchange.Redis.RedisValue StackExchange.Redis.StreamInfo.RecordedFirstEntryId.get -> StackExchange.Redis.RedisValue diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs index d185089e6..01f89618b 100644 --- a/src/StackExchange.Redis/RedisFeatures.cs +++ b/src/StackExchange.Redis/RedisFeatures.cs @@ -49,7 +49,8 @@ namespace StackExchange.Redis v8_0_0_M04 = new Version(7, 9, 227), // 8.0 M04 is version 7.9.227 v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240 v8_4_0_rc1 = new Version(8, 3, 224), // 8.4 RC1 is version 8.3.224 - v8_6_0 = new Version(8, 6, 0); + v8_6_0 = new Version(8, 6, 0), + v8_8_0 = new Version(8, 8, 0); #pragma warning restore SA1310 // Field names should not contain underscore #pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter diff --git a/src/StackExchange.Redis/StreamNackMessage.cs b/src/StackExchange.Redis/StreamNackMessage.cs new file mode 100644 index 000000000..7de38e613 --- /dev/null +++ b/src/StackExchange.Redis/StreamNackMessage.cs @@ -0,0 +1,130 @@ +using System; +using System.Threading.Tasks; + +namespace StackExchange.Redis; + +internal partial class RedisDatabase +{ + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) + => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageId, flags), ResultProcessor.Int64); + + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) + => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageId, flags), ResultProcessor.Int64); + + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageIds, flags), ResultProcessor.Int64); + + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageIds, flags), ResultProcessor.Int64); + + private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags) + => new StreamNackMessageSingle(Database, flags, key, groupName, consumerName, mode, messageId); + + private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags) + => messageIds is { Length: 1 } + ? new StreamNackMessageSingle(Database, flags, key, groupName, consumerName, mode, messageIds[0]) + : new StreamNackMessageMulti(Database, flags, key, groupName, consumerName, mode, messageIds); + + internal abstract class StreamNackMessageBase : Message.CommandKeyBase + { + private readonly RedisValue groupName; + private readonly RedisValue consumerName; + private readonly StreamNackMode mode; + + protected StreamNackMessageBase(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode) + : base(db, flags, RedisCommand.XNACK, key) + { + groupName.AssertNotNull(); + consumerName.AssertNotNull(); + + this.groupName = groupName; + this.consumerName = consumerName; + this.mode = mode; + } + + protected abstract int Count { get; } + + protected abstract void WriteIds(PhysicalConnection physical); + + protected override void WriteImpl(PhysicalConnection physical) + { + physical.WriteHeader(Command, ArgCount); + physical.Write(Key); + physical.WriteBulkString(groupName); + physical.WriteBulkString(consumerName); + WriteMode(physical); + physical.WriteBulkString(StreamConstants.Ids); + physical.WriteBulkString(Count); + WriteIds(physical); + } + + private void WriteMode(PhysicalConnection physical) + { + switch (mode) + { + case StreamNackMode.Silent: + physical.WriteBulkString("SILENT"u8); + break; + case StreamNackMode.Fail: + physical.WriteBulkString("FAIL"u8); + break; + case StreamNackMode.Fatal: + physical.WriteBulkString("FATAL"u8); + break; + default: + throw new ArgumentOutOfRangeException(nameof(mode)); + } + } + + public override int ArgCount => 6 + Count; + } + + internal sealed class StreamNackMessageSingle : StreamNackMessageBase + { + private readonly RedisValue messageId; + + public StreamNackMessageSingle(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode, in RedisValue messageId) + : base(db, flags, key, groupName, consumerName, mode) + { + messageId.AssertNotNull(); + this.messageId = messageId; + } + + protected override int Count => 1; + + protected override void WriteIds(PhysicalConnection physical) => physical.WriteBulkString(messageId); + } + + internal sealed class StreamNackMessageMulti : StreamNackMessageBase + { + private readonly RedisValue[] messageIds; + + public StreamNackMessageMulti(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds) + : base(db, flags, key, groupName, consumerName, mode) + { +#if NET + ArgumentNullException.ThrowIfNull(messageIds); +#else + if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); +#endif + if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); + + for (int i = 0; i < messageIds.Length; i++) + { + messageIds[i].AssertNotNull(); + } + + this.messageIds = messageIds; + } + + protected override int Count => messageIds.Length; + + protected override void WriteIds(PhysicalConnection physical) + { + for (int i = 0; i < messageIds.Length; i++) + { + physical.WriteBulkString(messageIds[i]); + } + } + } +} diff --git a/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs b/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs index acbef7e20..9464c4bd6 100644 --- a/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs +++ b/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs @@ -10,7 +10,7 @@ public class GcraIntegrationTests(ITestOutputHelper output, SharedConnectionFixt [Fact(Timeout = 5000)] public async Task GcraRateLimit_SmokeTest() { - await using var conn = Create(require: new Version(8, 8, 0)); + await using var conn = Create(require: RedisFeatures.v8_8_0); var db = conn.GetDatabase(); var key = Me(); db.KeyDelete(key, CommandFlags.FireAndForget); diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index 0b781123c..5563b5efc 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1030,6 +1030,21 @@ public void StreamAcknowledge_2() mock.Received().StreamAcknowledge("prefix:key", "group", messageIds, CommandFlags.None); } + [Fact] + public void StreamNegativeAcknowledge_1() + { + prefixed.StreamNegativeAcknowledge("key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); + mock.Received().StreamNegativeAcknowledge("prefix:key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); + } + + [Fact] + public void StreamNegativeAcknowledge_2() + { + var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" }; + prefixed.StreamNegativeAcknowledge("key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); + mock.Received().StreamNegativeAcknowledge("prefix:key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); + } + [Fact] public void StreamAdd_1() { diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index 94b54e112..7adbaab2c 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -940,6 +940,21 @@ public async Task StreamAcknowledgeAsync_2() await mock.Received().StreamAcknowledgeAsync("prefix:key", "group", messageIds, CommandFlags.None); } + [Fact] + public async Task StreamNegativeAcknowledgeAsync_1() + { + await prefixed.StreamNegativeAcknowledgeAsync("key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); + await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); + } + + [Fact] + public async Task StreamNegativeAcknowledgeAsync_2() + { + var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" }; + await prefixed.StreamNegativeAcknowledgeAsync("key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); + await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); + } + [Fact] public async Task StreamAddAsync_1() { diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 7e625d399..9ac3812d4 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -968,6 +968,63 @@ public void StreamConsumerGroupAcknowledgeAndDeleteMessage(StreamTrimMode mode) Assert.Equal(id2, notAcknowledged[0].Id); } + [Theory] + [InlineData(StreamNackMode.Silent, false)] + [InlineData(StreamNackMode.Silent, true)] + [InlineData(StreamNackMode.Fail, false)] + [InlineData(StreamNackMode.Fail, true)] + [InlineData(StreamNackMode.Fatal, false)] + [InlineData(StreamNackMode.Fatal, true)] + public async Task StreamConsumerGroupNegativeAcknowledgeMessage(StreamNackMode mode, bool async) + { + await using var conn = Create(require: RedisFeatures.v8_8_0); + + var db = conn.GetDatabase(); + var key = Me() + ":" + mode + ":" + async; + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + const string groupName = "test_group", + consumer = "test_consumer"; + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + RedisValue notexist = "0-0"; + + db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning, flags: CommandFlags.FireAndForget); + + var entries = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages); + Assert.Equal(3, entries.Length); + + long oneNack = async + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, id1) + : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, id1); + Assert.Equal(1, oneNack); + + long zeroNack = async + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, notexist) + : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, notexist); + Assert.Equal(0, zeroNack); + + long oneArrayNack = async + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, [id2]) + : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, [id2]); + Assert.Equal(1, oneArrayNack); + + long multiArrayNack = async + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, [id3, notexist]) + : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, [id3, notexist]); + Assert.Equal(1, multiArrayNack); + + var consumerPending = db.StreamPendingMessages(key, groupName, 10, consumer); + Assert.Empty(consumerPending); + + var allPending = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null); + Assert.Equal(3, allPending.Length); + Assert.Contains(allPending, x => x.MessageId == id1 && x.ConsumerName.IsNullOrEmpty); + Assert.Contains(allPending, x => x.MessageId == id2 && x.ConsumerName.IsNullOrEmpty); + Assert.Contains(allPending, x => x.MessageId == id3 && x.ConsumerName.IsNullOrEmpty); + } + [Fact] public async Task StreamConsumerGroupClaimMessages() { From 48423c9f07467189b5ac1dcb49dc250d47c12108 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 21 Apr 2026 13:20:07 +0100 Subject: [PATCH 2/2] PR number --- docs/ReleaseNotes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 6be767228..1ce12dda4 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -9,7 +9,7 @@ Current package versions: ## Unreleased - Detect server-mode correctly on Valkey 8+ instances ([#3050 by @wipiano](https://github.com/StackExchange/StackExchange.Redis/pull/3050)) -- Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#xxxx by @mgravell](https://github.com/StackExchange/StackExchange.Redis/issues/xxxx)) +- Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#3058 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3058)) - Update experimental `GCRA` APIs and wire protocol terminology from "requests" to "tokens", to match server change ([#3051 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3051)) ## 2.12.14