Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add command XGROUP SETID #2135

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
* Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))
* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))
* Node: Added XGROUP SETID command ([#2135]((https://github.com/valkey-io/valkey-glide/pull/2135))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
35 changes: 35 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ import {
createZScore,
createZUnion,
createZUnionStore,
createXGroupSetid,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -5214,6 +5215,40 @@ export class BaseClient {
return this.createWritePromise(createXAck(key, group, ids));
}

/**
* Sets the last delivered ID for a consumer group.
*
* @see {@link https://valkey.io/commands/xgroup-setid|valkey.io} for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param id - The stream entry ID that should be set as the last delivered ID for the consumer
* group.
tjzhang-BQ marked this conversation as resolved.
Show resolved Hide resolved
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used.
* @returns `"OK"`.
*
* * @example
* ```typescript
* console.log(await client.xgroupSetId("mystream", "mygroup", "0", 1L)); // Output is "OK"
* ```
*/
public async xgroupSetId(
key: string,
groupName: string,
id: string,
entriesRead?: number,
decoder?: Decoder,
): Promise<"OK"> {
return this.createWritePromise(
createXGroupSetid(key, groupName, id, entriesRead),
{
decoder: decoder,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String decoder should be hardcoded there, because command returns OK. No need to add decoder arg

},
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add string decoder there

}

/** Returns the element at index `index` in the list stored at `key`.
* The index is zero-based, so 0 means the first element, 1 the second element and so on.
* Negative indices can be used to designate elements starting at the tail of the list.
Expand Down
19 changes: 19 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3953,3 +3953,22 @@ export function createXAck(
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}

/**
* @internal
*/
export function createXGroupSetid(
key: string,
groupName: string,
id: string,
entriesRead?: number,
): command_request.Command {
const args = [key, groupName, id];

if (entriesRead !== undefined) {
args.push("ENTRIESREAD");
args.push(entriesRead.toString());
}

return createCommand(RequestType.XGroupSetId, args);
}
25 changes: 25 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ import {
createZScore,
createZUnion,
createZUnionStore,
createXGroupSetid,
} from "./Commands";
import { command_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -2909,6 +2910,30 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXAck(key, group, ids));
}

/**
* Sets the last delivered ID for a consumer group.
*
* @see {@link https://valkey.io/commands/xgroup-setid|valkey.io} for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param id - The stream entry ID that should be set as the last delivered ID for the consumer group.
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
*
* Command Response - `"OK"`.
*/
public xgroupSetId(
key: string,
groupName: string,
id: string,
entriesRead?: number,
): T {
return this.addAndReturn(
createXGroupSetid(key, groupName, id, entriesRead),
);
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
102 changes: 102 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9502,6 +9502,108 @@ export function runBaseTests(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xgroupSetId test %p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
const key = "testKey" + uuidv4();
const nonExistingKey = "group" + uuidv4();
const stringKey = "testKey" + uuidv4();
const groupName = uuidv4();
const consumerName = uuidv4();
const streamid0 = "0";
const streamid1_0 = "1-0";
const streamid1_1 = "1-1";
const streamid1_2 = "1-2";

// Setup: Create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries List
expect(
await client.xadd(key, [["f0", "v0"]], { id: streamid1_0 }),
).toBe(streamid1_0);
expect(
await client.xadd(key, [["f1", "v1"]], { id: streamid1_1 }),
).toBe(streamid1_1);
expect(
await client.xadd(key, [["f2", "v2"]], { id: streamid1_2 }),
).toBe(streamid1_2);

expect(
await client.xgroupCreate(key, groupName, streamid0),
).toBe("OK");

expect(
await client.xreadgroup(groupName, consumerName, {
[key]: ">",
}),
).toEqual({
[key]: {
[streamid1_0]: [["f0", "v0"]],
[streamid1_1]: [["f1", "v1"]],
[streamid1_2]: [["f2", "v2"]],
},
});

// Sanity check: xreadgroup should not return more entries since they're all already in the
// Pending Entries List.
expect(
await client.xreadgroup(groupName, consumerName, {
[key]: ">",
}),
).toBeNull();

// Reset the last delivered ID for the consumer group to "1-1"
if (cluster.checkIfServerVersionLessThan("7.0.0")) {
expect(
await client.xgroupSetId(key, groupName, streamid1_1),
).toBe("OK");
} else {
expect(
await client.xgroupSetId(
key,
groupName,
streamid1_1,
1,
),
).toBe("OK");
}

// xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
const newResult = await client.xreadgroup(
groupName,
consumerName,
{ [key]: ">" },
);
expect(newResult).toEqual({
[key]: {
[streamid1_2]: [["f2", "v2"]],
},
});

// An error is raised if XGROUP SETID is called with a non-existing key
await expect(
client.xgroupSetId(nonExistingKey, groupName, streamid0),
).rejects.toThrow(RequestError);

// An error is raised if XGROUP SETID is called with a non-existing group
await expect(
client.xgroupSetId(key, "non_existing_group", streamid0),
).rejects.toThrow(RequestError);

// Setting the ID to a non-existing ID is allowed
expect(await client.xgroupSetId(key, groupName, "99-99")).toBe(
"OK",
);

// key exists, but is not a stream
expect(await client.set(stringKey, "xgroup setid")).toBe("OK");
await expect(
client.xgroupSetId(stringKey, groupName, streamid1_0),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xpending test_%p`,
async (protocol) => {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,8 @@ export async function transactionTest(

baseTransaction.xack(key9, groupName1, ["0-3"]);
responseData.push(["xack(key9, groupName1, ['0-3'])", 0]);
baseTransaction.xgroupSetId(key9, groupName1, "0-2");
responseData.push(["xgroupSetId(key9, groupName1, '0-2')", "OK"]);
baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]);
baseTransaction.xgroupDestroy(key9, groupName1);
Expand Down
Loading