From 941d5e5297f0b58dd4ec9c4be0d0be5b25ead4e6 Mon Sep 17 00:00:00 2001 From: Matt Kemmerer Date: Tue, 13 Apr 2021 15:08:23 -0400 Subject: [PATCH] fix(cluster): autopipeline when prefix is used Previously the building of pipelines ignored the key prefix. It was possible that two keys, foo and bar, might be set into the same pipeline. However, after being prefixed by a configured "keyPrefix" value, they may no longer belong to the same pipeline. This led to the error: "All keys in the pipeline should belong to the same slots allocation group" Amended version of https://github.com/luin/ioredis/pull/1335/files This may fix #1264 and #1248. --- lib/autoPipelining.ts | 10 ++++++++- test/functional/cluster/autopipelining.ts | 25 +++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 238fe1149..8b160cb52 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -104,7 +104,15 @@ export function executeWithAutoPipelining( } // If we have slot information, we can improve routing by grouping slots served by the same subset of nodes - const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main'; + const slotKey = client.isCluster + ? client.slots[ + calculateSlot( + client.options.keyPrefix + ? `${client.options.keyPrefix}${args[0]}` + : args[0] + ) + ].join(",") + : "main"; if (!client._autoPipelines.has(slotKey)) { const pipeline = client.pipeline(); diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 02f933d64..a8cf347a8 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -35,6 +35,10 @@ describe("autoPipelining for cluster", function () { if (argv[0] === "get" && argv[1] === "foo6") { return "bar6"; } + + if (argv[0] === "get" && argv[1] === "baz:foo10") { + return "bar10"; + } }); new MockServer(30002, function (argv) { @@ -68,6 +72,10 @@ describe("autoPipelining for cluster", function () { return "bar5"; } + if (argv[0] === "get" && argv[1] === "baz:foo1") { + return "bar1"; + } + if (argv[0] === "evalsha") { return argv.slice(argv.length - 4); } @@ -177,6 +185,23 @@ describe("autoPipelining for cluster", function () { cluster.disconnect(); }); + it("should support building pipelines when a prefix is used", async () => { + const cluster = new Cluster(hosts, { + enableAutoPipelining: true, + keyPrefix: "baz:", + }); + await new Promise((resolve) => cluster.once("connect", resolve)); + + await cluster.set("foo1", "bar1"); + await cluster.set("foo10", "bar10"); + + expect( + await Promise.all([cluster.get("foo1"), cluster.get("foo10")]) + ).to.eql(["bar1", "bar10"]); + + cluster.disconnect(); + }); + it("should support commands queued after a pipeline is already queued for execution", (done) => { const cluster = new Cluster(hosts, { enableAutoPipelining: true });