Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge #1

Merged
merged 8 commits into from
Jun 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ atlassian-ide-plugin.xml
redis-git
*.releaseBackup
release.properties
work/
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
lettuce - A scalable Java Redis client
======================================

[![Build Status](https://snap-ci.com/mp911de/lettuce/branch/master/build_image)](https://snap-ci.com/mp911de/lettuce/branch/master)

Lettuce is a scalable thread-safe Redis client providing both synchronous and
asynchronous connections. Multiple threads may share one connection provided
they avoid blocking and transactional operations such as BLPOP, and MULTI/EXEC.
Expand All @@ -17,7 +19,7 @@ Lettuce works with:

Currently following commands are __not yet__ supported:

* Server Commands: DEBUG SEGFAULT, MONITOR, PUBSUB
* Server Commands: MONITOR

I'm pursuing the client code as a fork since the latest changes weren't merged back into https://github/wg/lettuce
I'd love not adding yet another Java redis client, but unfortunately it's the current situation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Closeable;
import java.util.List;
import java.util.Map;

/**
*
Expand All @@ -14,31 +15,120 @@
*/
public interface BaseRedisAsyncConnection<K, V> extends Closeable {

/**
* Post a message to a channel.
*
* @param channel the channel type: key
* @param message the message type: value
* @return RedisFuture<Long> integer-reply the number of clients that received the message.
*/
RedisFuture<Long> publish(K channel, V message);

/**
* Lists the currently *active channels*.
*
* @return RedisFuture<List<K>> array-reply a list of active channels, optionally matching the specified pattern.
*/
RedisFuture<List<K>> pubsubChannels();

/**
* Lists the currently *active channels*.
*
* @param channel the key
* @return RedisFuture<List<K>> array-reply a list of active channels, optionally matching the specified pattern.
*/
RedisFuture<List<K>> pubsubChannels(K channel);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels.
*
* @param channels
* @return array-reply a list of channels and number of subscribers for every channel.
*/
RedisFuture<Map<K, Long>> pubsubNumsub(K... channels);

/**
* Returns the number of subscriptions to patterns.
*
* @return RedisFuture<Long> integer-reply the number of patterns all the clients are subscribed to.
*/
RedisFuture<Long> pubsubNumpat();

/**
* Echo the given string.
*
* @param msg the message type: value
* @return RedisFuture<V> bulk-string-reply
*/
RedisFuture<V> echo(V msg);

/**
* Ping the server.
*
* @return RedisFuture<String> simple-string-reply
*/
RedisFuture<String> ping();

/**
* Close the connection.
*
* @return RedisFuture<String> simple-string-reply always OK.
*/
RedisFuture<String> quit();

@Override
void close();

String digest(V script);

/**
* Discard all commands issued after MULTI.
*
* @return RedisFuture<String> simple-string-reply always `OK`.
*/
RedisFuture<String> discard();

/**
* Execute all commands issued after MULTI.
*
* @return RedisFuture<List<Object>> array-reply each element being the reply to each of the commands in the atomic
* transaction.
*
* When using `WATCH`, `EXEC` can return a
*/
RedisFuture<List<Object>> exec();

/**
* Mark the start of a transaction block.
*
* @return RedisFuture<String> simple-string-reply always `OK`.
*/
RedisFuture<String> multi();

/**
* Watch the given keys to determine execution of the MULTI/EXEC block.
*
* @param keys the key
* @return RedisFuture<String> simple-string-reply always `OK`.
*/
RedisFuture<String> watch(K... keys);

/**
* Forget about all watched keys.
*
* @return RedisFuture<String> simple-string-reply always `OK`.
*/
RedisFuture<String> unwatch();

RedisFuture<Long> waitForReplication(int replicas, long timeout);

/**
* Close the connection. The connection will become not usable anymore as soon as this method was called.
*/
@Override
void close();

/**
*
* @return true if the connection is open (connected and not closed).
*/
boolean isOpen();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Closeable;
import java.util.List;
import java.util.Map;

/**
*
Expand All @@ -14,31 +15,119 @@
*/
public interface BaseRedisConnection<K, V> extends Closeable {

/**
* Post a message to a channel.
*
* @param channel the channel type: key
* @param message the message type: value
* @return Long integer-reply the number of clients that received the message.
*/
Long publish(K channel, V message);

/**
* Lists the currently *active channels*.
*
* @return List<K> array-reply a list of active channels, optionally matching the specified pattern.
*/
List<K> pubsubChannels();

/**
* Lists the currently *active channels*.
*
* @param channel the key
* @return List<K> array-reply a list of active channels, optionally matching the specified pattern.
*/
List<K> pubsubChannels(K channel);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels.
*
* @param channels
* @return array-reply a list of channels and number of subscribers for every channel.
*/
Map<K, Long> pubsubNumsub(K... channels);

/**
* Returns the number of subscriptions to patterns.
*
* @return Long integer-reply the number of patterns all the clients are subscribed to.
*/
Long pubsubNumpat();

/**
* Echo the given string.
*
* @param msg the message type: value
* @return V bulk-string-reply
*/
V echo(V msg);

/**
* Ping the server.
*
* @return String simple-string-reply
*/
String ping();

/**
* Close the connection.
*
* @return String simple-string-reply always OK.
*/
String quit();

@Override
void close();

String digest(V script);

/**
* Discard all commands issued after MULTI.
*
* @return String simple-string-reply always `OK`.
*/
String discard();

/**
* Execute all commands issued after MULTI.
*
* @return List<Object> array-reply each element being the reply to each of the commands in the atomic transaction.
*
* When using `WATCH`, `EXEC` can return a
*/
List<Object> exec();

/**
* Mark the start of a transaction block.
*
* @return String simple-string-reply always `OK`.
*/
String multi();

/**
* Watch the given keys to determine execution of the MULTI/EXEC block.
*
* @param keys the key
* @return String simple-string-reply always `OK`.
*/
String watch(K... keys);

/**
* Forget about all watched keys.
*
* @return String simple-string-reply always `OK`.
*/
String unwatch();

Long waitForReplication(int replicas, long timeout);

/**
* Close the connection. The connection will become not usable anymore as soon as this method was called.
*/
@Override
void close();

/**
*
* @return true if the connection is open (connected and not closed).
*/
boolean isOpen();

}
18 changes: 18 additions & 0 deletions lettuce/src/main/java/com/lambdaworks/redis/Connections.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@
import java.util.concurrent.ExecutionException;

/**
* Utility for checking a connection's state.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 14.05.14 22:05
*/
public class Connections {

/**
*
* @param connection
* @return true if the connection is valid (ping works).
*/
public final static boolean isValid(Object connection) {

checkNotNull(connection, "connection must not be null");
Expand Down Expand Up @@ -39,6 +47,11 @@ public final static boolean isValid(Object connection) {
throw new IllegalArgumentException("Connection class " + connection.getClass() + " not supported");
}

/**
*
* @param connection
* @return true if the connection is open.
*/
public final static boolean isOpen(Object connection) {

checkNotNull(connection, "connection must not be null");
Expand All @@ -55,6 +68,11 @@ public final static boolean isOpen(Object connection) {
throw new IllegalArgumentException("Connection class " + connection.getClass() + " not supported");
}

/**
* Closes a connection.
*
* @param connection
*/
public static void close(Object connection) {

checkNotNull(connection, "connection must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,26 @@ public RedisFuture<Long> publish(K channel, V message) {
return dispatch(commandBuilder.publish(channel, message));
}

@Override
public RedisFuture<List<K>> pubsubChannels() {
return dispatch(commandBuilder.pubsubChannels());
}

@Override
public RedisFuture<List<K>> pubsubChannels(K channel) {
return dispatch(commandBuilder.pubsubChannels(channel));
}

@Override
public RedisFuture<Map<K, Long>> pubsubNumsub(K... channels) {
return dispatch(commandBuilder.pubsubNumsub(channels));
}

@Override
public RedisFuture<Long> pubsubNumpat() {
return dispatch(commandBuilder.pubsubNumpat());
}

@Override
public RedisFuture<String> quit() {
return dispatch(commandBuilder.quit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ public Command<K, V, Long> publish(K channel, V message) {
return createCommand(PUBLISH, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, List<K>> pubsubChannels() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(CHANNELS);
return createCommand(PUBSUB, new KeyListOutput<K, V>(codec), args);
}

public Command<K, V, List<K>> pubsubChannels(K pattern) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(CHANNELS).addKey(pattern);
return createCommand(PUBSUB, new KeyListOutput<K, V>(codec), args);
}

public Command<K, V, Map<K, Long>> pubsubNumsub(K... pattern) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(NUMSUB).addKeys(pattern);
return createCommand(PUBSUB, (MapOutput) new MapOutput<K, Long>((RedisCodec) codec), args);
}

public Command<K, V, Long> pubsubNumpat() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(NUMPAT);
return createCommand(PUBSUB, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, String> quit() {
return createCommand(QUIT, new StatusOutput<K, V>(codec));
}
Expand Down
Loading