diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md
index cfdeb81e0..1ce12dda4 100644
--- a/docs/ReleaseNotes.md
+++ b/docs/ReleaseNotes.md
@@ -9,6 +9,7 @@ Current package versions:
## Unreleased
- Detect server-mode correctly on Valkey 8+ instances ([#3050 by @wipiano](https://github.com/StackExchange/StackExchange.Redis/pull/3050))
+- Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#3058 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3058))
- Update experimental `GCRA` APIs and wire protocol terminology from "requests" to "tokens", to match server change ([#3051 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3051))
## 2.12.14
diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs
index 5a9ba4c66..4f294f46b 100644
--- a/src/StackExchange.Redis/Enums/RedisCommand.cs
+++ b/src/StackExchange.Redis/Enums/RedisCommand.cs
@@ -240,6 +240,7 @@ internal enum RedisCommand
XGROUP,
XINFO,
XLEN,
+ XNACK,
XPENDING,
XRANGE,
XREAD,
@@ -561,6 +562,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.XDEL:
case RedisCommand.XDELEX:
case RedisCommand.XGROUP:
+ case RedisCommand.XNACK:
case RedisCommand.XREADGROUP:
case RedisCommand.XTRIM:
return false;
diff --git a/src/StackExchange.Redis/Enums/StreamNackMode.cs b/src/StackExchange.Redis/Enums/StreamNackMode.cs
new file mode 100644
index 000000000..68a8530aa
--- /dev/null
+++ b/src/StackExchange.Redis/Enums/StreamNackMode.cs
@@ -0,0 +1,26 @@
+using System.Diagnostics.CodeAnalysis;
+using RESPite;
+
+namespace StackExchange.Redis;
+
+///
+/// Determines how a stream message is negatively acknowledged back to the consumer group.
+///
+[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
+public enum StreamNackMode
+{
+ ///
+ /// Release the message without counting it as an additional failure.
+ ///
+ Silent = 0,
+
+ ///
+ /// Release the message and treat it as a normal failed delivery.
+ ///
+ Fail = 1,
+
+ ///
+ /// Release the message and mark it as a terminal failure.
+ ///
+ Fatal = 2,
+}
diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs
index d100e396f..51060b92c 100644
--- a/src/StackExchange.Redis/Interfaces/IDatabase.cs
+++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs
@@ -2614,6 +2614,36 @@ IEnumerable SortedSetScan(
StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026
+ ///
+ /// Allow the consumer to release a pending message back to the group without marking it as correctly processed.
+ /// Returns the number of messages negatively acknowledged.
+ ///
+ /// The key of the stream.
+ /// The name of the consumer group that received the message.
+ /// The name of the consumer releasing the message.
+ /// The negative acknowledge mode to use.
+ /// The ID of the message to negatively acknowledge.
+ /// The flags to use for this operation.
+ /// The number of messages negatively acknowledged.
+ ///
+ [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
+ long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
+
+ ///
+ /// Allow the consumer to release pending messages back to the group without marking them as correctly processed.
+ /// Returns the number of messages negatively acknowledged.
+ ///
+ /// The key of the stream.
+ /// The name of the consumer group that received the messages.
+ /// The name of the consumer releasing the messages.
+ /// The negative acknowledge mode to use.
+ /// The IDs of the messages to negatively acknowledge.
+ /// The flags to use for this operation.
+ /// The number of messages negatively acknowledged.
+ ///
+ [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
+ long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
+
///
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
index 43046e1e0..0de5a37ba 100644
--- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
+++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
@@ -644,6 +644,14 @@ IAsyncEnumerable SortedSetScanAsync(
Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026
+ ///
+ [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
+ Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
+
+ ///
+ [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
+ Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
+
///
Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
index 21ede4b36..093f3423b 100644
--- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
+++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
@@ -609,6 +609,12 @@ public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, Redi
public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags);
+ public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageId, flags);
+
+ public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageIds, flags);
+
public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs
index 25d721d15..7308ba568 100644
--- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs
+++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs
@@ -591,6 +591,12 @@ public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue grou
public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags);
+ public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageId, flags);
+
+ public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageIds, flags);
+
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
index 963b9582f..e51ab1eb6 100644
--- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
+++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
@@ -2258,9 +2258,13 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis
[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
[SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void
+[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
+[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
[SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
+[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
+[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
[SER003]StackExchange.Redis.StreamConfiguration
[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long?
[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void
@@ -2279,6 +2283,10 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis
[SER003]StackExchange.Redis.StreamInfo.IidsDuplicates.get -> long
[SER003]StackExchange.Redis.StreamInfo.IidsTracked.get -> long
[SER003]StackExchange.Redis.StreamInfo.PidsTracked.get -> long
+[SER006]StackExchange.Redis.StreamNackMode
+[SER006]StackExchange.Redis.StreamNackMode.Fail = 1 -> StackExchange.Redis.StreamNackMode
+[SER006]StackExchange.Redis.StreamNackMode.Fatal = 2 -> StackExchange.Redis.StreamNackMode
+[SER006]StackExchange.Redis.StreamNackMode.Silent = 0 -> StackExchange.Redis.StreamNackMode
StackExchange.Redis.StreamInfo.EntriesAdded.get -> long
StackExchange.Redis.StreamInfo.MaxDeletedEntryId.get -> StackExchange.Redis.RedisValue
StackExchange.Redis.StreamInfo.RecordedFirstEntryId.get -> StackExchange.Redis.RedisValue
diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs
index d185089e6..01f89618b 100644
--- a/src/StackExchange.Redis/RedisFeatures.cs
+++ b/src/StackExchange.Redis/RedisFeatures.cs
@@ -49,7 +49,8 @@ namespace StackExchange.Redis
v8_0_0_M04 = new Version(7, 9, 227), // 8.0 M04 is version 7.9.227
v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240
v8_4_0_rc1 = new Version(8, 3, 224), // 8.4 RC1 is version 8.3.224
- v8_6_0 = new Version(8, 6, 0);
+ v8_6_0 = new Version(8, 6, 0),
+ v8_8_0 = new Version(8, 8, 0);
#pragma warning restore SA1310 // Field names should not contain underscore
#pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter
diff --git a/src/StackExchange.Redis/StreamNackMessage.cs b/src/StackExchange.Redis/StreamNackMessage.cs
new file mode 100644
index 000000000..7de38e613
--- /dev/null
+++ b/src/StackExchange.Redis/StreamNackMessage.cs
@@ -0,0 +1,130 @@
+using System;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+internal partial class RedisDatabase
+{
+ public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None)
+ => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageId, flags), ResultProcessor.Int64);
+
+ public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None)
+ => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageId, flags), ResultProcessor.Int64);
+
+ public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
+ => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageIds, flags), ResultProcessor.Int64);
+
+ public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None)
+ => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageIds, flags), ResultProcessor.Int64);
+
+ private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags)
+ => new StreamNackMessageSingle(Database, flags, key, groupName, consumerName, mode, messageId);
+
+ private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags)
+ => messageIds is { Length: 1 }
+ ? new StreamNackMessageSingle(Database, flags, key, groupName, consumerName, mode, messageIds[0])
+ : new StreamNackMessageMulti(Database, flags, key, groupName, consumerName, mode, messageIds);
+
+ internal abstract class StreamNackMessageBase : Message.CommandKeyBase
+ {
+ private readonly RedisValue groupName;
+ private readonly RedisValue consumerName;
+ private readonly StreamNackMode mode;
+
+ protected StreamNackMessageBase(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode)
+ : base(db, flags, RedisCommand.XNACK, key)
+ {
+ groupName.AssertNotNull();
+ consumerName.AssertNotNull();
+
+ this.groupName = groupName;
+ this.consumerName = consumerName;
+ this.mode = mode;
+ }
+
+ protected abstract int Count { get; }
+
+ protected abstract void WriteIds(PhysicalConnection physical);
+
+ protected override void WriteImpl(PhysicalConnection physical)
+ {
+ physical.WriteHeader(Command, ArgCount);
+ physical.Write(Key);
+ physical.WriteBulkString(groupName);
+ physical.WriteBulkString(consumerName);
+ WriteMode(physical);
+ physical.WriteBulkString(StreamConstants.Ids);
+ physical.WriteBulkString(Count);
+ WriteIds(physical);
+ }
+
+ private void WriteMode(PhysicalConnection physical)
+ {
+ switch (mode)
+ {
+ case StreamNackMode.Silent:
+ physical.WriteBulkString("SILENT"u8);
+ break;
+ case StreamNackMode.Fail:
+ physical.WriteBulkString("FAIL"u8);
+ break;
+ case StreamNackMode.Fatal:
+ physical.WriteBulkString("FATAL"u8);
+ break;
+ default:
+ throw new ArgumentOutOfRangeException(nameof(mode));
+ }
+ }
+
+ public override int ArgCount => 6 + Count;
+ }
+
+ internal sealed class StreamNackMessageSingle : StreamNackMessageBase
+ {
+ private readonly RedisValue messageId;
+
+ public StreamNackMessageSingle(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode, in RedisValue messageId)
+ : base(db, flags, key, groupName, consumerName, mode)
+ {
+ messageId.AssertNotNull();
+ this.messageId = messageId;
+ }
+
+ protected override int Count => 1;
+
+ protected override void WriteIds(PhysicalConnection physical) => physical.WriteBulkString(messageId);
+ }
+
+ internal sealed class StreamNackMessageMulti : StreamNackMessageBase
+ {
+ private readonly RedisValue[] messageIds;
+
+ public StreamNackMessageMulti(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds)
+ : base(db, flags, key, groupName, consumerName, mode)
+ {
+#if NET
+ ArgumentNullException.ThrowIfNull(messageIds);
+#else
+ if (messageIds == null) throw new ArgumentNullException(nameof(messageIds));
+#endif
+ if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item.");
+
+ for (int i = 0; i < messageIds.Length; i++)
+ {
+ messageIds[i].AssertNotNull();
+ }
+
+ this.messageIds = messageIds;
+ }
+
+ protected override int Count => messageIds.Length;
+
+ protected override void WriteIds(PhysicalConnection physical)
+ {
+ for (int i = 0; i < messageIds.Length; i++)
+ {
+ physical.WriteBulkString(messageIds[i]);
+ }
+ }
+ }
+}
diff --git a/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs b/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs
index 636371832..7dcb39845 100644
--- a/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs
+++ b/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs
@@ -27,7 +27,7 @@ public async Task GcraRateLimit_NonDefaultCount()
[Fact(Timeout = 5000)]
public async Task GcraRateLimit_SmokeTest()
{
- await using var conn = Create(require: new Version(8, 8, 0));
+ await using var conn = Create(require: RedisFeatures.v8_8_0);
var db = conn.GetDatabase();
var key = Me();
db.KeyDelete(key, CommandFlags.FireAndForget);
diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs
index 0b781123c..5563b5efc 100644
--- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs
+++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs
@@ -1030,6 +1030,21 @@ public void StreamAcknowledge_2()
mock.Received().StreamAcknowledge("prefix:key", "group", messageIds, CommandFlags.None);
}
+ [Fact]
+ public void StreamNegativeAcknowledge_1()
+ {
+ prefixed.StreamNegativeAcknowledge("key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None);
+ mock.Received().StreamNegativeAcknowledge("prefix:key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None);
+ }
+
+ [Fact]
+ public void StreamNegativeAcknowledge_2()
+ {
+ var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" };
+ prefixed.StreamNegativeAcknowledge("key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None);
+ mock.Received().StreamNegativeAcknowledge("prefix:key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None);
+ }
+
[Fact]
public void StreamAdd_1()
{
diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs
index 94b54e112..7adbaab2c 100644
--- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs
+++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs
@@ -940,6 +940,21 @@ public async Task StreamAcknowledgeAsync_2()
await mock.Received().StreamAcknowledgeAsync("prefix:key", "group", messageIds, CommandFlags.None);
}
+ [Fact]
+ public async Task StreamNegativeAcknowledgeAsync_1()
+ {
+ await prefixed.StreamNegativeAcknowledgeAsync("key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None);
+ await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None);
+ }
+
+ [Fact]
+ public async Task StreamNegativeAcknowledgeAsync_2()
+ {
+ var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" };
+ await prefixed.StreamNegativeAcknowledgeAsync("key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None);
+ await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None);
+ }
+
[Fact]
public async Task StreamAddAsync_1()
{
diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs
index 7e625d399..9ac3812d4 100644
--- a/tests/StackExchange.Redis.Tests/StreamTests.cs
+++ b/tests/StackExchange.Redis.Tests/StreamTests.cs
@@ -968,6 +968,63 @@ public void StreamConsumerGroupAcknowledgeAndDeleteMessage(StreamTrimMode mode)
Assert.Equal(id2, notAcknowledged[0].Id);
}
+ [Theory]
+ [InlineData(StreamNackMode.Silent, false)]
+ [InlineData(StreamNackMode.Silent, true)]
+ [InlineData(StreamNackMode.Fail, false)]
+ [InlineData(StreamNackMode.Fail, true)]
+ [InlineData(StreamNackMode.Fatal, false)]
+ [InlineData(StreamNackMode.Fatal, true)]
+ public async Task StreamConsumerGroupNegativeAcknowledgeMessage(StreamNackMode mode, bool async)
+ {
+ await using var conn = Create(require: RedisFeatures.v8_8_0);
+
+ var db = conn.GetDatabase();
+ var key = Me() + ":" + mode + ":" + async;
+ await db.KeyDeleteAsync(key, CommandFlags.FireAndForget);
+ const string groupName = "test_group",
+ consumer = "test_consumer";
+
+ var id1 = db.StreamAdd(key, "field1", "value1");
+ var id2 = db.StreamAdd(key, "field2", "value2");
+ var id3 = db.StreamAdd(key, "field3", "value3");
+ RedisValue notexist = "0-0";
+
+ db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning, flags: CommandFlags.FireAndForget);
+
+ var entries = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages);
+ Assert.Equal(3, entries.Length);
+
+ long oneNack = async
+ ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, id1)
+ : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, id1);
+ Assert.Equal(1, oneNack);
+
+ long zeroNack = async
+ ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, notexist)
+ : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, notexist);
+ Assert.Equal(0, zeroNack);
+
+ long oneArrayNack = async
+ ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, [id2])
+ : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, [id2]);
+ Assert.Equal(1, oneArrayNack);
+
+ long multiArrayNack = async
+ ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, [id3, notexist])
+ : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, [id3, notexist]);
+ Assert.Equal(1, multiArrayNack);
+
+ var consumerPending = db.StreamPendingMessages(key, groupName, 10, consumer);
+ Assert.Empty(consumerPending);
+
+ var allPending = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null);
+ Assert.Equal(3, allPending.Length);
+ Assert.Contains(allPending, x => x.MessageId == id1 && x.ConsumerName.IsNullOrEmpty);
+ Assert.Contains(allPending, x => x.MessageId == id2 && x.ConsumerName.IsNullOrEmpty);
+ Assert.Contains(allPending, x => x.MessageId == id3 && x.ConsumerName.IsNullOrEmpty);
+ }
+
[Fact]
public async Task StreamConsumerGroupClaimMessages()
{