diff --git a/CHANGELOG.md b/CHANGELOG.md index 11da86683d..bf77700860 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122)) * Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105)) * Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109)) * Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 83042ebdce..9c78351a34 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -34,7 +34,7 @@ import { GeoUnit, GeospatialData, InsertPosition, - KeyWeight, // eslint-disable-line @typescript-eslint/no-unused-vars + KeyWeight, LPosOptions, ListDirection, MemberOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars @@ -176,6 +176,7 @@ import { createXGroupDelConsumer, createXGroupDestroy, createXInfoConsumers, + createXInfoGroups, createXInfoStream, createXLen, createXPending, @@ -4405,6 +4406,45 @@ export class BaseClient { return this.createWritePromise(createXInfoConsumers(key, group)); } + /** + * Returns the list of all consumer groups and their attributes for the stream stored at `key`. + * + * @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details. + * + * @param key - The key of the stream. + * @returns An array of maps, where each mapping represents the + * attributes of a consumer group for the stream at `key`. + * @example + * ```typescript + *
{@code + * const result = await client.xinfoGroups("my_stream"); + * console.log(result); // Output: + * // [ + * // { + * // "name": "mygroup", + * // "consumers": 2, + * // "pending": 2, + * // "last-delivered-id": "1638126030001-0", + * // "entries-read": 2, // Added in version 7.0.0 + * // "lag": 0 // Added in version 7.0.0 + * // }, + * // { + * // "name": "some-other-group", + * // "consumers": 1, + * // "pending": 0, + * // "last-delivered-id": "0-0", + * // "entries-read": null, // Added in version 7.0.0 + * // "lag": 1 // Added in version 7.0.0 + * // } + * // ] + * ``` + */ + public async xinfoGroups( + key: string, + ): Promise[]> { + return this.createWritePromise(createXInfoGroups(key)); + } + /** * Changes the ownership of a pending message. * diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 0ed3310fe8..936d0eab73 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2453,6 +2453,11 @@ export function createXInfoStream( return createCommand(RequestType.XInfoStream, args); } +/** @internal */ +export function createXInfoGroups(key: string): command_request.Command { + return createCommand(RequestType.XInfoGroups, [key]); +} + /** * @internal */ diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index e83233746a..986aabff9b 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -212,6 +212,7 @@ import { createXGroupDelConsumer, createXGroupDestroy, createXInfoConsumers, + createXInfoGroups, createXInfoStream, createXLen, createXPending, @@ -2331,6 +2332,20 @@ export class BaseTransaction > { return this.addAndReturn(createXInfoStream(key, fullOptions ?? false)); } + /** + * Returns the list of all consumer groups and their attributes for the stream stored at `key`. + * + * @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details. + * + * @param key - The key of the stream. + * + * Command Response - An `Array` of `Records`, where each mapping represents the + * attributes of a consumer group for the stream at `key`. + */ + public xinfoGroups(key: string): T { + return this.addAndReturn(createXInfoGroups(key)); + } + /** Returns the server time. * @see {@link https://valkey.io/commands/time/|valkey.io} for details. * diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 4720c77d3c..49833cbd24 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -5570,7 +5570,6 @@ export function runBaseTests (config: { "last-entry": (string | number | string[])[]; groups: number; }; - console.log(result); // verify result: expect(result.length).toEqual(1); @@ -8232,6 +8231,197 @@ export function runBaseTests (config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xinfogroups xinfo groups %p`, + async (protocol) => { + await runTest(async (client: BaseClient, cluster) => { + const key = uuidv4(); + const stringKey = uuidv4(); + const groupName1 = uuidv4(); + const consumer1 = uuidv4(); + const streamId1 = "0-1"; + const streamId2 = "0-2"; + const streamId3 = "0-3"; + + expect( + await client.xgroupCreate(key, groupName1, "0-0", { + mkStream: true, + }), + ).toEqual("OK"); + + // one empty group exists + expect(await client.xinfoGroups(key)).toEqual( + cluster.checkIfServerVersionLessThan("7.0.0") + ? [ + { + name: groupName1, + consumers: 0, + pending: 0, + "last-delivered-id": "0-0", + }, + ] + : [ + { + name: groupName1, + consumers: 0, + pending: 0, + "last-delivered-id": "0-0", + "entries-read": null, + lag: 0, + }, + ], + ); + + expect( + await client.xadd( + key, + [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + { id: streamId1 }, + ), + ).toEqual(streamId1); + + expect( + await client.xadd( + key, + [ + ["entry2_field1", "entry2_value1"], + ["entry2_field2", "entry2_value2"], + ], + { id: streamId2 }, + ), + ).toEqual(streamId2); + + expect( + await client.xadd( + key, + [["entry3_field1", "entry3_value1"]], + { id: streamId3 }, + ), + ).toEqual(streamId3); + + // same as previous check, bug lag = 3, there are 3 messages unread + expect(await client.xinfoGroups(key)).toEqual( + cluster.checkIfServerVersionLessThan("7.0.0") + ? [ + { + name: groupName1, + consumers: 0, + pending: 0, + "last-delivered-id": "0-0", + }, + ] + : [ + { + name: groupName1, + consumers: 0, + pending: 0, + "last-delivered-id": "0-0", + "entries-read": null, + lag: 3, + }, + ], + ); + + expect( + await client.customCommand([ + "XREADGROUP", + "GROUP", + groupName1, + consumer1, + "STREAMS", + key, + ">", + ]), + ).toEqual({ + [key]: { + [streamId1]: [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + [streamId2]: [ + ["entry2_field1", "entry2_value1"], + ["entry2_field2", "entry2_value2"], + ], + [streamId3]: [["entry3_field1", "entry3_value1"]], + }, + }); + // after reading, `lag` is reset, and `pending`, consumer count and last ID are set + expect(await client.xinfoGroups(key)).toEqual( + cluster.checkIfServerVersionLessThan("7.0.0") + ? [ + { + name: groupName1, + consumers: 1, + pending: 3, + "last-delivered-id": streamId3, + }, + ] + : [ + { + name: groupName1, + consumers: 1, + pending: 3, + "last-delivered-id": streamId3, + "entries-read": 3, + lag: 0, + }, + ], + ); + + expect( + await client.customCommand([ + "XACK", + key, + groupName1, + streamId1, + ]), + ).toEqual(1); + // once message ack'ed, pending counter decreased + expect(await client.xinfoGroups(key)).toEqual( + cluster.checkIfServerVersionLessThan("7.0.0") + ? [ + { + name: groupName1, + consumers: 1, + pending: 2, + "last-delivered-id": streamId3, + }, + ] + : [ + { + name: groupName1, + consumers: 1, + pending: 2, + "last-delivered-id": streamId3, + "entries-read": 3, + lag: 0, + }, + ], + ); + + // key exists, but it is not a stream + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect(client.xinfoGroups(stringKey)).rejects.toThrow( + RequestError, + ); + + // Passing a non-existing key raises an error + const key2 = uuidv4(); + await expect(client.xinfoGroups(key2)).rejects.toThrow( + RequestError, + ); + // create a second stream + await client.xadd(key2, [["a", "b"]]); + // no group yet exists + expect(await client.xinfoGroups(key2)).toEqual([]); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `xpending test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 83ac24c825..29147cdac4 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -1101,6 +1101,8 @@ export async function transactionTest( 'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }', 1, ]); + baseTransaction.xinfoGroups(key9); + responseData.push(["xinfoGroups(key9)", []]); baseTransaction.xgroupCreate(key9, groupName1, "0-0"); responseData.push(['xgroupCreate(key9, groupName1, "0-0")', "OK"]); baseTransaction.xgroupCreate(key9, groupName2, "0-0", { mkStream: true });