From e03017f16533ca4f37d7bebafb53149cd87fb42f Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 7 Aug 2024 14:20:36 -0700 Subject: [PATCH 1/3] Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/BaseClient.java | 51 +++++ .../api/commands/PubSubBaseCommands.java | 121 ++++++++++ .../glide/api/models/BaseTransaction.java | 72 ++++++ .../ClusterSubscriptionConfiguration.java | 13 ++ .../test/java/glide/api/GlideClientTest.java | 167 ++++++++++++++ .../glide/api/models/TransactionTests.java | 15 ++ .../src/test/java/glide/PubSubTests.java | 214 +++++++++++++++++- 7 files changed, 652 insertions(+), 1 deletion(-) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 9c6297f0ad..8d0bac8cfa 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -83,6 +83,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfAdd; import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -4495,6 +4498,54 @@ public CompletableFuture publish( }); } + @Override + public CompletableFuture pubsubChannels() { + return commandManager.submitNewCommand( + PubSubChannels, + new String[0], + response -> castArray(handleArrayResponse(response), String.class)); + } + + @Override + public CompletableFuture pubsubChannelsBinary() { + return commandManager.submitNewCommand( + PubSubChannels, + new GlideString[0], + response -> castArray(handleArrayResponse(response), GlideString.class)); + } + + @Override + public CompletableFuture pubsubChannels(@NonNull String pattern) { + return commandManager.submitNewCommand( + PubSubChannels, + new String[] {pattern}, + response -> castArray(handleArrayResponse(response), String.class)); + } + + @Override + public CompletableFuture pubsubChannels(@NonNull GlideString pattern) { + return commandManager.submitNewCommand( + PubSubChannels, + new GlideString[] {pattern}, + response -> castArray(handleArrayResponse(response), GlideString.class)); + } + + @Override + public CompletableFuture pubsubNumPat() { + return commandManager.submitNewCommand(PubSubNumPat, new String[0], this::handleLongResponse); + } + + @Override + public CompletableFuture> pubsubNumSub(@NonNull String[] channels) { + return commandManager.submitNewCommand(PubSubNumSub, channels, this::handleMapResponse); + } + + @Override + public CompletableFuture> pubsubNumSub(@NonNull GlideString[] channels) { + return commandManager.submitNewCommand( + PubSubNumSub, channels, this::handleBinaryStringMapResponse); + } + @Override public CompletableFuture watch(@NonNull String[] keys) { return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse); diff --git a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java index 6906d98e06..5ca571f530 100644 --- a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java @@ -2,6 +2,7 @@ package glide.api.commands; import glide.api.models.GlideString; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -40,4 +41,124 @@ public interface PubSubBaseCommands { * } */ CompletableFuture publish(GlideString message, GlideString channel); + + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @return An Array of all active channels. + * @example + *
{@code
+     * String[] response = client.pubsubChannels().get();
+     * assert Arrays.equals(new String[] { "channel1", "channel2" });
+     * }
+ */ + CompletableFuture pubsubChannels(); + + /** + * Lists the currently active channels.
+ * Dislike of {@link #pubsubChannels()}, returns channel names as {@link GlideString}s. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @return An Array of all active channels. + * @example + *
{@code
+     * GlideString[] response = client.pubsubChannels().get();
+     * assert Arrays.equals(new GlideString[] { "channel1", "channel2" });
+     * }
+ */ + CompletableFuture pubsubChannelsBinary(); + + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @param pattern A glob-style pattern to match active channels. + * @return An Array of currently active channels matching the given pattern. + * @example + *
{@code
+     * String[] response = client.pubsubChannels("news.*").get();
+     * assert Arrays.equals(new String[] { "news.sports", "news.weather" });
+     * }
+ */ + CompletableFuture pubsubChannels(String pattern); + + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @param pattern A glob-style pattern to match active channels. + * @return An Array of currently active channels matching the given pattern. + * @example + *
{@code
+     * GlideString[] response = client.pubsubChannels(gs("news.*")).get();
+     * assert Arrays.equals(new GlideString[] { gs("news.sports"), gs("news.weather") });
+     * }
+ */ + CompletableFuture pubsubChannels(GlideString pattern); + + /** + * Returns the number of unique patterns that are subscribed to by clients. + * + * @apiNote + *
    + *
  • When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + *
  • This is the total number of unique patterns all the clients are subscribed to, not + * the count of clients subscribed to patterns. + *
+ * + * @see valkey.io for details. + * @return The number of unique patterns. + * @example + *
{@code
+     * Long result = client.pubsubNumPat().get();
+     * assert result == 3L;
+     * }
+ */ + CompletableFuture pubsubNumPat(); + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single map. + * @see valkey.io for details. + * @param channels The list of channels to query for the number of subscribers. + * @return A Map where keys are the channel names and values are the numbers of + * subscribers. + * @example + *
{@code
+     * Map result = client.pubsubNumSub(new String[] {"channel1", "channel2"}).get();
+     * assert result.equals(Map.of("channel1", 3L, "channel2", 5L));
+     * }
+ */ + CompletableFuture> pubsubNumSub(String[] channels); + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single map. + * @see valkey.io for details. + * @param channels The list of channels to query for the number of subscribers. + * @return A Map where keys are the channel names and values are the numbers of + * subscribers. + * @example + *
{@code
+     * Map result = client.pubsubNumSub(new GlideString[] {gs("channel1"), gs("channel2")}).get();
+     * assert result.equals(Map.of(gs("channel1"), 3L, gs("channel2"), 5L));
+     * }
+ */ + CompletableFuture> pubsubNumSub(GlideString[] channels); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 6766617426..d0562dc75e 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -105,6 +105,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; import static command_request.CommandRequestOuterClass.RequestType.Ping; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -6295,6 +6298,75 @@ public T publish(@NonNull ArgType message, @NonNull ArgType channel) { return getThis(); } + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @return Command response - An Array of all active channels. + */ + public T pubsubChannels() { + protobufTransaction.addCommands(buildCommand(PubSubChannels)); + return getThis(); + } + + /** + * Lists the currently active channels. + * + * @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type + * will throw {@link IllegalArgumentException}. + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @param pattern A glob-style pattern to match active channels. + * @return Command response - An Array of currently active channels matching the + * given pattern. + */ + public T pubsubChannels(@NonNull ArgType pattern) { + checkTypeOrThrow(pattern); + protobufTransaction.addCommands(buildCommand(PubSubChannels, newArgsBuilder().add(pattern))); + return getThis(); + } + + /** + * Returns the number of unique patterns that are subscribed to by clients. + * + * @apiNote + *
    + *
  • When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + *
  • This is the total number of unique patterns all the clients are subscribed to, not + * the count of clients subscribed to patterns. + *
+ * + * @see valkey.io for details. + * @return Command response - The number of unique patterns. + */ + public T pubsubNumPat() { + protobufTransaction.addCommands(buildCommand(PubSubNumPat)); + return getThis(); + } + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified channels. + * + * @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type + * will throw {@link IllegalArgumentException}. + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single map. + * @see valkey.io for details. + * @param channels The list of channels to query for the number of subscribers. + * @return Command response - A Map where keys are the channel names and values are + * the numbers of subscribers. + */ + public T pubsubNumSub(@NonNull ArgType[] channels) { + checkTypeOrThrow(channels); + protobufTransaction.addCommands(buildCommand(PubSubNumSub, newArgsBuilder().add(channels))); + return getThis(); + } + /** * Gets the union of all the given sets. * diff --git a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java index a29ddd3d83..c45d6abb33 100644 --- a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java @@ -1,6 +1,8 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.models.configuration; +import static glide.api.models.GlideString.gs; + import glide.api.GlideClusterClient; import glide.api.models.GlideString; import java.util.HashMap; @@ -91,6 +93,17 @@ public ClusterSubscriptionConfigurationBuilder subscription( return this; } + /** + * Add a subscription to a channel or to multiple channels if {@link + * PubSubClusterChannelMode#PATTERN} is used.
+ * See {@link ClusterSubscriptionConfiguration#subscriptions}. + */ + public ClusterSubscriptionConfigurationBuilder subscription( + PubSubClusterChannelMode mode, String channelOrPattern) { + addSubscription(subscriptions, mode, gs(channelOrPattern)); + return this; + } + /** * Set all subscriptions in a bulk. Rewrites previously stored configurations.
* See {@link ClusterSubscriptionConfiguration#subscriptions}. diff --git a/java/client/src/test/java/glide/api/GlideClientTest.java b/java/client/src/test/java/glide/api/GlideClientTest.java index cb29738bbb..8190eb29b3 100644 --- a/java/client/src/test/java/glide/api/GlideClientTest.java +++ b/java/client/src/test/java/glide/api/GlideClientTest.java @@ -107,6 +107,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; import static command_request.CommandRequestOuterClass.RequestType.Ping; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -13494,6 +13497,170 @@ public void publish_returns_success() { assertEquals(OK, payload); } + @SneakyThrows + @Test + public void pubsubChannels_returns_success() { + // setup + String[] arguments = new String[0]; + String[] value = new String[] {"ch1", "ch2"}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannels(); + String[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubChannelsBinary_returns_success() { + // setup + GlideString[] arguments = new GlideString[0]; + GlideString[] value = new GlideString[] {gs("ch1"), gs("ch2")}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannelsBinary(); + GlideString[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubChannels_with_pattern_returns_success() { + // setup + String pattern = "ch*"; + String[] arguments = new String[] {pattern}; + String[] value = new String[] {"ch1", "ch2"}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannels(pattern); + String[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubChannelsBinary_with_pattern_returns_success() { + // setup + GlideString pattern = gs("ch*"); + GlideString[] arguments = new GlideString[] {pattern}; + GlideString[] value = new GlideString[] {gs("ch1"), gs("ch2")}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannels(pattern); + GlideString[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubNumPat_returns_success() { + // setup + String[] arguments = new String[0]; + Long value = 42L; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubNumPat), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubNumPat(); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubNumSub_returns_success() { + // setup + String[] arguments = new String[] {"ch1", "ch2"}; + Map value = Map.of(); + + CompletableFuture> testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.>submitNewCommand(eq(PubSubNumSub), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture> response = service.pubsubNumSub(arguments); + Map payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubNumSubBinary_returns_success() { + // setup + GlideString[] arguments = new GlideString[] {gs("ch1"), gs("ch2")}; + Map value = Map.of(); + + CompletableFuture> testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.>submitNewCommand( + eq(PubSubNumSub), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture> response = service.pubsubNumSub(arguments); + Map payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + @SneakyThrows @Test public void sunion_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index aa49a6849b..c112ff7599 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -104,6 +104,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; import static command_request.CommandRequestOuterClass.RequestType.Ping; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -1278,6 +1281,18 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.publish("msg", "ch1"); results.add(Pair.of(Publish, buildArgs("ch1", "msg"))); + transaction.pubsubChannels(); + results.add(Pair.of(PubSubChannels, buildArgs())); + + transaction.pubsubChannels("pattern"); + results.add(Pair.of(PubSubChannels, buildArgs("pattern"))); + + transaction.pubsubNumPat(); + results.add(Pair.of(PubSubNumPat, buildArgs())); + + transaction.pubsubNumSub(new String[] {"ch1", "ch2"}); + results.add(Pair.of(PubSubNumSub, buildArgs("ch1", "ch2"))); + transaction.lcsIdx("key1", "key2"); results.add(Pair.of(LCS, buildArgs("key1", "key2", IDX_COMMAND_STRING))); diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 6ca9b3691f..e5bd04da6c 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -2,11 +2,13 @@ package glide; import static glide.TestConfiguration.SERVER_VERSION; +import static glide.TestUtilities.assertDeepEquals; import static glide.TestUtilities.commonClientConfig; import static glide.TestUtilities.commonClusterClientConfig; import static glide.api.BaseClient.OK; import static glide.api.models.GlideString.gs; import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; @@ -49,6 +51,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -247,6 +250,7 @@ public void exact_happy_path(boolean standalone, MessageReadMethod method) { } /** Similar to `test_pubsub_exact_happy_path_many_channels` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -283,6 +287,7 @@ public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod } /** Similar to `test_sharded_pubsub` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -306,6 +311,7 @@ public void sharded_pubsub(MessageReadMethod method) { } /** Similar to `test_sharded_pubsub_many_channels` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -346,6 +352,7 @@ public void sharded_pubsub_many_channels(MessageReadMethod method) { } /** Similar to `test_pubsub_pattern` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -386,6 +393,7 @@ public void pattern(boolean standalone, MessageReadMethod method) { } /** Similar to `test_pubsub_pattern_many_channels` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -425,6 +433,7 @@ public void pattern_many_channels(boolean standalone, MessageReadMethod method) } /** Similar to `test_pubsub_combined_exact_and_pattern_one_client` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -476,6 +485,7 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea /** * Similar to `test_pubsub_combined_exact_and_pattern_multiple_clients` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -546,6 +556,7 @@ public void combined_exact_and_pattern_multiple_clients( /** * Similar to `test_pubsub_combined_exact_pattern_and_sharded_one_client` in python client tests. */ + @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -604,6 +615,7 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth } /** This test fully covers all `test_pubsub_*_co_existence` tests in python client. */ + @Disabled @SneakyThrows @Test public void coexistense_of_sync_and_async_read() { @@ -686,6 +698,7 @@ public void coexistense_of_sync_and_async_read() { * Similar to `test_pubsub_combined_exact_pattern_and_sharded_multi_client` in python client * tests. */ + @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -803,6 +816,7 @@ public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod me * Similar to `test_pubsub_three_publishing_clients_same_name_with_sharded` in python client * tests. */ + @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -1232,7 +1246,7 @@ public void pubsub_with_binary(boolean standalone) { createClientWithSubscriptions( standalone, subscriptions, Optional.of(callback), Optional.of(callbackMessages)); var sender = createClient(standalone); - clients.addAll(Arrays.asList(listener, listener2, sender)); + clients.addAll(List.of(listener, listener2, sender)); assertEquals(OK, sender.publish(message.getMessage(), channel).get()); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -1241,4 +1255,202 @@ public void pubsub_with_binary(boolean standalone) { assertEquals(1, callbackMessages.size()); assertEquals(message, callbackMessages.get(0)); } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_channels(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + assertEquals(0, client.pubsubChannels().get().length); + assertEquals(0, client.pubsubChannelsBinary().get().length); + assertEquals(0, client.pubsubChannels("**").get().length); + assertEquals(0, client.pubsubChannels(gs("**")).get().length); + + var channels = Set.of("test_channel1", "test_channel2", "some_channel"); + String pattern = "test_*"; + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.EXACT, + channels.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.EXACT, + channels.stream().map(GlideString::gs).collect(Collectors.toSet())); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + clients.addAll(List.of(client, listener)); + + // test without pattern + assertArrayEquals(channels.toArray(), client.pubsubChannels().get()); + assertArrayEquals(channels.toArray(), listener.pubsubChannels().get()); + assertArrayEquals( + channels.stream().map(GlideString::gs).toArray(), client.pubsubChannelsBinary().get()); + assertArrayEquals( + channels.stream().map(GlideString::gs).toArray(), listener.pubsubChannelsBinary().get()); + + // test with pattern + assertArrayEquals( + new String[] {"test_channel1", "test_channel2"}, client.pubsubChannels(pattern).get()); + assertArrayEquals( + new GlideString[] {gs("test_channel1"), gs("test_channel2")}, + client.pubsubChannels(gs(pattern)).get()); + assertArrayEquals( + new String[] {"test_channel1", "test_channel2"}, listener.pubsubChannels(pattern).get()); + assertArrayEquals( + new GlideString[] {gs("test_channel1"), gs("test_channel2")}, + listener.pubsubChannels(gs(pattern)).get()); + + // test with non-matching pattern + assertEquals(0, client.pubsubChannels("non_matching_*").get().length); + assertEquals(0, client.pubsubChannels(gs("non_matching_*")).get().length); + assertEquals(0, listener.pubsubChannels("non_matching_*").get().length); + assertEquals(0, listener.pubsubChannels(gs("non_matching_*")).get().length); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_numpat(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + assertEquals(0, client.pubsubNumPat().get()); + + var patterns = Set.of("news.*", "announcements.*"); + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + clients.addAll(List.of(client, listener)); + + assertEquals(2, client.pubsubNumPat().get()); + assertEquals(2, listener.pubsubNumPat().get()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_numsub(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + var channels = new String[] {"channel1", "channel2", "channel3"}; + assertEquals( + Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), + client.pubsubNumSub(channels).get()); + + Map> subscriptions1 = + standalone + ? Map.of( + PubSubChannelMode.EXACT, Set.of(gs("channel1"), gs("channel2"), gs("channel3"))) + : Map.of( + PubSubClusterChannelMode.EXACT, + Set.of(gs("channel1"), gs("channel2"), gs("channel3"))); + var listener1 = createClientWithSubscriptions(standalone, subscriptions1); + + Map> subscriptions2 = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(gs("channel2"), gs("channel3"))) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(gs("channel2"), gs("channel3"))); + var listener2 = createClientWithSubscriptions(standalone, subscriptions2); + + Map> subscriptions3 = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(gs("channel3"))) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(gs("channel3"))); + var listener3 = createClientWithSubscriptions(standalone, subscriptions3); + + Map> subscriptions4 = + standalone + ? Map.of(PubSubChannelMode.PATTERN, Set.of(gs("channel*"))) + : Map.of(PubSubClusterChannelMode.PATTERN, Set.of(gs("channel*"))); + var listener4 = createClientWithSubscriptions(standalone, subscriptions4); + + clients.addAll(List.of(client, listener1, listener2, listener3, listener4)); + + var expected = Map.of("channel1", 1L, "channel2", 2L, "channel3", 3L, "channel4", 0L); + assertEquals(expected, client.pubsubNumSub(ArrayUtils.addFirst(channels, "channel4")).get()); + assertEquals(expected, listener1.pubsubNumSub(ArrayUtils.addFirst(channels, "channel4")).get()); + + var expectedGs = + Map.of(gs("channel1"), 1L, gs("channel2"), 2L, gs("channel3"), 3L, gs("channel4"), 0L); + assertEquals( + expectedGs, + client + .pubsubNumSub( + new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")}) + .get()); + assertEquals( + expectedGs, + listener2 + .pubsubNumSub( + new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")}) + .get()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_channels_and_numpat_and_numsub_in_transaction(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + var channels = new String[] {"test_channel1", "test_channel2", "some_channel"}; + var patterns = Set.of("news.*", "announcements.*"); + String pattern = "test_*"; + + assertEquals( + Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), + client.pubsubNumSub(channels).get()); + + var transaction = (standalone ? new Transaction() : new ClusterTransaction()) + .pubsubChannels().pubsubChannels(pattern).pubsubNumPat().pubsubNumSub(channels); + + var result = standalone ? ((GlideClient)client).exec((Transaction) transaction) : ((GlideClusterClient)client).exec((ClusterTransaction) transaction); + assertDeepEquals(new Object[] { + new String[0], // pubsubChannels() + new String[0], // pubsubChannels(pattern) + 0, // pubsubNumPat() + Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), // pubsubNumSub(channels) + }, result + ); + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.EXACT, + Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), + PubSubChannelMode.PATTERN, patterns.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.EXACT, + Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), + PubSubClusterChannelMode.PATTERN, patterns.stream().map(GlideString::gs).collect(Collectors.toSet())); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + clients.addAll(List.of(client, listener)); + + result = standalone ? ((GlideClient)client).exec((Transaction) transaction) : ((GlideClusterClient)client).exec((ClusterTransaction) transaction); + + assertDeepEquals(new Object[] { + channels, // pubsubChannels() + new String[] {"test_channel1", "test_channel2"}, // pubsubChannels(pattern) + 1, // pubsubNumPat() + Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 1L)), // pubsubNumSub(channels) + }, result + ); + } } From 709baf6a1e45975147e3a18694546f5d190891ab Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 8 Aug 2024 14:12:58 -0700 Subject: [PATCH 2/3] Signed-off-by: Yury-Fridlyand --- CHANGELOG.md | 1 + .../src/main/java/glide/api/BaseClient.java | 4 +- .../src/main/java/glide/api/GlideClient.java | 10 ++ .../java/glide/api/GlideClusterClient.java | 22 ++- .../src/test/java/glide/PubSubTests.java | 167 ++++++++++-------- 5 files changed, 122 insertions(+), 82 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3403cb5bc9..accd9df3c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105)) * Node: Added EXPIRETIME and PEXPIRETIME commands ([#2063](https://github.com/valkey-io/valkey-glide/pull/2063)) * Node: Added SORT commands ([#2028](https://github.com/valkey-io/valkey-glide/pull/2028)) * Node: Added LASTSAVE command ([#2059](https://github.com/valkey-io/valkey-glide/pull/2059)) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 8d0bac8cfa..91182e797e 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -4511,7 +4511,7 @@ public CompletableFuture pubsubChannelsBinary() { return commandManager.submitNewCommand( PubSubChannels, new GlideString[0], - response -> castArray(handleArrayResponse(response), GlideString.class)); + response -> castArray(handleArrayResponseBinary(response), GlideString.class)); } @Override @@ -4527,7 +4527,7 @@ public CompletableFuture pubsubChannels(@NonNull GlideString patt return commandManager.submitNewCommand( PubSubChannels, new GlideString[] {pattern}, - response -> castArray(handleArrayResponse(response), GlideString.class)); + response -> castArray(handleArrayResponseBinary(response), GlideString.class)); } @Override diff --git a/java/client/src/main/java/glide/api/GlideClient.java b/java/client/src/main/java/glide/api/GlideClient.java index 53eaeb369d..5b7206b7fe 100644 --- a/java/client/src/main/java/glide/api/GlideClient.java +++ b/java/client/src/main/java/glide/api/GlideClient.java @@ -96,6 +96,16 @@ public CompletableFuture customCommand(@NonNull String[] args) { return commandManager.submitNewCommand(CustomCommand, args, this::handleObjectOrNullResponse); } + // @Override + public CompletableFuture customCommand(@NonNull GlideString[] args) { + return commandManager.submitNewCommand( + CustomCommand, + args, + response -> { + return handleBinaryObjectOrNullResponse(response); + }); + } + @Override public CompletableFuture exec(@NonNull Transaction transaction) { if (transaction.isBinaryOutput()) { diff --git a/java/client/src/main/java/glide/api/GlideClusterClient.java b/java/client/src/main/java/glide/api/GlideClusterClient.java index 65b8721ec8..e29e3ef195 100644 --- a/java/client/src/main/java/glide/api/GlideClusterClient.java +++ b/java/client/src/main/java/glide/api/GlideClusterClient.java @@ -110,17 +110,31 @@ public CompletableFuture> customCommand(@NonNull String[] a public CompletableFuture> customCommand( @NonNull String[] args, @NonNull Route route) { return commandManager.submitNewCommand( - CustomCommand, args, route, response -> handleCustomCommandResponse(route, response)); + CustomCommand, + args, + route, + response -> handleCustomCommandResponse(route, response, false)); } - protected ClusterValue handleCustomCommandResponse(Route route, Response response) { + // @Override + public CompletableFuture> customCommand( + @NonNull GlideString[] args, @NonNull Route route) { + return commandManager.submitNewCommand( + CustomCommand, args, route, response -> handleCustomCommandResponse(route, response, true)); + } + + protected ClusterValue handleCustomCommandResponse( + Route route, Response response, boolean bin) { if (route instanceof SingleNodeRoute) { - return ClusterValue.ofSingleValue(handleObjectOrNullResponse(response)); + return ClusterValue.ofSingleValue( + bin ? handleBinaryObjectOrNullResponse(response) : handleObjectOrNullResponse(response)); } if (response.hasConstantResponse()) { return ClusterValue.ofSingleValue(handleStringResponse(response)); } - return ClusterValue.ofMultiValue(handleMapResponse(response)); + return bin + ? ClusterValue.ofMultiValueBinary(handleBinaryStringMapResponse(response)) + : ClusterValue.ofMultiValue(handleMapResponse(response)); } @Override diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index e5bd04da6c..19166cbbf0 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -8,7 +8,6 @@ import static glide.api.BaseClient.OK; import static glide.api.models.GlideString.gs; import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; @@ -29,6 +28,8 @@ import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback; import glide.api.models.configuration.ClusterSubscriptionConfiguration; import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode; +import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute; +import glide.api.models.configuration.RequestRoutingConfiguration.SlotType; import glide.api.models.configuration.StandaloneSubscriptionConfiguration; import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode; import glide.api.models.exceptions.ConfigurationError; @@ -53,7 +54,7 @@ import lombok.SneakyThrows; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -114,10 +115,9 @@ private BaseClient createClientWithSubscriptions( @SneakyThrows private BaseClient createClient(boolean standalone) { - if (standalone) { - return GlideClient.createClient(commonClientConfig().build()).get(); - } - return GlideClusterClient.createClient(commonClusterClientConfig().build()).get(); + return standalone + ? GlideClient.createClient(commonClientConfig().build()).get() + : GlideClusterClient.createClient(commonClusterClientConfig().build()).get(); } /** @@ -131,17 +131,23 @@ private BaseClient createClient(boolean standalone) { private static final int MESSAGE_DELIVERY_DELAY = 500; // ms - @BeforeEach + @AfterEach @SneakyThrows public void cleanup() { for (var client : clients) { if (client instanceof GlideClusterClient) { - ((GlideClusterClient) client).customCommand(new String[] {"unsubscribe"}, ALL_NODES).get(); - ((GlideClusterClient) client).customCommand(new String[] {"punsubscribe"}, ALL_NODES).get(); - ((GlideClusterClient) client).customCommand(new String[] {"sunsubscribe"}, ALL_NODES).get(); + ((GlideClusterClient) client) + .customCommand(new GlideString[] {gs("unsubscribe")}, ALL_NODES) + .get(); + ((GlideClusterClient) client) + .customCommand(new GlideString[] {gs("punsubscribe")}, ALL_NODES) + .get(); + ((GlideClusterClient) client) + .customCommand(new GlideString[] {gs("sunsubscribe")}, ALL_NODES) + .get(); } else { - ((GlideClient) client).customCommand(new String[] {"unsubscribe"}).get(); - ((GlideClient) client).customCommand(new String[] {"punsubscribe"}).get(); + ((GlideClient) client).customCommand(new GlideString[] {gs("unsubscribe")}).get(); + ((GlideClient) client).customCommand(new GlideString[] {gs("punsubscribe")}).get(); } client.close(); } @@ -250,7 +256,6 @@ public void exact_happy_path(boolean standalone, MessageReadMethod method) { } /** Similar to `test_pubsub_exact_happy_path_many_channels` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -287,7 +292,6 @@ public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod } /** Similar to `test_sharded_pubsub` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -311,7 +315,6 @@ public void sharded_pubsub(MessageReadMethod method) { } /** Similar to `test_sharded_pubsub_many_channels` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -352,7 +355,6 @@ public void sharded_pubsub_many_channels(MessageReadMethod method) { } /** Similar to `test_pubsub_pattern` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -393,7 +395,6 @@ public void pattern(boolean standalone, MessageReadMethod method) { } /** Similar to `test_pubsub_pattern_many_channels` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -433,7 +434,6 @@ public void pattern_many_channels(boolean standalone, MessageReadMethod method) } /** Similar to `test_pubsub_combined_exact_and_pattern_one_client` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -485,7 +485,6 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea /** * Similar to `test_pubsub_combined_exact_and_pattern_multiple_clients` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest(name = "standalone = {0}, read messages via {1}") @MethodSource("getTestScenarios") @@ -556,7 +555,6 @@ public void combined_exact_and_pattern_multiple_clients( /** * Similar to `test_pubsub_combined_exact_pattern_and_sharded_one_client` in python client tests. */ - @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -615,7 +613,6 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth } /** This test fully covers all `test_pubsub_*_co_existence` tests in python client. */ - @Disabled @SneakyThrows @Test public void coexistense_of_sync_and_async_read() { @@ -698,7 +695,6 @@ public void coexistense_of_sync_and_async_read() { * Similar to `test_pubsub_combined_exact_pattern_and_sharded_multi_client` in python client * tests. */ - @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -816,7 +812,6 @@ public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod me * Similar to `test_pubsub_three_publishing_clients_same_name_with_sharded` in python client * tests. */ - @Disabled @SneakyThrows @ParameterizedTest @EnumSource(MessageReadMethod.class) @@ -1285,24 +1280,26 @@ public void pubsub_channels(boolean standalone) { clients.addAll(List.of(client, listener)); // test without pattern - assertArrayEquals(channels.toArray(), client.pubsubChannels().get()); - assertArrayEquals(channels.toArray(), listener.pubsubChannels().get()); - assertArrayEquals( - channels.stream().map(GlideString::gs).toArray(), client.pubsubChannelsBinary().get()); - assertArrayEquals( - channels.stream().map(GlideString::gs).toArray(), listener.pubsubChannelsBinary().get()); + assertEquals(channels, Set.of(client.pubsubChannels().get())); + assertEquals(channels, Set.of(listener.pubsubChannels().get())); + assertEquals( + channels.stream().map(GlideString::gs).collect(Collectors.toSet()), + Set.of(client.pubsubChannelsBinary().get())); + assertEquals( + channels.stream().map(GlideString::gs).collect(Collectors.toSet()), + Set.of(listener.pubsubChannelsBinary().get())); // test with pattern - assertArrayEquals( - new String[] {"test_channel1", "test_channel2"}, client.pubsubChannels(pattern).get()); - assertArrayEquals( - new GlideString[] {gs("test_channel1"), gs("test_channel2")}, - client.pubsubChannels(gs(pattern)).get()); - assertArrayEquals( - new String[] {"test_channel1", "test_channel2"}, listener.pubsubChannels(pattern).get()); - assertArrayEquals( - new GlideString[] {gs("test_channel1"), gs("test_channel2")}, - listener.pubsubChannels(gs(pattern)).get()); + assertEquals( + Set.of("test_channel1", "test_channel2"), Set.of(client.pubsubChannels(pattern).get())); + assertEquals( + Set.of(gs("test_channel1"), gs("test_channel2")), + Set.of(client.pubsubChannels(gs(pattern)).get())); + assertEquals( + Set.of("test_channel1", "test_channel2"), Set.of(listener.pubsubChannels(pattern).get())); + assertEquals( + Set.of(gs("test_channel1"), gs("test_channel2")), + Set.of(listener.pubsubChannels(gs(pattern)).get())); // test with non-matching pattern assertEquals(0, client.pubsubChannels("non_matching_*").get().length); @@ -1407,50 +1404,68 @@ public void pubsub_numsub(boolean standalone) { public void pubsub_channels_and_numpat_and_numsub_in_transaction(boolean standalone) { assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); - // no channels exists yet + var prefix = "{boo}-"; + var route = new SlotKeyRoute(prefix, SlotType.PRIMARY); var client = createClient(standalone); - var channels = new String[] {"test_channel1", "test_channel2", "some_channel"}; - var patterns = Set.of("news.*", "announcements.*"); - String pattern = "test_*"; - - assertEquals( - Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), - client.pubsubNumSub(channels).get()); - - var transaction = (standalone ? new Transaction() : new ClusterTransaction()) - .pubsubChannels().pubsubChannels(pattern).pubsubNumPat().pubsubNumSub(channels); + var channels = + new String[] {prefix + "test_channel1", prefix + "test_channel2", prefix + "some_channel"}; + var patterns = Set.of(prefix + "news.*", prefix + "announcements.*"); + String pattern = prefix + "test_*"; - var result = standalone ? ((GlideClient)client).exec((Transaction) transaction) : ((GlideClusterClient)client).exec((ClusterTransaction) transaction); - assertDeepEquals(new Object[] { - new String[0], // pubsubChannels() - new String[0], // pubsubChannels(pattern) - 0, // pubsubNumPat() - Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), // pubsubNumSub(channels) - }, result - ); + var transaction = + (standalone ? new Transaction() : new ClusterTransaction()) + .pubsubChannels() + .pubsubChannels(pattern) + .pubsubNumPat() + .pubsubNumSub(channels); + // no channels exists yet + var result = + standalone + ? ((GlideClient) client).exec((Transaction) transaction).get() + : ((GlideClusterClient) client).exec((ClusterTransaction) transaction, route).get(); + assertDeepEquals( + new Object[] { + new String[0], // pubsubChannels() + new String[0], // pubsubChannels(pattern) + 0L, // pubsubNumPat() + Arrays.stream(channels) + .collect(Collectors.toMap(c -> c, c -> 0L)), // pubsubNumSub(channels) + }, + result); Map> subscriptions = - standalone - ? Map.of( - PubSubChannelMode.EXACT, - Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), - PubSubChannelMode.PATTERN, patterns.stream().map(GlideString::gs).collect(Collectors.toSet())) - : Map.of( - PubSubClusterChannelMode.EXACT, - Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), - PubSubClusterChannelMode.PATTERN, patterns.stream().map(GlideString::gs).collect(Collectors.toSet())); + standalone + ? Map.of( + PubSubChannelMode.EXACT, + Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), + PubSubChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.EXACT, + Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), + PubSubClusterChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())); var listener = createClientWithSubscriptions(standalone, subscriptions); clients.addAll(List.of(client, listener)); - result = standalone ? ((GlideClient)client).exec((Transaction) transaction) : ((GlideClusterClient)client).exec((ClusterTransaction) transaction); - - assertDeepEquals(new Object[] { - channels, // pubsubChannels() - new String[] {"test_channel1", "test_channel2"}, // pubsubChannels(pattern) - 1, // pubsubNumPat() - Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 1L)), // pubsubNumSub(channels) - }, result - ); + result = + standalone + ? ((GlideClient) client).exec((Transaction) transaction).get() + : ((GlideClusterClient) client).exec((ClusterTransaction) transaction, route).get(); + + // convert arrays to sets, because we can't compare arrays - they received reordered + result[0] = Set.of((Object[]) result[0]); + result[1] = Set.of((Object[]) result[1]); + + assertDeepEquals( + new Object[] { + Set.of(channels), // pubsubChannels() + Set.of("{boo}-test_channel1", "{boo}-test_channel2"), // pubsubChannels(pattern) + 2L, // pubsubNumPat() + Arrays.stream(channels) + .collect(Collectors.toMap(c -> c, c -> 1L)), // pubsubNumSub(channels) + }, + result); } } From 9522deaffb529b47e6311be192fd64b2901a6c85 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 9 Aug 2024 16:07:43 -0700 Subject: [PATCH 3/3] Update java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java Co-authored-by: Andrew Carbonetto Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/commands/PubSubBaseCommands.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java index 5ca571f530..d83038b3b3 100644 --- a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java @@ -59,7 +59,7 @@ public interface PubSubBaseCommands { /** * Lists the currently active channels.
- * Dislike of {@link #pubsubChannels()}, returns channel names as {@link GlideString}s. + * Unlike of {@link #pubsubChannels()}, returns channel names as {@link GlideString}s. * * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response * into a single array.