Skip to content

Commit

Permalink
#393 add race condition when pool init and switch
Browse files Browse the repository at this point in the history
  • Loading branch information
Calvin committed Sep 13, 2014
1 parent a98f360 commit 897aff8
Showing 1 changed file with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.nosql.redis.JedisTemplate;
import org.springside.modules.nosql.redis.JedisTemplate.JedisAction;
import org.springside.modules.nosql.redis.JedisUtils;
import org.springside.modules.utils.Threads;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
Expand All @@ -33,7 +32,7 @@ public final class JedisSentinelPool extends JedisPool {
private String masterName;
private JedisPoolConfig masterPoolConfig;
private ConnectionInfo masterConnectionInfo;
private CountDownLatch poolInitLock = new CountDownLatch(1);
private AtomicBoolean poolInit = new AtomicBoolean(false);

/**
* Creates a new instance of <code>JedisSentinelPool</code>.
Expand All @@ -43,7 +42,7 @@ public final class JedisSentinelPool extends JedisPool {
* @param sentinelAddresses Array of HostAndPort to sentinel instances.
* @param masterName One sentinel can monitor several redis master-slave pair, use master name to identify them.
* @param masterConnectionInfo The the other information like password,timeout.
* @param masterPoolConfig Config of redis pool.
* @param masterPoolConfig Configuration of redis pool.
*
*/
public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, ConnectionInfo masterConnectionInfo,
Expand Down Expand Up @@ -71,20 +70,32 @@ public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, Con
// Start MasterSwitchListener thread ,internal poll will be start in the thread
masterSwitchListener = new MasterSwitchListener();
masterSwitchListener.start();
try {
boolean result = poolInitLock.await(10, TimeUnit.SECONDS);
if (!result) {
logger.warn("jedis pool can't not init after 10 seconds");
}
} catch (InterruptedException e) {
return;
}

waitForPoolInit(5000);
}

public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, JedisPoolConfig masterPoolConfig) {
this(sentinelAddresses, masterName, new ConnectionInfo(), masterPoolConfig);
}

@Override
public Jedis getResource() {
if (!poolInit.get()) {
waitForPoolInit(1000);
}
return super.getResource();
}

private void waitForPoolInit(long mills) {
long startTime = System.currentTimeMillis();
while (!poolInit.get() && ((System.currentTimeMillis() - startTime) < mills)) {
Threads.sleep(100);
}
if (!poolInit.get()) {
logger.warn("Wait for pool init but timeout");
}
}

@Override
public void destroy() {
// shutdown the listener thread
Expand All @@ -109,7 +120,8 @@ public void destroy() {
}

protected void destroyInternelPool() {
super.destroy();
closeInternalPool();

address = null;
connectionInfo = null;
internalPool = null;
Expand Down Expand Up @@ -173,14 +185,15 @@ public void run() {

if ((internalPool != null) && isAddressChange(masterAddress)) {
logger.info("The internalPool {} had changed, destroy it now.", previousMasterAddress);
poolInit.set(false);
destroyInternelPool();
}

if ((internalPool == null) || isAddressChange(masterAddress)) {
if (internalPool == null) {
logger.info("The internalPool {} is not init or the address had changed, init it now.",
masterAddress);
initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig);
poolInitLock.countDown();
poolInit.set(true);
}

previousMasterAddress = masterAddress;
Expand Down Expand Up @@ -281,14 +294,14 @@ public void onMessage(String channel, String message) {
String[] switchMasterMsg = message.split(" ");
// if the master name equals my master name, destroy the old pool and init a new pool
if (masterName.equals(switchMasterMsg[0])) {
destroyInternelPool();

HostAndPort masterAddress = new HostAndPort(switchMasterMsg[3],
Integer.parseInt(switchMasterMsg[4]));
logger.info("Switch master to " + masterAddress);

poolInit.set(false);
destroyInternelPool();
initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig);

poolInit.set(true);
previousMasterAddress = masterAddress;
}
}
Expand Down

0 comments on commit 897aff8

Please sign in to comment.