Skip to content

Commit

Permalink
Improve Codec API #118
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Aug 25, 2015
1 parent 40e7184 commit 0daa66b
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 77 deletions.
2 changes: 1 addition & 1 deletion RELEASE-NOTES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ Cross-slot command execution is available on the following APIs:
* RedisAdvancedClusterAsyncCommands
* RedisAdvancedClusterReactiveCommands


Node Selection API/Execution of commands on multiple cluster nodes
------------------------------------------------------------------
The advanced cluster API allows to select nodes and run commands on the node selection:
Expand Down Expand Up @@ -207,6 +206,7 @@ Enhancements
* Drop support for Java 6 and Java 7 #50
* Migrate RedisFuture to CompletionStage #48
* Allow limiting the request queue size #115
* Improve Codec API #118


Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import static com.lambdaworks.redis.protocol.CommandType.*;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.lambdaworks.redis.api.StatefulConnection;
import com.lambdaworks.redis.api.rx.*;
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/lambdaworks/redis/LettuceStrings.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.lambdaworks.redis;

import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

Expand Down Expand Up @@ -61,6 +62,16 @@ public static String string(double n) {
* @return the Base16 encoded SHA1 value
*/
public static String digest(byte[] script) {
return digest(ByteBuffer.wrap(script));
}

/**
* Create SHA1 digest from Lua script.
*
* @param script the script
* @return the Base16 encoded SHA1 value
*/
public static String digest(ByteBuffer script) {
try {
MessageDigest md = MessageDigest.getInstance("SHA1");
md.update(script);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public RedisCommands<K, V> sync() {
return sync;
}


@Override
public boolean isMulti() {
return multi != null;
Expand Down Expand Up @@ -142,16 +141,16 @@ public <T, C extends RedisCommand<K, V, T>> C dispatch(C cmd) {

if (local.getType().name().equals(AUTH.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
this.password = cmd.getArgs().getStrings().get(0).toCharArray();
if ("OK".equals(status) && cmd.getArgs().getFirstString() != null) {
this.password = cmd.getArgs().getFirstString().toCharArray();
}
});
}

if (local.getType().name().equals(SELECT.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
this.db = cmd.getArgs().getIntegers().get(0).intValue();
if ("OK".equals(status) && cmd.getArgs().getFirstInteger() != null) {
this.db = cmd.getArgs().getFirstInteger().intValue();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public <T, C extends RedisCommand<K, V, T>> C write(C command) {
}
}

if (channelWriter == null && args != null && !args.getKeys().isEmpty()) {
int hash = getHash(args.getEncodedKey(0));
if (channelWriter == null && args != null && args.getFirstEncodedKey() != null) {
int hash = SlotHash.getSlot(args.getFirstEncodedKey());
RedisChannelHandler<K, V> connection = (RedisChannelHandler<K, V>) clusterConnectionProvider.getConnection(
ClusterConnectionProvider.Intent.WRITE, hash);

Expand Down Expand Up @@ -126,10 +126,6 @@ private HostAndPort getAskTarget(String errorMessage) {
return HostAndPort.fromString(movedMessageParts.get(2));
}

protected int getHash(byte[] encodedKey) {
return SlotHash.getSlot(encodedKey);
}

@Override
public void close() {

Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/lambdaworks/redis/cluster/SlotHash.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.lambdaworks.redis.cluster;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -54,14 +55,29 @@ public static final int getSlot(String key) {
* @return slot
*/
public static final int getSlot(byte[] key) {
byte[] finalKey = key;
int start = indexOf(key, SUBKEY_START);
return getSlot(ByteBuffer.wrap(key));
}

/**
* Calculate the slot from the given key.
*
* @param key the key
* @return slot
*/
public static final int getSlot(ByteBuffer key) {

byte[] input = new byte[key.remaining()];
key.duplicate().get(input);

byte[] finalKey = input;

int start = indexOf(input, SUBKEY_START);
if (start != -1) {
int end = indexOf(key, start + 1, SUBKEY_END);
int end = indexOf(input, start + 1, SUBKEY_END);
if (end != -1 && end != start + 1) {

finalKey = new byte[end - (start + 1)];
System.arraycopy(key, start + 1, finalKey, 0, finalKey.length);
System.arraycopy(input, start + 1, finalKey, 0, finalKey.length);
}
}
return CRC16.crc16(finalKey) % SLOT_COUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ public <T, C extends RedisCommand<K, V, T>> C dispatch(C cmd) {

if (local.getType().name().equals(AUTH.name())) {
local = attachOnComplete(local, status -> {
if (status.equals("OK")) {
this.password = cmd.getArgs().getStrings().get(0).toCharArray();
if (status.equals("OK") && cmd.getArgs().getFirstString() != null) {
this.password = cmd.getArgs().getFirstString().toCharArray();
}
});
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/lambdaworks/redis/codec/ByteArrayCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 3.3
*/
public class ByteArrayCodec extends RedisCodec<byte[], byte[]> {
public class ByteArrayCodec implements RedisCodec<byte[], byte[]> {

@Override
public byte[] decodeKey(ByteBuffer bytes) {
Expand All @@ -21,13 +21,13 @@ public byte[] decodeValue(ByteBuffer bytes) {
}

@Override
public byte[] encodeKey(byte[] key) {
return key;
public ByteBuffer encodeKey(byte[] key) {
return ByteBuffer.wrap(key);
}

@Override
public byte[] encodeValue(byte[] value) {
return value;
public ByteBuffer encodeValue(byte[] value) {
return ByteBuffer.wrap(value);
}

private static byte[] getBytes(ByteBuffer buffer) {
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/com/lambdaworks/redis/codec/RedisCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@
import java.nio.ByteBuffer;

/**
* A RedisCodec encodes keys and values sent to redis, and decodes keys and values in the command output.
* A {@link RedisCodec} encodes keys and values sent to Redis, and decodes keys and values in the command output.
*
* The methods will be called by multiple threads and must be thread-safe.
* The methods are called by multiple threads and must be thread-safe.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
* @author Mark Paluch
*/
public abstract class RedisCodec<K, V> {
public interface RedisCodec<K, V> {
/**
* Decode the key output by redis.
*
* @param bytes Raw bytes of the key.
*
* @return The decoded key.
*/
public abstract K decodeKey(ByteBuffer bytes);
K decodeKey(ByteBuffer bytes);

/**
* Decode the value output by redis.
Expand All @@ -31,7 +32,7 @@ public abstract class RedisCodec<K, V> {
*
* @return The decoded value.
*/
public abstract V decodeValue(ByteBuffer bytes);
V decodeValue(ByteBuffer bytes);

/**
* Encode the key for output to redis.
Expand All @@ -40,7 +41,7 @@ public abstract class RedisCodec<K, V> {
*
* @return The encoded key.
*/
public abstract byte[] encodeKey(K key);
ByteBuffer encodeKey(K key);

/**
* Encode the value for output to redis.
Expand All @@ -49,5 +50,6 @@ public abstract class RedisCodec<K, V> {
*
* @return The encoded value.
*/
public abstract byte[] encodeValue(V value);
ByteBuffer encodeValue(V value);

}
18 changes: 11 additions & 7 deletions src/main/java/com/lambdaworks/redis/codec/Utf8StringCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*
* @author Will Glozer
*/
public class Utf8StringCodec extends RedisCodec<String, String> {
public class Utf8StringCodec implements RedisCodec<String, String> {

private final static byte[] EMPTY = new byte[0];

private Charset charset;
private CharsetDecoder decoder;
private CharBuffer chars;


/**
* Initialize a new instance that encodes and decodes strings using the UTF-8 charset;
*/
Expand All @@ -41,12 +45,12 @@ public String decodeValue(ByteBuffer bytes) {
}

@Override
public byte[] encodeKey(String key) {
public ByteBuffer encodeKey(String key) {
return encode(key);
}

@Override
public byte[] encodeValue(String value) {
public ByteBuffer encodeValue(String value) {
return encode(value);
}

Expand All @@ -63,11 +67,11 @@ private synchronized String decode(ByteBuffer bytes) {
return chars.flip().toString();
}

private byte[] encode(String string) {

private ByteBuffer encode(String string) {
if (string == null) {
return new byte[0];
return ByteBuffer.wrap(EMPTY);
}
return string.getBytes(charset);

return charset.encode(string);
}
}
Loading

0 comments on commit 0daa66b

Please sign in to comment.