Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: add PUBSUB CHANNELS, NUMPAT and NUMSUB commands #2105

Merged
merged 6 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105))
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
* Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093))
* Node: Added HRANDFIELD command ([#2096](https://github.com/valkey-io/valkey-glide/pull/2096))
Expand Down
51 changes: 51 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
import static command_request.CommandRequestOuterClass.RequestType.PfAdd;
import static command_request.CommandRequestOuterClass.RequestType.PfCount;
import static command_request.CommandRequestOuterClass.RequestType.PfMerge;
import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub;
import static command_request.CommandRequestOuterClass.RequestType.Publish;
import static command_request.CommandRequestOuterClass.RequestType.RPop;
import static command_request.CommandRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -4495,6 +4498,54 @@ public CompletableFuture<String> publish(
});
}

@Override
public CompletableFuture<String[]> pubsubChannels() {
return commandManager.submitNewCommand(
PubSubChannels,
new String[0],
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> pubsubChannelsBinary() {
return commandManager.submitNewCommand(
PubSubChannels,
new GlideString[0],
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<String[]> pubsubChannels(@NonNull String pattern) {
return commandManager.submitNewCommand(
PubSubChannels,
new String[] {pattern},
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> pubsubChannels(@NonNull GlideString pattern) {
return commandManager.submitNewCommand(
PubSubChannels,
new GlideString[] {pattern},
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<Long> pubsubNumPat() {
return commandManager.submitNewCommand(PubSubNumPat, new String[0], this::handleLongResponse);
}

@Override
public CompletableFuture<Map<String, Long>> pubsubNumSub(@NonNull String[] channels) {
return commandManager.submitNewCommand(PubSubNumSub, channels, this::handleMapResponse);
}

@Override
public CompletableFuture<Map<GlideString, Long>> pubsubNumSub(@NonNull GlideString[] channels) {
return commandManager.submitNewCommand(
PubSubNumSub, channels, this::handleBinaryStringMapResponse);
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
Expand Down
10 changes: 10 additions & 0 deletions java/client/src/main/java/glide/api/GlideClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ public CompletableFuture<Object> customCommand(@NonNull String[] args) {
return commandManager.submitNewCommand(CustomCommand, args, this::handleObjectOrNullResponse);
}

// @Override
public CompletableFuture<Object> customCommand(@NonNull GlideString[] args) {
return commandManager.submitNewCommand(
CustomCommand,
args,
response -> {
return handleBinaryObjectOrNullResponse(response);
});
}

@Override
public CompletableFuture<Object[]> exec(@NonNull Transaction transaction) {
if (transaction.isBinaryOutput()) {
Expand Down
22 changes: 18 additions & 4 deletions java/client/src/main/java/glide/api/GlideClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,31 @@ public CompletableFuture<ClusterValue<Object>> customCommand(@NonNull String[] a
public CompletableFuture<ClusterValue<Object>> customCommand(
@NonNull String[] args, @NonNull Route route) {
return commandManager.submitNewCommand(
CustomCommand, args, route, response -> handleCustomCommandResponse(route, response));
CustomCommand,
args,
route,
response -> handleCustomCommandResponse(route, response, false));
}

protected ClusterValue<Object> handleCustomCommandResponse(Route route, Response response) {
// @Override
public CompletableFuture<ClusterValue<Object>> customCommand(
@NonNull GlideString[] args, @NonNull Route route) {
return commandManager.submitNewCommand(
CustomCommand, args, route, response -> handleCustomCommandResponse(route, response, true));
}

protected ClusterValue<Object> handleCustomCommandResponse(
Route route, Response response, boolean bin) {
if (route instanceof SingleNodeRoute) {
return ClusterValue.ofSingleValue(handleObjectOrNullResponse(response));
return ClusterValue.ofSingleValue(
bin ? handleBinaryObjectOrNullResponse(response) : handleObjectOrNullResponse(response));
}
if (response.hasConstantResponse()) {
return ClusterValue.ofSingleValue(handleStringResponse(response));
}
return ClusterValue.ofMultiValue(handleMapResponse(response));
return bin
? ClusterValue.ofMultiValueBinary(handleBinaryStringMapResponse(response))
: ClusterValue.ofMultiValue(handleMapResponse(response));
}

@Override
Expand Down
121 changes: 121 additions & 0 deletions java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -40,4 +41,124 @@ public interface PubSubBaseCommands {
* }</pre>
*/
CompletableFuture<String> publish(GlideString message, GlideString channel);

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @return An <code>Array</code> of all active channels.
* @example
* <pre>{@code
* String[] response = client.pubsubChannels().get();
* assert Arrays.equals(new String[] { "channel1", "channel2" });
* }</pre>
*/
CompletableFuture<String[]> pubsubChannels();

/**
* Lists the currently active channels.<br>
* Unlike of {@link #pubsubChannels()}, returns channel names as {@link GlideString}s.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @return An <code>Array</code> of all active channels.
* @example
* <pre>{@code
* GlideString[] response = client.pubsubChannels().get();
* assert Arrays.equals(new GlideString[] { "channel1", "channel2" });
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubChannelsBinary();

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active channels.
* @return An <code>Array</code> of currently active channels matching the given pattern.
* @example
* <pre>{@code
* String[] response = client.pubsubChannels("news.*").get();
* assert Arrays.equals(new String[] { "news.sports", "news.weather" });
* }</pre>
*/
CompletableFuture<String[]> pubsubChannels(String pattern);

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active channels.
* @return An <code>Array</code> of currently active channels matching the given pattern.
* @example
* <pre>{@code
* GlideString[] response = client.pubsubChannels(gs("news.*")).get();
* assert Arrays.equals(new GlideString[] { gs("news.sports"), gs("news.weather") });
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubChannels(GlideString pattern);

/**
* Returns the number of unique patterns that are subscribed to by clients.
*
* @apiNote
* <ul>
* <li>When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* <li>This is the total number of unique patterns all the clients are subscribed to, not
* the count of clients subscribed to patterns.
* </ul>
*
* @see <a href="https://valkey.io/commands/pubsub-numpat/">valkey.io</a> for details.
* @return The number of unique patterns.
* @example
* <pre>{@code
* Long result = client.pubsubNumPat().get();
* assert result == 3L;
* }</pre>
*/
CompletableFuture<Long> pubsubNumPat();

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single map.
* @see <a href="https://valkey.io/commands/pubsub-numsub/">valkey.io</a> for details.
* @param channels The list of channels to query for the number of subscribers.
* @return A <code>Map</code> where keys are the channel names and values are the numbers of
* subscribers.
* @example
* <pre>{@code
* Map<String, Long> result = client.pubsubNumSub(new String[] {"channel1", "channel2"}).get();
* assert result.equals(Map.of("channel1", 3L, "channel2", 5L));
* }</pre>
*/
CompletableFuture<Map<String, Long>> pubsubNumSub(String[] channels);

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single map.
* @see <a href="https://valkey.io/commands/pubsub-numsub/">valkey.io</a> for details.
* @param channels The list of channels to query for the number of subscribers.
* @return A <code>Map</code> where keys are the channel names and values are the numbers of
* subscribers.
* @example
* <pre>{@code
* Map<GlideString, Long> result = client.pubsubNumSub(new GlideString[] {gs("channel1"), gs("channel2")}).get();
* assert result.equals(Map.of(gs("channel1"), 3L, gs("channel2"), 5L));
* }</pre>
*/
CompletableFuture<Map<GlideString, Long>> pubsubNumSub(GlideString[] channels);
}
72 changes: 72 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@
import static command_request.CommandRequestOuterClass.RequestType.PfCount;
import static command_request.CommandRequestOuterClass.RequestType.PfMerge;
import static command_request.CommandRequestOuterClass.RequestType.Ping;
import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub;
import static command_request.CommandRequestOuterClass.RequestType.Publish;
import static command_request.CommandRequestOuterClass.RequestType.RPop;
import static command_request.CommandRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -6295,6 +6298,75 @@ public <ArgType> T publish(@NonNull ArgType message, @NonNull ArgType channel) {
return getThis();
}

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @return Command response - An <code>Array</code> of all active channels.
*/
public T pubsubChannels() {
protobufTransaction.addCommands(buildCommand(PubSubChannels));
return getThis();
}

/**
* Lists the currently active channels.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active channels.
* @return Command response - An <code>Array</code> of currently active channels matching the
* given pattern.
*/
public <ArgType> T pubsubChannels(@NonNull ArgType pattern) {
checkTypeOrThrow(pattern);
protobufTransaction.addCommands(buildCommand(PubSubChannels, newArgsBuilder().add(pattern)));
return getThis();
}

/**
* Returns the number of unique patterns that are subscribed to by clients.
*
* @apiNote
* <ul>
* <li>When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* <li>This is the total number of unique patterns all the clients are subscribed to, not
* the count of clients subscribed to patterns.
* </ul>
*
* @see <a href="https://valkey.io/commands/pubsub-numpat/">valkey.io</a> for details.
* @return Command response - The number of unique patterns.
*/
public T pubsubNumPat() {
protobufTransaction.addCommands(buildCommand(PubSubNumPat));
return getThis();
}

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified channels.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single map.
* @see <a href="https://valkey.io/commands/pubsub-numsub/">valkey.io</a> for details.
* @param channels The list of channels to query for the number of subscribers.
* @return Command response - A <code>Map</code> where keys are the channel names and values are
* the numbers of subscribers.
*/
public <ArgType> T pubsubNumSub(@NonNull ArgType[] channels) {
checkTypeOrThrow(channels);
protobufTransaction.addCommands(buildCommand(PubSubNumSub, newArgsBuilder().add(channels)));
return getThis();
}

/**
* Gets the union of all the given sets.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import static glide.api.models.GlideString.gs;

import glide.api.GlideClusterClient;
import glide.api.models.GlideString;
import java.util.HashMap;
Expand Down Expand Up @@ -91,6 +93,17 @@ public ClusterSubscriptionConfigurationBuilder subscription(
return this;
}

/**
* Add a subscription to a channel or to multiple channels if {@link
* PubSubClusterChannelMode#PATTERN} is used.<br>
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
*/
public ClusterSubscriptionConfigurationBuilder subscription(
PubSubClusterChannelMode mode, String channelOrPattern) {
addSubscription(subscriptions, mode, gs(channelOrPattern));
return this;
}

/**
* Set all subscriptions in a bulk. Rewrites previously stored configurations.<br>
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
Expand Down
Loading
Loading