Skip to content

Commit

Permalink
Java: Add XREAD command (#1524)
Browse files Browse the repository at this point in the history
* Java: Add XREAD command

Signed-off-by: Andrew Carbonetto <[email protected]>

* Return typing to Object[][]

Signed-off-by: Andrew Carbonetto <[email protected]>

* Restore String return

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update XREAD for review comments

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS

Signed-off-by: Andrew Carbonetto <[email protected]>

* Documentation changes

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored Jun 10, 2024
1 parent 80614d1 commit 17c7f45
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 38 deletions.
34 changes: 34 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static glide.ffi.resolvers.SocketListenerResolver.getSocket;
import static glide.utils.ArrayTransformUtils.castArray;
import static glide.utils.ArrayTransformUtils.castArrayofArrays;
import static glide.utils.ArrayTransformUtils.castMapOf2DArray;
import static glide.utils.ArrayTransformUtils.castMapOfArrays;
import static glide.utils.ArrayTransformUtils.concatenateArrays;
import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
Expand Down Expand Up @@ -116,6 +117,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
Expand Down Expand Up @@ -176,6 +178,7 @@
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand All @@ -196,6 +199,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -394,6 +398,23 @@ protected <V> Map<String, V> handleMapOrNullResponse(Response response) throws R
Map.class, EnumSet.of(ResponseFlags.IS_NULLABLE, ResponseFlags.ENCODING_UTF8), response);
}

/**
* @param response A Protobuf response
* @return A map of a map of <code>String[][]</code>
*/
protected Map<String, Map<String, String[][]>> handleXReadResponse(Response response)
throws RedisException {
Map<String, Object> mapResponse = handleMapOrNullResponse(response);
if (mapResponse == null) {
return null;
}
return mapResponse.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> castMapOf2DArray((Map<String, Object[][]>) e.getValue(), String.class)));
}

@SuppressWarnings("unchecked") // raw Set cast to Set<String>
protected Set<String> handleSetResponse(Response response) throws RedisException {
return handleRedisResponse(Set.class, EnumSet.of(ResponseFlags.ENCODING_UTF8), response);
Expand Down Expand Up @@ -1289,6 +1310,19 @@ public CompletableFuture<String> xadd(
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
@NonNull Map<String, String> keysAndIds) {
return xread(keysAndIds, StreamReadOptions.builder().build());
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptions options) {
String[] arguments = options.toArgs(keysAndIds);
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Long> xtrim(@NonNull String key, @NonNull StreamTrimOptions options) {
String[] arguments = ArrayUtils.addFirst(options.toArgs(), key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -41,7 +42,7 @@ public interface StreamBaseCommands {
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @param options Stream add options {@link StreamAddOptions}.
* @return The id of the added entry, or <code>null</code> if {@link
* StreamAddOptionsBuilder#makeStream(Boolean)} is set to <code>false</code> and no stream
* with the matching <code>key</code> exists.
Expand All @@ -57,12 +58,68 @@ public interface StreamBaseCommands {
*/
CompletableFuture<String> xadd(String key, Map<String, String> values, StreamAddOptions options);

/**
* Reads entries from the given streams.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @return A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* @example
* <pre>{@code
* Map<String, String> xreadKeys = Map.of("streamKey", "0-0");
* Map<String, Map<String, String[][]>> streamReadResponse = client.xread(xreadKeys).get();
* for (var keyEntry : streamReadResponse.entrySet()) {
* System.out.printf("Key: %s", keyEntry.getKey());
* for (var streamEntry : keyEntry.getValue().entrySet()) {
* Arrays.stream(streamEntry.getValue()).forEach(entity ->
* System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
* );
* }
* }</pre>
*/
CompletableFuture<Map<String, Map<String, String[][]>>> xread(Map<String, String> keysAndIds);

/**
* Reads entries from the given streams.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @param options Options detailing how to read the stream {@link StreamReadOptions}.
* @return A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* @example
* <pre>{@code
* // retrieve streamKey entries and block for 1 second if is no stream data
* Map<String, String> xreadKeys = Map.of("streamKey", "0-0");
* StreamReadOptions options = StreamReadOptions.builder().block(1L).build();
* Map<String, Map<String, String[][]>> streamReadResponse = client.xread(xreadKeys, options).get();
* for (var keyEntry : streamReadResponse.entrySet()) {
* System.out.printf("Key: %s", keyEntry.getKey());
* for (var streamEntry : keyEntry.getValue().entrySet()) {
* Arrays.stream(streamEntry.getValue()).forEach(entity ->
* System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
* );
* }
* }</pre>
*/
CompletableFuture<Map<String, Map<String, String[][]>>> xread(
Map<String, String> keysAndIds, StreamReadOptions options);

/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options.
* @param options Stream trim options {@link StreamTrimOptions}.
* @return The number of entries deleted from the stream.
* @example
* <pre>{@code
Expand Down
36 changes: 34 additions & 2 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
Expand Down Expand Up @@ -215,6 +216,7 @@
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.ReadFrom;
import java.util.Arrays;
Expand Down Expand Up @@ -2688,7 +2690,7 @@ public T xadd(@NonNull String key, @NonNull Map<String, String> values) {
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @param options Stream add options {@link StreamAddOptions}.
* @return Command Response - The id of the added entry, or <code>null</code> if {@link
* StreamAddOptionsBuilder#makeStream(Boolean)} is set to <code>false</code> and no stream
* with the matching <code>key</code> exists.
Expand All @@ -2703,12 +2705,42 @@ public T xadd(
return getThis();
}

/**
* Reads entries from the given streams.
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @return Command Response - A <code>{@literal Map<String, Map<Object[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
*/
public T xread(@NonNull Map<String, String> keysAndIds) {
return xread(keysAndIds, StreamReadOptions.builder().build());
}

/**
* Reads entries from the given streams.
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a> for details.
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @param options options detailing how to read the stream {@link StreamReadOptions}.
* @return Command Response - A <code>{@literal Map<String, Map<Object[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
*/
public T xread(@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptions options) {
protobufTransaction.addCommands(buildCommand(XRead, buildArgs(options.toArgs(keysAndIds))));
return getThis();
}

/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options.
* @param options Stream trim options {@link StreamTrimOptions}.
* @return Command Response - The number of entries deleted from the stream.
*/
public T xtrim(@NonNull String key, @NonNull StreamTrimOptions options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands.stream;

import glide.api.commands.StreamBaseCommands;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Builder;

/**
* Optional arguments for {@link StreamBaseCommands#xread(Map, StreamReadOptions)}
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a>
*/
@Builder
public final class StreamReadOptions {

public static final String READ_COUNT_REDIS_API = "COUNT";
public static final String READ_BLOCK_REDIS_API = "BLOCK";
public static final String READ_STREAMS_REDIS_API = "STREAMS";

/**
* If set, the request will be blocked for the set amount of milliseconds or until the server has
* the required number of entries. Equivalent to <code>BLOCK</code> in the Redis API.
*/
Long block;

/**
* The maximal number of elements requested. Equivalent to <code>COUNT</code> in the Redis API.
*/
Long count;

/**
* Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map,
* StreamReadOptions)} into a String[].
*
* @return String[]
*/
public String[] toArgs(Map<String, String> streams) {
List<String> optionArgs = new ArrayList<>();

if (this.count != null) {
optionArgs.add(READ_COUNT_REDIS_API);
optionArgs.add(count.toString());
}

if (this.block != null) {
optionArgs.add(READ_BLOCK_REDIS_API);
optionArgs.add(block.toString());
}

optionArgs.add(READ_STREAMS_REDIS_API);
Set<Map.Entry<String, String>> entrySet = streams.entrySet();
optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList()));

return optionArgs.toArray(new String[0]);
}
}
27 changes: 21 additions & 6 deletions java/client/src/main/java/glide/utils/ArrayTransformUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,32 @@ public static <T, U extends T> U[][] castArrayofArrays(T[] outerObjectArr, Class
* @param mapOfArrays Map of Array values to cast.
* @param clazz The class of the array values to cast to.
* @return A Map of arrays of type U[], containing the key/values from the input Map.
* @param <T> The base type from which the elements are being cast.
* @param <U> The subtype of T to which the elements are cast.
* @param <T> The target type which the elements are cast.
*/
@SuppressWarnings("unchecked")
public static <T, U extends T> Map<String, U[]> castMapOfArrays(
Map<String, T[]> mapOfArrays, Class<U> clazz) {
public static <T> Map<String, T[]> castMapOfArrays(
Map<String, Object[]> mapOfArrays, Class<T> clazz) {
if (mapOfArrays == null) {
return null;
}
return mapOfArrays.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> castArray(e.getValue(), clazz)));
}

/**
* Maps a Map of Object[][] with value type T[][] to value of U[][].
*
* @param mapOfArrays Map of 2D Array values to cast.
* @param clazz The class of the array values to cast to.
* @return A Map of arrays of type U[][], containing the key/values from the input Map.
* @param <T> The target type which the elements are cast.
*/
public static <T> Map<String, T[][]> castMapOf2DArray(
Map<String, Object[][]> mapOfArrays, Class<T> clazz) {
if (mapOfArrays == null) {
return null;
}
return mapOfArrays.entrySet().stream()
.collect(Collectors.toMap(k -> k.getKey(), e -> castArray(e.getValue(), clazz)));
.collect(Collectors.toMap(Map.Entry::getKey, e -> castArrayofArrays(e.getValue(), clazz)));
}

/**
Expand Down
Loading

0 comments on commit 17c7f45

Please sign in to comment.