diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index b2ad3f1560..e49eedc74f 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -925,6 +925,16 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { key_type: &None, value_type: &None, }), + b"XCLAIM" => { + if cmd.position(b"JUSTID").is_some() { + Some(ExpectedReturnType::ArrayOfStrings) + } else { + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::SimpleString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }) + } + } b"XAUTOCLAIM" => { if cmd.position(b"JUSTID").is_some() { // Value conversion is not needed if the JUSTID arg was passed. @@ -1262,6 +1272,36 @@ mod tests { assert!(converted_4.is_err()); } + #[test] + fn convert_xclaim() { + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XCLAIM") + .arg("key") + .arg("grou") + .arg("consumer") + .arg("0") + .arg("id") + ), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::SimpleString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }) + )); + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XCLAIM") + .arg("key") + .arg("grou") + .arg("consumer") + .arg("0") + .arg("id") + .arg("JUSTID") + ), + Some(ExpectedReturnType::ArrayOfStrings) + )); + } + #[test] fn convert_xrange_xrevrange() { assert!(matches!( diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index 63199878d9..d93d6c7c35 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -243,6 +243,7 @@ enum RequestType { HScan = 202; XAutoClaim = 203; Wait = 208; + XClaim = 209; } message Command { diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index 81e9332ea1..7064e9a0ce 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -213,6 +213,7 @@ pub enum RequestType { HScan = 202, XAutoClaim = 203, Wait = 208, + XClaim = 209, } fn get_two_word_command(first: &str, second: &str) -> Cmd { @@ -429,6 +430,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::HScan => RequestType::HScan, ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim, ProtobufRequestType::Wait => RequestType::Wait, + ProtobufRequestType::XClaim => RequestType::XClaim, } } } @@ -643,6 +645,7 @@ impl RequestType { RequestType::HScan => Some(cmd("HSCAN")), RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")), RequestType::Wait => Some(cmd("WAIT")), + RequestType::XClaim => Some(cmd("XCLAIM")), } } } diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 2d567b0b75..fcc8cc253b 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -6,6 +6,7 @@ import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldReadOnlySubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldSubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs; +import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; import static glide.ffi.resolvers.SocketListenerResolver.getSocket; import static glide.utils.ArrayTransformUtils.cast3DArray; import static glide.utils.ArrayTransformUtils.castArray; @@ -137,6 +138,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Watch; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; +import static redis_request.RedisRequestOuterClass.RequestType.XClaim; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer; @@ -226,6 +228,7 @@ import glide.api.models.commands.scan.SScanOptions; import glide.api.models.commands.scan.ZScanOptions; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; import glide.api.models.commands.stream.StreamRange; @@ -2178,6 +2181,66 @@ public CompletableFuture xpending( XPending, args, response -> castArray(handleArrayResponse(response), Object[].class)); } + @Override + public CompletableFuture> xclaim( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids) { + String[] args = + concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); + return commandManager.submitNewCommand(XClaim, args, this::handleMapResponse); + } + + @Override + public CompletableFuture> xclaim( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids, + @NonNull StreamClaimOptions options) { + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs()); + return commandManager.submitNewCommand(XClaim, args, this::handleMapResponse); + } + + @Override + public CompletableFuture xclaimJustId( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids) { + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + new String[] {JUST_ID_REDIS_API}); + return commandManager.submitNewCommand( + XClaim, args, response -> castArray(handleArrayResponse(response), String.class)); + } + + @Override + public CompletableFuture xclaimJustId( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids, + @NonNull StreamClaimOptions options) { + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + options.toArgs(), + new String[] {JUST_ID_REDIS_API}); + return commandManager.submitNewCommand( + XClaim, args, response -> castArray(handleArrayResponse(response), String.class)); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 9644d11f8c..e92c723cc1 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -4,6 +4,7 @@ import glide.api.models.GlideString; import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; import glide.api.models.commands.stream.StreamRange; @@ -711,4 +712,119 @@ CompletableFuture xpending( StreamRange end, long count, StreamPendingOptions options); + + /** + * Changes the ownership of a pending message. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids A array of entry ids. + * @return A Map of message entries with the format + * {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer. + * @example + *
+     * // read messages from streamId for consumer1
+     * var readResult = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "consumer1").get();
+     * // "entryId" is now read, and we can assign the pending messages to consumer2
+     * Map results = client.xclaim("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}).get();
+     * for (String key: results.keySet()) {
+     *     System.out.println(key);
+     *     for (String[] entry: results.get(key)) {
+     *         System.out.printf("{%s=%s}%n", entry[0], entry[1]);
+     *     }
+     * }
+     * 
+ */ + CompletableFuture> xclaim( + String key, String group, String consumer, long minIdleTime, String[] ids); + + /** + * Changes the ownership of a pending message. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids An array of entry ids. + * @param options Stream claim options {@link StreamClaimOptions}. + * @return A Map of message entries with the format + * {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer. + * @example + *
+     * // assign (force) unread and unclaimed messages to consumer2
+     * StreamClaimOptions options = StreamClaimOptions.builder().force().build();
+     * Map results = client.xclaim("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}, options).get();
+     * for (String key: results.keySet()) {
+     *     System.out.println(key);
+     *     for (String[] entry: results.get(key)) {
+     *         System.out.printf("{%s=%s}%n", entry[0], entry[1]);
+     *     }
+     * }
+     * 
+ */ + CompletableFuture> xclaim( + String key, + String group, + String consumer, + long minIdleTime, + String[] ids, + StreamClaimOptions options); + + /** + * Changes the ownership of a pending message. This function returns an array with + * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids An array of entry ids. + * @return An array of message ids claimed by the consumer. + * @example + *
+     * // read messages from streamId for consumer1
+     * var readResult = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "consumer1").get();
+     * // "entryId" is now read, and we can assign the pending messages to consumer2
+     * String[] results = client.xclaimJustId("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}).get();
+     * for (String id: results) {
+     *     System.out.printf("consumer2 claimed stream entry ID: %s %n", id);
+     * }
+     * 
+ */ + CompletableFuture xclaimJustId( + String key, String group, String consumer, long minIdleTime, String[] ids); + + /** + * Changes the ownership of a pending message. This function returns an array with + * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids An array of entry ids. + * @param options Stream claim options {@link StreamClaimOptions}. + * @return An array of message ids claimed by the consumer. + * @example + *
+     * // assign (force) unread and unclaimed messages to consumer2
+     * StreamClaimOptions options = StreamClaimOptions.builder().force().build();
+     * String[] results = client.xclaimJustId("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}, options).get();
+     * for (String id: results) {
+     *     System.out.printf("consumer2 claimed stream entry ID: %s %n", id);
+     * }
+     */
+    CompletableFuture xclaimJustId(
+            String key,
+            String group,
+            String consumer,
+            long minIdleTime,
+            String[] ids,
+            StreamClaimOptions options);
 }
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
index 593cda2bba..13659f4b4c 100644
--- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
+++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
@@ -20,6 +20,7 @@
 import static glide.api.models.commands.function.FunctionListOptions.LIBRARY_NAME_REDIS_API;
 import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API;
 import static glide.api.models.commands.function.FunctionLoadOptions.REPLACE;
+import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
 import static glide.utils.ArrayTransformUtils.concatenateArrays;
 import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
 import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray;
@@ -163,6 +164,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Wait;
 import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
+import static redis_request.RedisRequestOuterClass.RequestType.XClaim;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
@@ -257,6 +259,7 @@
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
 import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange;
@@ -3330,6 +3333,114 @@ public T xpending(
         return getThis();
     }
 
+    /**
+     * Changes the ownership of a pending message.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @return Command Response - A Map of message entries with the format 
+     *      {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer.
+     */
+    public T xclaim(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids) {
+        String[] args =
+                concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
+    /**
+     * Changes the ownership of a pending message.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @param options Stream claim options {@link StreamClaimOptions}.
+     * @return Command Response - A Map of message entries with the format 
+     *      {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer.
+     */
+    public T xclaim(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids,
+            @NonNull StreamClaimOptions options) {
+        String[] args =
+                concatenateArrays(
+                        new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs());
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
+    /**
+     * Changes the ownership of a pending message. This function returns an array with
+     * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @return Command Response - An array of message ids claimed by the consumer.
+     */
+    public T xclaimJustId(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids) {
+        String[] args =
+                concatenateArrays(
+                        new String[] {key, group, consumer, Long.toString(minIdleTime)},
+                        ids,
+                        new String[] {JUST_ID_REDIS_API});
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
+    /**
+     * Changes the ownership of a pending message. This function returns an array with
+     * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @param options Stream claim options {@link StreamClaimOptions}.
+     * @return Command Response - An array of message ids claimed by the consumer.
+     */
+    public T xclaimJustId(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids,
+            @NonNull StreamClaimOptions options) {
+        String[] args =
+                concatenateArrays(
+                        new String[] {key, group, consumer, Long.toString(minIdleTime)},
+                        ids,
+                        options.toArgs(),
+                        new String[] {JUST_ID_REDIS_API});
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
     /**
      * Returns the remaining time to live of key that has a timeout, in milliseconds.
      *
diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java
new file mode 100644
index 0000000000..9122096582
--- /dev/null
+++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java
@@ -0,0 +1,104 @@
+/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
+package glide.api.models.commands.stream;
+
+import glide.api.commands.StreamBaseCommands;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Builder;
+
+/**
+ * Optional arguments to {@link StreamBaseCommands#xclaim(String, String, String, long, String[],
+ * StreamClaimOptions)}
+ *
+ * @see valkey.io
+ */
+@Builder
+public class StreamClaimOptions {
+
+    /** ValKey API string to designate IDLE time in milliseconds */
+    public static final String IDLE_REDIS_API = "IDLE";
+
+    /** ValKey API string to designate TIME time in unix-milliseconds */
+    public static final String TIME_REDIS_API = "TIME";
+
+    /** ValKey API string to designate RETRYCOUNT */
+    public static final String RETRY_COUNT_REDIS_API = "RETRYCOUNT";
+
+    /** ValKey API string to designate FORCE */
+    public static final String FORCE_REDIS_API = "FORCE";
+
+    /** ValKey API string to designate JUSTID */
+    public static final String JUST_ID_REDIS_API = "JUSTID";
+
+    /**
+     * Set the idle time (last time it was delivered) of the message in milliseconds. If idle
+     *  is not specified, an idle of 0 is assumed, that is, the time
+     * count is reset because the message now has a new owner trying to process it.
+     */
+    private final Long idle; // in milliseconds
+
+    /**
+     * This is the same as {@link #idle} but instead of a relative amount of milliseconds, it sets the
+     * idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF
+     * file generating XCLAIM commands.
+     */
+    private final Long idleUnixTime; // in unix-time milliseconds
+
+    /**
+     * Set the retry counter to the specified value. This counter is incremented every time a message
+     * is delivered again. Normally {@link StreamBaseCommands#xclaim} does not alter this counter,
+     * which is just served to clients when the {@link StreamBaseCommands#xpending} command is called:
+     * this way clients can detect anomalies, like messages that are never processed for some reason
+     * after a big number of delivery attempts.
+     */
+    private final Long retryCount;
+
+    /**
+     * Creates the pending message entry in the PEL even if certain specified IDs are not already in
+     * the PEL assigned to a different client. However, the message must exist in the stream,
+     * otherwise the IDs of non-existing messages are ignored.
+     */
+    private final boolean isForce;
+
+    public static class StreamClaimOptionsBuilder {
+
+        /**
+         * Creates the pending message entry in the PEL even if certain specified IDs are not already in
+         * the PEL assigned to a different client. However, the message must exist in the stream,
+         * otherwise the IDs of non-existing messages are ignored.
+         */
+        public StreamClaimOptionsBuilder force() {
+            return isForce(true);
+        }
+    }
+
+    /**
+     * Converts options for Xclaim into a String[].
+     *
+     * @return String[]
+     */
+    public String[] toArgs() {
+        List optionArgs = new ArrayList<>();
+
+        if (idle != null) {
+            optionArgs.add(IDLE_REDIS_API);
+            optionArgs.add(Long.toString(idle));
+        }
+
+        if (idleUnixTime != null) {
+            optionArgs.add(TIME_REDIS_API);
+            optionArgs.add(Long.toString(idleUnixTime));
+        }
+
+        if (retryCount != null) {
+            optionArgs.add(RETRY_COUNT_REDIS_API);
+            optionArgs.add(Long.toString(retryCount));
+        }
+
+        if (isForce) {
+            optionArgs.add(FORCE_REDIS_API);
+        }
+
+        return optionArgs.toArray(new String[0]);
+    }
+}
diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java
index 4a0b8144cd..51025218d9 100644
--- a/java/client/src/test/java/glide/api/RedisClientTest.java
+++ b/java/client/src/test/java/glide/api/RedisClientTest.java
@@ -43,6 +43,11 @@
 import static glide.api.models.commands.scan.BaseScanOptions.COUNT_OPTION_STRING;
 import static glide.api.models.commands.scan.BaseScanOptions.MATCH_OPTION_STRING;
 import static glide.api.models.commands.stream.StreamAddOptions.NO_MAKE_STREAM_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
 import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
@@ -221,6 +226,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Watch;
 import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
+import static redis_request.RedisRequestOuterClass.RequestType.XClaim;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
@@ -317,6 +323,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange;
@@ -6053,6 +6060,155 @@ public void xack_returns_success() {
         assertEquals(mockResult, payload);
     }
 
+    @SneakyThrows
+    @Test
+    public void xclaim_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        String[] arguments = concatenateArrays(new String[] {key, groupName, consumer, "18"}, ids);
+        Map mockResult = Map.of("1234-0", new String[][] {{"message", "log"}});
+
+        CompletableFuture> testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.>submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture> response =
+                service.xclaim(key, groupName, consumer, minIdleTime, ids);
+        Map payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xclaim_with_options_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        StreamClaimOptions options =
+                StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
+        String[] arguments =
+                new String[] {
+                    key,
+                    groupName,
+                    consumer,
+                    "18",
+                    "testId",
+                    IDLE_REDIS_API,
+                    "11",
+                    TIME_REDIS_API,
+                    "12",
+                    RETRY_COUNT_REDIS_API,
+                    "5",
+                    FORCE_REDIS_API
+                };
+        Map mockResult = Map.of("1234-0", new String[][] {{"message", "log"}});
+
+        CompletableFuture> testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.>submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture> response =
+                service.xclaim(key, groupName, consumer, minIdleTime, ids, options);
+        Map payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xclaimJustId_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        String[] arguments = new String[] {key, groupName, consumer, "18", "testId", JUST_ID_REDIS_API};
+        String[] mockResult = {"message", "log"};
+
+        CompletableFuture testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture response =
+                service.xclaimJustId(key, groupName, consumer, minIdleTime, ids);
+        String[] payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xclaimJustId_with_options_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        StreamClaimOptions options =
+                StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
+        String[] arguments =
+                new String[] {
+                    key,
+                    groupName,
+                    consumer,
+                    "18",
+                    "testId",
+                    IDLE_REDIS_API,
+                    "11",
+                    TIME_REDIS_API,
+                    "12",
+                    RETRY_COUNT_REDIS_API,
+                    "5",
+                    FORCE_REDIS_API,
+                    JUST_ID_REDIS_API
+                };
+        String[] mockResult = {"message", "log"};
+
+        CompletableFuture testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture response =
+                service.xclaimJustId(key, groupName, consumer, minIdleTime, ids, options);
+        String[] payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
     @SneakyThrows
     @Test
     public void xack_binary_returns_success() {
diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java
index 9c70ed1886..ad9e56ac0d 100644
--- a/java/client/src/test/java/glide/api/models/TransactionTests.java
+++ b/java/client/src/test/java/glide/api/models/TransactionTests.java
@@ -31,6 +31,11 @@
 import static glide.api.models.commands.geospatial.GeoAddOptions.CHANGED_REDIS_API;
 import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMLONLAT_VALKEY_API;
 import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMMEMBER_VALKEY_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
 import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
@@ -184,6 +189,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Wait;
 import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
+import static redis_request.RedisRequestOuterClass.RequestType.XClaim;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
@@ -268,6 +274,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange;
@@ -896,6 +903,58 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
                                 "99",
                                 "consumer")));
 
+        transaction.xclaim("key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"});
+        results.add(Pair.of(XClaim, buildArgs("key", "group", "consumer", "99", "12345-1", "98765-4")));
+
+        StreamClaimOptions claimOptions =
+                StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
+        transaction.xclaim(
+                "key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}, claimOptions);
+        results.add(
+                Pair.of(
+                        XClaim,
+                        buildArgs(
+                                "key",
+                                "group",
+                                "consumer",
+                                "99",
+                                "12345-1",
+                                "98765-4",
+                                IDLE_REDIS_API,
+                                "11",
+                                TIME_REDIS_API,
+                                "12",
+                                RETRY_COUNT_REDIS_API,
+                                "5",
+                                FORCE_REDIS_API)));
+
+        transaction.xclaimJustId("key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"});
+        results.add(
+                Pair.of(
+                        XClaim,
+                        buildArgs("key", "group", "consumer", "99", "12345-1", "98765-4", JUST_ID_REDIS_API)));
+
+        transaction.xclaimJustId(
+                "key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}, claimOptions);
+        results.add(
+                Pair.of(
+                        XClaim,
+                        buildArgs(
+                                "key",
+                                "group",
+                                "consumer",
+                                "99",
+                                "12345-1",
+                                "98765-4",
+                                IDLE_REDIS_API,
+                                "11",
+                                TIME_REDIS_API,
+                                "12",
+                                RETRY_COUNT_REDIS_API,
+                                "5",
+                                FORCE_REDIS_API,
+                                JUST_ID_REDIS_API)));
+
         transaction.time();
         results.add(Pair.of(Time, buildArgs()));
 
diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index 83f737be7f..bd2e237a17 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -82,6 +82,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange.IdBound;
@@ -4618,7 +4619,7 @@ public void xack_return_failures(BaseClient client) {
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
-    public void xpending(BaseClient client) {
+    public void xpending_xclaim(BaseClient client) {
 
         String key = UUID.randomUUID().toString();
         String groupName = "group" + UUID.randomUUID();
@@ -4709,9 +4710,52 @@ public void xpending(BaseClient client) {
                 ArrayUtils.remove(pending_results_extended[4], 2));
         assertTrue((Long) pending_results_extended[4][2] >= 0L);
 
-        // acknowledge streams 2-4 and remove them from the xpending results
+        // use claim to claim stream 3 and 5 for consumer 1
+        var claimResults =
+                client.xclaim(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5}).get();
+        assertDeepEquals(
+                Map.of(
+                        streamid_3,
+                        new String[][] {{"field3", "value3"}},
+                        streamid_5,
+                        new String[][] {{"field5", "value5"}}),
+                claimResults);
+
+        var claimResultsJustId =
+                client
+                        .xclaimJustId(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5})
+                        .get();
+        assertArrayEquals(new String[] {streamid_3, streamid_5}, claimResultsJustId);
+
+        // add one more stream
+        String streamid_6 = client.xadd(key, Map.of("field6", "value6")).get();
+        assertNotNull(streamid_6);
+
+        // using force, we can xclaim the message without reading it
+        var claimForceResults =
+                client
+                        .xclaim(
+                                key,
+                                groupName,
+                                consumer2,
+                                0L,
+                                new String[] {streamid_6},
+                                StreamClaimOptions.builder().force().retryCount(99L).build())
+                        .get();
+        assertDeepEquals(Map.of(streamid_6, new String[][] {{"field6", "value6"}}), claimForceResults);
+
+        Object[][] forcePendingResults =
+                client.xpending(key, groupName, IdBound.of(streamid_6), IdBound.of(streamid_6), 1L).get();
+        assertEquals(streamid_6, forcePendingResults[0][0]);
+        assertEquals(consumer2, forcePendingResults[0][1]);
+        assertEquals(99L, forcePendingResults[0][3]);
+
+        // acknowledge streams 2, 3, 4, and 6 and remove them from the xpending results
         assertEquals(
-                3L, client.xack(key, groupName, new String[] {streamid_2, streamid_3, streamid_4}).get());
+                4L,
+                client
+                        .xack(key, groupName, new String[] {streamid_2, streamid_3, streamid_4, streamid_6})
+                        .get());
 
         pending_results_extended =
                 client
@@ -4719,7 +4763,7 @@ public void xpending(BaseClient client) {
                         .get();
         assertEquals(1, pending_results_extended.length);
         assertEquals(streamid_5, pending_results_extended[0][0]);
-        assertEquals(consumer2, pending_results_extended[0][1]);
+        assertEquals(consumer1, pending_results_extended[0][1]);
 
         pending_results_extended =
                 client
@@ -4737,11 +4781,10 @@ public void xpending(BaseClient client) {
                                 InfRangeBound.MIN,
                                 InfRangeBound.MAX,
                                 10L,
-                                StreamPendingOptions.builder().minIdleTime(1L).consumer(consumer2).build())
+                                StreamPendingOptions.builder().minIdleTime(1L).consumer(consumer1).build())
                         .get();
-        assertEquals(1, pending_results_extended.length);
-        assertEquals(streamid_5, pending_results_extended[0][0]);
-        assertEquals(consumer2, pending_results_extended[0][1]);
+        // note: streams ID 1 and 5 are still pending, all others were acknowledged
+        assertEquals(2, pending_results_extended.length);
     }
 
     @SneakyThrows
@@ -4903,6 +4946,149 @@ public void xpending_return_failures(BaseClient client) {
         assertInstanceOf(RequestException.class, executionException.getCause());
     }
 
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xclaim_return_failures(BaseClient client) {
+
+        String key = UUID.randomUUID().toString();
+        String stringkey = UUID.randomUUID().toString();
+        String groupName = "group" + UUID.randomUUID();
+        String zeroStreamId = "0";
+        String consumer1 = "consumer-1-" + UUID.randomUUID();
+        String consumer2 = "consumer-2-" + UUID.randomUUID();
+
+        // create group and consumer for the group
+        assertEquals(
+                OK,
+                client
+                        .xgroupCreate(
+                                key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
+                        .get());
+        assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get());
+
+        // Add stream entry and mark as pending:
+        String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get();
+        assertNotNull(streamid_1);
+        assertNotNull(client.xreadgroup(Map.of(key, ">"), groupName, consumer1).get());
+
+        // claim with invalid stream entry IDs
+        ExecutionException executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client.xclaimJustId(key, groupName, consumer1, 1L, new String[] {"invalid"}).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        // claim with empty stream entry IDs returns no results
+        var emptyClaim = client.xclaimJustId(key, groupName, consumer1, 1L, new String[0]).get();
+        assertEquals(0L, emptyClaim.length);
+
+        // non-existent key throws a RequestError (NOGROUP)
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        final var claimOptions = StreamClaimOptions.builder().idle(1L).build();
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        // Key exists, but it is not a stream
+        assertEquals(OK, client.set(stringkey, "bar").get());
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+    }
+
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
index b55c127247..8c45614e08 100644
--- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java
+++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
@@ -46,6 +46,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamRange;
 import glide.api.models.commands.stream.StreamRange.IdBound;
@@ -841,6 +842,22 @@ private static Object[] streamCommands(BaseTransaction transaction) {
                         groupName1,
                         consumer1,
                         StreamReadGroupOptions.builder().count(2L).build())
+                .xclaim(streamKey1, groupName1, consumer1, 0L, new String[] {"0-1"})
+                .xclaim(
+                        streamKey1,
+                        groupName1,
+                        consumer1,
+                        0L,
+                        new String[] {"0-3"},
+                        StreamClaimOptions.builder().force().build())
+                .xclaimJustId(streamKey1, groupName1, consumer1, 0L, new String[] {"0-3"})
+                .xclaimJustId(
+                        streamKey1,
+                        groupName1,
+                        consumer1,
+                        0L,
+                        new String[] {"0-4"},
+                        StreamClaimOptions.builder().force().build())
                 .xpending(streamKey1, groupName1)
                 .xack(streamKey1, groupName1, new String[] {"0-3"})
                 .xpending(
@@ -894,6 +911,12 @@ private static Object[] streamCommands(BaseTransaction transaction) {
             Map.of(
                     streamKey1,
                     Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options);
+            Map.of(), // xclaim(streamKey1, groupName1, consumer1, 0L, new String[] {"0-1"})
+            Map.of(
+                    "0-3",
+                    new String[][] {{"field3", "value3"}}), // xclaim(streamKey1, ..., {"0-3"}, options)
+            new String[] {"0-3"}, // xclaimJustId(streamKey1, ..., new String[] {"0-3"})
+            new String[0], // xclaimJustId(streamKey1, ..., new String[] {"0-4"}, options)
             new Object[] {
                 1L, "0-3", "0-3", new Object[][] {{consumer1, "1"}}
             }, // xpending(streamKey1, groupName1)