Skip to content

Commit

Permalink
redis#437 - Added support for closing Broken Connection
Browse files Browse the repository at this point in the history
  • Loading branch information
giridharkannan committed Jul 22, 2013
1 parent 69f5340 commit 338d11e
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 114 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.6</version>
</dependency>
</dependencies>

<build>
Expand Down
153 changes: 112 additions & 41 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
Expand All @@ -16,6 +19,8 @@
import redis.clients.util.SafeEncoder;

public class Connection {
public static Logger log = LoggerFactory.getLogger(Connection.class);

private String host;
private int port = Protocol.DEFAULT_PORT;
private Socket socket;
Expand Down Expand Up @@ -63,11 +68,12 @@ public Connection(final String host) {
}

protected void flush() {
try {
outputStream.flush();
} catch (IOException e) {
throw new JedisConnectionException(e);
}
try {
outputStream.flush();
} catch (IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

protected Connection sendCommand(final Command cmd, final String... args) {
Expand All @@ -80,16 +86,40 @@ protected Connection sendCommand(final Command cmd, final String... args) {

protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
try {
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
} catch(IOException e) {
close();
throw new JedisConnectionException(e);
}
}

private void close() {
if(socket == null) { return; }

try {
socket.close();
socket = null;
} catch(IOException e) {
if(log.isErrorEnabled()) {
log.error(String.format("error closing socket : %s" +
" for host : %s:%s", toString(), getHost(), getPort()), e);
}
}
}

protected Connection sendCommand(final Command cmd) {
connect();
Protocol.sendCommand(outputStream, cmd, new byte[0][]);
pipelinedCommands++;
return this;
try {
Protocol.sendCommand(outputStream, cmd, new byte[0][]);
pipelinedCommands++;
return this;
} catch (IOException e) {
close();
throw new JedisConnectionException(e);
}
}

public Connection(final String host, final int port) {
Expand Down Expand Up @@ -134,6 +164,7 @@ public void connect() {
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}
Expand All @@ -160,14 +191,19 @@ public boolean isConnected() {
}

protected String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) Protocol.read(inputStream);
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
flush();
try {
pipelinedCommands--;
final byte[] resp = (byte[]) Protocol.read(inputStream);
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
} catch(IOException e) {
close();
throw new JedisConnectionException(e);
}
}

public String getBulkReply() {
Expand All @@ -181,14 +217,24 @@ public String getBulkReply() {

public byte[] getBinaryBulkReply() {
flush();
pipelinedCommands--;
return (byte[]) Protocol.read(inputStream);
try {
pipelinedCommands--;
return (byte[]) Protocol.read(inputStream);
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

public Long getIntegerReply() {
flush();
pipelinedCommands--;
return (Long) Protocol.read(inputStream);
try {
flush();
pipelinedCommands--;
return (Long) Protocol.read(inputStream);
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

public List<String> getMultiBulkReply() {
Expand All @@ -198,22 +244,37 @@ public List<String> getMultiBulkReply() {
@SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<byte[]>) Protocol.read(inputStream);
try {
pipelinedCommands--;
return (List<byte[]>) Protocol.read(inputStream);
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

@SuppressWarnings("unchecked")
public List<Object> getObjectMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<Object>) Protocol.read(inputStream);
try {
pipelinedCommands--;
return (List<Object>) Protocol.read(inputStream);
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

@SuppressWarnings("unchecked")
public List<Long> getIntegerMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<Long>) Protocol.read(inputStream);
try {
pipelinedCommands--;
return (List<Long>) Protocol.read(inputStream);
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

public List<Object> getAll() {
Expand All @@ -223,20 +284,30 @@ public List<Object> getAll() {
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > except) {
try{
all.add(Protocol.read(inputStream));
}catch(JedisDataException e){
all.add(e);
}
pipelinedCommands--;
}
return all;
try {
while (pipelinedCommands > except) {
try{
all.add(Protocol.read(inputStream));
}catch(JedisDataException e){
all.add(e);
}
pipelinedCommands--;
}
return all;
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}

public Object getOne() {
flush();
pipelinedCommands--;
return Protocol.read(inputStream);
try {
pipelinedCommands--;
return Protocol.read(inputStream);
} catch(IOException ex) {
close();
throw new JedisConnectionException(ex);
}
}
}
14 changes: 7 additions & 7 deletions src/main/java/redis/clients/jedis/JedisPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ public JedisPool(final Config poolConfig, final String host, int port, int timeo
final int database) {
super(poolConfig, new JedisFactory(host, port, timeout, password, database));
}


public void returnBrokenResource(final BinaryJedis resource) {
returnBrokenResourceObject(resource);
}

public void returnResource(final BinaryJedis resource) {
returnResourceObject(resource);
public void returnResource(final Jedis resource) {
if(resource.isConnected()) {
returnResourceObject(resource);
} else {
returnBrokenResourceObject(resource);
}
}


/**
* PoolableObjectFactory custom impl.
*/
Expand Down
47 changes: 17 additions & 30 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.ArrayList;
import java.util.List;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
Expand Down Expand Up @@ -35,13 +34,12 @@ private Protocol() {
}

public static void sendCommand(final RedisOutputStream os,
final Command command, final byte[]... args) {
final Command command, final byte[]... args) throws IOException {
sendCommand(os, command.raw, args);
}

private static void sendCommand(final RedisOutputStream os,
final byte[] command, final byte[]... args) {
try {
final byte[] command, final byte[]... args) throws IOException {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
Expand All @@ -55,18 +53,14 @@ private static void sendCommand(final RedisOutputStream os,
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}

private static void processError(final RedisInputStream is) {
private static void processError(final RedisInputStream is) throws IOException {
String message = is.readLine();
throw new JedisDataException(message);
}

private static Object process(final RedisInputStream is) {
try {
private static Object process(final RedisInputStream is) throws IOException {
byte b = is.readByte();
if (b == MINUS_BYTE) {
processError(is);
Expand All @@ -79,45 +73,38 @@ private static Object process(final RedisInputStream is) {
} else if (b == PLUS_BYTE) {
return processStatusCodeReply(is);
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
throw new JedisDataException("Unknown reply: " + (char) b);
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
return null;
return null;
}

private static byte[] processStatusCodeReply(final RedisInputStream is) {
private static byte[] processStatusCodeReply(final RedisInputStream is) throws IOException {
return SafeEncoder.encode(is.readLine());
}

private static byte[] processBulkReply(final RedisInputStream is) {
private static byte[] processBulkReply(final RedisInputStream is) throws IOException {
int len = Integer.parseInt(is.readLine());
if (len == -1) {
return null;
}
byte[] read = new byte[len];
int offset = 0;
try {
while (offset < len) {
offset += is.read(read, offset, (len - offset));
}
// read 2 more bytes for the command delimiter
is.readByte();
is.readByte();
} catch (IOException e) {
throw new JedisConnectionException(e);
}
while (offset < len) {
offset += is.read(read, offset, (len - offset));
}
// read 2 more bytes for the command delimiter
is.readByte();
is.readByte();

return read;
}

private static Long processInteger(final RedisInputStream is) {
private static Long processInteger(final RedisInputStream is) throws IOException {
String num = is.readLine();
return Long.valueOf(num);
}

private static List<Object> processMultiBulkReply(final RedisInputStream is) {
private static List<Object> processMultiBulkReply(final RedisInputStream is) throws IOException {
int num = Integer.parseInt(is.readLine());
if (num == -1) {
return null;
Expand All @@ -133,7 +120,7 @@ private static List<Object> processMultiBulkReply(final RedisInputStream is) {
return ret;
}

public static Object read(final RedisInputStream is) {
public static Object read(final RedisInputStream is) throws IOException {
return process(is);
}

Expand Down
Loading

0 comments on commit 338d11e

Please sign in to comment.