Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,9 @@ export default class RedisClient<
return new RedisCommandsQueue(
this.#options.RESP ?? DEFAULT_RESP,
this.#options.commandsQueueMaxLength,
(channel, listeners) => this.emit('sharded-channel-moved', channel, listeners),
(channel, listeners) => {
this.emit('sharded-channel-moved', channel, listeners);
},
Comment thread
cursor[bot] marked this conversation as resolved.
clientId
);
}
Expand Down
34 changes: 19 additions & 15 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export default class RedisClusterSlots<
this.clientSideCache = options.clientSideCache;
} else {
this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache)
}
}
}

this.#clientFactory = RedisClient.factory(this.#options);
Expand Down Expand Up @@ -946,20 +946,22 @@ export default class RedisClusterSlots<
}

async #initiateShardedPubSubClient(master: MasterNode<M, F, S, RESP, TYPE_MAPPING>) {
const client = this.#createClient(master, false)
.on('server-sunsubscribe', async (channel, listeners) => {
try {
await this.rediscover(client);
const redirectTo = await this.getShardedPubSubClient(channel);
await redirectTo.extendPubSubChannelListeners(
PUBSUB_TYPE.SHARDED,
channel,
listeners
);
} catch (err) {
this.#emit('sharded-shannel-moved-error', err, channel, listeners);
}
});
const client = this.#createClient(master, false);

client.on('sharded-channel-moved', async (channel, listeners) => {
try {
await this.rediscover(client);
const redirectTo = await this.getShardedPubSubClient(channel);
await redirectTo.extendPubSubChannelListeners(
PUBSUB_TYPE.SHARDED,
channel,
listeners
);
} catch (err) {
this.#emit('sharded-channel-moved-error', err, channel, listeners);
}
});


master.pubSub = {
client,
Expand All @@ -977,6 +979,8 @@ export default class RedisClusterSlots<
return master.pubSub.connectPromise!;
}



async executeShardedUnsubscribeCommand(
channel: string,
unsubscribe: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<void>
Expand Down
44 changes: 44 additions & 0 deletions packages/client/lib/cluster/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { strict as assert } from 'node:assert';
import { setTimeout } from 'node:timers/promises';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisCluster from '.';
import { SQUARE_SCRIPT } from '../client/index.spec';
Expand Down Expand Up @@ -422,6 +423,49 @@ describe('Cluster', () => {
serverArguments: [],
minimumDockerVersion: [7]
});

// Regression for #3311: subscribe FIRST, then migrate the slot in place.
// The server pushes SUNSUBSCRIBE to the already-subscribed client, which
// must drive the cluster to rediscover and reattach the listener on the
// new owner. The dead `server-sunsubscribe` listener meant this never
// happened and the subscription was silently lost after migration.
testUtils.testWithCluster('should resubscribe a sharded channel after in-place slot migration (#3311)', async cluster => {
const SLOT = 10328, // slot of `channel`
migrating = cluster.slots[SLOT].master,
importing = cluster.masters.find(master => master !== migrating)!,
[migratingClient, importingClient] = await Promise.all([
cluster.nodeClient(migrating),
cluster.nodeClient(importing)
]);

const listener = spy();

// subscribe BEFORE migration -> the sharded PubSub client attaches to `migrating`
await cluster.sSubscribe('channel', listener);

// move the slot in-place to `importing`; `migrating` loses the slot and
// pushes SUNSUBSCRIBE to the subscribed client
await Promise.all([
migratingClient.clusterDelSlots(SLOT),
importingClient.clusterDelSlots(SLOT),
importingClient.clusterAddSlots(SLOT),
migratingClient.clusterSetSlot(SLOT, 'NODE', importing.id),
importingClient.clusterSetSlot(SLOT, 'NODE', importing.id)
]);

// the reattach is async and sharded PubSub does not buffer, so
// publish until the resubscribed listener receives the message.
// With the bug this never reattaches and the loop exhausts -> assertion fails.
for (let i = 0; i < 50 && !listener.called; i++) {
await cluster.sPublish('channel', 'message');
await setTimeout(100);
}

assert.ok(listener.calledWithExactly('message', 'channel'));
}, {
serverArguments: [],
minimumDockerVersion: [7]
});
});

describe('clusterEvents', () => {
Expand Down