Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Redis Stream command support #30703

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/redis-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ As mentioned above, the API is divided into groups:
- pubsub - `pubsub()`
- set - `.set(memberType)`
- sorted-set - `.sortedSet(memberType)`
- stream (not available yet)
- string - `.value(valueType)`
- stream - `.stream(`valueType`)
- transactions - `withTransaction`
- json - `.json()` (requires the https://redis.com/modules/redis-json/[RedisJSON] module on the server side)
- bloom - `.bloom()` (requires the https://redis.com/modules/redis-bloom/[RedisBloom] module on the server side)
Expand Down
81 changes: 81 additions & 0 deletions extensions/redis-client/runtime/src/etc/RedisCommandGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
cescoffier marked this conversation as resolved.
Show resolved Hide resolved
//DEPS io.smallrye.reactive:smallrye-mutiny-vertx-redis-client:2.24.1
//DEPS info.picocli:picocli:4.6.3
//DEPS org.slf4j:slf4j-simple:1.7.36

import static java.lang.System.*;

import java.util.List;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.RedisOptions;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;


@Command(name = "RedisCommandGenerator", mixinStandardHelpOptions = true, version = "RedisCommandGenerator 0.1", description = "Generate REdis Command Javadoc and signatures")
public class RedisCommandGenerator implements Callable<Integer> {

static Logger logger = LoggerFactory.getLogger("👻 >> ");


@Option(names = {"--redis"},
description = "Redis connection string (redis://localhost:6379). Start Redis with: `docker run -p 6379:6379 redis:latest`",
defaultValue = "redis://localhost:6379")
private String url;

@Option(names = "--command", description = "The command name from https://redis.io/commands/", required = true)
private String command;

public Integer call() {
logger.info("Connecting to Redis");

Vertx vertx = Vertx.vertx();
Redis client = Redis.createClient(vertx, new RedisOptions().setConnectionString(url));
RedisAPI api = RedisAPI.api(client);

Response response = api.commandAndAwait(List.of("DOCS", command.toLowerCase()));
System.out.println(javadoc(command, response.get(command)));

vertx.closeAndAwait();
return 0;
}

private String javadoc(String cmd, Response response) {
String content = "/**\n";
content += String.format(" * Execute the command <a href=\"https://redis.io/commands/%s\">$s</a>.\n", cmd.toLowerCase(), cmd.toUpperCase());
content += String.format(" * Summary: %s\n", response.get("summary").toString());
content += String.format(" * Group: %s\n", response.get("group").toString());
if (response.get("since") != null) {
content += String.format(" * Requires Redis %s+\n", response.get("since").toString());
}
boolean deprecated = false;
if (response.get("deprecated_since") != null) {
content += String.format(" * Deprecated since Redis %s\n", response.get("deprecated_since").toString());
deprecated = true;
}
content += " * <p>\n";
for (Response arg: response.get("arguments")) {
content += String.format(" * @param %s %s\n", arg.get("name").toString(), arg.get("type"));
}
content += " * @return TODO\n";
if (deprecated) {
content += " * @deprecated";
}
content += " */\n";
return content;
}

public static void main(String... args) {
int exitCode = new CommandLine(new RedisCommandGenerator()).execute(args);
System.exit(exitCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.quarkus.redis.datasource.search.ReactiveSearchCommands;
import io.quarkus.redis.datasource.set.ReactiveSetCommands;
import io.quarkus.redis.datasource.sortedset.ReactiveSortedSetCommands;
import io.quarkus.redis.datasource.stream.ReactiveStreamCommands;
import io.quarkus.redis.datasource.string.ReactiveStringCommands;
import io.quarkus.redis.datasource.timeseries.ReactiveTimeSeriesCommands;
import io.quarkus.redis.datasource.topk.ReactiveTopKCommands;
Expand Down Expand Up @@ -381,6 +382,29 @@ default ReactiveBitMapCommands<String> bitmap() {
return bitmap(String.class);
}

/**
* Gets the object to execute commands manipulating streams.
*
* @param redisKeyType the class of the keys
* @param fieldType the class of the fields included in the message exchanged on the streams
* @param valueType the class of the values included in the message exchanged on the streams
* @param <K> the type of the redis key
* @param <F> the type of the fields (map's keys)
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
<K, F, V> ReactiveStreamCommands<K, F, V> stream(Class<K> redisKeyType, Class<F> fieldType, Class<V> valueType);

/**
* Gets the object to execute commands manipulating streams, using a string key, and string fields.
*
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
default <V> ReactiveStreamCommands<String, String, V> stream(Class<V> typeOfValue) {
return stream(String.class, String.class, typeOfValue);
}

/**
* Gets the object to publish and receive messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.quarkus.redis.datasource.search.SearchCommands;
import io.quarkus.redis.datasource.set.SetCommands;
import io.quarkus.redis.datasource.sortedset.SortedSetCommands;
import io.quarkus.redis.datasource.stream.StreamCommands;
import io.quarkus.redis.datasource.string.StringCommands;
import io.quarkus.redis.datasource.timeseries.TimeSeriesCommands;
import io.quarkus.redis.datasource.topk.TopKCommands;
Expand Down Expand Up @@ -379,6 +380,29 @@ default BitMapCommands<String> bitmap() {
return bitmap(String.class);
}

/**
* Gets the object to execute commands manipulating streams.
*
* @param redisKeyType the class of the keys
* @param fieldType the class of the fields included in the message exchanged on the streams
* @param valueType the class of the values included in the message exchanged on the streams
* @param <K> the type of the redis key
* @param <F> the type of the fields (map's keys)
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
<K, F, V> StreamCommands<K, F, V> stream(Class<K> redisKeyType, Class<F> fieldType, Class<V> valueType);

/**
* Gets the object to execute commands manipulating streams, using a string key, and string fields.
*
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
default <V> StreamCommands<String, String, V> stream(Class<V> typeOfValue) {
return stream(String.class, String.class, typeOfValue);
}

/**
* Gets the object to manipulate JSON values.
* This group requires the <a href="https://redis.io/docs/stack/json/">RedisJSON module</a>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import java.util.ArrayList;
import java.util.List;

import io.quarkus.redis.datasource.RedisCommandExtraArguments;

/**
* Arguments for the Redis <a href="https://redis.io/commands/copy">COPY</a> command.
*/
public class CopyArgs {
public class CopyArgs implements RedisCommandExtraArguments {

private long destinationDb = -1;

Expand Down Expand Up @@ -34,6 +36,7 @@ public CopyArgs replace(boolean replace) {
return this;
}

@Override
public List<String> toArgs() {
List<String> args = new ArrayList<>();
if (destinationDb != -1) {
Expand All @@ -47,4 +50,4 @@ public List<String> toArgs() {
return args;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.redis.datasource.stream;

import java.util.List;

/**
* Represents claimed messages
*
* @param <K> the type of the key
* @param <F> the field type for the payload
* @param <V> the value type for the payload
*/
public class ClaimedMessages<K, F, V> {
private final String id;
private final List<StreamMessage<K, F, V>> messages;

public ClaimedMessages(String id, List<StreamMessage<K, F, V>> messages) {
this.id = id;
this.messages = messages;
}

public String getId() {
return this.id;
}

public List<StreamMessage<K, F, V>> getMessages() {
return this.messages;
}
}
Loading