From 892fb08b53231a4c760c7c5dd511c1d846f322aa Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 3 Jun 2015 08:44:39 +0200 Subject: [PATCH] Improve codecs #70 --- .../com/lambdaworks/redis/RedisClient.java | 25 ++++++------ .../redis/cluster/RedisClusterClient.java | 25 +++++++++--- .../redis/codec/ByteArrayCodec.java | 38 +++++++++++++++++++ .../redis/codec/Utf8StringCodec.java | 20 ++++++---- .../lambdaworks/redis/HashCommandTest.java | 14 ++++++- 5 files changed, 96 insertions(+), 26 deletions(-) create mode 100644 src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index d9ee6a5f69..b05721aafe 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -34,7 +34,7 @@ */ public class RedisClient extends AbstractRedisClient { - private final RedisCodec codec = new Utf8StringCodec(); + private final RedisCodec codec = newStringStringCodec(); private final RedisURI redisURI; /** @@ -98,7 +98,7 @@ public RedisConnectionPool> pool() { */ public RedisConnectionPool> pool(int maxIdle, int maxActive) { - return pool(codec, maxIdle, maxActive); + return pool(newStringStringCodec(), maxIdle, maxActive); } /** @@ -173,7 +173,7 @@ public RedisConnectionPool> asyncPool() { */ public RedisConnectionPool> asyncPool(int maxIdle, int maxActive) { - return asyncPool(codec, maxIdle, maxActive); + return asyncPool(newStringStringCodec(), maxIdle, maxActive); } /** @@ -225,7 +225,7 @@ public void resourceClosed(Object resource) { */ @SuppressWarnings({ "unchecked", "rawtypes" }) public RedisConnection connect() { - return (RedisConnection) connect((RedisCodec) codec); + return connect(newStringStringCodec()); } /** @@ -253,7 +253,7 @@ public RedisConnection connect(RedisCodec codec) { @SuppressWarnings({ "unchecked", "rawtypes" }) public RedisConnection connect(RedisURI redisURI) { checkValidRedisURI(redisURI); - return (RedisConnection) connect((RedisCodec) codec, redisURI); + return (RedisConnection) connect(newStringStringCodec(), redisURI); } private void checkValidRedisURI(RedisURI redisURI) { @@ -271,7 +271,7 @@ private RedisConnection connect(RedisCodec codec, RedisURI redisURI * @return A new connection. */ public RedisAsyncConnection connectAsync() { - return connectAsync(codec); + return connectAsync(newStringStringCodec()); } /** @@ -297,7 +297,7 @@ public RedisAsyncConnection connectAsync(RedisCodec codec) { */ public RedisAsyncConnection connectAsync(RedisURI redisURI) { checkValidRedisURI(redisURI); - return connectAsync(codec, redisURI); + return connectAsync(newStringStringCodec(), redisURI); } private RedisAsyncConnectionImpl connectAsync(RedisCodec codec, RedisURI redisURI) { @@ -353,7 +353,7 @@ public RedisPubSubConnection connectPubSub() { */ public RedisPubSubConnection connectPubSub(RedisURI redisURI) { checkValidRedisURI(redisURI); - return connectPubSub(codec, redisURI); + return connectPubSub(newStringStringCodec(), redisURI); } /** @@ -389,7 +389,7 @@ protected RedisPubSubConnection connectPubSub(RedisCodec code * @return a new connection. */ public RedisSentinelAsyncConnection connectSentinelAsync() { - return connectSentinelAsync(codec); + return connectSentinelAsync(newStringStringCodec()); } /** @@ -413,9 +413,8 @@ public RedisSentinelAsyncConnection connectSentinelAsync(RedisCodec * @param redisURI the redis server to connect to, must not be {@literal null} * @return A new connection. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) public RedisSentinelAsyncConnection connectSentinelAsync(RedisURI redisURI) { - return (RedisSentinelAsyncConnection) connectSentinelAsyncImpl((RedisCodec) codec, redisURI); + return connectSentinelAsyncImpl(newStringStringCodec(), redisURI); } private RedisSentinelAsyncConnection connectSentinelAsyncImpl(RedisCodec codec, RedisURI redisURI) { @@ -583,4 +582,8 @@ private SocketAddress lookupRedis(String sentinelMasterId) throws InterruptedExc } } + protected Utf8StringCodec newStringStringCodec() { + return new Utf8StringCodec(); + } + } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index 746b1e0c81..c427623c86 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -1,6 +1,8 @@ package com.lambdaworks.redis.cluster; -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import java.net.SocketAddress; import java.util.Collections; @@ -11,7 +13,13 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.lambdaworks.redis.*; +import com.lambdaworks.redis.AbstractRedisClient; +import com.lambdaworks.redis.RedisAsyncConnectionImpl; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.RedisClusterAsyncConnection; +import com.lambdaworks.redis.RedisClusterConnection; +import com.lambdaworks.redis.RedisException; +import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.cluster.models.partitions.ClusterPartitionParser; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; @@ -33,7 +41,7 @@ public class RedisClusterClient extends AbstractRedisClient { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterClient.class); - private final RedisCodec codec = new Utf8StringCodec(); + private Partitions partitions; private List initialUris = Lists.newArrayList(); @@ -72,7 +80,7 @@ public RedisClusterClient(List initialUris) { */ public RedisClusterConnection connectCluster() { - return connectCluster(codec); + return connectCluster(newStringStringCodec()); } /** @@ -96,7 +104,7 @@ public RedisClusterConnection connectCluster(RedisCodec codec * @return A new connection. */ public RedisClusterAsyncConnection connectClusterAsync() { - return connectClusterAsyncImpl(codec, getSocketAddressSupplier()); + return connectClusterAsyncImpl(newStringStringCodec(), getSocketAddressSupplier()); } /** @@ -112,7 +120,7 @@ public RedisClusterAsyncConnection connectClusterAsync(RedisCodec connectAsyncImpl(SocketAddress socketAddress) { - return connectAsyncImpl(codec, socketAddress); + return connectAsyncImpl(newStringStringCodec(), socketAddress); } /** @@ -286,4 +294,9 @@ public SocketAddress get() { } }; } + + protected Utf8StringCodec newStringStringCodec() { + return new Utf8StringCodec(); + } + } diff --git a/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java b/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java new file mode 100644 index 0000000000..3ed990e554 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java @@ -0,0 +1,38 @@ +package com.lambdaworks.redis.codec; + +import java.nio.ByteBuffer; + +/** + * A {@link RedisCodec} that uses plain byte arrays. + * + * @author Mark Paluch + * @since 3.3 + */ +public class ByteArrayCodec extends RedisCodec { + + @Override + public byte[] decodeKey(ByteBuffer bytes) { + return getBytes(bytes); + } + + @Override + public byte[] decodeValue(ByteBuffer bytes) { + return getBytes(bytes); + } + + @Override + public byte[] encodeKey(byte[] key) { + return key; + } + + @Override + public byte[] encodeValue(byte[] value) { + return value; + } + + private static byte[] getBytes(ByteBuffer buffer) { + byte[] b = new byte[buffer.remaining()]; + buffer.get(b); + return b; + } +} \ No newline at end of file diff --git a/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java b/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java index 3c09c879e5..f91fa7c056 100644 --- a/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java +++ b/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java @@ -51,19 +51,23 @@ public byte[] encodeValue(String value) { } private synchronized String decode(ByteBuffer bytes) { - chars.clear(); - bytes.mark(); + chars.clear(); + bytes.mark(); - decoder.reset(); - while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) { - chars = CharBuffer.allocate(chars.capacity() * 2); - bytes.reset(); - } + decoder.reset(); + while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) { + chars = CharBuffer.allocate(chars.capacity() * 2); + bytes.reset(); + } - return chars.flip().toString(); + return chars.flip().toString(); } private byte[] encode(String string) { + + if (string == null) { + return new byte[0]; + } return string.getBytes(charset); } } diff --git a/src/test/java/com/lambdaworks/redis/HashCommandTest.java b/src/test/java/com/lambdaworks/redis/HashCommandTest.java index be1191fedc..7f75ff0b32 100644 --- a/src/test/java/com/lambdaworks/redis/HashCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/HashCommandTest.java @@ -2,7 +2,7 @@ package com.lambdaworks.redis; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; import java.util.List; @@ -148,6 +148,18 @@ public void hmset() throws Exception { assertThat(redis.hmget(key, "one", "two")).isEqualTo(list("1", "2")); } + @Test + public void hmsetWithNulls() throws Exception { + Map hash = new HashMap(); + hash.put("one", null); + assertThat(redis.hmset(key, hash)).isEqualTo("OK"); + assertThat(redis.hmget(key, "one")).isEqualTo(list("")); + + hash.put("one", ""); + assertThat(redis.hmset(key, hash)).isEqualTo("OK"); + assertThat(redis.hmget(key, "one")).isEqualTo(list("")); + } + @Test public void hset() throws Exception { assertThat(redis.hset(key, "one", "1")).isTrue();