Skip to content

Commit

Permalink
Implement support for Redis 7.2
Browse files Browse the repository at this point in the history
Redis 7.2 was released last week.
Most changes are in the Redis Stack modules (removal of graph, time-series/JSON/search switching to RESP 3, JSON format change.

This commit adds support for Redis 7.2:
- update the dev service and the image used in the tests
- make sure we still support Redis 5, 6.2.x, 7.0 and 7.2
  • Loading branch information
cescoffier authored and holly-cummins committed Feb 8, 2024
1 parent 25f7dac commit 7261d2d
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface DevServicesConfig {
/**
* The container image name to use, for container based DevServices providers.
* If you want to use Redis Stack modules (bloom, graph, search...), use:
* {@code redis/redis-stack-server:latest}.
* {@code redis/redis-stack:latest}.
*/
Optional<String> imageName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@BuildSteps(onlyIfNot = IsNormal.class, onlyIf = { GlobalDevServicesConfig.Enabled.class })
public class DevServicesRedisProcessor {
private static final Logger log = Logger.getLogger(DevServicesRedisProcessor.class);
private static final String REDIS_7_ALPINE = "docker.io/redis:7-alpine";
private static final String REDIS_IMAGE = "docker.io/redis:7";
private static final int REDIS_EXPOSED_PORT = 6379;
private static final String REDIS_SCHEME = "redis://";

Expand Down Expand Up @@ -171,8 +171,8 @@ private RunningDevService startContainer(DockerStatusBuildItem dockerStatusBuild
return null;
}

DockerImageName dockerImageName = DockerImageName.parse(devServicesConfig.imageName().orElse(REDIS_7_ALPINE))
.asCompatibleSubstituteFor(REDIS_7_ALPINE);
DockerImageName dockerImageName = DockerImageName.parse(devServicesConfig.imageName().orElse(REDIS_IMAGE))
.asCompatibleSubstituteFor(REDIS_IMAGE);

Supplier<RunningDevService> defaultRedisServerSupplier = () -> {
QuarkusPortRedisContainer redisContainer = new QuarkusPortRedisContainer(dockerImageName, devServicesConfig.port(),
Expand Down
9 changes: 8 additions & 1 deletion extensions/redis-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
</property>
</activation>
<properties>
<redis.base.image>redis/redis-stack:7.0.2-RC2</redis.base.image>
<redis.base.image>redis/redis-stack:7.2.0-v0</redis.base.image>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -149,5 +149,12 @@
<redis.base.image>redis:6</redis.base.image>
</properties>
</profile>

<profile>
<id>redis-7.0</id>
<properties>
<redis.base.image>redis/redis-stack:7.0.6-RC9</redis.base.image>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public List<String> toArgs() {

if (bucketTimestamp != null) {
list.add("BUCKETTIMESTAMP");
list.add(bucketTimestamp.toString());
list.add(bucketTimestamp.toString().toLowerCase());
}

if (empty) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkus.redis.runtime.datasource;

import java.util.Set;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.ResponseType;

public class AbstractRedisCommands {

Expand All @@ -17,4 +20,17 @@ public Uni<Response> execute(RedisCommand cmd) {
return redis.execute(cmd.toRequest());
}

static boolean isMap(Response response) {
try {
return response != null && response.type() == ResponseType.MULTI && notEmptyOrNull(response.getKeys());
} catch (Exception ignored) {
// Not a map, but a plain multi
return false;
}
}

private static boolean notEmptyOrNull(Set<String> keys) {
return keys != null && !keys.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.ResponseType;

Expand Down Expand Up @@ -71,40 +72,81 @@ public <T> Uni<T> jsonGet(K key, Class<T> clazz) {
nonNull(clazz, "clazz");
return _jsonGet(key)
.map(r -> {
if (r == null) {
return null;
var m = getJsonObject(r);
if (m != null) {
return m.mapTo(clazz);
}
return Json.decodeValue(r.getDelegate().toBuffer(), clazz);
return null;
});
}

@Override
public Uni<JsonObject> jsonGetObject(K key) {
return _jsonGet(key)
.map(r -> r.toBuffer().toJsonObject());
.map(ReactiveJsonCommandsImpl::getJsonObject);
}

@Override
public Uni<JsonArray> jsonGetArray(K key) {
return _jsonGet(key)
.map(r -> r.toBuffer().toJsonArray());
.map(ReactiveJsonCommandsImpl::getJsonArray);
}

static JsonArray getJsonArray(Response r) {
if (r == null || r.toString().equalsIgnoreCase("null")) { // JSON null
return null;
}
// With Redis 7.2 the response is a BULK (String) but using a nested array.
Buffer buffer = r.toBuffer();
JsonArray array = buffer.toJsonArray();
if (array.size() == 1 && array.getString(0).startsWith("[")) {
return array.getJsonArray(0);
}
return array;
}

static JsonObject getJsonObject(Response r) {
if (r == null || r.toString().equalsIgnoreCase("null")) { // JSON null
return null;
}
// With Redis 7.2 the response is a BULK (String) but using a nested array.
Buffer buffer = r.toBuffer();
if (buffer.toJson() instanceof JsonArray) {
var array = buffer.toJsonArray();
if (array.size() == 0) {
return null;
}
return array.getJsonObject(0);
}
return r.toBuffer().toJsonObject();
}

static JsonArray getJsonArrayFromJsonGet(Response r) {
if (r == null || r.toString().equalsIgnoreCase("null")) { // JSON null
return null;
}
// With Redis 7.2 the response is a BULK (String) but using a nested array.
Buffer buffer = r.toBuffer();
if (buffer.toJson() instanceof JsonArray) {
var array = buffer.toJsonArray();
if (array.size() == 0) {
return new JsonArray();
}
return array;
}
return buffer.toJsonArray();
}

@Override
public Uni<JsonArray> jsonGet(K key, String path) {
return _jsonGet(key, path)
.map(r -> r.toBuffer().toJsonArray());
.map(ReactiveJsonCommandsImpl::getJsonArrayFromJsonGet);
}

@Override
public Uni<JsonObject> jsonGet(K key, String... paths) {
return _jsonGet(key, paths)
.map(r -> {
if (r == null || r.toString().equalsIgnoreCase("null")) { // JSON null
return null;
}
return r.toBuffer().toJsonObject();
});
.map(ReactiveJsonCommandsImpl::getJsonObject);
}

@Override
Expand Down Expand Up @@ -284,7 +326,16 @@ static List<String> decodeTypeResponse(Response r) {
List<String> list = new ArrayList<>();
if (r.type() == ResponseType.MULTI) {
for (Response response : r) {
list.add(response == null ? null : response.toString());
if (response == null) {
list.add(null);
} else if (response.type() == ResponseType.MULTI) {
// Redis 7.2 behavior
for (Response nested : response) {
list.add(nested == null ? null : nested.toString());
}
} else {
list.add(response.toString());
}
}
} else {
list.add(r.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,35 @@ AggregationResponse decodeAggregateResponse(Response response, boolean cursor) {
if (response == null) {
return new AggregationResponse(Collections.emptyList());
}

var payload = response;
var cursorId = -1L;
if (cursor) {
payload = response.get(0);
cursorId = response.get(1).toLong();
}

if (isMap(payload)) {
// Redis 7.2+ behavior
Response results = payload.get("results");
List<AggregateDocument> docs = new ArrayList<>();
if (results != null) {
for (Response result : results) {
Map<String, Document.Property> list = new HashMap<>();
if (result.containsKey("extra_attributes")) {
Response attributes = result.get("extra_attributes");
for (String propertyName : attributes.getKeys()) {
list.put(propertyName, new Document.Property(propertyName, attributes.get(propertyName)));
}
}
AggregateDocument doc = new AggregateDocument(list);
docs.add(doc);
}
}
return new AggregationResponse(cursorId, docs);
}

// Pre-Redis 7.2 behavior:
List<AggregateDocument> docs = new ArrayList<>();
for (int i = 1; i < payload.size(); i++) {
var nested = payload.get(i);
Expand Down Expand Up @@ -186,6 +208,38 @@ SearchQueryResponse decodeSearchQueryResult(Response response, QueryArgs args) {
if (response == null) {
return new SearchQueryResponse(0, Collections.emptyList());
}

if (isMap(response)) {
// Read it as a map - Redis 7.2 behavior
var count = response.get("total_results");
if (count == null || count.toInteger() == 0) {
return new SearchQueryResponse(0, Collections.emptyList());
}
var total = count.toInteger();

List<Document> docs = new ArrayList<>();
Response results = response.get("results");
for (int i = 0; i < results.size(); i++) {
Response result = results.get(i);
Response score = result.get("score");
Response payload = result.get("payload");
Response doc = result.get("extra_attributes");

Map<String, Document.Property> properties = new HashMap<>();
if (doc != null) {
for (String k : doc.getKeys()) {
properties.put(k, new Document.Property(k, doc.get(k)));
}
}
docs.add(
new Document(result.get("id").toString(), score == null ? 1.0 : score.toDouble(), payload, properties));
}

return new SearchQueryResponse(total, docs);

}

// 7.2- behavior
var count = response.get(0).toInteger();
if (count == 0) {
return new SearchQueryResponse(0, Collections.emptyList());
Expand Down Expand Up @@ -249,6 +303,33 @@ SpellCheckResponse decodeSpellcheckResponse(Response response) {
return new SpellCheckResponse(Collections.emptyMap());
}
Map<String, List<SpellCheckResponse.SpellCheckSuggestion>> resp = new LinkedHashMap<>();
if (isMap(response)) {
// Redis 7.2 behavior.
Response results = response.get("results");
Set<String> terms = results.getKeys();
for (String word : terms) {
List<SpellCheckResponse.SpellCheckSuggestion> list = new ArrayList<>();
Response suggestions = results.get(word);
if (suggestions.size() == 0) {
resp.put(word, Collections.emptyList());
} else {
for (Response suggestion : suggestions) {
for (String proposal : suggestion.getKeys()) {
double distance = suggestion.get(proposal).toDouble();
if (!proposal.equals(word)) {
list.add(new SpellCheckResponse.SpellCheckSuggestion(proposal, distance));
}
}
if (!list.isEmpty()) {
resp.put(word, list);
}
}
}

}
return new SpellCheckResponse(resp);
}

for (Response term : response) {
if (!term.get(0).toString().equals("TERM")) {
continue; // Unknown format
Expand Down Expand Up @@ -291,6 +372,20 @@ SynDumpResponse decodeSynDumpResponse(Response r) {
if (r == null || r.size() == 0) {
return new SynDumpResponse(Collections.emptyMap());
}
if (isMap(r)) {
Set<String> keys = r.getKeys();
// Redis 7.2 behavior
Map<String, List<String>> synonyms = new HashMap<>();
for (String key : keys) {
var groups = r.get(key);
for (Response group : groups) {
synonyms.computeIfAbsent(group.toString(), x -> new ArrayList<>()).add(key);
}
}
return new SynDumpResponse(synonyms);

}

Map<String, List<String>> synonyms = new HashMap<>();
String term = null;
for (Response response : r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,28 @@ Map<String, SampleGroup> decodeGroup(Response response) {
return Collections.emptyMap();
}
Map<String, SampleGroup> groups = new HashMap<>();

if (isMap(response)) {
for (String group : response.getKeys()) {
Map<String, String> labels = new HashMap<>();
List<Sample> samples = new ArrayList<>();
var nested = response.get(group);
for (Response label : nested.get(0)) {
labels.put(label.get(0).toString(), label.get(1).toString());
}
for (Response sample : nested.get(nested.size() - 1)) { // samples are last
if (sample.type() == ResponseType.MULTI) {
samples.add(decodeSample(sample));
} else {
samples.add(new Sample(sample.toLong(), nested.get(nested.size() - 1).get(1).toDouble()));
break;
}
}
groups.put(group, new SampleGroup(group, labels, samples));
}
return groups;
}

for (Response nested : response) {
// this should be the group
String group = nested.get(0).toString();
Expand Down
Loading

0 comments on commit 7261d2d

Please sign in to comment.