diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index d75282b1..c237170f 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -28,7 +28,10 @@ function findAutoPipeline( } // We have slot information, we can improve routing by grouping slots served by the same subset of nodes - return client.slots[calculateSlot(args[0])].join(","); + const prefixedKey = client.options.keyPrefix + ? `${client.options.keyPrefix}${args[0]}` + : args[0]; + return client.slots[calculateSlot(prefixedKey)].join(","); } function executeAutoPipeline(client, slotKey: string) { diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 4039c9bf..455c8f05 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -30,6 +30,10 @@ describe("autoPipelining for cluster", function () { if (argv[0] === "get" && argv[1] === "foo6") { return "bar6"; } + + if (argv[0] === "get" && argv[1] === "baz:foo10") { + return "bar10"; + } }); new MockServer(30002, function (argv) { @@ -63,6 +67,10 @@ describe("autoPipelining for cluster", function () { return "bar5"; } + if (argv[0] === "get" && argv[1] === "baz:foo1") { + return "bar1"; + } + if (argv[0] === "evalsha") { return argv.slice(argv.length - 4); } @@ -172,6 +180,23 @@ describe("autoPipelining for cluster", function () { cluster.disconnect(); }); + it("should support building pipelines when a prefix is used", async () => { + const cluster = new Cluster(hosts, { + enableAutoPipelining: true, + keyPrefix: "baz:", + }); + await new Promise((resolve) => cluster.once("connect", resolve)); + + await cluster.set("foo1", "bar1"); + await cluster.set("foo10", "bar10"); + + expect( + await Promise.all([cluster.get("foo1"), cluster.get("foo10")]) + ).to.eql(["bar1", "bar10"]); + + cluster.disconnect(); + }); + it("should support commands queued after a pipeline is already queued for execution", (done) => { const cluster = new Cluster(hosts, { enableAutoPipelining: true });