Skip to content

Commit

Permalink
Merge pull request quarkusio#30703 from cescoffier/redis-stream-commands
Browse files Browse the repository at this point in the history
Implement Redis Stream command support
  • Loading branch information
cescoffier authored Jan 31, 2023
2 parents b3cb9ec + 5927a3b commit fc7933e
Show file tree
Hide file tree
Showing 34 changed files with 5,358 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/redis-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ As mentioned above, the API is divided into groups:
- pubsub - `pubsub()`
- set - `.set(memberType)`
- sorted-set - `.sortedSet(memberType)`
- stream (not available yet)
- string - `.value(valueType)`
- stream - `.stream(`valueType`)
- transactions - `withTransaction`
- json - `.json()` (requires the https://redis.com/modules/redis-json/[RedisJSON] module on the server side)
- bloom - `.bloom()` (requires the https://redis.com/modules/redis-bloom/[RedisBloom] module on the server side)
Expand Down
81 changes: 81 additions & 0 deletions extensions/redis-client/runtime/src/etc/RedisCommandGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:smallrye-mutiny-vertx-redis-client:2.24.1
//DEPS info.picocli:picocli:4.6.3
//DEPS org.slf4j:slf4j-simple:1.7.36

import static java.lang.System.*;

import java.util.List;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.RedisOptions;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;


@Command(name = "RedisCommandGenerator", mixinStandardHelpOptions = true, version = "RedisCommandGenerator 0.1", description = "Generate REdis Command Javadoc and signatures")
public class RedisCommandGenerator implements Callable<Integer> {

static Logger logger = LoggerFactory.getLogger("👻 >> ");


@Option(names = {"--redis"},
description = "Redis connection string (redis://localhost:6379). Start Redis with: `docker run -p 6379:6379 redis:latest`",
defaultValue = "redis://localhost:6379")
private String url;

@Option(names = "--command", description = "The command name from https://redis.io/commands/", required = true)
private String command;

public Integer call() {
logger.info("Connecting to Redis");

Vertx vertx = Vertx.vertx();
Redis client = Redis.createClient(vertx, new RedisOptions().setConnectionString(url));
RedisAPI api = RedisAPI.api(client);

Response response = api.commandAndAwait(List.of("DOCS", command.toLowerCase()));
System.out.println(javadoc(command, response.get(command)));

vertx.closeAndAwait();
return 0;
}

private String javadoc(String cmd, Response response) {
String content = "/**\n";
content += String.format(" * Execute the command <a href=\"https://redis.io/commands/%s\">$s</a>.\n", cmd.toLowerCase(), cmd.toUpperCase());
content += String.format(" * Summary: %s\n", response.get("summary").toString());
content += String.format(" * Group: %s\n", response.get("group").toString());
if (response.get("since") != null) {
content += String.format(" * Requires Redis %s+\n", response.get("since").toString());
}
boolean deprecated = false;
if (response.get("deprecated_since") != null) {
content += String.format(" * Deprecated since Redis %s\n", response.get("deprecated_since").toString());
deprecated = true;
}
content += " * <p>\n";
for (Response arg: response.get("arguments")) {
content += String.format(" * @param %s %s\n", arg.get("name").toString(), arg.get("type"));
}
content += " * @return TODO\n";
if (deprecated) {
content += " * @deprecated";
}
content += " */\n";
return content;
}

public static void main(String... args) {
int exitCode = new CommandLine(new RedisCommandGenerator()).execute(args);
System.exit(exitCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.quarkus.redis.datasource.search.ReactiveSearchCommands;
import io.quarkus.redis.datasource.set.ReactiveSetCommands;
import io.quarkus.redis.datasource.sortedset.ReactiveSortedSetCommands;
import io.quarkus.redis.datasource.stream.ReactiveStreamCommands;
import io.quarkus.redis.datasource.string.ReactiveStringCommands;
import io.quarkus.redis.datasource.timeseries.ReactiveTimeSeriesCommands;
import io.quarkus.redis.datasource.topk.ReactiveTopKCommands;
Expand Down Expand Up @@ -381,6 +382,29 @@ default ReactiveBitMapCommands<String> bitmap() {
return bitmap(String.class);
}

/**
* Gets the object to execute commands manipulating streams.
*
* @param redisKeyType the class of the keys
* @param fieldType the class of the fields included in the message exchanged on the streams
* @param valueType the class of the values included in the message exchanged on the streams
* @param <K> the type of the redis key
* @param <F> the type of the fields (map's keys)
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
<K, F, V> ReactiveStreamCommands<K, F, V> stream(Class<K> redisKeyType, Class<F> fieldType, Class<V> valueType);

/**
* Gets the object to execute commands manipulating streams, using a string key, and string fields.
*
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
default <V> ReactiveStreamCommands<String, String, V> stream(Class<V> typeOfValue) {
return stream(String.class, String.class, typeOfValue);
}

/**
* Gets the object to publish and receive messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.quarkus.redis.datasource.search.SearchCommands;
import io.quarkus.redis.datasource.set.SetCommands;
import io.quarkus.redis.datasource.sortedset.SortedSetCommands;
import io.quarkus.redis.datasource.stream.StreamCommands;
import io.quarkus.redis.datasource.string.StringCommands;
import io.quarkus.redis.datasource.timeseries.TimeSeriesCommands;
import io.quarkus.redis.datasource.topk.TopKCommands;
Expand Down Expand Up @@ -379,6 +380,29 @@ default BitMapCommands<String> bitmap() {
return bitmap(String.class);
}

/**
* Gets the object to execute commands manipulating streams.
*
* @param redisKeyType the class of the keys
* @param fieldType the class of the fields included in the message exchanged on the streams
* @param valueType the class of the values included in the message exchanged on the streams
* @param <K> the type of the redis key
* @param <F> the type of the fields (map's keys)
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
<K, F, V> StreamCommands<K, F, V> stream(Class<K> redisKeyType, Class<F> fieldType, Class<V> valueType);

/**
* Gets the object to execute commands manipulating streams, using a string key, and string fields.
*
* @param <V> the type of the value
* @return the object to execute commands manipulating streams.
*/
default <V> StreamCommands<String, String, V> stream(Class<V> typeOfValue) {
return stream(String.class, String.class, typeOfValue);
}

/**
* Gets the object to manipulate JSON values.
* This group requires the <a href="https://redis.io/docs/stack/json/">RedisJSON module</a>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import java.util.ArrayList;
import java.util.List;

import io.quarkus.redis.datasource.RedisCommandExtraArguments;

/**
* Arguments for the Redis <a href="https://redis.io/commands/copy">COPY</a> command.
*/
public class CopyArgs {
public class CopyArgs implements RedisCommandExtraArguments {

private long destinationDb = -1;

Expand Down Expand Up @@ -34,6 +36,7 @@ public CopyArgs replace(boolean replace) {
return this;
}

@Override
public List<String> toArgs() {
List<String> args = new ArrayList<>();
if (destinationDb != -1) {
Expand All @@ -47,4 +50,4 @@ public List<String> toArgs() {
return args;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.redis.datasource.stream;

import java.util.List;

/**
* Represents claimed messages
*
* @param <K> the type of the key
* @param <F> the field type for the payload
* @param <V> the value type for the payload
*/
public class ClaimedMessages<K, F, V> {
private final String id;
private final List<StreamMessage<K, F, V>> messages;

public ClaimedMessages(String id, List<StreamMessage<K, F, V>> messages) {
this.id = id;
this.messages = messages;
}

public String getId() {
return this.id;
}

public List<StreamMessage<K, F, V>> getMessages() {
return this.messages;
}
}
Loading

0 comments on commit fc7933e

Please sign in to comment.