Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve robustness around redis connection issues #26

Merged
merged 1 commit into from
Dec 4, 2012
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/main/java/net/greghaines/jesque/worker/WorkerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ protected enum WorkerState
private static final Logger log = LoggerFactory.getLogger(WorkerImpl.class);
private static final AtomicLong workerCounter = new AtomicLong(0);
protected static final long emptyQueueSleepTime = 500; // 500 ms
private static final long reconnectSleepTime = 5000; // 5s
private static final int reconnectAttempts = 120; // Total time: 10min
protected static final long reconnectSleepTime = 5000; // 5s
protected static final int reconnectAttempts = 120; // Total time: 10min
private static volatile boolean threadNameChangingEnabled = false; // set the thread name to the message for debugging

/**
Expand Down Expand Up @@ -162,7 +162,7 @@ protected static void checkQueues(final Iterable<String> queues)
"Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": ";
private final AtomicReference<Thread> workerThreadRef =
new AtomicReference<Thread>(null);
private final AtomicReference<WorkerExceptionHandler> exceptionHandlerRef =
protected final AtomicReference<WorkerExceptionHandler> exceptionHandlerRef =
new AtomicReference<WorkerExceptionHandler>(new DefaultWorkerExceptionHandler());

/**
Expand Down Expand Up @@ -535,8 +535,16 @@ protected void recoverFromException(final String curQueue, final Exception e)
this.jedis.disconnect();
try { Thread.sleep(reconnectSleepTime); } catch (Exception e2){}
this.jedis.connect();
String pingResult = this.jedis.ping();
if( !pingResult.equals("PONG") ) {
log.info("Unknown redis ping result: " + pingResult);
this.jedis.disconnect();
}
}
catch (JedisConnectionException jce){} // Ignore bad connection attempts
catch (Exception e3) {
log.error("Unknown Exception while trying to reconnect to jedis", e3);
}
} while (++i <= reconAttempts && !this.jedis.isConnected());
if (!this.jedis.isConnected())
{
Expand Down