From 73a3f64379f459155b6709261a66f25e4d7164e9 Mon Sep 17 00:00:00 2001 From: Aarti Sonigra <23amtics292@gmail.com> Date: Sat, 6 Jun 2026 18:59:25 +0530 Subject: [PATCH 1/6] fix(search): support exact match with params --- packages/search/lib/commands/SEARCH.spec.ts | 5 ++- packages/search/lib/commands/SEARCH.ts | 43 ++++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/packages/search/lib/commands/SEARCH.spec.ts b/packages/search/lib/commands/SEARCH.spec.ts index 758ac2a1b2..277d20788c 100644 --- a/packages/search/lib/commands/SEARCH.spec.ts +++ b/packages/search/lib/commands/SEARCH.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'node:assert'; + import testUtils, { GLOBAL } from '../test-utils'; import SEARCH from './SEARCH'; import { parseArgs } from '@redis/client/lib/commands/generic-transformers'; @@ -260,7 +261,9 @@ describe('FT.SEARCH', () => { number: 1 } }), - ['FT.SEARCH', 'index', 'query', 'PARAMS', '6', 'string', 'string', 'buffer', Buffer.from('buffer'), 'number', '1', 'DIALECT', DEFAULT_DIALECT] + + ['FT.SEARCH', 'index', 'query', 'PARAMS', '3', 'string', 'string', 'buffer', Buffer.from('buffer'), 'number', '1', 'DIALECT', DEFAULT_DIALECT] + ); }); diff --git a/packages/search/lib/commands/SEARCH.ts b/packages/search/lib/commands/SEARCH.ts index 8f8da9d9bc..fc4791e1e8 100644 --- a/packages/search/lib/commands/SEARCH.ts +++ b/packages/search/lib/commands/SEARCH.ts @@ -8,24 +8,37 @@ import { getMapValue, mapLikeToObject, mapLikeValues, parseDocumentValue, parseS export type FtSearchParams = Record; export function parseParamsArgument(parser: CommandParser, params?: FtSearchParams) { - if (params) { - parser.push('PARAMS'); - - const args: Array = []; - for (const key in params) { - if (!Object.hasOwn(params, key)) continue; - - const value = params[key]; - args.push( - key, - typeof value === 'number' ? value.toString() : value - ); - } - - parser.pushVariadicWithLength(args); + if (!params) return; + + parser.push('PARAMS'); + + // FT.SEARCH expects: PARAMS ... + // Where is the *number of pairs*. + // + // The previous implementation incorrectly used `pushVariadicWithLength(args)` + // which sets the length to the number of *arguments*, not the required + // number of pairs. This causes exact-match queries like `@field:"$x"` + // to bind parameters incorrectly on some Redis Stack/FT versions. + const pairArgs: Array = []; + let pairs = 0; + + for (const key in params) { + if (!Object.hasOwn(params, key)) continue; + + const value = params[key]; + pairArgs.push( + key, + typeof value === 'number' ? value.toString() : value + ); + pairs++; } + + // is the number of pairs, so it must be `pairs`. + parser.push(pairs.toString()); + parser.pushVariadic(pairArgs); } + export interface FtSearchOptions { VERBATIM?: boolean; NOSTOPWORDS?: boolean; From f9228b9eb0d499c8ee55a768b1b74c02ca394b9f Mon Sep 17 00:00:00 2001 From: Aarti Sonigra <23amtics292@gmail.com> Date: Thu, 18 Jun 2026 16:24:20 +0530 Subject: [PATCH 2/6] fix(cluster): resolve sharded pubsub migration event handling (#3311) --- packages/client/lib/client/index.ts | 4 +++ packages/client/lib/cluster/cluster-slots.ts | 31 +++++++++++--------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index ce04064068..cde71a2b66 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -748,6 +748,10 @@ export default class RedisClient< return new RedisCommandsQueue( this.#options.RESP ?? DEFAULT_RESP, this.#options.commandsQueueMaxLength, + (channel, listeners) => { + this.emit('sharded-channel-moved', channel, listeners); + this.emit('server-sunsubscribe', channel, listeners); + }, (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners), clientId ); diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 907a6b378c..3a5e93aae1 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -946,20 +946,21 @@ export default class RedisClusterSlots< } async #initiateShardedPubSubClient(master: MasterNode) { - const client = this.#createClient(master, false) - .on('server-sunsubscribe', async (channel, listeners) => { - try { - await this.rediscover(client); - const redirectTo = await this.getShardedPubSubClient(channel); - await redirectTo.extendPubSubChannelListeners( - PUBSUB_TYPE.SHARDED, - channel, - listeners - ); - } catch (err) { - this.#emit('sharded-shannel-moved-error', err, channel, listeners); - } - }); + const client = this.#createClient(master, false); + + client.on('server-sunsubscribe', async (channel, listeners) => { + try { + await this.rediscover(client); + const redirectTo = await this.getShardedPubSubClient(channel); + await redirectTo.extendPubSubChannelListeners( + PUBSUB_TYPE.SHARDED, + channel, + listeners + ); + } catch (err) { + this.#emit('sharded-channel-moved-error', err, channel, listeners); + } + }); master.pubSub = { client, @@ -977,6 +978,8 @@ export default class RedisClusterSlots< return master.pubSub.connectPromise!; } + + async executeShardedUnsubscribeCommand( channel: string, unsubscribe: (client: RedisClientType) => Promise From 41e2901af7d186b9a2fb78d5ee99e4be125479ff Mon Sep 17 00:00:00 2001 From: Aarti Sonigra <23amtics292@gmail.com> Date: Tue, 23 Jun 2026 16:49:30 +0530 Subject: [PATCH 3/6] Remove unrelated SEARCH changes --- packages/search/lib/commands/SEARCH.spec.ts | 5 +-- packages/search/lib/commands/SEARCH.ts | 43 +++++++-------------- 2 files changed, 16 insertions(+), 32 deletions(-) diff --git a/packages/search/lib/commands/SEARCH.spec.ts b/packages/search/lib/commands/SEARCH.spec.ts index 277d20788c..758ac2a1b2 100644 --- a/packages/search/lib/commands/SEARCH.spec.ts +++ b/packages/search/lib/commands/SEARCH.spec.ts @@ -1,5 +1,4 @@ import { strict as assert } from 'node:assert'; - import testUtils, { GLOBAL } from '../test-utils'; import SEARCH from './SEARCH'; import { parseArgs } from '@redis/client/lib/commands/generic-transformers'; @@ -261,9 +260,7 @@ describe('FT.SEARCH', () => { number: 1 } }), - - ['FT.SEARCH', 'index', 'query', 'PARAMS', '3', 'string', 'string', 'buffer', Buffer.from('buffer'), 'number', '1', 'DIALECT', DEFAULT_DIALECT] - + ['FT.SEARCH', 'index', 'query', 'PARAMS', '6', 'string', 'string', 'buffer', Buffer.from('buffer'), 'number', '1', 'DIALECT', DEFAULT_DIALECT] ); }); diff --git a/packages/search/lib/commands/SEARCH.ts b/packages/search/lib/commands/SEARCH.ts index fc4791e1e8..8f8da9d9bc 100644 --- a/packages/search/lib/commands/SEARCH.ts +++ b/packages/search/lib/commands/SEARCH.ts @@ -8,37 +8,24 @@ import { getMapValue, mapLikeToObject, mapLikeValues, parseDocumentValue, parseS export type FtSearchParams = Record; export function parseParamsArgument(parser: CommandParser, params?: FtSearchParams) { - if (!params) return; - - parser.push('PARAMS'); - - // FT.SEARCH expects: PARAMS ... - // Where is the *number of pairs*. - // - // The previous implementation incorrectly used `pushVariadicWithLength(args)` - // which sets the length to the number of *arguments*, not the required - // number of pairs. This causes exact-match queries like `@field:"$x"` - // to bind parameters incorrectly on some Redis Stack/FT versions. - const pairArgs: Array = []; - let pairs = 0; - - for (const key in params) { - if (!Object.hasOwn(params, key)) continue; - - const value = params[key]; - pairArgs.push( - key, - typeof value === 'number' ? value.toString() : value - ); - pairs++; - } + if (params) { + parser.push('PARAMS'); + + const args: Array = []; + for (const key in params) { + if (!Object.hasOwn(params, key)) continue; + + const value = params[key]; + args.push( + key, + typeof value === 'number' ? value.toString() : value + ); + } - // is the number of pairs, so it must be `pairs`. - parser.push(pairs.toString()); - parser.pushVariadic(pairArgs); + parser.pushVariadicWithLength(args); + } } - export interface FtSearchOptions { VERBATIM?: boolean; NOSTOPWORDS?: boolean; From 2a19ad9538f4cd92ff4e4aa96237bb46a285ffdf Mon Sep 17 00:00:00 2001 From: Aarti Sonigra <23amtics292@gmail.com> Date: Tue, 23 Jun 2026 17:15:00 +0530 Subject: [PATCH 4/6] Fix sharded pubsub migration event handling --- packages/client/lib/client/index.ts | 2 -- packages/client/lib/cluster/cluster-slots.ts | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index cde71a2b66..ac4449ad68 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -750,9 +750,7 @@ export default class RedisClient< this.#options.commandsQueueMaxLength, (channel, listeners) => { this.emit('sharded-channel-moved', channel, listeners); - this.emit('server-sunsubscribe', channel, listeners); }, - (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners), clientId ); } diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 3a5e93aae1..e9ed4c548e 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -948,7 +948,7 @@ export default class RedisClusterSlots< async #initiateShardedPubSubClient(master: MasterNode) { const client = this.#createClient(master, false); - client.on('server-sunsubscribe', async (channel, listeners) => { + client.on('sharded-channel-moved', async (channel, listeners) => { try { await this.rediscover(client); const redirectTo = await this.getShardedPubSubClient(channel); @@ -962,6 +962,7 @@ export default class RedisClusterSlots< } }); + master.pubSub = { client, connectPromise: client.connect() From 2b01ecf07973d85d7dc24bdcaf69180de11fe6e4 Mon Sep 17 00:00:00 2001 From: Aarti Sonigra <23amtics292@gmail.com> Date: Tue, 23 Jun 2026 18:09:50 +0530 Subject: [PATCH 5/6] Fix sharded pubsub resubscribe on in-place slot migration (#3311) --- packages/client/lib/cluster/cluster-slots.ts | 2 +- packages/client/lib/cluster/index.spec.ts | 44 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index e9ed4c548e..ac58137281 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -150,7 +150,7 @@ export default class RedisClusterSlots< this.clientSideCache = options.clientSideCache; } else { this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache) - } + } } this.#clientFactory = RedisClient.factory(this.#options); diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index a71cd0f904..918ed05a32 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'node:assert'; +import { setTimeout } from 'node:timers/promises'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisCluster from '.'; import { SQUARE_SCRIPT } from '../client/index.spec'; @@ -422,6 +423,49 @@ describe('Cluster', () => { serverArguments: [], minimumDockerVersion: [7] }); + + // Regression for #3311: subscribe FIRST, then migrate the slot in place. + // The server pushes SUNSUBSCRIBE to the already-subscribed client, which + // must drive the cluster to rediscover and reattach the listener on the + // new owner. The dead `server-sunsubscribe` listener meant this never + // happened and the subscription was silently lost after migration. + testUtils.testWithCluster('should resubscribe a sharded channel after in-place slot migration (#3311)', async cluster => { + const SLOT = 10328, // slot of `channel` + migrating = cluster.slots[SLOT].master, + importing = cluster.masters.find(master => master !== migrating)!, + [migratingClient, importingClient] = await Promise.all([ + cluster.nodeClient(migrating), + cluster.nodeClient(importing) + ]); + + const listener = spy(); + + // subscribe BEFORE migration -> the sharded PubSub client attaches to `migrating` + await cluster.sSubscribe('channel', listener); + + // move the slot in-place to `importing`; `migrating` loses the slot and + // pushes SUNSUBSCRIBE to the subscribed client + await Promise.all([ + migratingClient.clusterDelSlots(SLOT), + importingClient.clusterDelSlots(SLOT), + importingClient.clusterAddSlots(SLOT), + migratingClient.clusterSetSlot(SLOT, 'NODE', importing.id), + importingClient.clusterSetSlot(SLOT, 'NODE', importing.id) + ]); + + // the reattach is async and sharded PubSub does not buffer, so + // publish until the resubscribed listener receives the message. + // With the bug this never reattaches and the loop exhausts -> assertion fails. + for (let i = 0; i < 50 && !listener.called; i++) { + await cluster.sPublish('channel', 'message'); + await setTimeout(100); + } + + assert.ok(listener.calledWithExactly('message', 'channel')); + }, { + serverArguments: [], + minimumDockerVersion: [7] + }); }); describe('clusterEvents', () => { From 3b96e5437e0e834be7f83b2a28e6ffced3021ea7 Mon Sep 17 00:00:00 2001 From: Aarti Sonigra <23amtics292@gmail.com> Date: Tue, 23 Jun 2026 22:01:58 +0530 Subject: [PATCH 6/6] revert: remove index.ts changes --- packages/client/lib/client/index.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index ac4449ad68..ce04064068 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -748,9 +748,7 @@ export default class RedisClient< return new RedisCommandsQueue( this.#options.RESP ?? DEFAULT_RESP, this.#options.commandsQueueMaxLength, - (channel, listeners) => { - this.emit('sharded-channel-moved', channel, listeners); - }, + (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners), clientId ); }