Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add overloads for XADD to allow duplicate entry keys #1970

Merged
merged 15 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@
import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray;
import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArrayBinary;
import static glide.utils.ArrayTransformUtils.convertNestedArrayToKeyValueGlideStringArray;
import static glide.utils.ArrayTransformUtils.convertNestedArrayToKeyValueStringArray;
import static glide.utils.ArrayTransformUtils.mapGeoDataToArray;
import static glide.utils.ArrayTransformUtils.mapGeoDataToGlideStringArray;

Expand Down Expand Up @@ -2588,12 +2590,23 @@ public CompletableFuture<String> xadd(@NonNull String key, @NonNull Map<String,
return xadd(key, values, StreamAddOptions.builder().build());
}

@Override
public CompletableFuture<String> xadd(@NonNull String key, @NonNull String[][] values) {
return xadd(key, values, StreamAddOptions.builder().build());
}

@Override
public CompletableFuture<GlideString> xadd(
@NonNull GlideString key, @NonNull Map<GlideString, GlideString> values) {
return xadd(key, values, StreamAddOptionsBinary.builder().build());
}

@Override
public CompletableFuture<GlideString> xadd(
@NonNull GlideString key, @NonNull GlideString[][] values) {
return xadd(key, values, StreamAddOptionsBinary.builder().build());
}

@Override
public CompletableFuture<String> xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
Expand All @@ -2603,6 +2616,16 @@ public CompletableFuture<String> xadd(
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<String> xadd(
@NonNull String key, @NonNull String[][] values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key),
convertNestedArrayToKeyValueStringArray(values));
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<GlideString> xadd(
@NonNull GlideString key,
Expand All @@ -2618,6 +2641,21 @@ public CompletableFuture<GlideString> xadd(
return commandManager.submitNewCommand(XAdd, arguments, this::handleGlideStringOrNullResponse);
}

@Override
public CompletableFuture<GlideString> xadd(
@NonNull GlideString key,
@NonNull GlideString[][] values,
@NonNull StreamAddOptionsBinary options) {
GlideString[] arguments =
new ArgsBuilder()
.add(key)
.add(options.toArgs())
.add(convertNestedArrayToKeyValueGlideStringArray(values))
.toArray();

return commandManager.submitNewCommand(XAdd, arguments, this::handleGlideStringOrNullResponse);
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
@NonNull Map<String, String> keysAndIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ public interface StreamBaseCommands {
*/
CompletableFuture<String> xadd(String key, Map<String, String> values);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
* @example
* <pre>{@code
* String streamId = client.xadd("key", {{"name", "Sara"}, {"surname", "OConnor"}}).get();
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* System.out.println("Stream: " + streamId);
* }</pre>
*/
CompletableFuture<String> xadd(String key, String[][] values);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
Expand All @@ -59,6 +75,22 @@ public interface StreamBaseCommands {
*/
CompletableFuture<GlideString> xadd(GlideString key, Map<GlideString, GlideString> values);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
* @example
* <pre>{@code
* String streamId = client.xadd(gs("key"), {{gs("name"), gs("Sara")}, {gs("surname"), gs("OConnor")}}).get();
* System.out.println("Stream: " + streamId);
* }</pre>
*/
CompletableFuture<GlideString> xadd(GlideString key, GlideString[][] values);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
Expand All @@ -82,6 +114,29 @@ public interface StreamBaseCommands {
*/
CompletableFuture<String> xadd(String key, Map<String, String> values, StreamAddOptions options);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
* @return The id of the added entry, or <code>null</code> if {@link
* StreamAddOptionsBuilder#makeStream(Boolean)} is set to <code>false</code> and no stream
* with the matching <code>key</code> exists.
* @example
* <pre>{@code
* // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
* StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* String streamId = client.xadd("key", {{"name", "Sara"}, {"surname", "OConnor"}}, options).get();
* if (streamId != null) {
* assert streamId.equals("sid");
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* }
* }</pre>
*/
CompletableFuture<String> xadd(String key, String[][] values, StreamAddOptions options);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
Expand All @@ -106,6 +161,30 @@ public interface StreamBaseCommands {
CompletableFuture<GlideString> xadd(
GlideString key, Map<GlideString, GlideString> values, StreamAddOptionsBinary options);

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
* @return The id of the added entry, or <code>null</code> if {@link
* StreamAddOptionsBinaryBuilder#makeStream(Boolean)} is set to <code>false</code> and no
* stream with the matching <code>key</code> exists.
* @example
* <pre>{@code
* // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
* StreamAddOptionsBinary options = StreamAddOptions.builder().id(gs("sid")).makeStream(Boolean.FALSE).build();
* String streamId = client.xadd(gs("key"), {{gs("name"), gs("Sara")}, {gs("surname"), gs("OConnor")}}, options).get();
* if (streamId != null) {
* assert streamId.equals("sid");
* }
* }</pre>
*/
CompletableFuture<GlideString> xadd(
GlideString key, GlideString[][] values, StreamAddOptionsBinary options);

/**
* Reads entries from the given streams.
*
Expand Down
43 changes: 43 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
import static glide.utils.ArrayTransformUtils.flattenAllKeysFollowedByAllValues;
import static glide.utils.ArrayTransformUtils.flattenMapToGlideStringArray;
import static glide.utils.ArrayTransformUtils.flattenMapToGlideStringArrayValueFirst;
import static glide.utils.ArrayTransformUtils.flattenNestedArrayToGlideStringArray;
import static glide.utils.ArrayTransformUtils.mapGeoDataToGlideStringArray;

import command_request.CommandRequestOuterClass.Command;
Expand Down Expand Up @@ -3356,6 +3357,21 @@ public <ArgType> T xadd(@NonNull ArgType key, @NonNull Map<ArgType, ArgType> val
return xadd(key, values, StreamAddOptions.builder().build());
}

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* If the <code>key</code> doesn't exist, the stream is created.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return Command Response - The id of the added entry.
*/
public <ArgType> T xadd(@NonNull ArgType key, @NonNull ArgType[][] values) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
return xadd(key, values, StreamAddOptions.builder().build());
}

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
Expand Down Expand Up @@ -3385,6 +3401,33 @@ public <ArgType> T xadd(
return getThis();
}

/**
* Adds an entry to the specified stream stored at <code>key</code>.<br>
* If the <code>key</code> doesn't exist, the stream is created.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @see <a href="https://valkey.io/commands/xadd/">valkey.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options {@link StreamAddOptions}.
* @return Command Response - The id of the added entry, or <code>null</code> if {@link
* StreamAddOptionsBuilder#makeStream(Boolean)} is set to <code>false</code> and no stream
* with the matching <code>key</code> exists.
*/
public <ArgType> T xadd(
@NonNull ArgType key, @NonNull ArgType[][] values, @NonNull StreamAddOptions options) {
checkTypeOrThrow(key);
protobufTransaction.addCommands(
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
buildCommand(
XAdd,
newArgsBuilder()
.add(key)
.add(options.toArgs())
.add(flattenNestedArrayToGlideStringArray(values))));
return getThis();
}

/**
* Reads entries from the given streams.
*
Expand Down
57 changes: 57 additions & 0 deletions java/client/src/main/java/glide/utils/ArrayTransformUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,44 @@ public static GlideString[] convertMapToKeyValueGlideStringArray(Map<GlideString
.toArray(GlideString[]::new);
}

/**
* Converts a nested array of string keys and values of any type that can be converted in to an
* array of strings with alternating keys and values.
*
* @param args Nested array of string keys to values of any type to convert.
* @return Array of strings [key1, value1.toString(), key2, value2.toString(), ...].
*/
public static String[] convertNestedArrayToKeyValueStringArray(String[][] args) {
for (String[] entry : args) {
if (entry.length != 2) {
throw new IllegalArgumentException(
"Array entry had the wrong length. Expected length 2 but got length " + entry.length);
}
}
return Arrays.stream(args)
.flatMap(entry -> Stream.of(entry[0], entry[1]))
.toArray(String[]::new);
}

/**
* Converts a nested array of GlideString keys and values of any type in to an array of
* GlideStrings with alternating keys and values.
*
* @param args Nested array of GlideString keys to values of any type to convert.
* @return Array of strings [key1, gs(value1.toString()), key2, gs(value2.toString()), ...].
*/
public static GlideString[] convertNestedArrayToKeyValueGlideStringArray(GlideString[][] args) {
for (GlideString[] entry : args) {
if (entry.length != 2) {
throw new IllegalArgumentException(
"Array entry had the wrong length. Expected length 2 but got length " + entry.length);
}
}
return Arrays.stream(args)
.flatMap(entry -> Stream.of(entry[0], GlideString.gs(entry[1].toString())))
.toArray(GlideString[]::new);
}

/**
* Converts a map of string keys and values of any type into an array of strings with alternating
* values and keys.
Expand Down Expand Up @@ -250,6 +288,25 @@ public static GlideString[] flattenMapToGlideStringArray(Map<?, ?> args) {
.toArray(GlideString[]::new);
}

/**
* Converts a nested array of any type of keys and values in to an array of GlideString with
* alternating keys and values.
*
* @param args Nested array of keys to values of any type to convert.
* @return Array of GlideString [key1, value1, key2, value2, ...].
*/
public static <T> GlideString[] flattenNestedArrayToGlideStringArray(T[][] args) {
for (T[] entry : args) {
if (entry.length != 2) {
throw new IllegalArgumentException(
"Array entry had the wrong length. Expected length 2 but got length " + entry.length);
}
}
return Arrays.stream(args)
.flatMap(entry -> Stream.of(GlideString.of(entry[0]), GlideString.of(entry[1])))
.toArray(GlideString[]::new);
}

/**
* Converts a map of any type of keys and values in to an array of GlideString with alternating
* values and keys.
Expand Down
56 changes: 56 additions & 0 deletions java/client/src/test/java/glide/api/GlideClientTest.java
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@
import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray;
import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArrayBinary;
import static glide.utils.ArrayTransformUtils.convertNestedArrayToKeyValueGlideStringArray;
import static glide.utils.ArrayTransformUtils.convertNestedArrayToKeyValueStringArray;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -7174,6 +7176,32 @@ public void xadd_returns_success() {
assertEquals(returnId, response.get());
}

@SneakyThrows
@Test
public void xadd_list_of_pairs_returns_success() {
// setup
String key = "testKey";
String[][] fieldValues = {{"testField1", "testValue1"}, {"testField2", "testValue2"}};
String[] fieldValuesArgs = convertNestedArrayToKeyValueStringArray(fieldValues);
String[] arguments = new String[] {key, "*"};
arguments = ArrayUtils.addAll(arguments, fieldValuesArgs);
String returnId = "testId";

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(returnId);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(XAdd), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.xadd(key, fieldValues);

// verify
assertEquals(testResponse, response);
assertEquals(returnId, response.get());
}

@SneakyThrows
@Test
public void xadd_binary_returns_success() {
Expand Down Expand Up @@ -7202,6 +7230,34 @@ public void xadd_binary_returns_success() {
assertEquals(returnId, response.get());
}

@SneakyThrows
@Test
public void xadd_list_of_pairs_binary_returns_success() {
// setup
GlideString key = gs("testKey");
GlideString[][] fieldValues = {
{gs("testField1"), gs("testValue1")}, {gs("testField2"), gs("testValue2")}
};
GlideString[] fieldValuesArgs = convertNestedArrayToKeyValueGlideStringArray(fieldValues);
GlideString[] arguments = new GlideString[] {key, gs("*")};
arguments = ArrayUtils.addAll(arguments, fieldValuesArgs);
GlideString returnId = gs("testId");

CompletableFuture<GlideString> testResponse = new CompletableFuture<>();
testResponse.complete(returnId);

// match on protobuf request
when(commandManager.<GlideString>submitNewCommand(eq(XAdd), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<GlideString> response = service.xadd(key, fieldValues);

// verify
assertEquals(testResponse, response);
assertEquals(returnId, response.get());
}

private static List<Arguments> getStreamAddOptions() {
return List.of(
Arguments.of(
Expand Down
Loading
Loading