Skip to content

Commit

Permalink
#393 rollback race condition,only wait for init
Browse files Browse the repository at this point in the history
  • Loading branch information
Calvin committed Sep 14, 2014
1 parent 6af30e2 commit aaa0833
Showing 1 changed file with 15 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.nosql.redis.JedisTemplate;
import org.springside.modules.nosql.redis.JedisTemplate.JedisAction;
import org.springside.modules.nosql.redis.JedisUtils;
import org.springside.modules.utils.Threads;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
* Pool which to get redis master address get from sentinel instances.
*/
/**
* Pool which to get redis master address get from sentinel instances.
*/
Expand All @@ -37,7 +41,7 @@ public final class JedisSentinelPool extends JedisPool {
private String masterName;
private JedisPoolConfig masterPoolConfig;
private ConnectionInfo masterConnectionInfo;
private AtomicBoolean poolInit = new AtomicBoolean(false);
private CountDownLatch poolInit = new CountDownLatch(1);

/**
* Creates a new instance of <code>JedisSentinelPool</code>.
Expand Down Expand Up @@ -76,31 +80,19 @@ public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, Con
masterSwitchListener = new MasterSwitchListener();
masterSwitchListener.start();

waitForPoolInit(5000);
try {
if (!poolInit.await(5, TimeUnit.SECONDS)) {
logger.warn("the sentiel pool can't not init in 5 seconds");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, JedisPoolConfig masterPoolConfig) {
this(sentinelAddresses, masterName, new ConnectionInfo(), masterPoolConfig);
}

@Override
public Jedis getResource() {
if (!poolInit.get()) {
waitForPoolInit(1000);
}
return super.getResource();
}

private void waitForPoolInit(long mills) {
long startTime = System.currentTimeMillis();
while (!poolInit.get() && ((System.currentTimeMillis() - startTime) < mills)) {
Threads.sleep(100);
}
if (!poolInit.get()) {
logger.warn("Wait for pool init but timeout");
}
}

@Override
public void destroy() {
// shutdown the listener thread
Expand Down Expand Up @@ -190,15 +182,14 @@ public void run() {

if ((internalPool != null) && isAddressChange(masterAddress)) {
logger.info("The internalPool {} had changed, destroy it now.", previousMasterAddress);
poolInit.set(false);
destroyInternelPool();
}

if (internalPool == null) {
logger.info("The internalPool {} is not init or the address had changed, init it now.",
masterAddress);
initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig);
poolInit.set(true);
poolInit.countDown();
}

previousMasterAddress = masterAddress;
Expand Down Expand Up @@ -303,10 +294,8 @@ public void onMessage(String channel, String message) {
HostAndPort masterAddress = new HostAndPort(switchMasterMsg[3],
Integer.parseInt(switchMasterMsg[4]));
logger.info("Switch master to " + masterAddress);
poolInit.set(false);
destroyInternelPool();
initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig);
poolInit.set(true);
previousMasterAddress = masterAddress;
}
}
Expand All @@ -332,4 +321,4 @@ public void onPSubscribe(String pattern, int subscribedChannels) {
}
}
}
}
}

0 comments on commit aaa0833

Please sign in to comment.