Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Current package versions:
- Detect server-mode correctly on Valkey 8+ instances ([#3050 by @wipiano](https://github.com/StackExchange/StackExchange.Redis/pull/3050))
- Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#3058 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3058))
- Update experimental `GCRA` APIs and wire protocol terminology from "requests" to "tokens", to match server change ([#3051 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3051))
- Add experimental `Aggregate.Count` support for sorted-set combination operations against Redis 8.8 ([#3059 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3059))

## 2.12.14

Expand Down
11 changes: 10 additions & 1 deletion src/StackExchange.Redis/Enums/Aggregate.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace StackExchange.Redis
using System.Diagnostics.CodeAnalysis;
using RESPite;

namespace StackExchange.Redis
{
/// <summary>
/// Specifies how elements should be aggregated when combining sorted sets.
Expand All @@ -19,5 +22,11 @@ public enum Aggregate
/// The greatest value of the combined elements is used.
/// </summary>
Max,

/// <summary>
/// The number of combined element scores is used.
/// </summary>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
Count,
}
}
10 changes: 4 additions & 6 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2620,29 +2620,27 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="consumerName">The name of the consumer releasing the message.</param>
/// <param name="mode">The negative acknowledge mode to use.</param>
/// <param name="messageId">The ID of the message to negatively acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages negatively acknowledged.</returns>
/// <returns>Returns the number of messages successfully NACKed as a resp integer, regardless of mode (SILENT, FAIL, or FATAL) or options (RETRYCOUNT, FORCE) specified.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Allow the consumer to release pending messages back to the group without marking them as correctly processed.
/// Returns the number of messages negatively acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the messages.</param>
/// <param name="consumerName">The name of the consumer releasing the messages.</param>
/// <param name="mode">The negative acknowledge mode to use.</param>
/// <param name="messageIds">The IDs of the messages to negatively acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages negatively acknowledged.</returns>
/// <returns>Returns the number of messages successfully NACKed as a resp integer, regardless of mode (SILENT, FAIL, or FATAL) or options (RETRYCOUNT, FORCE) specified.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Adds an entry using the specified values to the given stream key.
Expand Down
8 changes: 4 additions & 4 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,13 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<StreamTrimResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <inheritdoc cref="IDatabase.StreamNegativeAcknowledge(RedisKey, RedisValue, RedisValue, StreamNackMode, RedisValue, CommandFlags)"/>
/// <inheritdoc cref="IDatabase.StreamNegativeAcknowledge(RedisKey, RedisValue, StreamNackMode, RedisValue, CommandFlags)"/>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamNegativeAcknowledge(RedisKey, RedisValue, RedisValue, StreamNackMode, RedisValue[], CommandFlags)"/>
/// <inheritdoc cref="IDatabase.StreamNegativeAcknowledge(RedisKey, RedisValue, StreamNackMode, RedisValue[], CommandFlags)"/>
[Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)]
Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
Expand Down
8 changes: 4 additions & 4 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,11 @@ public Task<StreamTrimResult> StreamAcknowledgeAndDeleteAsync(RedisKey key, Redi
public Task<StreamTrimResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags);

public Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageId, flags);
public Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, mode, messageId, flags);

public Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageIds, flags);
public Task<long> StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, mode, messageIds, flags);

public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,11 @@ public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue grou
public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags);

public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageId, flags);
public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledge(ToInner(key), groupName, mode, messageId, flags);

public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageIds, flags);
public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
Inner.StreamNegativeAcknowledge(ToInner(key), groupName, mode, messageIds, flags);

public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
Expand Down
9 changes: 5 additions & 4 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ override StackExchange.Redis.SortedSetEntry.Equals(object? obj) -> bool
override StackExchange.Redis.SortedSetEntry.GetHashCode() -> int
override StackExchange.Redis.SortedSetEntry.ToString() -> string!
StackExchange.Redis.Aggregate
[SER006]StackExchange.Redis.Aggregate.Count = 3 -> StackExchange.Redis.Aggregate
StackExchange.Redis.Aggregate.Max = 2 -> StackExchange.Redis.Aggregate
StackExchange.Redis.Aggregate.Min = 1 -> StackExchange.Redis.Aggregate
StackExchange.Redis.Aggregate.Sum = 0 -> StackExchange.Redis.Aggregate
Expand Down Expand Up @@ -2258,13 +2259,13 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis
[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
[SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void
[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisValue>!
[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisValue>!
[SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
[SER003]StackExchange.Redis.StreamConfiguration
[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long?
[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void
Expand Down
4 changes: 4 additions & 0 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4629,6 +4629,10 @@ private void AddWeightsAggregationAndScore(Span<RedisValue> values, double[]? we
values[i++] = RedisLiterals.AGGREGATE;
values[i++] = RedisLiterals.MAX;
break;
case Aggregate.Count:
values[i++] = RedisLiterals.AGGREGATE;
values[i++] = RedisLiterals.COUNT;
break;
default:
throw new ArgumentOutOfRangeException(nameof(aggregate));
}
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/RedisFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace StackExchange.Redis
v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240
v8_4_0_rc1 = new Version(8, 3, 224), // 8.4 RC1 is version 8.3.224
v8_6_0 = new Version(8, 6, 0),
v8_8_0 = new Version(8, 8, 0);
v8_8_0 = new Version(8, 7, 225); // 8.8 is version 8.7.225

#pragma warning restore SA1310 // Field names should not contain underscore
#pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter
Expand Down
8 changes: 6 additions & 2 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public static readonly TimeSpanProcessor
public static readonly HashEntryArrayProcessor
HashEntryArray = new HashEntryArrayProcessor();

// If the server reports max (i.e. FATAL), use int.MinValue as a similarly obviously bad value.
private static int ParseStreamDeliveryCount(long deliveryCount)
=> deliveryCount == long.MaxValue ? int.MinValue : checked((int)deliveryCount);

[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822:Mark members as static", Justification = "Conditionally run on instance")]
public void ConnectionFail(Message message, ConnectionFailureType fail, Exception? innerException, string? annotation, ConnectionMultiplexer? muxer)
{
Expand Down Expand Up @@ -2715,7 +2719,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
messageId: details.GetNext().AsRedisValue(),
consumerName: details.GetNext().AsRedisValue(),
idleTimeInMs: (long)details.GetNext().AsRedisValue(),
deliveryCount: (int)details.GetNext().AsRedisValue());
deliveryCount: ParseStreamDeliveryCount((long)details.GetNext().AsRedisValue()));
});

SetResult(message, messageInfoArray);
Expand Down Expand Up @@ -2763,7 +2767,7 @@ protected static StreamEntry ParseRedisStreamEntry(in RawResult item)
id: id,
values: values,
idleTime: TimeSpan.FromMilliseconds(idleTimeInMs),
deliveryCount: checked((int)deliveryCount));
deliveryCount: ParseStreamDeliveryCount(deliveryCount));
}
return new StreamEntry(
id: id,
Expand Down
Loading
Loading