diff --git a/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java b/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java index 98cf88906e..7dbb0704e8 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java @@ -106,10 +106,11 @@ protected void initChannel(Channel ch) throws Exception { redisBootstrap.connect(redisAddress).get(); - connection.registerCloseables(closeableResources, connection); + connection.registerCloseables(closeableResources, connection, handler); return connection; } catch (Exception e) { + connection.close(); throw new RedisException("Unable to connect", e); } } @@ -119,14 +120,14 @@ protected void initChannel(Channel ch) throws Exception { */ public void shutdown() { - ImmutableList autoCloseables = ImmutableList.copyOf(closeableResources); - for (Closeable closeableResource : autoCloseables) { + while (!closeableResources.isEmpty()) { + Closeable closeableResource = closeableResources.iterator().next(); try { closeableResource.close(); } catch (Exception e) { logger.debug("Exception on Close: " + e.getMessage(), e); - } + closeableResources.remove(closeableResource); } for (Channel c : channels) { diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java index 69b9673576..f3da07da8e 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java @@ -1,6 +1,7 @@ package com.lambdaworks.redis; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.TimeUnit; @@ -20,7 +21,7 @@ * @author Mark Paluch * @since 15.05.14 16:09 */ -public abstract class RedisChannelHandler extends ChannelInboundHandlerAdapter { +public abstract class RedisChannelHandler extends ChannelInboundHandlerAdapter implements Closeable { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); @@ -65,7 +66,6 @@ public synchronized void close() { if (!closed) { active = false; closed = true; - channelWriter.close(); closeEvents.fireEventClosed(this); closeEvents = null; } @@ -92,8 +92,19 @@ public void registerCloseables(final Collection registry, final Close addListener(new CloseEvents.CloseListener() { @Override public void resourceClosed(Object resource) { - registry.removeAll(Arrays.asList(closeables)); + for (Closeable closeable : closeables) { + if (closeable == RedisChannelHandler.this) { + continue; + } + + try { + closeable.close(); + } catch (IOException e) { + logger.debug(e.toString(), e); + } + } + registry.removeAll(Arrays.asList(closeables)); } }); } diff --git a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index f5f836f150..9b8ca307c9 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -261,7 +261,6 @@ public void close() { logger.debug("close()"); if (closed) { - logger.warn("Client is already closed"); return; } @@ -276,12 +275,14 @@ public void close() { buffer = null; } - if (!closed && channel != null) { + closed = true; + + if (channel != null) { ConnectionWatchdog watchdog = channel.pipeline().get(ConnectionWatchdog.class); if (watchdog != null) { watchdog.setReconnect(false); } - closed = true; + try { channel.close().sync(); } catch (InterruptedException e) { @@ -289,9 +290,7 @@ public void close() { } channel = null; - } - } public boolean isClosed() { diff --git a/lettuce/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java b/lettuce/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java index 2159d96121..2b32a5d66b 100644 --- a/lettuce/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java +++ b/lettuce/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java @@ -124,18 +124,18 @@ public void testResourceCleaning() throws Exception { pool1.allocateConnection(); assertEquals(1, redisClient.getChannelCount()); - assertEquals(2, redisClient.getResourceCount()); + assertEquals(3, redisClient.getResourceCount()); RedisConnectionPool> pool2 = redisClient.pool(); - assertEquals(3, redisClient.getResourceCount()); + assertEquals(4, redisClient.getResourceCount()); pool2.allocateConnection(); - assertEquals(4, redisClient.getResourceCount()); + assertEquals(6, redisClient.getResourceCount()); redisClient.pool().close(); - assertEquals(4, redisClient.getResourceCount()); + assertEquals(6, redisClient.getResourceCount()); redisClient.shutdown();