diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 1ce12dda4..32ec98884 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -11,6 +11,7 @@ Current package versions: - Detect server-mode correctly on Valkey 8+ instances ([#3050 by @wipiano](https://github.com/StackExchange/StackExchange.Redis/pull/3050)) - Add Redis 8.8 stream negative acknowledgements (`XNACK`) ([#3058 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3058)) - Update experimental `GCRA` APIs and wire protocol terminology from "requests" to "tokens", to match server change ([#3051 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3051)) +- Add experimental `Aggregate.Count` support for sorted-set combination operations against Redis 8.8 ([#3059 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3059)) ## 2.12.14 diff --git a/src/StackExchange.Redis/Enums/Aggregate.cs b/src/StackExchange.Redis/Enums/Aggregate.cs index 41e1d435d..3e2aefc14 100644 --- a/src/StackExchange.Redis/Enums/Aggregate.cs +++ b/src/StackExchange.Redis/Enums/Aggregate.cs @@ -1,4 +1,7 @@ -namespace StackExchange.Redis +using System.Diagnostics.CodeAnalysis; +using RESPite; + +namespace StackExchange.Redis { /// /// Specifies how elements should be aggregated when combining sorted sets. @@ -19,5 +22,11 @@ public enum Aggregate /// The greatest value of the combined elements is used. /// Max, + + /// + /// The number of combined element scores is used. + /// + [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] + Count, } } diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 51060b92c..149cd3797 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2620,14 +2620,13 @@ IEnumerable SortedSetScan( /// /// The key of the stream. /// The name of the consumer group that received the message. - /// The name of the consumer releasing the message. /// The negative acknowledge mode to use. /// The ID of the message to negatively acknowledge. /// The flags to use for this operation. - /// The number of messages negatively acknowledged. + /// Returns the number of messages successfully NACKed as a resp integer, regardless of mode (SILENT, FAIL, or FATAL) or options (RETRYCOUNT, FORCE) specified. /// [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] - long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); + long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); /// /// Allow the consumer to release pending messages back to the group without marking them as correctly processed. @@ -2635,14 +2634,13 @@ IEnumerable SortedSetScan( /// /// The key of the stream. /// The name of the consumer group that received the messages. - /// The name of the consumer releasing the messages. /// The negative acknowledge mode to use. /// The IDs of the messages to negatively acknowledge. /// The flags to use for this operation. - /// The number of messages negatively acknowledged. + /// Returns the number of messages successfully NACKed as a resp integer, regardless of mode (SILENT, FAIL, or FATAL) or options (RETRYCOUNT, FORCE) specified. /// [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] - long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); /// /// Adds an entry using the specified values to the given stream key. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 0de5a37ba..af131135f 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -644,13 +644,13 @@ IAsyncEnumerable SortedSetScanAsync( Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 - /// + /// [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] - Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); + Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); - /// + /// [Experimental(Experiments.Server_8_8, UrlFormat = Experiments.UrlFormat)] - Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); /// Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index 093f3423b..cd8171f5a 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -609,11 +609,11 @@ public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, Redi public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags); - public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => - Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageId, flags); + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, mode, messageId, flags); - public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => - Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, consumerName, mode, messageIds, flags); + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledgeAsync(ToInner(key), groupName, mode, messageIds, flags); public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 7308ba568..78e3959d6 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -591,11 +591,11 @@ public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue grou public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags); - public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => - Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageId, flags); + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledge(ToInner(key), groupName, mode, messageId, flags); - public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => - Inner.StreamNegativeAcknowledge(ToInner(key), groupName, consumerName, mode, messageIds, flags); + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => + Inner.StreamNegativeAcknowledge(ToInner(key), groupName, mode, messageIds, flags); public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index e51ab1eb6..2d3c191fa 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -61,6 +61,7 @@ override StackExchange.Redis.SortedSetEntry.Equals(object? obj) -> bool override StackExchange.Redis.SortedSetEntry.GetHashCode() -> int override StackExchange.Redis.SortedSetEntry.ToString() -> string! StackExchange.Redis.Aggregate +[SER006]StackExchange.Redis.Aggregate.Count = 3 -> StackExchange.Redis.Aggregate StackExchange.Redis.Aggregate.Max = 2 -> StackExchange.Redis.Aggregate StackExchange.Redis.Aggregate.Min = 1 -> StackExchange.Redis.Aggregate StackExchange.Redis.Aggregate.Sum = 0 -> StackExchange.Redis.Aggregate @@ -2258,13 +2259,13 @@ static StackExchange.Redis.RedisChannel.KeySpaceSingleKey(in StackExchange.Redis [SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue [SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue [SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void -[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long -[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +[SER006]StackExchange.Redis.IDatabase.StreamNegativeAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long [SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! [SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! [SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER006]StackExchange.Redis.IDatabaseAsync.StreamNegativeAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamNackMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! [SER003]StackExchange.Redis.StreamConfiguration [SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long? [SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index ac3c14bcc..cdf4fc9af 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -4629,6 +4629,10 @@ private void AddWeightsAggregationAndScore(Span values, double[]? we values[i++] = RedisLiterals.AGGREGATE; values[i++] = RedisLiterals.MAX; break; + case Aggregate.Count: + values[i++] = RedisLiterals.AGGREGATE; + values[i++] = RedisLiterals.COUNT; + break; default: throw new ArgumentOutOfRangeException(nameof(aggregate)); } diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs index 01f89618b..1a40cd427 100644 --- a/src/StackExchange.Redis/RedisFeatures.cs +++ b/src/StackExchange.Redis/RedisFeatures.cs @@ -50,7 +50,7 @@ namespace StackExchange.Redis v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240 v8_4_0_rc1 = new Version(8, 3, 224), // 8.4 RC1 is version 8.3.224 v8_6_0 = new Version(8, 6, 0), - v8_8_0 = new Version(8, 8, 0); + v8_8_0 = new Version(8, 7, 225); // 8.8 is version 8.7.225 #pragma warning restore SA1310 // Field names should not contain underscore #pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 56e1973cc..3387d5552 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -193,6 +193,10 @@ public static readonly TimeSpanProcessor public static readonly HashEntryArrayProcessor HashEntryArray = new HashEntryArrayProcessor(); + // If the server reports max (i.e. FATAL), use int.MinValue as a similarly obviously bad value. + private static int ParseStreamDeliveryCount(long deliveryCount) + => deliveryCount == long.MaxValue ? int.MinValue : checked((int)deliveryCount); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822:Mark members as static", Justification = "Conditionally run on instance")] public void ConnectionFail(Message message, ConnectionFailureType fail, Exception? innerException, string? annotation, ConnectionMultiplexer? muxer) { @@ -2715,7 +2719,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes messageId: details.GetNext().AsRedisValue(), consumerName: details.GetNext().AsRedisValue(), idleTimeInMs: (long)details.GetNext().AsRedisValue(), - deliveryCount: (int)details.GetNext().AsRedisValue()); + deliveryCount: ParseStreamDeliveryCount((long)details.GetNext().AsRedisValue())); }); SetResult(message, messageInfoArray); @@ -2763,7 +2767,7 @@ protected static StreamEntry ParseRedisStreamEntry(in RawResult item) id: id, values: values, idleTime: TimeSpan.FromMilliseconds(idleTimeInMs), - deliveryCount: checked((int)deliveryCount)); + deliveryCount: ParseStreamDeliveryCount(deliveryCount)); } return new StreamEntry( id: id, diff --git a/src/StackExchange.Redis/StreamNackMessage.cs b/src/StackExchange.Redis/StreamNackMessage.cs index 7de38e613..991d7e76d 100644 --- a/src/StackExchange.Redis/StreamNackMessage.cs +++ b/src/StackExchange.Redis/StreamNackMessage.cs @@ -5,40 +5,37 @@ namespace StackExchange.Redis; internal partial class RedisDatabase { - public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) - => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageId, flags), ResultProcessor.Int64); + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) + => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, mode, messageId, flags), ResultProcessor.Int64); - public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) - => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageId, flags), ResultProcessor.Int64); + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) + => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, mode, messageId, flags), ResultProcessor.Int64); - public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) - => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageIds, flags), ResultProcessor.Int64); + public long StreamNegativeAcknowledge(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + => ExecuteSync(GetStreamNegativeAcknowledgeMessage(key, groupName, mode, messageIds, flags), ResultProcessor.Int64); - public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) - => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, consumerName, mode, messageIds, flags), ResultProcessor.Int64); + public Task StreamNegativeAcknowledgeAsync(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + => ExecuteAsync(GetStreamNegativeAcknowledgeMessage(key, groupName, mode, messageIds, flags), ResultProcessor.Int64); - private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue messageId, CommandFlags flags) - => new StreamNackMessageSingle(Database, flags, key, groupName, consumerName, mode, messageId); + private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue messageId, CommandFlags flags) + => new StreamNackMessageSingle(Database, flags, key, groupName, mode, messageId); - private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags) + private Message GetStreamNegativeAcknowledgeMessage(RedisKey key, RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds, CommandFlags flags) => messageIds is { Length: 1 } - ? new StreamNackMessageSingle(Database, flags, key, groupName, consumerName, mode, messageIds[0]) - : new StreamNackMessageMulti(Database, flags, key, groupName, consumerName, mode, messageIds); + ? new StreamNackMessageSingle(Database, flags, key, groupName, mode, messageIds[0]) + : new StreamNackMessageMulti(Database, flags, key, groupName, mode, messageIds); internal abstract class StreamNackMessageBase : Message.CommandKeyBase { private readonly RedisValue groupName; - private readonly RedisValue consumerName; private readonly StreamNackMode mode; - protected StreamNackMessageBase(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode) + protected StreamNackMessageBase(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, StreamNackMode mode) : base(db, flags, RedisCommand.XNACK, key) { groupName.AssertNotNull(); - consumerName.AssertNotNull(); this.groupName = groupName; - this.consumerName = consumerName; this.mode = mode; } @@ -51,7 +48,6 @@ protected override void WriteImpl(PhysicalConnection physical) physical.WriteHeader(Command, ArgCount); physical.Write(Key); physical.WriteBulkString(groupName); - physical.WriteBulkString(consumerName); WriteMode(physical); physical.WriteBulkString(StreamConstants.Ids); physical.WriteBulkString(Count); @@ -76,15 +72,15 @@ private void WriteMode(PhysicalConnection physical) } } - public override int ArgCount => 6 + Count; + public override int ArgCount => 5 + Count; } internal sealed class StreamNackMessageSingle : StreamNackMessageBase { private readonly RedisValue messageId; - public StreamNackMessageSingle(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode, in RedisValue messageId) - : base(db, flags, key, groupName, consumerName, mode) + public StreamNackMessageSingle(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, StreamNackMode mode, in RedisValue messageId) + : base(db, flags, key, groupName, mode) { messageId.AssertNotNull(); this.messageId = messageId; @@ -99,8 +95,8 @@ internal sealed class StreamNackMessageMulti : StreamNackMessageBase { private readonly RedisValue[] messageIds; - public StreamNackMessageMulti(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, in RedisValue consumerName, StreamNackMode mode, RedisValue[] messageIds) - : base(db, flags, key, groupName, consumerName, mode) + public StreamNackMessageMulti(int db, CommandFlags flags, in RedisKey key, in RedisValue groupName, StreamNackMode mode, RedisValue[] messageIds) + : base(db, flags, key, groupName, mode) { #if NET ArgumentNullException.ThrowIfNull(messageIds); diff --git a/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs b/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs index 7dcb39845..627ee311f 100644 --- a/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs +++ b/tests/StackExchange.Redis.Tests/GcraIntegrationTests.cs @@ -10,7 +10,7 @@ public class GcraIntegrationTests(ITestOutputHelper output, SharedConnectionFixt [Fact(Timeout = 5000)] public async Task GcraRateLimit_NonDefaultCount() { - await using var conn = Create(require: new Version(8, 8, 0)); + await using var conn = Create(require: RedisFeatures.v8_8_0); var db = conn.GetDatabase(); var key = Me(); db.KeyDelete(key, CommandFlags.FireAndForget); @@ -29,7 +29,7 @@ public async Task GcraRateLimit_SmokeTest() { await using var conn = Create(require: RedisFeatures.v8_8_0); var db = conn.GetDatabase(); - var key = Me(); + var key = Me() + Guid.NewGuid(); db.KeyDelete(key, CommandFlags.FireAndForget); for (int i = 0; i < 15; i++) { diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index 5563b5efc..f117f8c5f 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1033,16 +1033,16 @@ public void StreamAcknowledge_2() [Fact] public void StreamNegativeAcknowledge_1() { - prefixed.StreamNegativeAcknowledge("key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); - mock.Received().StreamNegativeAcknowledge("prefix:key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); + prefixed.StreamNegativeAcknowledge("key", "group", StreamNackMode.Fail, "0-0", CommandFlags.None); + mock.Received().StreamNegativeAcknowledge("prefix:key", "group", StreamNackMode.Fail, "0-0", CommandFlags.None); } [Fact] public void StreamNegativeAcknowledge_2() { var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" }; - prefixed.StreamNegativeAcknowledge("key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); - mock.Received().StreamNegativeAcknowledge("prefix:key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); + prefixed.StreamNegativeAcknowledge("key", "group", StreamNackMode.Fail, messageIds, CommandFlags.None); + mock.Received().StreamNegativeAcknowledge("prefix:key", "group", StreamNackMode.Fail, messageIds, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index 7adbaab2c..625eb022d 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -943,16 +943,16 @@ public async Task StreamAcknowledgeAsync_2() [Fact] public async Task StreamNegativeAcknowledgeAsync_1() { - await prefixed.StreamNegativeAcknowledgeAsync("key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); - await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", "consumer", StreamNackMode.Fail, "0-0", CommandFlags.None); + await prefixed.StreamNegativeAcknowledgeAsync("key", "group", StreamNackMode.Fail, "0-0", CommandFlags.None); + await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", StreamNackMode.Fail, "0-0", CommandFlags.None); } [Fact] public async Task StreamNegativeAcknowledgeAsync_2() { var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" }; - await prefixed.StreamNegativeAcknowledgeAsync("key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); - await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", "consumer", StreamNackMode.Fail, messageIds, CommandFlags.None); + await prefixed.StreamNegativeAcknowledgeAsync("key", "group", StreamNackMode.Fail, messageIds, CommandFlags.None); + await mock.Received().StreamNegativeAcknowledgeAsync("prefix:key", "group", StreamNackMode.Fail, messageIds, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/SortedSetTests.cs b/tests/StackExchange.Redis.Tests/SortedSetTests.cs index a6e6271ea..4dfa3be37 100644 --- a/tests/StackExchange.Redis.Tests/SortedSetTests.cs +++ b/tests/StackExchange.Redis.Tests/SortedSetTests.cs @@ -324,6 +324,48 @@ public async Task SortedSetIntersectionLengthAsync() Assert.Equal(3, inter); } + [Fact] + public async Task SortedSetCombineAggregateCount() + { + await using var conn = Create(require: RedisFeatures.v8_8_0); + + var db = conn.GetDatabase(); + var key1 = Me(); + db.KeyDelete(key1, CommandFlags.FireAndForget); + var key2 = Me() + "2"; + db.KeyDelete(key2, CommandFlags.FireAndForget); + var destination = Me() + "dest"; + db.KeyDelete(destination, CommandFlags.FireAndForget); + + db.SortedSetAdd(key1, entries); + db.SortedSetAdd(key2, entriesPow3); + + var inter = db.SortedSetCombineWithScores(SetOperation.Intersect, [key1, key2], aggregate: Aggregate.Count); + Assert.Equal(5, inter.Length); + Assert.Equal(new SortedSetEntry("a", 2), inter[0]); + Assert.Equal(new SortedSetEntry("c", 2), inter[1]); + Assert.Equal(new SortedSetEntry("e", 2), inter[2]); + Assert.Equal(new SortedSetEntry("g", 2), inter[3]); + Assert.Equal(new SortedSetEntry("i", 2), inter[4]); + + var union = db.SortedSetCombineWithScores(SetOperation.Union, [key1, key2], aggregate: Aggregate.Count); + Assert.Equal(10, union.Length); + Assert.Equal(new SortedSetEntry("b", 1), union[0]); + Assert.Equal(new SortedSetEntry("d", 1), union[1]); + Assert.Equal(new SortedSetEntry("f", 1), union[2]); + Assert.Equal(new SortedSetEntry("h", 1), union[3]); + Assert.Equal(new SortedSetEntry("j", 1), union[4]); + Assert.Equal(new SortedSetEntry("a", 2), union[5]); + Assert.Equal(new SortedSetEntry("c", 2), union[6]); + Assert.Equal(new SortedSetEntry("e", 2), union[7]); + Assert.Equal(new SortedSetEntry("g", 2), union[8]); + Assert.Equal(new SortedSetEntry("i", 2), union[9]); + + var stored = db.SortedSetCombineAndStore(SetOperation.Intersect, destination, [key1, key2], aggregate: Aggregate.Count); + Assert.Equal(5, stored); + Assert.Equal(inter, db.SortedSetRangeByRankWithScores(destination)); + } + [Fact] public async Task SortedSetRangeViaScript() { diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 9ac3812d4..c3a7e6206 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -996,23 +996,23 @@ public async Task StreamConsumerGroupNegativeAcknowledgeMessage(StreamNackMode m Assert.Equal(3, entries.Length); long oneNack = async - ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, id1) - : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, id1); + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, mode, id1) + : db.StreamNegativeAcknowledge(key, groupName, mode, id1); Assert.Equal(1, oneNack); long zeroNack = async - ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, notexist) - : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, notexist); + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, mode, notexist) + : db.StreamNegativeAcknowledge(key, groupName, mode, notexist); Assert.Equal(0, zeroNack); long oneArrayNack = async - ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, [id2]) - : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, [id2]); + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, mode, [id2]) + : db.StreamNegativeAcknowledge(key, groupName, mode, [id2]); Assert.Equal(1, oneArrayNack); long multiArrayNack = async - ? await db.StreamNegativeAcknowledgeAsync(key, groupName, consumer, mode, [id3, notexist]) - : db.StreamNegativeAcknowledge(key, groupName, consumer, mode, [id3, notexist]); + ? await db.StreamNegativeAcknowledgeAsync(key, groupName, mode, [id3, notexist]) + : db.StreamNegativeAcknowledge(key, groupName, mode, [id3, notexist]); Assert.Equal(1, multiArrayNack); var consumerPending = db.StreamPendingMessages(key, groupName, 10, consumer); @@ -1023,6 +1023,10 @@ public async Task StreamConsumerGroupNegativeAcknowledgeMessage(StreamNackMode m Assert.Contains(allPending, x => x.MessageId == id1 && x.ConsumerName.IsNullOrEmpty); Assert.Contains(allPending, x => x.MessageId == id2 && x.ConsumerName.IsNullOrEmpty); Assert.Contains(allPending, x => x.MessageId == id3 && x.ConsumerName.IsNullOrEmpty); + if (mode == StreamNackMode.Fatal) + { + Assert.All(allPending, x => Assert.Equal(int.MinValue, x.DeliveryCount)); + } } [Fact]