Skip to content

Commit

Permalink
fix isolationPool after reconnect (#2409)
Browse files Browse the repository at this point in the history
* fix #2406 - fix isolationPool after reconnect

* revert breaking change

* fix
  • Loading branch information
leibale authored Apr 26, 2023
1 parent 986a510 commit dc920d3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 20 deletions.
42 changes: 36 additions & 6 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,41 @@ describe('Client', () => {
}
});

testUtils.testWithClient('executeIsolated', async client => {
const id = await client.clientId(),
isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId());
assert.ok(id !== isolatedId);
}, GLOBAL.SERVERS.OPEN);
describe('isolationPool', () => {
testUtils.testWithClient('executeIsolated', async client => {
const id = await client.clientId(),
isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId());
assert.ok(id !== isolatedId);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('should be able to use pool even before connect', async client => {
await client.executeIsolated(() => Promise.resolve());
// make sure to destroy isolation pool
await client.connect();
await client.disconnect();
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});

testUtils.testWithClient('should work after reconnect (#2406)', async client => {
await client.disconnect();
await client.connect();
await client.executeIsolated(() => Promise.resolve());
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('should throw ClientClosedError after disconnect', async client => {
await client.connect();
await client.disconnect();
await assert.rejects(
client.executeIsolated(() => Promise.resolve()),
ClientClosedError
);
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});
});

async function killClient<
M extends RedisModules,
Expand Down Expand Up @@ -731,7 +761,7 @@ describe('Client', () => {
members.map<MemberTuple>(member => [member.value, member.score]).sort(sort)
);
}, GLOBAL.SERVERS.OPEN);

describe('PubSub', () => {
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
function assertStringListener(message: string, channel: string) {
Expand Down
37 changes: 23 additions & 14 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '.
import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { callbackify } from 'util';

export interface RedisClientOptions<
M extends RedisModules = RedisModules,
Expand Down Expand Up @@ -190,7 +189,7 @@ export default class RedisClient<
readonly #options?: RedisClientOptions<M, F, S>;
readonly #socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
readonly #isolationPool: Pool<RedisClientType<M, F, S>>;
#isolationPool?: Pool<RedisClientType<M, F, S>>;
readonly #v4: Record<string, any> = {};
#selectedDB = 0;

Expand Down Expand Up @@ -223,16 +222,9 @@ export default class RedisClient<
this.#options = this.#initiateOptions(options);
this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket();
this.#isolationPool = createPool({
create: async () => {
const duplicate = this.duplicate({
isolationPoolOptions: undefined
}).on('error', err => this.emit('error', err));
await duplicate.connect();
return duplicate;
},
destroy: client => client.disconnect()
}, options?.isolationPoolOptions);
// should be initiated in connect, not here
// TODO: consider breaking in v5
this.#isolationPool = this.#initiateIsolationPool();
this.#legacyMode();
}

Expand Down Expand Up @@ -337,6 +329,19 @@ export default class RedisClient<
.on('end', () => this.emit('end'));
}

#initiateIsolationPool() {
return createPool({
create: async () => {
const duplicate = this.duplicate({
isolationPoolOptions: undefined
}).on('error', err => this.emit('error', err));
await duplicate.connect();
return duplicate;
},
destroy: client => client.disconnect()
}, this.#options?.isolationPoolOptions);
}

#legacyMode(): void {
if (!this.#options?.legacyMode) return;

Expand Down Expand Up @@ -422,6 +427,8 @@ export default class RedisClient<
}

connect(): Promise<void> {
// see comment in constructor
this.#isolationPool ??= this.#initiateIsolationPool();
return this.#socket.connect();
}

Expand Down Expand Up @@ -704,6 +711,7 @@ export default class RedisClient<
}

executeIsolated<T>(fn: (client: RedisClientType<M, F, S>) => T | Promise<T>): Promise<T> {
if (!this.#isolationPool) return Promise.reject(new ClientClosedError());
return this.#isolationPool.use(fn);
}

Expand Down Expand Up @@ -802,8 +810,9 @@ export default class RedisClient<
}

async #destroyIsolationPool(): Promise<void> {
await this.#isolationPool.drain();
await this.#isolationPool.clear();
await this.#isolationPool!.drain();
await this.#isolationPool!.clear();
this.#isolationPool = undefined;
}

ref(): void {
Expand Down

0 comments on commit dc920d3

Please sign in to comment.