From 0daa66b1d2558b8108d7e74aceb90a7b4eb6734b Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Sat, 22 Aug 2015 12:54:02 +0200 Subject: [PATCH] Improve Codec API #118 --- RELEASE-NOTES.txt | 2 +- .../redis/AbstractRedisReactiveCommands.java | 3 + .../com/lambdaworks/redis/LettuceStrings.java | 11 +++ .../redis/StatefulRedisConnectionImpl.java | 9 +- .../ClusterDistributionChannelWriter.java | 8 +- .../lambdaworks/redis/cluster/SlotHash.java | 24 ++++- .../StatefulRedisClusterConnectionImpl.java | 4 +- .../redis/codec/ByteArrayCodec.java | 10 +- .../lambdaworks/redis/codec/RedisCodec.java | 16 ++-- .../redis/codec/Utf8StringCodec.java | 18 ++-- .../redis/protocol/CommandArgs.java | 93 ++++++++++++------- .../extensibility/MyPubSubConnection.java | 5 +- .../lambdaworks/redis/CustomCodecTest.java | 10 +- .../commands/AsyncCommandInternalsTest.java | 1 - .../redis/commands/CommandInternalsTest.java | 1 - .../redis/commands/KeyCommandTest.java | 1 + 16 files changed, 139 insertions(+), 77 deletions(-) diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt index 8d96679d9d..075c999d4a 100644 --- a/RELEASE-NOTES.txt +++ b/RELEASE-NOTES.txt @@ -169,7 +169,6 @@ Cross-slot command execution is available on the following APIs: * RedisAdvancedClusterAsyncCommands * RedisAdvancedClusterReactiveCommands - Node Selection API/Execution of commands on multiple cluster nodes ------------------------------------------------------------------ The advanced cluster API allows to select nodes and run commands on the node selection: @@ -207,6 +206,7 @@ Enhancements * Drop support for Java 6 and Java 7 #50 * Migrate RedisFuture to CompletionStage #48 * Allow limiting the request queue size #115 +* Improve Codec API #118 Fixes diff --git a/src/main/java/com/lambdaworks/redis/AbstractRedisReactiveCommands.java b/src/main/java/com/lambdaworks/redis/AbstractRedisReactiveCommands.java index ea6131fe73..900791ab75 100644 --- a/src/main/java/com/lambdaworks/redis/AbstractRedisReactiveCommands.java +++ b/src/main/java/com/lambdaworks/redis/AbstractRedisReactiveCommands.java @@ -3,9 +3,12 @@ import static com.lambdaworks.redis.protocol.CommandType.*; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.lambdaworks.redis.api.StatefulConnection; import com.lambdaworks.redis.api.rx.*; diff --git a/src/main/java/com/lambdaworks/redis/LettuceStrings.java b/src/main/java/com/lambdaworks/redis/LettuceStrings.java index b4c33070f0..3c666f9dd1 100644 --- a/src/main/java/com/lambdaworks/redis/LettuceStrings.java +++ b/src/main/java/com/lambdaworks/redis/LettuceStrings.java @@ -1,5 +1,6 @@ package com.lambdaworks.redis; +import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -61,6 +62,16 @@ public static String string(double n) { * @return the Base16 encoded SHA1 value */ public static String digest(byte[] script) { + return digest(ByteBuffer.wrap(script)); + } + + /** + * Create SHA1 digest from Lua script. + * + * @param script the script + * @return the Base16 encoded SHA1 value + */ + public static String digest(ByteBuffer script) { try { MessageDigest md = MessageDigest.getInstance("SHA1"); md.update(script); diff --git a/src/main/java/com/lambdaworks/redis/StatefulRedisConnectionImpl.java b/src/main/java/com/lambdaworks/redis/StatefulRedisConnectionImpl.java index 82a2beaf22..a6dce969db 100644 --- a/src/main/java/com/lambdaworks/redis/StatefulRedisConnectionImpl.java +++ b/src/main/java/com/lambdaworks/redis/StatefulRedisConnectionImpl.java @@ -111,7 +111,6 @@ public RedisCommands sync() { return sync; } - @Override public boolean isMulti() { return multi != null; @@ -142,16 +141,16 @@ public > C dispatch(C cmd) { if (local.getType().name().equals(AUTH.name())) { local = attachOnComplete(local, status -> { - if ("OK".equals(status)) { - this.password = cmd.getArgs().getStrings().get(0).toCharArray(); + if ("OK".equals(status) && cmd.getArgs().getFirstString() != null) { + this.password = cmd.getArgs().getFirstString().toCharArray(); } }); } if (local.getType().name().equals(SELECT.name())) { local = attachOnComplete(local, status -> { - if ("OK".equals(status)) { - this.db = cmd.getArgs().getIntegers().get(0).intValue(); + if ("OK".equals(status) && cmd.getArgs().getFirstInteger() != null) { + this.db = cmd.getArgs().getFirstInteger().intValue(); } }); } diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java index f907934daf..6255ca778d 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java @@ -80,8 +80,8 @@ public > C write(C command) { } } - if (channelWriter == null && args != null && !args.getKeys().isEmpty()) { - int hash = getHash(args.getEncodedKey(0)); + if (channelWriter == null && args != null && args.getFirstEncodedKey() != null) { + int hash = SlotHash.getSlot(args.getFirstEncodedKey()); RedisChannelHandler connection = (RedisChannelHandler) clusterConnectionProvider.getConnection( ClusterConnectionProvider.Intent.WRITE, hash); @@ -126,10 +126,6 @@ private HostAndPort getAskTarget(String errorMessage) { return HostAndPort.fromString(movedMessageParts.get(2)); } - protected int getHash(byte[] encodedKey) { - return SlotHash.getSlot(encodedKey); - } - @Override public void close() { diff --git a/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java b/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java index 4689c923c6..da4f7618de 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java +++ b/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java @@ -1,5 +1,6 @@ package com.lambdaworks.redis.cluster; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -54,14 +55,29 @@ public static final int getSlot(String key) { * @return slot */ public static final int getSlot(byte[] key) { - byte[] finalKey = key; - int start = indexOf(key, SUBKEY_START); + return getSlot(ByteBuffer.wrap(key)); + } + + /** + * Calculate the slot from the given key. + * + * @param key the key + * @return slot + */ + public static final int getSlot(ByteBuffer key) { + + byte[] input = new byte[key.remaining()]; + key.duplicate().get(input); + + byte[] finalKey = input; + + int start = indexOf(input, SUBKEY_START); if (start != -1) { - int end = indexOf(key, start + 1, SUBKEY_END); + int end = indexOf(input, start + 1, SUBKEY_END); if (end != -1 && end != start + 1) { finalKey = new byte[end - (start + 1)]; - System.arraycopy(key, start + 1, finalKey, 0, finalKey.length); + System.arraycopy(input, start + 1, finalKey, 0, finalKey.length); } } return CRC16.crc16(finalKey) % SLOT_COUNT; diff --git a/src/main/java/com/lambdaworks/redis/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/com/lambdaworks/redis/cluster/StatefulRedisClusterConnectionImpl.java index 380639ecbd..dcc7fbbae7 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/com/lambdaworks/redis/cluster/StatefulRedisClusterConnectionImpl.java @@ -161,8 +161,8 @@ public > C dispatch(C cmd) { if (local.getType().name().equals(AUTH.name())) { local = attachOnComplete(local, status -> { - if (status.equals("OK")) { - this.password = cmd.getArgs().getStrings().get(0).toCharArray(); + if (status.equals("OK") && cmd.getArgs().getFirstString() != null) { + this.password = cmd.getArgs().getFirstString().toCharArray(); } }); } diff --git a/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java b/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java index 3ed990e554..2b6c09ad66 100644 --- a/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java +++ b/src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java @@ -8,7 +8,7 @@ * @author Mark Paluch * @since 3.3 */ -public class ByteArrayCodec extends RedisCodec { +public class ByteArrayCodec implements RedisCodec { @Override public byte[] decodeKey(ByteBuffer bytes) { @@ -21,13 +21,13 @@ public byte[] decodeValue(ByteBuffer bytes) { } @Override - public byte[] encodeKey(byte[] key) { - return key; + public ByteBuffer encodeKey(byte[] key) { + return ByteBuffer.wrap(key); } @Override - public byte[] encodeValue(byte[] value) { - return value; + public ByteBuffer encodeValue(byte[] value) { + return ByteBuffer.wrap(value); } private static byte[] getBytes(ByteBuffer buffer) { diff --git a/src/main/java/com/lambdaworks/redis/codec/RedisCodec.java b/src/main/java/com/lambdaworks/redis/codec/RedisCodec.java index ef694c5d59..d92b8158b3 100644 --- a/src/main/java/com/lambdaworks/redis/codec/RedisCodec.java +++ b/src/main/java/com/lambdaworks/redis/codec/RedisCodec.java @@ -5,16 +5,17 @@ import java.nio.ByteBuffer; /** - * A RedisCodec encodes keys and values sent to redis, and decodes keys and values in the command output. + * A {@link RedisCodec} encodes keys and values sent to Redis, and decodes keys and values in the command output. * - * The methods will be called by multiple threads and must be thread-safe. + * The methods are called by multiple threads and must be thread-safe. * * @param Key type. * @param Value type. * * @author Will Glozer + * @author Mark Paluch */ -public abstract class RedisCodec { +public interface RedisCodec { /** * Decode the key output by redis. * @@ -22,7 +23,7 @@ public abstract class RedisCodec { * * @return The decoded key. */ - public abstract K decodeKey(ByteBuffer bytes); + K decodeKey(ByteBuffer bytes); /** * Decode the value output by redis. @@ -31,7 +32,7 @@ public abstract class RedisCodec { * * @return The decoded value. */ - public abstract V decodeValue(ByteBuffer bytes); + V decodeValue(ByteBuffer bytes); /** * Encode the key for output to redis. @@ -40,7 +41,7 @@ public abstract class RedisCodec { * * @return The encoded key. */ - public abstract byte[] encodeKey(K key); + ByteBuffer encodeKey(K key); /** * Encode the value for output to redis. @@ -49,5 +50,6 @@ public abstract class RedisCodec { * * @return The encoded value. */ - public abstract byte[] encodeValue(V value); + ByteBuffer encodeValue(V value); + } diff --git a/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java b/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java index f91fa7c056..137808a000 100644 --- a/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java +++ b/src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java @@ -16,11 +16,15 @@ * * @author Will Glozer */ -public class Utf8StringCodec extends RedisCodec { +public class Utf8StringCodec implements RedisCodec { + + private final static byte[] EMPTY = new byte[0]; + private Charset charset; private CharsetDecoder decoder; private CharBuffer chars; + /** * Initialize a new instance that encodes and decodes strings using the UTF-8 charset; */ @@ -41,12 +45,12 @@ public String decodeValue(ByteBuffer bytes) { } @Override - public byte[] encodeKey(String key) { + public ByteBuffer encodeKey(String key) { return encode(key); } @Override - public byte[] encodeValue(String value) { + public ByteBuffer encodeValue(String value) { return encode(value); } @@ -63,11 +67,11 @@ private synchronized String decode(ByteBuffer bytes) { return chars.flip().toString(); } - private byte[] encode(String string) { - + private ByteBuffer encode(String string) { if (string == null) { - return new byte[0]; + return ByteBuffer.wrap(EMPTY); } - return string.getBytes(charset); + + return charset.encode(string); } } diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandArgs.java b/src/main/java/com/lambdaworks/redis/protocol/CommandArgs.java index 387637b967..ed4c982cac 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandArgs.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandArgs.java @@ -6,14 +6,12 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import com.lambdaworks.redis.codec.RedisCodec; /** - * Redis command argument encoder. + * Redis command arguments. * * @param Key type. * @param Value type. @@ -24,11 +22,11 @@ public class CommandArgs { private final RedisCodec codec; private ByteBuffer buffer; + private ByteBuffer firstEncodedKey; private int count; - private final List keys = new ArrayList(); - private final List strings = new ArrayList(); - private final List integers = new ArrayList(); - private final List keywords = new ArrayList(); + + private Long firstInteger; + private String firstString; public CommandArgs(RedisCodec codec) { this.codec = codec; @@ -45,7 +43,12 @@ public int count() { } public CommandArgs addKey(K key) { - keys.add(key); + + if (firstEncodedKey == null) { + firstEncodedKey = codec.encodeKey(key); + return write(firstEncodedKey.duplicate()); + } + return write(codec.encodeKey(key)); } @@ -80,8 +83,14 @@ public CommandArgs add(Map map) { } for (Map.Entry entry : map.entrySet()) { - keys.add(entry.getKey()); - write(codec.encodeKey(entry.getKey())); + if (firstEncodedKey == null) { + firstEncodedKey = codec.encodeKey(entry.getKey()); + write(firstEncodedKey.duplicate()); + + } else { + write(codec.encodeKey(entry.getKey())); + } + write(codec.encodeValue(entry.getValue())); } @@ -89,12 +98,16 @@ public CommandArgs add(Map map) { } public CommandArgs add(String s) { - strings.add(s); + if (firstString == null) { + firstString = s; + } return write(s); } public CommandArgs add(long n) { - integers.add(n); + if (firstInteger == null) { + firstInteger = n; + } return write(Long.toString(n)); } @@ -107,7 +120,6 @@ public CommandArgs add(byte[] value) { } public CommandArgs add(CommandKeyword keyword) { - keywords.add(keyword); return write(keyword.bytes); } @@ -116,10 +128,36 @@ public CommandArgs add(CommandType type) { } public CommandArgs add(ProtocolKeyword keyword) { - keywords.add(keyword); return write(keyword.getBytes()); } + private CommandArgs write(ByteBuffer arg) { + buffer.mark(); + + if (buffer.remaining() < arg.remaining()) { + int estimate = buffer.remaining() + arg.remaining() + 10; + realloc(max(buffer.capacity() * 2, estimate)); + } + + while (true) { + try { + ByteBuffer toWrite = arg.duplicate(); + buffer.put((byte) '$'); + write(toWrite.remaining()); + buffer.put(CRLF); + buffer.put(toWrite); + buffer.put(CRLF); + break; + } catch (BufferOverflowException e) { + buffer.reset(); + realloc(buffer.capacity() * 2); + } + } + + count++; + return this; + } + private CommandArgs write(byte[] arg) { buffer.mark(); @@ -202,34 +240,27 @@ private void realloc(int size) { this.buffer = newBuffer; } - public List getKeys() { - return keys; - } - - public byte[] getEncodedKey(int index) { - return codec.encodeKey(keys.get(index)); - } - - public List getKeywords() { - return keywords; + public ByteBuffer getFirstEncodedKey() { + if (firstEncodedKey != null) { + return firstEncodedKey.duplicate(); + } + return null; } @Override public String toString() { final StringBuilder sb = new StringBuilder(); sb.append(getClass().getSimpleName()); - sb.append(" [keys=").append(keys); - sb.append(", keywords=").append(keywords); - sb.append(", buffer=").append(new String(buffer.array())); + sb.append(" [buffer=").append(new String(buffer.array())); sb.append(']'); return sb.toString(); } - public List getStrings() { - return strings; + public Long getFirstInteger() { + return firstInteger; } - public List getIntegers() { - return integers; + public String getFirstString() { + return firstString; } } diff --git a/src/test/java/biz/paluch/redis/extensibility/MyPubSubConnection.java b/src/test/java/biz/paluch/redis/extensibility/MyPubSubConnection.java index 71878d369b..523d93a813 100644 --- a/src/test/java/biz/paluch/redis/extensibility/MyPubSubConnection.java +++ b/src/test/java/biz/paluch/redis/extensibility/MyPubSubConnection.java @@ -2,6 +2,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Sets; import com.lambdaworks.redis.RedisChannelWriter; @@ -19,7 +20,7 @@ @SuppressWarnings("unchecked") public class MyPubSubConnection extends StatefulRedisPubSubConnectionImpl { - private Set interceptedChannels = Sets.newHashSet(); + private AtomicInteger subscriptions = new AtomicInteger(); /** * Initialize a new connection. @@ -37,7 +38,7 @@ public MyPubSubConnection(RedisChannelWriter writer, RedisCodec code public > C dispatch(C cmd) { if (cmd.getType() == CommandType.SUBSCRIBE) { - interceptedChannels.addAll(cmd.getArgs().getKeys()); + subscriptions.incrementAndGet(); } return super.dispatch(cmd); diff --git a/src/test/java/com/lambdaworks/redis/CustomCodecTest.java b/src/test/java/com/lambdaworks/redis/CustomCodecTest.java index 493c3db94e..9641443505 100644 --- a/src/test/java/com/lambdaworks/redis/CustomCodecTest.java +++ b/src/test/java/com/lambdaworks/redis/CustomCodecTest.java @@ -38,7 +38,7 @@ public void testByteCodec() throws Exception { assertThat(keys).contains(key.getBytes()); } - public class SerializedObjectCodec extends RedisCodec { + public class SerializedObjectCodec implements RedisCodec { private Charset charset = Charset.forName("UTF-8"); @Override @@ -59,17 +59,17 @@ public Object decodeValue(ByteBuffer bytes) { } @Override - public byte[] encodeKey(String key) { - return charset.encode(key).array(); + public ByteBuffer encodeKey(String key) { + return charset.encode(key); } @Override - public byte[] encodeValue(Object value) { + public ByteBuffer encodeValue(Object value) { try { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(bytes); os.writeObject(value); - return bytes.toByteArray(); + return ByteBuffer.wrap(bytes.toByteArray()); } catch (IOException e) { return null; } diff --git a/src/test/java/com/lambdaworks/redis/commands/AsyncCommandInternalsTest.java b/src/test/java/com/lambdaworks/redis/commands/AsyncCommandInternalsTest.java index 148bba3f99..9961039279 100644 --- a/src/test/java/com/lambdaworks/redis/commands/AsyncCommandInternalsTest.java +++ b/src/test/java/com/lambdaworks/redis/commands/AsyncCommandInternalsTest.java @@ -125,7 +125,6 @@ public void customKeywordWithArgs() throws Exception { codec))); sut.getArgs().add(MyKeywords.DUMMY); assertThat(sut.getArgs().toString()).contains(MyKeywords.DUMMY.name()); - assertThat(sut.getArgs().getKeywords()).contains(MyKeywords.DUMMY); } @Test diff --git a/src/test/java/com/lambdaworks/redis/commands/CommandInternalsTest.java b/src/test/java/com/lambdaworks/redis/commands/CommandInternalsTest.java index c987bb0205..63afce0564 100644 --- a/src/test/java/com/lambdaworks/redis/commands/CommandInternalsTest.java +++ b/src/test/java/com/lambdaworks/redis/commands/CommandInternalsTest.java @@ -76,7 +76,6 @@ public void customKeywordWithArgs() throws Exception { sut = new Command(MyKeywords.DUMMY, null, new CommandArgs(codec)); sut.getArgs().add(MyKeywords.DUMMY); assertThat(sut.getArgs().toString()).contains(MyKeywords.DUMMY.name()); - assertThat(sut.getArgs().getKeywords()).contains(MyKeywords.DUMMY); } @Test(expected = IllegalStateException.class) diff --git a/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java b/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java index 557706b00f..9a2ed0fc82 100644 --- a/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java @@ -23,6 +23,7 @@ public void del() throws Exception { redis.set(key + "1", value); redis.set(key + "2", value); + assertThat(redis.del(key + "1", key + "2")).isEqualTo(2); }