-
Notifications
You must be signed in to change notification settings - Fork 992
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Handle reconnects on an own executor group (instead of the timer/even…
…t loop group) and honor timeouts when waiting for reconnects #100 Motivation: Performing connect tasks on the event loop can block the channel threads. In consequence, no further channel message processing is possible and threads run into a dead-lock situation. Reconnects are handled now in a own executor group not harming the channel's event loop. The reconnect futures use the connection's timeout for the whole reconnect sequence (TCP connect and initializations such as PING or SSL) to prevent infinite blocking.
- Loading branch information
Showing
4 changed files
with
95 additions
and
37 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
package com.lambdaworks.redis; | ||
|
||
import static com.google.common.base.Preconditions.*; | ||
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import java.net.SocketAddress; | ||
import java.util.List; | ||
|
@@ -16,8 +16,10 @@ | |
import io.netty.channel.ChannelHandler; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelInboundHandlerAdapter; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.group.ChannelGroup; | ||
import io.netty.util.Timer; | ||
import io.netty.util.concurrent.EventExecutorGroup; | ||
|
||
/** | ||
* @author <a href="mailto:[email protected]">Mark Paluch</a> | ||
|
@@ -33,6 +35,7 @@ public class ConnectionBuilder { | |
private Timer timer; | ||
private Bootstrap bootstrap; | ||
private ClientOptions clientOptions; | ||
private EventExecutorGroup workerPool; | ||
private long timeout; | ||
private TimeUnit timeUnit; | ||
|
||
|
@@ -69,6 +72,11 @@ public ConnectionBuilder clientOptions(ClientOptions clientOptions) { | |
return this; | ||
} | ||
|
||
public ConnectionBuilder workerPool(EventExecutorGroup workerPool) { | ||
this.workerPool = workerPool; | ||
return this; | ||
} | ||
|
||
public ConnectionBuilder connectionEvents(ConnectionEvents connectionEvents) { | ||
this.connectionEvents = connectionEvents; | ||
return this; | ||
|
@@ -110,7 +118,8 @@ protected List<ChannelHandler> buildHandlers() { | |
checkState(timer != null, "timer must be set for autoReconnect=true"); | ||
checkState(socketAddressSupplier != null, "socketAddressSupplier must be set for autoReconnect=true"); | ||
|
||
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientOptions, bootstrap, timer, socketAddressSupplier); | ||
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientOptions, bootstrap, timer, workerPool, | ||
socketAddressSupplier); | ||
|
||
watchdog.setListenOnChannelInactive(true); | ||
handlers.add(watchdog); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters