Skip to content

Commit

Permalink
Merge pull request #489 from Ladicek/fix-pool-metrics-for-tainted-con…
Browse files Browse the repository at this point in the history
…nections

fix pool metrics for tainted connections
  • Loading branch information
Ladicek authored Jan 15, 2025
2 parents 8fb30d3 + 7f51ac1 commit 0d9dbdf
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.vertx.redis.client.Response;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A pooled Redis connection
Expand All @@ -20,6 +21,7 @@ public class PooledRedisConnection implements RedisConnection {
private final RedisConnectionInternal connection;
private final PoolMetrics metrics;
private final Object metric;
private final AtomicBoolean ended = new AtomicBoolean();

public PooledRedisConnection(Lease<RedisConnectionInternal> lease, PoolMetrics<?, ?> poolMetrics, Object metric) {
this.lease = lease;
Expand Down Expand Up @@ -88,6 +90,9 @@ public RedisConnection endHandler(@Nullable Handler<Void> endHandler) {
public Future<Void> close() {
if (connection.reset()) {
lease.recycle();
}

if (ended.compareAndSet(false, true)) {
if (metrics != null) {
metrics.end(metric);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,33 @@ public void simpleTest(TestContext should) {
});
}

@Test
public void taintedConnection(TestContext test) {
Async async = test.async();

Redis client = Redis.createClient(rule.vertx(), new RedisOptions().setConnectionString(redis.getRedisUri()));
client.connect()
.compose(conn -> {
test.assertEquals(0, getMetrics().pending());
test.assertEquals(1, getMetrics().inUse());

return conn.send(Request.cmd(Command.SELECT).arg(7)) // taints the connection
.compose(response -> {
test.assertEquals(0, getMetrics().pending());
test.assertEquals(1, getMetrics().inUse());

return conn.close();
}).onComplete(test.asyncAssertSuccess(ignored -> {
test.assertEquals(0, getMetrics().pending());
test.assertEquals(0, getMetrics().inUse());
}));
})
.compose(ignored -> client.close())
.onComplete(test.asyncAssertSuccess(ignored -> {
async.complete();
}));
}

@Test
public void testLifecycle(TestContext should) {
final Async test = should.async();
Expand Down

0 comments on commit 0d9dbdf

Please sign in to comment.