Skip to content

Commit

Permalink
Improve codecs #70
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jun 7, 2015
1 parent c06e976 commit 892fb08
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 26 deletions.
25 changes: 14 additions & 11 deletions src/main/java/com/lambdaworks/redis/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
public class RedisClient extends AbstractRedisClient {

private final RedisCodec<String, String> codec = new Utf8StringCodec();
private final RedisCodec<String, String> codec = newStringStringCodec();
private final RedisURI redisURI;

/**
Expand Down Expand Up @@ -98,7 +98,7 @@ public RedisConnectionPool<RedisConnection<String, String>> pool() {
*/
public RedisConnectionPool<RedisConnection<String, String>> pool(int maxIdle, int maxActive) {

return pool(codec, maxIdle, maxActive);
return pool(newStringStringCodec(), maxIdle, maxActive);
}

/**
Expand Down Expand Up @@ -173,7 +173,7 @@ public RedisConnectionPool<RedisAsyncConnection<String, String>> asyncPool() {
*/
public RedisConnectionPool<RedisAsyncConnection<String, String>> asyncPool(int maxIdle, int maxActive) {

return asyncPool(codec, maxIdle, maxActive);
return asyncPool(newStringStringCodec(), maxIdle, maxActive);
}

/**
Expand Down Expand Up @@ -225,7 +225,7 @@ public void resourceClosed(Object resource) {
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public RedisConnection<String, String> connect() {
return (RedisConnection<String, String>) connect((RedisCodec) codec);
return connect(newStringStringCodec());
}

/**
Expand Down Expand Up @@ -253,7 +253,7 @@ public <K, V> RedisConnection<K, V> connect(RedisCodec<K, V> codec) {
@SuppressWarnings({ "unchecked", "rawtypes" })
public RedisConnection<String, String> connect(RedisURI redisURI) {
checkValidRedisURI(redisURI);
return (RedisConnection<String, String>) connect((RedisCodec) codec, redisURI);
return (RedisConnection<String, String>) connect(newStringStringCodec(), redisURI);
}

private void checkValidRedisURI(RedisURI redisURI) {
Expand All @@ -271,7 +271,7 @@ private <K, V> RedisConnection connect(RedisCodec<K, V> codec, RedisURI redisURI
* @return A new connection.
*/
public RedisAsyncConnection<String, String> connectAsync() {
return connectAsync(codec);
return connectAsync(newStringStringCodec());
}

/**
Expand All @@ -297,7 +297,7 @@ public <K, V> RedisAsyncConnection<K, V> connectAsync(RedisCodec<K, V> codec) {
*/
public RedisAsyncConnection<String, String> connectAsync(RedisURI redisURI) {
checkValidRedisURI(redisURI);
return connectAsync(codec, redisURI);
return connectAsync(newStringStringCodec(), redisURI);
}

private <K, V> RedisAsyncConnectionImpl<K, V> connectAsync(RedisCodec<K, V> codec, RedisURI redisURI) {
Expand Down Expand Up @@ -353,7 +353,7 @@ public RedisPubSubConnection<String, String> connectPubSub() {
*/
public RedisPubSubConnection<String, String> connectPubSub(RedisURI redisURI) {
checkValidRedisURI(redisURI);
return connectPubSub(codec, redisURI);
return connectPubSub(newStringStringCodec(), redisURI);
}

/**
Expand Down Expand Up @@ -389,7 +389,7 @@ protected <K, V> RedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> code
* @return a new connection.
*/
public RedisSentinelAsyncConnection<String, String> connectSentinelAsync() {
return connectSentinelAsync(codec);
return connectSentinelAsync(newStringStringCodec());
}

/**
Expand All @@ -413,9 +413,8 @@ public <K, V> RedisSentinelAsyncConnection<K, V> connectSentinelAsync(RedisCodec
* @param redisURI the redis server to connect to, must not be {@literal null}
* @return A new connection.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public RedisSentinelAsyncConnection<String, String> connectSentinelAsync(RedisURI redisURI) {
return (RedisSentinelAsyncConnection<String, String>) connectSentinelAsyncImpl((RedisCodec) codec, redisURI);
return connectSentinelAsyncImpl(newStringStringCodec(), redisURI);
}

private <K, V> RedisSentinelAsyncConnection<K, V> connectSentinelAsyncImpl(RedisCodec<K, V> codec, RedisURI redisURI) {
Expand Down Expand Up @@ -583,4 +582,8 @@ private SocketAddress lookupRedis(String sentinelMasterId) throws InterruptedExc
}
}

protected Utf8StringCodec newStringStringCodec() {
return new Utf8StringCodec();
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.lambdaworks.redis.cluster;

import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import java.net.SocketAddress;
import java.util.Collections;
Expand All @@ -11,7 +13,13 @@

import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.lambdaworks.redis.*;
import com.lambdaworks.redis.AbstractRedisClient;
import com.lambdaworks.redis.RedisAsyncConnectionImpl;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisClusterConnection;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.ClusterPartitionParser;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
Expand All @@ -33,7 +41,7 @@
public class RedisClusterClient extends AbstractRedisClient {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterClient.class);
private final RedisCodec<String, String> codec = new Utf8StringCodec();

private Partitions partitions;

private List<RedisURI> initialUris = Lists.newArrayList();
Expand Down Expand Up @@ -72,7 +80,7 @@ public RedisClusterClient(List<RedisURI> initialUris) {
*/
public RedisClusterConnection<String, String> connectCluster() {

return connectCluster(codec);
return connectCluster(newStringStringCodec());
}

/**
Expand All @@ -96,7 +104,7 @@ public <K, V> RedisClusterConnection<K, V> connectCluster(RedisCodec<K, V> codec
* @return A new connection.
*/
public RedisClusterAsyncConnection<String, String> connectClusterAsync() {
return connectClusterAsyncImpl(codec, getSocketAddressSupplier());
return connectClusterAsyncImpl(newStringStringCodec(), getSocketAddressSupplier());
}

/**
Expand All @@ -112,7 +120,7 @@ public <K, V> RedisClusterAsyncConnection<K, V> connectClusterAsync(RedisCodec<K
}

protected RedisAsyncConnectionImpl<String, String> connectAsyncImpl(SocketAddress socketAddress) {
return connectAsyncImpl(codec, socketAddress);
return connectAsyncImpl(newStringStringCodec(), socketAddress);
}

/**
Expand Down Expand Up @@ -286,4 +294,9 @@ public SocketAddress get() {
}
};
}

protected Utf8StringCodec newStringStringCodec() {
return new Utf8StringCodec();
}

}
38 changes: 38 additions & 0 deletions src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.lambdaworks.redis.codec;

import java.nio.ByteBuffer;

/**
* A {@link RedisCodec} that uses plain byte arrays.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 3.3
*/
public class ByteArrayCodec extends RedisCodec<byte[], byte[]> {

@Override
public byte[] decodeKey(ByteBuffer bytes) {
return getBytes(bytes);
}

@Override
public byte[] decodeValue(ByteBuffer bytes) {
return getBytes(bytes);
}

@Override
public byte[] encodeKey(byte[] key) {
return key;
}

@Override
public byte[] encodeValue(byte[] value) {
return value;
}

private static byte[] getBytes(ByteBuffer buffer) {
byte[] b = new byte[buffer.remaining()];
buffer.get(b);
return b;
}
}
20 changes: 12 additions & 8 deletions src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,23 @@ public byte[] encodeValue(String value) {
}

private synchronized String decode(ByteBuffer bytes) {
chars.clear();
bytes.mark();
chars.clear();
bytes.mark();

decoder.reset();
while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) {
chars = CharBuffer.allocate(chars.capacity() * 2);
bytes.reset();
}
decoder.reset();
while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) {
chars = CharBuffer.allocate(chars.capacity() * 2);
bytes.reset();
}

return chars.flip().toString();
return chars.flip().toString();
}

private byte[] encode(String string) {

if (string == null) {
return new byte[0];
}
return string.getBytes(charset);
}
}
14 changes: 13 additions & 1 deletion src/test/java/com/lambdaworks/redis/HashCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

package com.lambdaworks.redis;

import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -148,6 +148,18 @@ public void hmset() throws Exception {
assertThat(redis.hmget(key, "one", "two")).isEqualTo(list("1", "2"));
}

@Test
public void hmsetWithNulls() throws Exception {
Map<String, String> hash = new HashMap<String, String>();
hash.put("one", null);
assertThat(redis.hmset(key, hash)).isEqualTo("OK");
assertThat(redis.hmget(key, "one")).isEqualTo(list(""));

hash.put("one", "");
assertThat(redis.hmset(key, hash)).isEqualTo("OK");
assertThat(redis.hmget(key, "one")).isEqualTo(list(""));
}

@Test
public void hset() throws Exception {
assertThat(redis.hset(key, "one", "1")).isTrue();
Expand Down

0 comments on commit 892fb08

Please sign in to comment.