Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Current package versions:
## Unreleased

- Detect server-mode correctly on Valkey 8+ instances ([#3050 by @wipiano](https://github.com/StackExchange/StackExchange.Redis/pull/3050))
- Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#3058 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3058))
- Update experimental `GCRA` APIs and wire protocol terminology from "requests" to "tokens", to match server change ([#3051 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3051))

## 2.12.14
Expand Down
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ internal enum RedisCommand
XGROUP,
XINFO,
XLEN,
XNACK,
XPENDING,
XRANGE,
XREAD,
Expand Down Expand Up @@ -561,6 +562,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.XDEL:
case RedisCommand.XDELEX:
case RedisCommand.XGROUP:
case RedisCommand.XNACK:
case RedisCommand.XREADGROUP:
case RedisCommand.XTRIM:
return false;
Expand Down
26 changes: 26 additions & 0 deletions src/StackExchange.Redis/Enums/StreamNackMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Diagnostics.CodeAnalysis;
using RESPite;

namespace StackExchange.Redis;

/// <summary>
/// Determines how a stream message is negatively acknowledged back to the consumer group.
/// </summary>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
public enum StreamNackMode
{
/// <summary>
/// Release the message without counting it as an additional failure.
/// </summary>
Silent = 0,

/// <summary>
/// Release the message and treat it as a normal failed delivery.
/// </summary>
Fail = 1,

/// <summary>
/// Release the message and mark it as a terminal failure.
/// </summary>
Fatal = 2,
}
30 changes: 30 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2614,6 +2614,36 @@ IEnumerable<SortedSetEntry> SortedSetScan(
StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Allow the consumer to release a pending message back to the group without marking it as correctly processed.
/// Returns the number of messages negatively acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="consumerName">The name of the consumer releasing the message.</param>
/// <param name="mode">The negative acknowledge mode to use.</param>
/// <param name="messageId">The ID of the message to negatively acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages negatively acknowledged.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Allow the consumer to release pending messages back to the group without marking them as correctly processed.
/// Returns the number of messages negatively acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the messages.</param>
/// <param name="consumerName">The name of the consumer releasing the messages.</param>
/// <param name="mode">The negative acknowledge mode to use.</param>
/// <param name="messageIds">The IDs of the messages to negatively acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages negatively acknowledged.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
Expand Down
8 changes: 8 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,14 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<StreamTrimResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <inheritdoc cref="IDatabase.StreamNegativeAcknowledge(RedisKey, RedisValue, RedisValue, StreamNackMode, RedisValue, CommandFlags)"/>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamNegativeAcknowledge(RedisKey, RedisValue, RedisValue, StreamNackMode, RedisValue[], CommandFlags)"/>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);

Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,12 @@ public Task<StreamTrimResult> StreamAcknowledgeAndDeleteAsync(RedisKey key, Redi
public Task<StreamTrimResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags);

public Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageId, flags);

public Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageIds, flags);

public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,12 @@ public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue grou
public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags);

public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageId, flags);

public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageIds, flags);

public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);

Expand Down
8 changes: 8 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2258,9 +2258,13 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis
[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
[SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void
[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisValue>!
[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisValue>!
[SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
[SER003]StackExchange.Redis.StreamConfiguration
[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long?
[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void
Expand All @@ -2279,6 +2283,10 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis
[SER003]StackExchange.Redis.StreamInfo.IidsDuplicates.get -> long
[SER003]StackExchange.Redis.StreamInfo.IidsTracked.get -> long
[SER003]StackExchange.Redis.StreamInfo.PidsTracked.get -> long
[SER006]StackExchange.Redis.StreamNackMode
[SER006]StackExchange.Redis.StreamNackMode.Fail = 1 -> StackExchange.Redis.StreamNackMode
[SER006]StackExchange.Redis.StreamNackMode.Fatal = 2 -> StackExchange.Redis.StreamNackMode
[SER006]StackExchange.Redis.StreamNackMode.Silent = 0 -> StackExchange.Redis.StreamNackMode
StackExchange.Redis.StreamInfo.EntriesAdded.get -> long
StackExchange.Redis.StreamInfo.MaxDeletedEntryId.get -> StackExchange.Redis.RedisValue
StackExchange.Redis.StreamInfo.RecordedFirstEntryId.get -> StackExchange.Redis.RedisValue
Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/RedisFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ namespace StackExchange.Redis
v8_0_0_M04 = new Version(7, 9, 227), // 8.0 M04 is version 7.9.227
v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240
v8_4_0_rc1 = new Version(8, 3, 224), // 8.4 RC1 is version 8.3.224
v8_6_0 = new Version(8, 6, 0);
v8_6_0 = new Version(8, 6, 0),
v8_8_0 = new Version(8, 8, 0);

#pragma warning restore SA1310 // Field names should not contain underscore
#pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter
Expand Down
Loading
Loading