diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 907a6b378c..ac58137281 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -150,7 +150,7 @@ export default class RedisClusterSlots< this.clientSideCache = options.clientSideCache; } else { this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache) - } + } } this.#clientFactory = RedisClient.factory(this.#options); @@ -946,20 +946,22 @@ export default class RedisClusterSlots< } async #initiateShardedPubSubClient(master: MasterNode) { - const client = this.#createClient(master, false) - .on('server-sunsubscribe', async (channel, listeners) => { - try { - await this.rediscover(client); - const redirectTo = await this.getShardedPubSubClient(channel); - await redirectTo.extendPubSubChannelListeners( - PUBSUB_TYPE.SHARDED, - channel, - listeners - ); - } catch (err) { - this.#emit('sharded-shannel-moved-error', err, channel, listeners); - } - }); + const client = this.#createClient(master, false); + + client.on('sharded-channel-moved', async (channel, listeners) => { + try { + await this.rediscover(client); + const redirectTo = await this.getShardedPubSubClient(channel); + await redirectTo.extendPubSubChannelListeners( + PUBSUB_TYPE.SHARDED, + channel, + listeners + ); + } catch (err) { + this.#emit('sharded-channel-moved-error', err, channel, listeners); + } + }); + master.pubSub = { client, @@ -977,6 +979,8 @@ export default class RedisClusterSlots< return master.pubSub.connectPromise!; } + + async executeShardedUnsubscribeCommand( channel: string, unsubscribe: (client: RedisClientType) => Promise diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index a71cd0f904..918ed05a32 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'node:assert'; +import { setTimeout } from 'node:timers/promises'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisCluster from '.'; import { SQUARE_SCRIPT } from '../client/index.spec'; @@ -422,6 +423,49 @@ describe('Cluster', () => { serverArguments: [], minimumDockerVersion: [7] }); + + // Regression for #3311: subscribe FIRST, then migrate the slot in place. + // The server pushes SUNSUBSCRIBE to the already-subscribed client, which + // must drive the cluster to rediscover and reattach the listener on the + // new owner. The dead `server-sunsubscribe` listener meant this never + // happened and the subscription was silently lost after migration. + testUtils.testWithCluster('should resubscribe a sharded channel after in-place slot migration (#3311)', async cluster => { + const SLOT = 10328, // slot of `channel` + migrating = cluster.slots[SLOT].master, + importing = cluster.masters.find(master => master !== migrating)!, + [migratingClient, importingClient] = await Promise.all([ + cluster.nodeClient(migrating), + cluster.nodeClient(importing) + ]); + + const listener = spy(); + + // subscribe BEFORE migration -> the sharded PubSub client attaches to `migrating` + await cluster.sSubscribe('channel', listener); + + // move the slot in-place to `importing`; `migrating` loses the slot and + // pushes SUNSUBSCRIBE to the subscribed client + await Promise.all([ + migratingClient.clusterDelSlots(SLOT), + importingClient.clusterDelSlots(SLOT), + importingClient.clusterAddSlots(SLOT), + migratingClient.clusterSetSlot(SLOT, 'NODE', importing.id), + importingClient.clusterSetSlot(SLOT, 'NODE', importing.id) + ]); + + // the reattach is async and sharded PubSub does not buffer, so + // publish until the resubscribed listener receives the message. + // With the bug this never reattaches and the loop exhausts -> assertion fails. + for (let i = 0; i < 50 && !listener.called; i++) { + await cluster.sPublish('channel', 'message'); + await setTimeout(100); + } + + assert.ok(listener.calledWithExactly('message', 'channel')); + }, { + serverArguments: [], + minimumDockerVersion: [7] + }); }); describe('clusterEvents', () => {