From 17c7f45304c2fc94b2c5af8658149f413954a577 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Mon, 10 Jun 2024 16:39:54 -0700 Subject: [PATCH] Java: Add XREAD command (#1524) * Java: Add XREAD command Signed-off-by: Andrew Carbonetto * Return typing to Object[][] Signed-off-by: Andrew Carbonetto * Restore String return Signed-off-by: Andrew Carbonetto * Update XREAD for review comments Signed-off-by: Andrew Carbonetto * SPOTLESS Signed-off-by: Andrew Carbonetto * Documentation changes Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/api/BaseClient.java | 34 +++++ .../api/commands/StreamBaseCommands.java | 61 +++++++- .../glide/api/models/BaseTransaction.java | 36 ++++- .../commands/stream/StreamReadOptions.java | 61 ++++++++ .../java/glide/utils/ArrayTransformUtils.java | 27 +++- .../test/java/glide/api/RedisClientTest.java | 138 ++++++++++++++---- .../glide/api/models/TransactionTests.java | 21 +++ .../test/java/glide/SharedCommandTests.java | 127 ++++++++++++++++ .../java/glide/TransactionTestUtilities.java | 4 + .../test/java/glide/cluster/CommandTests.java | 2 + 10 files changed, 473 insertions(+), 38 deletions(-) create mode 100644 java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 239b7db1f4..e3e50a1873 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -7,6 +7,7 @@ import static glide.ffi.resolvers.SocketListenerResolver.getSocket; import static glide.utils.ArrayTransformUtils.castArray; import static glide.utils.ArrayTransformUtils.castArrayofArrays; +import static glide.utils.ArrayTransformUtils.castMapOf2DArray; import static glide.utils.ArrayTransformUtils.castMapOfArrays; import static glide.utils.ArrayTransformUtils.concatenateArrays; import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray; @@ -116,6 +117,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; +import static redis_request.RedisRequestOuterClass.RequestType.XRead; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZCard; @@ -176,6 +178,7 @@ import glide.api.models.commands.geospatial.GeospatialData; import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamRange; +import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.exceptions.RedisException; @@ -196,6 +199,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.NonNull; import org.apache.commons.lang3.ArrayUtils; @@ -394,6 +398,23 @@ protected Map handleMapOrNullResponse(Response response) throws R Map.class, EnumSet.of(ResponseFlags.IS_NULLABLE, ResponseFlags.ENCODING_UTF8), response); } + /** + * @param response A Protobuf response + * @return A map of a map of String[][] + */ + protected Map> handleXReadResponse(Response response) + throws RedisException { + Map mapResponse = handleMapOrNullResponse(response); + if (mapResponse == null) { + return null; + } + return mapResponse.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> castMapOf2DArray((Map) e.getValue(), String.class))); + } + @SuppressWarnings("unchecked") // raw Set cast to Set protected Set handleSetResponse(Response response) throws RedisException { return handleRedisResponse(Set.class, EnumSet.of(ResponseFlags.ENCODING_UTF8), response); @@ -1289,6 +1310,19 @@ public CompletableFuture xadd( return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse); } + @Override + public CompletableFuture>> xread( + @NonNull Map keysAndIds) { + return xread(keysAndIds, StreamReadOptions.builder().build()); + } + + @Override + public CompletableFuture>> xread( + @NonNull Map keysAndIds, @NonNull StreamReadOptions options) { + String[] arguments = options.toArgs(keysAndIds); + return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponse); + } + @Override public CompletableFuture xtrim(@NonNull String key, @NonNull StreamTrimOptions options) { String[] arguments = ArrayUtils.addFirst(options.toArgs(), key); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 09a26df937..bdb9f000e1 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -6,6 +6,7 @@ import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -41,7 +42,7 @@ public interface StreamBaseCommands { * @see redis.io for details. * @param key The key of the stream. * @param values Field-value pairs to be added to the entry. - * @param options Stream add options. + * @param options Stream add options {@link StreamAddOptions}. * @return The id of the added entry, or null if {@link * StreamAddOptionsBuilder#makeStream(Boolean)} is set to false and no stream * with the matching key exists. @@ -57,12 +58,68 @@ public interface StreamBaseCommands { */ CompletableFuture xadd(String key, Map values, StreamAddOptions options); + /** + * Reads entries from the given streams. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see redis.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * @example + *
{@code
+     * Map xreadKeys = Map.of("streamKey", "0-0");
+     * Map> streamReadResponse = client.xread(xreadKeys).get();
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+ */ + CompletableFuture>> xread(Map keysAndIds); + + /** + * Reads entries from the given streams. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see redis.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. + * @param options Options detailing how to read the stream {@link StreamReadOptions}. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * @example + *
{@code
+     * // retrieve streamKey entries and block for 1 second if is no stream data
+     * Map xreadKeys = Map.of("streamKey", "0-0");
+     * StreamReadOptions options = StreamReadOptions.builder().block(1L).build();
+     * Map> streamReadResponse = client.xread(xreadKeys, options).get();
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+ */ + CompletableFuture>> xread( + Map keysAndIds, StreamReadOptions options); + /** * Trims the stream by evicting older entries. * * @see redis.io for details. * @param key The key of the stream. - * @param options Stream trim options. + * @param options Stream trim options {@link StreamTrimOptions}. * @return The number of entries deleted from the stream. * @example *
{@code
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
index ad04013935..1862f6f905 100644
--- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
+++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
@@ -142,6 +142,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XLen;
 import static redis_request.RedisRequestOuterClass.RequestType.XRange;
+import static redis_request.RedisRequestOuterClass.RequestType.XRead;
 import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
 import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
@@ -215,6 +216,7 @@
 import glide.api.models.commands.stream.StreamAddOptions;
 import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
 import glide.api.models.commands.stream.StreamRange;
+import glide.api.models.commands.stream.StreamReadOptions;
 import glide.api.models.commands.stream.StreamTrimOptions;
 import glide.api.models.configuration.ReadFrom;
 import java.util.Arrays;
@@ -2688,7 +2690,7 @@ public T xadd(@NonNull String key, @NonNull Map values) {
      * @see redis.io for details.
      * @param key The key of the stream.
      * @param values Field-value pairs to be added to the entry.
-     * @param options Stream add options.
+     * @param options Stream add options {@link StreamAddOptions}.
      * @return Command Response - The id of the added entry, or null if {@link
      *     StreamAddOptionsBuilder#makeStream(Boolean)} is set to false and no stream
      *     with the matching key exists.
@@ -2703,12 +2705,42 @@ public T xadd(
         return getThis();
     }
 
+    /**
+     * Reads entries from the given streams.
+     *
+     * @see redis.io for details.
+     * @param keysAndIds An array of Pairs of keys and entry ids to read from. A 
+     *     pair is composed of a stream's key and the id of the entry after which the stream
+     *     will be read.
+     * @return Command Response - A {@literal Map>} with stream
+     *     keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
+     */
+    public T xread(@NonNull Map keysAndIds) {
+        return xread(keysAndIds, StreamReadOptions.builder().build());
+    }
+
+    /**
+     * Reads entries from the given streams.
+     *
+     * @see redis.io for details.
+     * @param keysAndIds An array of Pairs of keys and entry ids to read from. A 
+     *     pair is composed of a stream's key and the id of the entry after which the stream
+     *     will be read.
+     * @param options options detailing how to read the stream {@link StreamReadOptions}.
+     * @return Command Response - A {@literal Map>} with stream
+     *     keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
+     */
+    public T xread(@NonNull Map keysAndIds, @NonNull StreamReadOptions options) {
+        protobufTransaction.addCommands(buildCommand(XRead, buildArgs(options.toArgs(keysAndIds))));
+        return getThis();
+    }
+
     /**
      * Trims the stream by evicting older entries.
      *
      * @see redis.io for details.
      * @param key The key of the stream.
-     * @param options Stream trim options.
+     * @param options Stream trim options {@link StreamTrimOptions}.
      * @return Command Response - The number of entries deleted from the stream.
      */
     public T xtrim(@NonNull String key, @NonNull StreamTrimOptions options) {
diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java
new file mode 100644
index 0000000000..7baad14121
--- /dev/null
+++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java
@@ -0,0 +1,61 @@
+/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
+package glide.api.models.commands.stream;
+
+import glide.api.commands.StreamBaseCommands;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.Builder;
+
+/**
+ * Optional arguments for {@link StreamBaseCommands#xread(Map, StreamReadOptions)}
+ *
+ * @see redis.io
+ */
+@Builder
+public final class StreamReadOptions {
+
+    public static final String READ_COUNT_REDIS_API = "COUNT";
+    public static final String READ_BLOCK_REDIS_API = "BLOCK";
+    public static final String READ_STREAMS_REDIS_API = "STREAMS";
+
+    /**
+     * If set, the request will be blocked for the set amount of milliseconds or until the server has
+     * the required number of entries. Equivalent to BLOCK in the Redis API.
+     */
+    Long block;
+
+    /**
+     * The maximal number of elements requested. Equivalent to COUNT in the Redis API.
+     */
+    Long count;
+
+    /**
+     * Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map,
+     * StreamReadOptions)} into a String[].
+     *
+     * @return String[]
+     */
+    public String[] toArgs(Map streams) {
+        List optionArgs = new ArrayList<>();
+
+        if (this.count != null) {
+            optionArgs.add(READ_COUNT_REDIS_API);
+            optionArgs.add(count.toString());
+        }
+
+        if (this.block != null) {
+            optionArgs.add(READ_BLOCK_REDIS_API);
+            optionArgs.add(block.toString());
+        }
+
+        optionArgs.add(READ_STREAMS_REDIS_API);
+        Set> entrySet = streams.entrySet();
+        optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
+        optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList()));
+
+        return optionArgs.toArray(new String[0]);
+    }
+}
diff --git a/java/client/src/main/java/glide/utils/ArrayTransformUtils.java b/java/client/src/main/java/glide/utils/ArrayTransformUtils.java
index 372cc4f336..a251693293 100644
--- a/java/client/src/main/java/glide/utils/ArrayTransformUtils.java
+++ b/java/client/src/main/java/glide/utils/ArrayTransformUtils.java
@@ -103,17 +103,32 @@ public static  U[][] castArrayofArrays(T[] outerObjectArr, Class
      * @param mapOfArrays Map of Array values to cast.
      * @param clazz The class of the array values to cast to.
      * @return A Map of arrays of type U[], containing the key/values from the input Map.
-     * @param  The base type from which the elements are being cast.
-     * @param  The subtype of T to which the elements are cast.
+     * @param  The target type which the elements are cast.
      */
-    @SuppressWarnings("unchecked")
-    public static  Map castMapOfArrays(
-            Map mapOfArrays, Class clazz) {
+    public static  Map castMapOfArrays(
+            Map mapOfArrays, Class clazz) {
+        if (mapOfArrays == null) {
+            return null;
+        }
+        return mapOfArrays.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> castArray(e.getValue(), clazz)));
+    }
+
+    /**
+     * Maps a Map of Object[][] with value type T[][] to value of U[][].
+     *
+     * @param mapOfArrays Map of 2D Array values to cast.
+     * @param clazz The class of the array values to cast to.
+     * @return A Map of arrays of type U[][], containing the key/values from the input Map.
+     * @param  The target type which the elements are cast.
+     */
+    public static  Map castMapOf2DArray(
+            Map mapOfArrays, Class clazz) {
         if (mapOfArrays == null) {
             return null;
         }
         return mapOfArrays.entrySet().stream()
-                .collect(Collectors.toMap(k -> k.getKey(), e -> castArray(e.getValue(), clazz)));
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> castArrayofArrays(e.getValue(), clazz)));
     }
 
     /**
diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java
index e2effea9b2..87f33597ad 100644
--- a/java/client/src/test/java/glide/api/RedisClientTest.java
+++ b/java/client/src/test/java/glide/api/RedisClientTest.java
@@ -30,6 +30,9 @@
 import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API;
 import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API;
 import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamReadOptions.READ_BLOCK_REDIS_API;
+import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamReadOptions.READ_STREAMS_REDIS_API;
 import static glide.api.models.commands.stream.StreamTrimOptions.TRIM_EXACT_REDIS_API;
 import static glide.api.models.commands.stream.StreamTrimOptions.TRIM_LIMIT_REDIS_API;
 import static glide.api.models.commands.stream.StreamTrimOptions.TRIM_MAXLEN_REDIS_API;
@@ -172,6 +175,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XLen;
 import static redis_request.RedisRequestOuterClass.RequestType.XRange;
+import static redis_request.RedisRequestOuterClass.RequestType.XRead;
 import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
 import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
@@ -242,6 +246,7 @@
 import glide.api.models.commands.stream.StreamRange;
 import glide.api.models.commands.stream.StreamRange.IdBound;
 import glide.api.models.commands.stream.StreamRange.InfRangeBound;
+import glide.api.models.commands.stream.StreamReadOptions;
 import glide.api.models.commands.stream.StreamTrimOptions;
 import glide.api.models.commands.stream.StreamTrimOptions.MaxLen;
 import glide.api.models.commands.stream.StreamTrimOptions.MinId;
@@ -3728,34 +3733,6 @@ public void zintercard_returns_success() {
         assertEquals(value, payload);
     }
 
-    @SneakyThrows
-    @Test
-    public void xadd_returns_success() {
-        // setup
-        String key = "testKey";
-        Map fieldValues = new LinkedHashMap<>();
-        fieldValues.put("testField1", "testValue1");
-        fieldValues.put("testField2", "testValue2");
-        String[] fieldValuesArgs = convertMapToKeyValueStringArray(fieldValues);
-        String[] arguments = new String[] {key, "*"};
-        arguments = ArrayUtils.addAll(arguments, fieldValuesArgs);
-        String returnId = "testId";
-
-        CompletableFuture testResponse = new CompletableFuture<>();
-        testResponse.complete(returnId);
-
-        // match on protobuf request
-        when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any()))
-                .thenReturn(testResponse);
-
-        // exercise
-        CompletableFuture response = service.xadd(key, fieldValues);
-
-        // verify
-        assertEquals(testResponse, response);
-        assertEquals(returnId, response.get());
-    }
-
     @SneakyThrows
     @Test
     public void zrandmember_returns_success() {
@@ -3856,6 +3833,34 @@ public void zincrby_returns_success() {
         assertEquals(value, payload);
     }
 
+    @SneakyThrows
+    @Test
+    public void xadd_returns_success() {
+        // setup
+        String key = "testKey";
+        Map fieldValues = new LinkedHashMap<>();
+        fieldValues.put("testField1", "testValue1");
+        fieldValues.put("testField2", "testValue2");
+        String[] fieldValuesArgs = convertMapToKeyValueStringArray(fieldValues);
+        String[] arguments = new String[] {key, "*"};
+        arguments = ArrayUtils.addAll(arguments, fieldValuesArgs);
+        String returnId = "testId";
+
+        CompletableFuture testResponse = new CompletableFuture<>();
+        testResponse.complete(returnId);
+
+        // match on protobuf request
+        when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture response = service.xadd(key, fieldValues);
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(returnId, response.get());
+    }
+
     private static List getStreamAddOptions() {
         return List.of(
                 Arguments.of(
@@ -4062,6 +4067,83 @@ public void xlen_returns_success() {
         assertEquals(completedResult, payload);
     }
 
+    @SneakyThrows
+    @Test
+    public void xread_multiple_keys() {
+        // setup
+        String keyOne = "one";
+        String streamIdOne = "id-one";
+        String keyTwo = "two";
+        String streamIdTwo = "id-two";
+        String[][] fieldValues = {{"field", "value"}};
+        Map> completedResult = new LinkedHashMap<>();
+        completedResult.put(keyOne, Map.of(streamIdOne, fieldValues));
+        completedResult.put(keyTwo, Map.of(streamIdTwo, fieldValues));
+        String[] arguments = {READ_STREAMS_REDIS_API, keyOne, keyTwo, streamIdOne, streamIdTwo};
+
+        CompletableFuture>> testResponse =
+                new CompletableFuture<>();
+        testResponse.complete(completedResult);
+
+        // match on protobuf request
+        when(commandManager.>>submitNewCommand(
+                        eq(XRead), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        Map keysAndIds = new LinkedHashMap<>();
+        keysAndIds.put(keyOne, streamIdOne);
+        keysAndIds.put(keyTwo, streamIdTwo);
+        CompletableFuture>> response = service.xread(keysAndIds);
+        Map> payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(completedResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xread_with_options() {
+        // setup
+        String keyOne = "one";
+        String streamIdOne = "id-one";
+        Long block = 2L;
+        Long count = 10L;
+        String[][] fieldValues = {{"field", "value"}};
+        Map> completedResult =
+                Map.of(keyOne, Map.of(streamIdOne, fieldValues));
+        String[] arguments = {
+            READ_COUNT_REDIS_API,
+            count.toString(),
+            READ_BLOCK_REDIS_API,
+            block.toString(),
+            READ_STREAMS_REDIS_API,
+            keyOne,
+            streamIdOne
+        };
+
+        CompletableFuture>> testResponse =
+                new CompletableFuture<>();
+        testResponse.complete(completedResult);
+
+        // match on protobuf request
+        when(commandManager.>>submitNewCommand(
+                        eq(XRead), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture>> response =
+                service.xread(
+                        Map.of(keyOne, streamIdOne),
+                        StreamReadOptions.builder().block(block).count(count).build());
+        Map> payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(completedResult, payload);
+    }
+
     @Test
     @SneakyThrows
     public void xdel_returns_success() {
diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java
index 1807b9f12c..54617e0cf7 100644
--- a/java/client/src/test/java/glide/api/models/TransactionTests.java
+++ b/java/client/src/test/java/glide/api/models/TransactionTests.java
@@ -27,6 +27,9 @@
 import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API;
 import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API;
 import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamReadOptions.READ_BLOCK_REDIS_API;
+import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamReadOptions.READ_STREAMS_REDIS_API;
 import static glide.api.models.commands.stream.StreamTrimOptions.TRIM_EXACT_REDIS_API;
 import static glide.api.models.commands.stream.StreamTrimOptions.TRIM_MINID_REDIS_API;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -152,6 +155,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XLen;
 import static redis_request.RedisRequestOuterClass.RequestType.XRange;
+import static redis_request.RedisRequestOuterClass.RequestType.XRead;
 import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
 import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
 import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
@@ -213,6 +217,7 @@
 import glide.api.models.commands.geospatial.GeospatialData;
 import glide.api.models.commands.stream.StreamAddOptions;
 import glide.api.models.commands.stream.StreamRange.InfRangeBound;
+import glide.api.models.commands.stream.StreamReadOptions;
 import glide.api.models.commands.stream.StreamTrimOptions.MinId;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -701,6 +706,22 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
         transaction.xtrim("key", new MinId(true, "id"));
         results.add(Pair.of(XTrim, buildArgs("key", TRIM_MINID_REDIS_API, TRIM_EXACT_REDIS_API, "id")));
 
+        transaction.xread(Map.of("key", "id"));
+        results.add(Pair.of(XRead, buildArgs(READ_STREAMS_REDIS_API, "key", "id")));
+
+        transaction.xread(Map.of("key", "id"), StreamReadOptions.builder().block(1L).count(2L).build());
+        results.add(
+                Pair.of(
+                        XRead,
+                        buildArgs(
+                                READ_COUNT_REDIS_API,
+                                "2",
+                                READ_BLOCK_REDIS_API,
+                                "1",
+                                READ_STREAMS_REDIS_API,
+                                "key",
+                                "id")));
+
         transaction.xlen("key");
         results.add(Pair.of(XLen, buildArgs("key")));
 
diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index 0679f665c5..b26ada27dc 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -67,6 +67,7 @@
 import glide.api.models.commands.stream.StreamAddOptions;
 import glide.api.models.commands.stream.StreamRange.IdBound;
 import glide.api.models.commands.stream.StreamRange.InfRangeBound;
+import glide.api.models.commands.stream.StreamReadOptions;
 import glide.api.models.commands.stream.StreamTrimOptions.MaxLen;
 import glide.api.models.commands.stream.StreamTrimOptions.MinId;
 import glide.api.models.configuration.NodeAddress;
@@ -3038,6 +3039,132 @@ public void xadd_xlen_and_xtrim(BaseClient client) {
         assertTrue(executionException.getCause() instanceof RequestException);
     }
 
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xread(BaseClient client) {
+        String key1 = "{key}:1" + UUID.randomUUID();
+        String key2 = "{key}:2" + UUID.randomUUID();
+        String field1 = "f1_";
+        String field2 = "f2_";
+        String field3 = "f3_";
+
+        // setup first entries in streams key1 and key2
+        Map timestamp_1_1_map = new LinkedHashMap<>();
+        timestamp_1_1_map.put(field1, field1 + "1");
+        timestamp_1_1_map.put(field3, field3 + "1");
+        String timestamp_1_1 =
+                client.xadd(key1, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
+        assertNotNull(timestamp_1_1);
+
+        String timestamp_2_1 =
+                client
+                        .xadd(key2, Map.of(field2, field2 + "1"), StreamAddOptions.builder().id("2-1").build())
+                        .get();
+        assertNotNull(timestamp_2_1);
+
+        // setup second entries in streams key1 and key2
+        String timestamp_1_2 =
+                client
+                        .xadd(key1, Map.of(field1, field1 + "2"), StreamAddOptions.builder().id("1-2").build())
+                        .get();
+        assertNotNull(timestamp_1_2);
+
+        String timestamp_2_2 =
+                client
+                        .xadd(key2, Map.of(field2, field2 + "2"), StreamAddOptions.builder().id("2-2").build())
+                        .get();
+        assertNotNull(timestamp_2_2);
+
+        // setup third entries in streams key1 and key2
+        Map timestamp_1_3_map = new LinkedHashMap<>();
+        timestamp_1_3_map.put(field1, field1 + "3");
+        timestamp_1_3_map.put(field3, field3 + "3");
+        String timestamp_1_3 =
+                client.xadd(key1, timestamp_1_3_map, StreamAddOptions.builder().id("1-3").build()).get();
+        assertNotNull(timestamp_1_3);
+
+        String timestamp_2_3 =
+                client
+                        .xadd(key2, Map.of(field2, field2 + "3"), StreamAddOptions.builder().id("2-3").build())
+                        .get();
+        assertNotNull(timestamp_2_3);
+
+        Map> result =
+                client.xread(Map.of(key1, timestamp_1_1, key2, timestamp_2_1)).get();
+
+        // check key1
+        Map expected_key1 = new LinkedHashMap<>();
+        expected_key1.put(timestamp_1_2, new String[][] {{field1, field1 + "2"}});
+        expected_key1.put(
+                timestamp_1_3,
+                new String[][] {
+                    {field1, field1 + "3"},
+                    {field3, field3 + "3"}
+                });
+        assertDeepEquals(expected_key1, result.get(key1));
+
+        // check key2
+        Map expected_key2 = new LinkedHashMap<>();
+        expected_key2.put(timestamp_2_2, new String[][] {{field2, field2 + "2"}});
+        expected_key2.put(timestamp_2_3, new String[][] {{field2, field2 + "3"}});
+        assertDeepEquals(expected_key2, result.get(key2));
+    }
+
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xread_return_failures(BaseClient client) {
+        String key1 = "{key}:1" + UUID.randomUUID();
+        String nonStreamKey = "{key}:3" + UUID.randomUUID();
+        String field1 = "f1_";
+
+        // setup first entries in streams key1 and key2
+        Map timestamp_1_1_map = new LinkedHashMap<>();
+        timestamp_1_1_map.put(field1, field1 + "1");
+        String timestamp_1_1 =
+                client.xadd(key1, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
+        assertNotNull(timestamp_1_1);
+
+        // Key exists, but it is not a stream
+        assertEquals(OK, client.set(nonStreamKey, "bar").get());
+        ExecutionException executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () -> client.xread(Map.of(nonStreamKey, timestamp_1_1, key1, timestamp_1_1)).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () -> client.xread(Map.of(key1, timestamp_1_1, nonStreamKey, timestamp_1_1)).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        try (var testClient =
+                client instanceof RedisClient
+                        ? RedisClient.CreateClient(commonClientConfig().build()).get()
+                        : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {
+
+            // ensure that commands doesn't time out even if timeout > request timeout
+            long oneSecondInMS = 1000L;
+            assertNull(
+                    testClient
+                            .xread(
+                                    Map.of(key1, timestamp_1_1),
+                                    StreamReadOptions.builder().block(oneSecondInMS).build())
+                            .get());
+
+            // with 0 timeout (no timeout) should never time out,
+            // but we wrap the test with timeout to avoid test failing or stuck forever
+            assertThrows(
+                    TimeoutException.class, // <- future timeout, not command timeout
+                    () ->
+                            testClient
+                                    .xread(Map.of(key1, timestamp_1_1), StreamReadOptions.builder().block(0L).build())
+                                    .get(3, TimeUnit.SECONDS));
+        }
+    }
+
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
index 05d0dd3bd7..aeed0e1fd9 100644
--- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java
+++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
@@ -650,6 +650,7 @@ private static Object[] streamCommands(BaseTransaction transaction) {
                 .xadd(streamKey1, Map.of("field2", "value2"), StreamAddOptions.builder().id("0-2").build())
                 .xadd(streamKey1, Map.of("field3", "value3"), StreamAddOptions.builder().id("0-3").build())
                 .xlen(streamKey1)
+                .xread(Map.of(streamKey1, "0-2"))
                 .xrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"))
                 .xrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L)
                 .xtrim(streamKey1, new MinId(true, "0-2"))
@@ -660,6 +661,9 @@ private static Object[] streamCommands(BaseTransaction transaction) {
             "0-2", // xadd(streamKey1, Map.of("field2", "value2"), ... .id("0-2").build());
             "0-3", // xadd(streamKey1, Map.of("field3", "value3"), ... .id("0-3").build());
             3L, // xlen(streamKey1)
+            Map.of(
+                    streamKey1,
+                    Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2"));
             Map.of("0-1", new String[] {"field1", "value1"}), // .xrange(streamKey1, "0-1", "0-1")
             Map.of("0-1", new String[] {"field1", "value1"}), // .xrange(streamKey1, "0-1", "0-1", 1l)
             1L, // xtrim(streamKey1, new MinId(true, "0-2"))
diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java
index afc8c46f04..c19f708647 100644
--- a/java/integTest/src/test/java/glide/cluster/CommandTests.java
+++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java
@@ -747,6 +747,8 @@ public static Stream callCrossSlotCommandsWhichShouldFail() {
                 Arguments.of("sintercard", "7.0.0", clusterClient.sintercard(new String[] {"abc", "def"})),
                 Arguments.of(
                         "sintercard", "7.0.0", clusterClient.sintercard(new String[] {"abc", "def"}, 1)),
+                Arguments.of(
+                        "xread", null, clusterClient.xread(Map.of("abc", "stream1", "zxy", "stream2"))),
                 Arguments.of("copy", "6.2.0", clusterClient.copy("abc", "def", true)));
     }