Skip to content

Commit

Permalink
feat(cluster): apply provided connection name to internal connections
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Apr 8, 2021
1 parent 81b9be0 commit 2e388db
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 7 deletions.
6 changes: 2 additions & 4 deletions lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { EventEmitter } from "events";
import ConnectionPool from "./ConnectionPool";
import { getNodeKey } from "./util";
import { getConnectionName, getNodeKey } from "./util";
import { sample, noop, Debug } from "../utils";
import Redis from "../redis";

const debug = Debug("cluster:subscriber");

const SUBSCRIBER_CONNECTION_NAME = "ioredisClusterSubscriber";

export default class ClusterSubscriber {
private started = false;
private subscriber: any = null;
Expand Down Expand Up @@ -81,7 +79,7 @@ export default class ClusterSubscriber {
username: options.username,
password: options.password,
enableReadyCheck: true,
connectionName: SUBSCRIBER_CONNECTION_NAME,
connectionName: getConnectionName("subscriber", options.connectionName),
lazyConnect: true,
tls: options.tls,
});
Expand Down
6 changes: 5 additions & 1 deletion lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
nodeKeyToRedisOptions,
groupSrvRecords,
weightSrvRecords,
getConnectionName,
} from "./util";
import ClusterSubscriber from "./ClusterSubscriber";
import DelayQueue from "./DelayQueue";
Expand Down Expand Up @@ -791,7 +792,10 @@ class Cluster extends EventEmitter {
enableOfflineQueue: true,
enableReadyCheck: false,
retryStrategy: null,
connectionName: "ioredisClusterRefresher",
connectionName: getConnectionName(
"refresher",
this.options.redisOptions && this.options.redisOptions.connectionName
),
});

// Ignore error events since we will handle
Expand Down
5 changes: 5 additions & 0 deletions lib/cluster/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,8 @@ export function weightSrvRecords(recordsGroup: ISrvRecordsGroup): SrvRecord {
}
}
}

export function getConnectionName(component, nodeConnectionName) {
const prefix = `ioredis-cluster(${component})`;
return nodeConnectionName ? `${prefix}:${nodeConnectionName}` : prefix;
}
42 changes: 42 additions & 0 deletions test/functional/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,46 @@ describe("ClusterSubscriber", () => {
subscriber.stop();
pool.reset([]);
});

it("sets correct connection name when connectionName is set", async () => {
const pool = new ConnectionPool({ connectionName: "test" });
const subscriber = new ClusterSubscriber(pool, new EventEmitter());

const clientNames = [];
new MockServer(30000, (argv) => {
if (argv[0] === "client" && argv[1] === "setname") {
clientNames.push(argv[2]);
}
});

pool.findOrCreate({ host: "127.0.0.1", port: 30000 });

subscriber.start();
await subscriber.getInstance().subscribe("foo");
subscriber.stop();
pool.reset([]);

expect(clientNames).to.eql(["ioredis-cluster(subscriber):test"]);
});

it("sets correct connection name when connectionName is absent", async () => {
const pool = new ConnectionPool({});
const subscriber = new ClusterSubscriber(pool, new EventEmitter());

const clientNames = [];
new MockServer(30000, (argv) => {
if (argv[0] === "client" && argv[1] === "setname") {
clientNames.push(argv[2]);
}
});

pool.findOrCreate({ host: "127.0.0.1", port: 30000 });

subscriber.start();
await subscriber.getInstance().subscribe("foo");
subscriber.stop();
pool.reset([]);

expect(clientNames).to.eql(["ioredis-cluster(subscriber)"]);
});
});
4 changes: 2 additions & 2 deletions test/functional/cluster/pub_sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe("cluster:pub/sub", function () {
const sub = new Cluster(options);

sub.subscribe("test cluster", function () {
node1.write(node1.findClientByName("ioredisClusterSubscriber"), [
node1.write(node1.findClientByName("ioredis-cluster(subscriber)"), [
"message",
"test channel",
"hi",
Expand Down Expand Up @@ -61,7 +61,7 @@ describe("cluster:pub/sub", function () {
}
if (argv[0] === "subscribe") {
expect(c.password).to.eql("abc");
expect(getConnectionName(c)).to.eql("ioredisClusterSubscriber");
expect(getConnectionName(c)).to.eql("ioredis-cluster(subscriber)");
}
if (argv[0] === "cluster" && argv[1] === "slots") {
return [[0, 16383, ["127.0.0.1", 30001]]];
Expand Down

0 comments on commit 2e388db

Please sign in to comment.