Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recompute cache when the redis connection pool is exhausted #39668

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.mutiny.unchecked.UncheckedFunction;
import io.smallrye.mutiny.vertx.MutinyHelper;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
Expand Down Expand Up @@ -102,6 +103,11 @@ public RedisCacheImpl(RedisCacheInfo cacheInfo, Vertx vertx, Redis redis, Suppli
this.redis = redis;
}

private static boolean isRecomputableError(Throwable error) {
return error instanceof ConnectException
|| error instanceof ConnectionPoolTooBusyException;
}

private Class<?> loadClass(String type) throws ClassNotFoundException {
if (PRIMITIVE_TO_CLASS_MAPPING.containsKey(type)) {
return PRIMITIVE_TO_CLASS_MAPPING.get(type);
Expand Down Expand Up @@ -211,7 +217,7 @@ public Uni<?> apply(V value) {
}
})

.onFailure(ConnectException.class).recoverWithUni(new Function<Throwable, Uni<? extends V>>() {
.onFailure(RedisCacheImpl::isRecomputableError).recoverWithUni(new Function<Throwable, Uni<? extends V>>() {
@Override
public Uni<? extends V> apply(Throwable e) {
log.warn("Unable to connect to Redis, recomputing cached value", e);
Expand Down Expand Up @@ -260,7 +266,7 @@ public Uni<V> apply(RedisConnection connection) {
});
}
})
.onFailure(ConnectException.class).recoverWithUni(e -> {
.onFailure(RedisCacheImpl::isRecomputableError).recoverWithUni(e -> {
log.warn("Unable to connect to Redis, recomputing cached value", e);
return valueLoader.apply(key);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -20,8 +22,10 @@
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.json.Json;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.Request;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.RedisOptions;

class RedisCacheImplTest extends RedisCacheTestBase {

Expand Down Expand Up @@ -50,6 +54,33 @@ public void testPutInTheCache() {
assertThat(r).isNotNull();
}

@Test
public void testExhaustConnectionPool() {
String k = UUID.randomUUID().toString();
RedisCacheInfo info = new RedisCacheInfo();
info.name = "foo";
info.valueType = String.class.getName();
info.expireAfterWrite = Optional.of(Duration.ofSeconds(2));

Redis redis = Redis.createClient(vertx, new RedisOptions()
.setMaxPoolSize(1)
.setMaxPoolWaiting(0)
.setConnectionString("redis://" + server.getHost() + ":" + server.getFirstMappedPort()));

RedisCacheImpl cache = new RedisCacheImpl(info, vertx, redis, BLOCKING_ALLOWED);

List<Uni<String>> responses = new ArrayList<>();
for (int i = 0; i < 100; i++) {
responses.add(cache.get(k, s -> "hello"));
}

final var values = Uni.combine().all().unis(responses).with(list -> list).await().indefinitely();
assertThat(values).isNotEmpty().allMatch(value -> value.equals("hello"));

var r = redis.send(Request.cmd(Command.GET).arg("cache:foo:" + k)).await().indefinitely();
assertThat(r).isNotNull();
}

@Test
public void testPutInTheCacheWithoutRedis() {
String k = UUID.randomUUID().toString();
Expand Down