From a98f360489282c44610b53c7488e70d57eb52431 Mon Sep 17 00:00:00 2001 From: Calvin Date: Sat, 13 Sep 2014 11:11:48 +0800 Subject: [PATCH] =?UTF-8?q?#393=20=E6=96=B0=E7=9A=84Connection=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E5=8F=8APool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/showcase/pom.xml | 4 + .../demos/redis/JedisPoolFactory.java | 27 -- .../demos/redis/RedisCounterBenchmark.java | 9 +- .../demos/redis/RedisSessionBenchmark.java | 7 +- .../redis/elector/MasterElectorDemo.java | 9 +- .../AdvancedJobConsumerBatchPopDemo.java | 6 +- .../AdvancedJobConsumerSinglePopDemo.java | 6 +- .../job/consumer/SimpleJobConsumerDemo.java | 9 +- .../dispatcher/ReliableJobDispatcherDemo.java | 9 +- .../dispatcher/SimpleJobDispatcherDemo.java | 9 +- .../redis/job/producer/JobProducerDemo.java | 9 +- modules/parent/pom.xml | 5 + .../modules/nosql/redis/JedisTemplate.java | 2 +- .../modules/nosql/redis/JedisUtils.java | 57 ++- .../nosql/redis/pool/ConnectionInfo.java | 72 +--- .../modules/nosql/redis/pool/JedisPool.java | 65 +++- .../nosql/redis/pool/JedisPoolBuilder.java | 175 +++++++++ .../nosql/redis/pool/JedisSentinelPool.java | 333 +++++++++--------- .../redis/scheduler/AdvancedJobConsumer.java | 2 +- 19 files changed, 467 insertions(+), 348 deletions(-) delete mode 100644 examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/JedisPoolFactory.java create mode 100644 modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPoolBuilder.java diff --git a/examples/showcase/pom.xml b/examples/showcase/pom.xml index 1f7f2e258..d83ae14bc 100644 --- a/examples/showcase/pom.xml +++ b/examples/showcase/pom.xml @@ -35,6 +35,10 @@ org.springside springside-metrics + + org.springside + springside-redis + diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/JedisPoolFactory.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/JedisPoolFactory.java deleted file mode 100644 index 18fa847ae..000000000 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/JedisPoolFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.examples.showcase.demos.redis; - -import org.springside.modules.nosql.redis.JedisUtils; - -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; - -public class JedisPoolFactory { - - public static JedisPool createJedisPool(String defaultHost, int defaultPort, int defaultTimeout, int threadCount) { - // 合并命令行传入的系统变量与默认值 - String host = System.getProperty("redis.host", defaultHost); - String port = System.getProperty("redis.port", String.valueOf(defaultPort)); - String timeout = System.getProperty("redis.timeout", String.valueOf(defaultTimeout)); - - // 设置Pool大小,设为与线程数等大,并屏蔽掉idle checking - JedisPoolConfig poolConfig = JedisUtils.createPoolConfig(threadCount, threadCount); - - // create jedis pool - return new JedisPool(poolConfig, host, Integer.valueOf(port), Integer.valueOf(timeout)); - } -} diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisCounterBenchmark.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisCounterBenchmark.java index 32f975a02..381c750cb 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisCounterBenchmark.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisCounterBenchmark.java @@ -6,12 +6,11 @@ package org.springside.examples.showcase.demos.redis; import org.springside.modules.nosql.redis.JedisTemplate; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.test.benchmark.BenchmarkTask; import org.springside.modules.test.benchmark.ConcurrentBenchmark; -import redis.clients.jedis.JedisPool; - /** * 测试Redis用于计数器时incr()方法的性能. * @@ -39,8 +38,8 @@ public RedisCounterBenchmark() { @Override protected void setUp() { - pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, threadCount); + + pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(threadCount).buildPool(); jedisTemplate = new JedisTemplate(pool); // 重置Counter diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisSessionBenchmark.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisSessionBenchmark.java index c92495de7..d19ce81cd 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisSessionBenchmark.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/RedisSessionBenchmark.java @@ -10,12 +10,12 @@ import org.springside.modules.mapper.JsonMapper; import org.springside.modules.nosql.redis.JedisTemplate; import org.springside.modules.nosql.redis.JedisTemplate.JedisActionNoResult; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.test.benchmark.BenchmarkTask; import org.springside.modules.test.benchmark.ConcurrentBenchmark; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; /** * 测试Redis用于Session管理的setEx()与get()方法性能, 使用JSON格式存储数据. @@ -44,8 +44,7 @@ public RedisSessionBenchmark() { @Override protected void setUp() { - pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, threadCount); + pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(threadCount).buildPool(); jedisTemplate = new JedisTemplate(pool); // 清空数据库 diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/elector/MasterElectorDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/elector/MasterElectorDemo.java index 4b19e49ee..f6b2ad0c4 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/elector/MasterElectorDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/elector/MasterElectorDemo.java @@ -5,18 +5,15 @@ *******************************************************************************/ package org.springside.examples.showcase.demos.redis.elector; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; -import org.springside.modules.nosql.redis.JedisUtils; import org.springside.modules.nosql.redis.elector.MasterElector; - -import redis.clients.jedis.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; public class MasterElectorDemo { public static void main(String[] args) throws Exception { - JedisPool pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, 1); + JedisPool pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(1).buildPool(); try { MasterElector masterElector = new MasterElector(pool, 5); diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerBatchPopDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerBatchPopDemo.java index e9ae7ac4d..90e6d049c 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerBatchPopDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerBatchPopDemo.java @@ -10,8 +10,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.nosql.redis.scheduler.AdvancedJobConsumer; import org.springside.modules.nosql.redis.scheduler.SimpleJobConsumer; import org.springside.modules.test.benchmark.ConcurrentBenchmark; @@ -41,8 +40,7 @@ public static void main(String[] args) throws Exception { batchSize = Integer.parseInt(System.getProperty("batchsize", String.valueOf(AdvancedJobConsumer.DEFAULT_BATCH_SIZE))); - pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, threadCount); + pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(threadCount).buildPool(); ExecutorService threadPool = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++) { diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerSinglePopDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerSinglePopDemo.java index fd1579581..be9b7ad75 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerSinglePopDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/AdvancedJobConsumerSinglePopDemo.java @@ -9,8 +9,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.nosql.redis.scheduler.AdvancedJobConsumer; import org.springside.modules.nosql.redis.scheduler.SimpleJobConsumer; import org.springside.modules.test.benchmark.ConcurrentBenchmark; @@ -32,8 +31,7 @@ public static void main(String[] args) throws Exception { threadCount = Integer.parseInt(System.getProperty(ConcurrentBenchmark.THREAD_COUNT_NAME, String.valueOf(THREAD_COUNT))); - pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, threadCount); + pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(threadCount).buildPool(); ExecutorService threadPool = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++) { diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/SimpleJobConsumerDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/SimpleJobConsumerDemo.java index 0fa5dd0d9..a1f1cdbab 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/SimpleJobConsumerDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/consumer/SimpleJobConsumerDemo.java @@ -10,13 +10,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.nosql.redis.scheduler.SimpleJobConsumer; import org.springside.modules.test.benchmark.ConcurrentBenchmark; -import redis.clients.jedis.JedisPool; - import com.google.common.util.concurrent.RateLimiter; /** @@ -49,8 +47,7 @@ public static void main(String[] args) throws Exception { threadCount = Integer.parseInt(System.getProperty(ConcurrentBenchmark.THREAD_COUNT_NAME, String.valueOf(THREAD_COUNT))); - pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, threadCount); + pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(threadCount).buildPool(); ExecutorService threadPool = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++) { diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/ReliableJobDispatcherDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/ReliableJobDispatcherDemo.java index 412a51e02..5bf1f5c4e 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/ReliableJobDispatcherDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/ReliableJobDispatcherDemo.java @@ -10,13 +10,11 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.nosql.redis.scheduler.JobDispatcher; import org.springside.modules.nosql.redis.scheduler.JobStatistics; -import redis.clients.jedis.JedisPool; - /** * 运行JobDispatcher,每秒将Job从"job:ss:scheduled" sorted set 发布到"job:ss:ready" list. * 如果有任务已被领取而长期没有被执行,会从"job:ss:locked" sorted set取回并重新发布到"job:ss:ready" list. @@ -31,8 +29,7 @@ public class ReliableJobDispatcherDemo { public static void main(String[] args) throws Exception { - JedisPool pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, 1); + JedisPool pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(1).buildPool(); try { JobDispatcher dispatcher = new JobDispatcher("ss", pool); dispatcher.setReliable(true); diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/SimpleJobDispatcherDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/SimpleJobDispatcherDemo.java index 994791f03..ebb09f7a5 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/SimpleJobDispatcherDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/dispatcher/SimpleJobDispatcherDemo.java @@ -10,13 +10,11 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.nosql.redis.scheduler.JobDispatcher; import org.springside.modules.nosql.redis.scheduler.JobStatistics; -import redis.clients.jedis.JedisPool; - /** * 运行JobDispatcher,每秒将Job从"job:ss:scheduled" sorted set 发布到"job:ss:ready" list. * @@ -31,8 +29,7 @@ public class SimpleJobDispatcherDemo { public static void main(String[] args) throws Exception { - JedisPool pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, 1); + JedisPool pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(1).buildPool(); try { JobDispatcher dispatcher = new JobDispatcher("ss", pool); JobStatistics statistics = new JobStatistics("ss", pool); diff --git a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/producer/JobProducerDemo.java b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/producer/JobProducerDemo.java index 69b65cded..048b98be4 100644 --- a/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/producer/JobProducerDemo.java +++ b/examples/showcase/src/main/java/org/springside/examples/showcase/demos/redis/job/producer/JobProducerDemo.java @@ -8,15 +8,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.springside.examples.showcase.demos.redis.JedisPoolFactory; import org.springside.examples.showcase.demos.redis.job.dispatcher.SimpleJobDispatcherDemo; -import org.springside.modules.nosql.redis.JedisUtils; +import org.springside.modules.nosql.redis.pool.JedisPool; +import org.springside.modules.nosql.redis.pool.JedisPoolBuilder; import org.springside.modules.nosql.redis.scheduler.JobProducer; import org.springside.modules.test.benchmark.BenchmarkTask; import org.springside.modules.test.benchmark.ConcurrentBenchmark; -import redis.clients.jedis.JedisPool; - /** * 运行JobProducer产生新的Job。 * @@ -49,8 +47,7 @@ public JobProducerDemo() { @Override protected void setUp() { - pool = JedisPoolFactory.createJedisPool(JedisUtils.DEFAULT_HOST, JedisUtils.DEFAULT_PORT, - JedisUtils.DEFAULT_TIMEOUT, threadCount); + pool = new JedisPoolBuilder().setDirectHostAndPort("localhost", "6379").setPoolSize(threadCount).buildPool(); jobProducer = new JobProducer("ss", pool); } diff --git a/modules/parent/pom.xml b/modules/parent/pom.xml index e5e88005f..6eaa92aef 100644 --- a/modules/parent/pom.xml +++ b/modules/parent/pom.xml @@ -99,6 +99,11 @@ springside-metrics ${springside.version} + + org.springside + springside-redis + ${springside.version} + diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java index 9df299c63..8759959d0 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java @@ -80,7 +80,7 @@ protected void closeResource(Jedis jedis, boolean connectionBroken) { } } catch (Exception e) { logger.error("Error happen when return jedis to pool, try to close it directly.", e); - JedisUtils.closeJedis(jedis); + JedisUtils.destroyJedis(jedis); } } } diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java index a79098b6e..cf639c9f2 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java @@ -5,53 +5,28 @@ *******************************************************************************/ package org.springside.modules.nosql.redis; +import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; +import org.springside.modules.nosql.redis.pool.JedisPool; + import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPoolConfig; -import redis.clients.jedis.Protocol; +import redis.clients.jedis.exceptions.JedisException; public class JedisUtils { - public static final String DEFAULT_HOST = "localhost"; - public static final int DEFAULT_PORT = Protocol.DEFAULT_PORT; - public static final int DEFAULT_TIMEOUT = Protocol.DEFAULT_TIMEOUT; private static final String OK_CODE = "OK"; private static final String OK_MULTI_CODE = "+OK"; /** - * 快速设置JedisPoolConfig, 不执行idle checking。 - */ - public static JedisPoolConfig createPoolConfig(int maxIdle, int maxTotal) { - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxIdle(maxIdle); - poolConfig.setMaxTotal(maxTotal); - poolConfig.setTimeBetweenEvictionRunsMillis(-1); - return poolConfig; - } - - /** - * 快速设置JedisPoolConfig, 设置执行idle checking的间隔和可被清除的idle时间. - * 默认的checkingIntervalSecs是30秒,可被清除时间是60秒。 - */ - public static JedisPoolConfig createPoolConfig(int maxIdle, int maxTotal, int checkingIntervalSecs, - int evictableIdleTimeSecs) { - JedisPoolConfig poolConfig = createPoolConfig(maxIdle, maxTotal); - - poolConfig.setTimeBetweenEvictionRunsMillis(checkingIntervalSecs * 1000); - poolConfig.setMinEvictableIdleTimeMillis(evictableIdleTimeSecs * 1000); - return poolConfig; - } - - /** - * 判断 是 OK 或 +OK. + * 判断 返回值是否ok. */ public static boolean isStatusOk(String status) { return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status)); } /** - * 退出然后关闭Jedis连接。如果Jedis为null则无动作。 + * 在Pool以外强行销毁Jedis. */ - public static void closeJedis(Jedis jedis) { + public static void destroyJedis(Jedis jedis) { if ((jedis != null) && jedis.isConnected()) { try { try { @@ -63,4 +38,22 @@ public static void closeJedis(Jedis jedis) { } } } + + /** + * Ping the jedis instance, return true is the result is PONG. + */ + public static boolean ping(JedisPool pool) { + JedisTemplate template = new JedisTemplate(pool); + try { + String result = template.execute(new JedisAction() { + @Override + public String action(Jedis jedis) { + return jedis.ping(); + } + }); + return (result != null) && result.equals("PONG"); + } catch (JedisException e) { + return false; + } + } } diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java index f4d0b9963..74eb63411 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java @@ -9,60 +9,25 @@ public class ConnectionInfo { public static final String DEFAULT_PASSWORD = null; - private String host; - private int port; - private int timeout; - private String password; - private int database; - private String clientName; + private int database = Protocol.DEFAULT_DATABASE; + private String password = DEFAULT_PASSWORD; + private int timeout = Protocol.DEFAULT_TIMEOUT; - public ConnectionInfo(String host) { - this(host, Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, DEFAULT_PASSWORD, Protocol.DEFAULT_DATABASE, null); + public ConnectionInfo() { } - public ConnectionInfo(String host, int port) { - this(host, port, Protocol.DEFAULT_TIMEOUT, DEFAULT_PASSWORD, Protocol.DEFAULT_DATABASE, null); - } - - public ConnectionInfo(String host, int port, int timeout) { - this(host, port, timeout, DEFAULT_PASSWORD, Protocol.DEFAULT_DATABASE, null); - } - - public ConnectionInfo(String host, int port, int timeout, String password, int database, String clientName) { - this.host = host; - this.port = port; + public ConnectionInfo(int database, String password, int timeout) { this.timeout = timeout; this.password = password; this.database = database; - this.clientName = clientName; } - public String getHostAndPort() { - return new StringBuilder().append(host).append(":").append(port).toString(); - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public int getTimeout() { - return timeout; + public int getDatabase() { + return database; } - public void setTimeout(int timeout) { - this.timeout = timeout; + public void setDatabase(int database) { + this.database = database; } public String getPassword() { @@ -73,25 +38,16 @@ public void setPassword(String password) { this.password = password; } - public int getDatabase() { - return database; - } - - public void setDatabase(int database) { - this.database = database; - } - - public String getClientName() { - return clientName; + public int getTimeout() { + return timeout; } - public void setClientName(String clientName) { - this.clientName = clientName; + public void setTimeout(int timeout) { + this.timeout = timeout; } @Override public String toString() { - return "ConnectionInfo [host=" + host + ", port=" + port + ", timeout=" + timeout + ", password=" + password - + ", database=" + database + ", clientName=" + clientName + "]"; + return "ConnectionInfo [database=" + database + ", password=" + password + ", timeout=" + timeout + "]"; } } diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java index 5ac8934be..259241004 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java @@ -2,29 +2,70 @@ import org.apache.commons.pool2.impl.GenericObjectPool; +import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPoolConfig; import redis.clients.util.Pool; /** - * Pool which connect to redis instance directly. + * Jedis Pool base class. */ public class JedisPool extends Pool { - private String hostAndPort; + protected HostAndPort address; - public JedisPool(ConnectionInfo connectionInfo, JedisPoolConfig config) { - this.hostAndPort = connectionInfo.getHostAndPort(); + protected ConnectionInfo connectionInfo; - JedisFactory factory = new JedisFactory(connectionInfo.getHost(), connectionInfo.getPort(), - connectionInfo.getTimeout(), connectionInfo.getPassword(), connectionInfo.getDatabase(), - connectionInfo.getClientName()); + /** + * Create a JedisPoolConfig with new maxPoolSize becasuse JedisPoolConfig's default maxPoolSize is only 8. + * Also reset the idle checking time to 10 minutes, the default value is half minute. + * Also rest the max idle to zero, the default value is 8 too. + * The default idle time is 60 seconds. + */ + public static JedisPoolConfig createPoolConfig(int maxPoolSize) { + JedisPoolConfig config = new JedisPoolConfig(); + config.setMaxTotal(maxPoolSize); + config.setMaxIdle(maxPoolSize); + + config.setTimeBetweenEvictionRunsMillis(600 * 1000); + + return config; + } + + protected JedisPool() { + + } + + public JedisPool(HostAndPort address, JedisPoolConfig config) { + initInternalPool(address, new ConnectionInfo(), config); + } + + public JedisPool(HostAndPort address, ConnectionInfo connectionInfo, JedisPoolConfig config) { + initInternalPool(address, connectionInfo, config); + } + + /** + * Initialize the internal pool with connection info and pool config. + */ + protected void initInternalPool(HostAndPort address, ConnectionInfo connectionInfo, JedisPoolConfig config) { + this.address = address; + this.connectionInfo = connectionInfo; + JedisFactory factory = new JedisFactory(address.getHost(), address.getPort(), connectionInfo.getTimeout(), + connectionInfo.getPassword(), connectionInfo.getDatabase()); internalPool = new GenericObjectPool(factory, config); } /** - * Return an available jedis connection back to pool. + * Return a broken jedis connection back to pool. + */ + @Override + public void returnBrokenResource(final Jedis resource) { + returnBrokenResourceObject(resource); + } + + /** + * Return a available jedis connection back to pool. */ @Override public void returnResource(final Jedis resource) { @@ -32,7 +73,11 @@ public void returnResource(final Jedis resource) { returnResourceObject(resource); } - public String getHostAndPort() { - return hostAndPort; + public HostAndPort getAddress() { + return address; + } + + public ConnectionInfo getConnectionInfo() { + return connectionInfo; } } diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPoolBuilder.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPoolBuilder.java new file mode 100644 index 000000000..791ecd422 --- /dev/null +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPoolBuilder.java @@ -0,0 +1,175 @@ +package org.springside.modules.nosql.redis.pool; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.Protocol; + +/** + * Build JedisPool smartly。 + * Depends on masterName whether prefix with "direct:", it will build JedisSentinelPool or JedisDirectPool. + */ +public class JedisPoolBuilder { + + public static final String DIRECT_POOL_PREFIX = "direct:"; + + private static Logger logger = LoggerFactory.getLogger(JedisPoolBuilder.class); + + private String[] sentinelHosts; + private int sentinelPort = Protocol.DEFAULT_SENTINEL_PORT; + + private String masterName; + private String[] shardedMasterNames; + + private int poolSize = -1; + + private int database = Protocol.DEFAULT_DATABASE; + private String password = ConnectionInfo.DEFAULT_PASSWORD; + private int timeout = Protocol.DEFAULT_TIMEOUT; + + public JedisPoolBuilder setHosts(String[] hosts) { + this.sentinelHosts = hosts; + return this; + } + + public JedisPoolBuilder setHosts(String hosts) { + if (hosts != null) { + this.sentinelHosts = hosts.split(","); + } + return this; + } + + public JedisPoolBuilder setPort(int port) { + this.sentinelPort = port; + return this; + } + + public JedisPoolBuilder setMasterName(String masterName) { + this.masterName = masterName; + return this; + } + + public JedisPoolBuilder setShardedMasterNames(String[] shardedMasterNames) { + this.shardedMasterNames = shardedMasterNames; + return this; + } + + public JedisPoolBuilder setShardedMasterNames(String shardedMasterNames) { + if (shardedMasterNames != null) { + this.shardedMasterNames = shardedMasterNames.split(","); + } + return this; + } + + public JedisPoolBuilder setDirectHostAndPort(String host, String port) { + this.masterName = host + ":" + port; + return this; + } + + public JedisPoolBuilder setPoolSize(int poolSize) { + this.poolSize = poolSize; + return this; + } + + public JedisPoolBuilder setDatabase(int database) { + this.database = database; + return this; + } + + public JedisPoolBuilder setPassword(String password) { + this.password = password; + return this; + } + + public JedisPoolBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public JedisPool buildPool() { + + if ((masterName == null) || "".equals(masterName)) { + throw new IllegalArgumentException("masterName is null or empty"); + } + + if (poolSize < 1) { + throw new IllegalArgumentException("poolSize is less then one"); + } + + JedisPoolConfig config = JedisPool.createPoolConfig(poolSize); + ConnectionInfo connectionInfo = new ConnectionInfo(database, password, timeout); + + if (isDirect(masterName)) { + return buildDirectPool(masterName, connectionInfo, config); + } else { + if ((sentinelHosts == null) || (sentinelHosts.length == 0)) { + throw new IllegalArgumentException("sentinelHosts is null or empty"); + } + return buildSentinelPool(masterName, connectionInfo, config); + } + } + + public List buildShardedPools() { + + if ((shardedMasterNames == null) || (shardedMasterNames.length == 0) || "".equals(shardedMasterNames[0])) { + throw new IllegalArgumentException("shardedMasterNames is null or empty"); + } + + if (poolSize < 1) { + throw new IllegalArgumentException("poolSize is less then one"); + } + + JedisPoolConfig config = JedisPool.createPoolConfig(poolSize); + ConnectionInfo connectionInfo = new ConnectionInfo(database, password, timeout); + + List jedisPools = new ArrayList(); + + if (isDirect(shardedMasterNames[0])) { + for (String theMasterName : shardedMasterNames) { + jedisPools.add(buildDirectPool(theMasterName, connectionInfo, config)); + } + } else { + + if ((sentinelHosts == null) || (sentinelHosts.length == 0)) { + throw new IllegalArgumentException("sentinelHosts is null or empty"); + } + + for (String theMasterName : shardedMasterNames) { + jedisPools.add(buildSentinelPool(theMasterName, connectionInfo, config)); + } + } + return jedisPools; + } + + private JedisPool buildDirectPool(String directMasterName, ConnectionInfo connectionInfo, JedisPoolConfig config) { + String hostPortStr = directMasterName.substring(directMasterName.indexOf(":") + 1, directMasterName.length()); + String[] hostPort = hostPortStr.split(":"); + + logger.info("Building JedisDirectPool, on redis server " + hostPort[0] + " ,sentinelPort is " + hostPort[1]); + + HostAndPort masterAddress = new HostAndPort(hostPort[0], Integer.parseInt(hostPort[1])); + return new JedisPool(masterAddress, config); + } + + private JedisPool buildSentinelPool(String sentinelMasterName, ConnectionInfo connectionInfo, JedisPoolConfig config) { + logger.info("Building JedisSentinelPool, on sentinel sentinelHosts:" + Arrays.toString(sentinelHosts) + + " ,sentinelPort is " + sentinelPort + " ,masterName is " + sentinelMasterName); + + HostAndPort[] sentinelAddress = new HostAndPort[sentinelHosts.length]; + for (int i = 0; i < sentinelHosts.length; i++) { + sentinelAddress[i] = new HostAndPort(sentinelHosts[i], sentinelPort); + } + + return new JedisSentinelPool(sentinelAddress, sentinelMasterName, connectionInfo, config); + } + + private static boolean isDirect(String masterName) { + return masterName.startsWith(DIRECT_POOL_PREFIX); + } +} diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java index 696426f4f..07c599837 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java @@ -1,79 +1,104 @@ package org.springside.modules.nosql.redis.pool; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.pool2.impl.GenericObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springside.modules.nosql.redis.JedisTemplate; +import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; import org.springside.modules.nosql.redis.JedisUtils; +import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPubSub; import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisException; -import redis.clients.util.Pool; /** - * Pool which to get redis master address from sentinel instances. + * Pool which to get redis master address get from sentinel instances. */ -public class JedisSentinelPool extends Pool { +public final class JedisSentinelPool extends JedisPool { - private static final String UNAVAILABLE_MASTER_ADDRESS = "All sentinel down"; + private static final String NO_ADDRESS_YET = "I dont know because no sentinel up"; private static Logger logger = LoggerFactory.getLogger(JedisSentinelPool.class); - private String masterName; - private ConnectionInfo[] sentinelInfos; - private ConnectionInfo masterAddtionalInfo; - private JedisPoolConfig masterPoolConfig; - + private List sentinelPools = new ArrayList(); private MasterSwitchListener masterSwitchListener; - /** - * see {@link #JedisSentinelPool(ConnectionInfo[], String, ConnectionInfo, JedisPoolConfig)} - */ - public JedisSentinelPool(ConnectionInfo sentinelInfo, String masterName, ConnectionInfo masterAddtionalInfo, - JedisPoolConfig masterPoolConfig) { - this(new ConnectionInfo[] { sentinelInfo }, masterName, masterAddtionalInfo, masterPoolConfig); - } + private String masterName; + private JedisPoolConfig masterPoolConfig; + private ConnectionInfo masterConnectionInfo; + private CountDownLatch poolInitLock = new CountDownLatch(1); /** * Creates a new instance of JedisSentinelPool. * - * @param sentinelInfos Array of connection information to sentinel instances. - * @param masterName One sentinel can monitor several redis master-slave pair, use master name to identify it. - * @param masterAddtionalInfo The master host and port would dynamic get from sentinel, and the other information - * like password, timeout store in it. - * @param masterPoolConfig Configuration of redis pool. + * All parameters can be null or empty. + * + * @param sentinelAddresses Array of HostAndPort to sentinel instances. + * @param masterName One sentinel can monitor several redis master-slave pair, use master name to identify them. + * @param masterConnectionInfo The the other information like password,timeout. + * @param masterPoolConfig Config of redis pool. + * */ - public JedisSentinelPool(ConnectionInfo[] sentinelInfos, String masterName, ConnectionInfo masterAddtionalInfo, + public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, ConnectionInfo masterConnectionInfo, JedisPoolConfig masterPoolConfig) { - // check and assign parameter, all parameter can't not be null or empty - assertArgument(((sentinelInfos != null) && (sentinelInfos.length != 0)), "seintinelInfos is not set"); - this.sentinelInfos = sentinelInfos; + // sentinelAddresses + assertArgument(((sentinelAddresses == null) || (sentinelAddresses.length == 0)), "seintinelInfos is not set"); - assertArgument(masterAddtionalInfo != null, "masterAddtionalInfo is not set"); - this.masterAddtionalInfo = masterAddtionalInfo; + for (HostAndPort sentinelInfo : sentinelAddresses) { + JedisPool sentinelPool = new JedisPool(sentinelInfo, new JedisPoolConfig()); + sentinelPools.add(sentinelPool); + } + + // masterConnectionInfo + assertArgument(masterConnectionInfo == null, "masterConnectionInfo is not set"); + this.masterConnectionInfo = masterConnectionInfo; - assertArgument(((masterName != null) && !masterName.isEmpty()), "masterName is not set"); + // masterName + assertArgument(((masterName == null) || masterName.isEmpty()), "masterName is not set"); this.masterName = masterName; - assertArgument(masterPoolConfig != null, "masterPoolConfig is not set"); + // poolConfig + assertArgument(masterPoolConfig == null, "masterPoolConfig is not set"); this.masterPoolConfig = masterPoolConfig; // Start MasterSwitchListener thread ,internal poll will be start in the thread masterSwitchListener = new MasterSwitchListener(); masterSwitchListener.start(); + try { + boolean result = poolInitLock.await(10, TimeUnit.SECONDS); + if (!result) { + logger.warn("jedis pool can't not init after 10 seconds"); + } + } catch (InterruptedException e) { + return; + } + } + + public JedisSentinelPool(HostAndPort[] sentinelAddresses, String masterName, JedisPoolConfig masterPoolConfig) { + this(sentinelAddresses, masterName, new ConnectionInfo(), masterPoolConfig); } @Override public void destroy() { + // shutdown the listener thread masterSwitchListener.shutdown(); - destroyInternalPool(); + // destroy sentinel pools + for (JedisPool sentinel : sentinelPools) { + sentinel.destroy(); + } + + // destroy redis pool + destroyInternelPool(); + // wait for the masterSwitchListener thread finish try { logger.info("Waiting for MasterSwitchListener thread finish"); masterSwitchListener.join(); @@ -83,199 +108,163 @@ public void destroy() { } } - @Override - public void returnResource(final Jedis resource) { - resource.resetState(); - returnResourceObject(resource); - } - - private void initInternalPool(ConnectionInfo masterConnectionInfo) { - JedisFactory factory = new JedisFactory(masterConnectionInfo.getHost(), masterConnectionInfo.getPort(), - masterConnectionInfo.getTimeout(), masterConnectionInfo.getPassword(), - masterConnectionInfo.getDatabase(), masterConnectionInfo.getClientName()); - - internalPool = new GenericObjectPool(factory, masterPoolConfig); - } - - private void destroyInternalPool() { + protected void destroyInternelPool() { super.destroy(); + address = null; + connectionInfo = null; + internalPool = null; } /** - * Assert the argument, throw IllegalArgumentException if the expression is false. + * Assert the argurment, throw IllegalArgumentException if the expression is true. */ - private static void assertArgument(boolean expression, String message) throws IllegalArgumentException { - if (!expression) { + private static void assertArgument(boolean expression, String message) { + if (expression) { throw new IllegalArgumentException(message); } } - // method for test - MasterSwitchListener getMasterSwitchListener() { + // for test + public MasterSwitchListener getMasterSwitchListener() { return masterSwitchListener; } /** * Listener thread to listen master switch message from sentinel. */ - class MasterSwitchListener extends Thread { - public static final String THREAD_NAME_PREFIX = "RedisMasterSwitchListener-"; - public static final int RETRY_WAIT_TIME_MILLS = 1000; + public class MasterSwitchListener extends Thread { + public static final String THREAD_NAME_PREFIX = "MasterSwitchListener-"; - private ConnectionInfo sentinelInfo; - private Jedis sentinelJedis; private JedisPubSub subscriber; - + private JedisPool sentinelPool; + private Jedis sentinelJedis; private AtomicBoolean running = new AtomicBoolean(true); - - private ConnectionInfo previousMasterConnectionInfo; + private HostAndPort previousMasterAddress; public MasterSwitchListener() { super(THREAD_NAME_PREFIX + masterName); } + // stop the blocking subscription and interrupt the thread + public void shutdown() { + // interrupt the thread + running.getAndSet(false); + this.interrupt(); + + // stop the blocking subscription + try { + if (subscriber != null) { + subscriber.unsubscribe(); + } + } finally { + JedisUtils.destroyJedis(sentinelJedis); + } + } + @Override public void run() { while (running.get()) { try { - boolean avalibleSentinelExist = selectSentinel(); - if (avalibleSentinelExist) { - try { - ConnectionInfo masterConnectionInfo = queryMasterAddress(); - if ((internalPool != null) && isMasterAddressChanged(masterConnectionInfo)) { - logger.info("The internalPool {} had changed, destroy it now.", - previousMasterConnectionInfo.getHostAndPort()); - destroyInternalPool(); - } - - if ((internalPool == null) || isMasterAddressChanged(masterConnectionInfo)) { - logger.info("The internalPool {} is not init or the address had changed, init it now.", - masterConnectionInfo.getHostAndPort()); - initInternalPool(masterConnectionInfo); - } - - previousMasterConnectionInfo = masterConnectionInfo; - - // blocking listen master switch message until exception happen. - subscriber = new MasterSwitchSubscriber(); - sentinelJedis.subscribe(subscriber, "+switch-master", "+redirect-to-master"); - } catch (JedisConnectionException e) { - JedisUtils.closeJedis(sentinelJedis); - - if (running.get()) { - logger.error("Lost connection with Sentinel " + sentinelInfo.getHostAndPort() - + ", sleep 1000ms and try to connect another one."); - sleep(RETRY_WAIT_TIME_MILLS); - } - } catch (Exception e) { - JedisUtils.closeJedis(sentinelJedis); - - if (running.get()) { - logger.error( - "Unexpected Exception happen, current Sentinel is" - + sentinelInfo.getHostAndPort() + ", sleep 1000ms and try again.", e); - sleep(RETRY_WAIT_TIME_MILLS); - } + sentinelPool = pickupSentinel(); + + if (sentinelPool != null) { + + HostAndPort masterAddress = queryMasterAddress(); + + if ((internalPool != null) && isAddressChange(masterAddress)) { + logger.info("The internalPool {} had changed, destroy it now.", previousMasterAddress); + destroyInternelPool(); + } + + if ((internalPool == null) || isAddressChange(masterAddress)) { + logger.info("The internalPool {} is not init or the address had changed, init it now.", + masterAddress); + initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig); + poolInitLock.countDown(); } + + previousMasterAddress = masterAddress; + + sentinelJedis = sentinelPool.getResource(); + subscriber = new MasterSwitchSubscriber(); + sentinelJedis.subscribe(subscriber, "+switch-master", "+redirect-to-master"); } else { - logger.info("All sentinels down, sleep 1000ms and try to select again."); - // when the system startup but the sentinels not yet, init an ugly address to prevent null point - // exception. + logger.info("All sentinels down, sleep 2 seconds and try to connect again."); + // When the system startup but the sentinels not yet, init a urgly address to prevent null point + // exception. change the logic later. if (internalPool == null) { - ConnectionInfo masterConnectionInfo = new ConnectionInfo(UNAVAILABLE_MASTER_ADDRESS); - initInternalPool(masterConnectionInfo); - previousMasterConnectionInfo = masterConnectionInfo; + HostAndPort masterAddress = new HostAndPort(NO_ADDRESS_YET, 6379); + initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig); + previousMasterAddress = masterAddress; } - sleep(RETRY_WAIT_TIME_MILLS); + sleep(2000); + } + } catch (JedisConnectionException e) { + + if (sentinelJedis != null) { + sentinelPool.returnBrokenResource(sentinelJedis); + } + + if (running.get()) { + logger.error("Lost connection with Sentinel " + sentinelPool.getAddress() + + ", sleep 1 seconds and try to connect other one. "); + sleep(1000); } } catch (Exception e) { - logger.error("Unexpected exception happen", e); - sleep(RETRY_WAIT_TIME_MILLS); + logger.error(e.getMessage(), e); + sleep(1000); } } } - /** - * Interrupt the thread and stop the blocking subscription. - */ - private void shutdown() { - running.getAndSet(false); - this.interrupt(); - - try { - if (subscriber != null) { - subscriber.unsubscribe(); - } - } finally { - JedisUtils.closeJedis(sentinelJedis); - } + public HostAndPort getCurrentMasterAddress() { + return previousMasterAddress; } /** - * Pickup the first available sentinel, if all sentinel down, return false. + * Pickup the first available sentinel, if all sentinel down, return null. */ - private boolean selectSentinel() { - for (ConnectionInfo info : sentinelInfos) { - if (ping(info)) { - sentinelInfo = info; - sentinelJedis = new Jedis(sentinelInfo.getHost(), sentinelInfo.getPort()); - return true; + private JedisPool pickupSentinel() { + for (JedisPool pool : sentinelPools) { + if (JedisUtils.ping(pool)) { + return pool; } } - return false; + return null; } - /** - * Ping the jedis instance, return true if the result is PONG. - */ - private boolean ping(ConnectionInfo connectionInfo) { - Jedis jedis = new Jedis(connectionInfo.getHost(), connectionInfo.getPort()); - try { - String result = jedis.ping(); - return (result != null) && result.equals("PONG"); - } catch (JedisException e) { - return false; - } finally { - JedisUtils.closeJedis(jedis); + private boolean isAddressChange(HostAndPort currentMasterAddress) { + if (previousMasterAddress == null) { + return true; } + + return !previousMasterAddress.equals(currentMasterAddress); } /** * Query master address from sentinel. */ - private ConnectionInfo queryMasterAddress() { - - List address = sentinelJedis.sentinelGetMasterAddrByName(masterName); + private HostAndPort queryMasterAddress() { + JedisTemplate sentinelTemplate = new JedisTemplate(sentinelPool); + List address = sentinelTemplate.execute(new JedisAction>() { + @Override + public List action(Jedis jedis) { + return jedis.sentinelGetMasterAddrByName(masterName); + } + }); if ((address == null) || address.isEmpty()) { throw new IllegalArgumentException("Master name " + masterName + " is not in sentinel.conf"); } - return buildMasterConnectionInfo(address.get(0), address.get(1)); - } - - /** - * Combine the host & port with the masterAddtionalInfo which store the other properties. - */ - private ConnectionInfo buildMasterConnectionInfo(String host, String port) { - return new ConnectionInfo(host, Integer.valueOf(port), masterAddtionalInfo.getTimeout(), - masterAddtionalInfo.getPassword(), masterAddtionalInfo.getDatabase(), - masterAddtionalInfo.getClientName()); - } - - private boolean isMasterAddressChanged(ConnectionInfo currentMasterConnectionInfo) { - if (previousMasterConnectionInfo == null) { - return true; - } - - return !previousMasterConnectionInfo.getHostAndPort().equals(currentMasterConnectionInfo.getHostAndPort()); + return new HostAndPort(address.get(0), Integer.valueOf(address.get(1))); } private void sleep(int millseconds) { try { Thread.sleep(millseconds); - } catch (InterruptedException e) { + } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } } @@ -287,20 +276,20 @@ private class MasterSwitchSubscriber extends JedisPubSub { @Override public void onMessage(String channel, String message) { // message example: +switch-master: mymaster 127.0.0.1 6379 127.0.0.1 6380 - // +redirect-to-master mymaster 127.0.0.1 6380 127.0.0.1 6381 (if slave-master fail-over quick enough) - logger.info("Sentinel " + sentinelInfo.getHostAndPort() + " published: " + message); + // +redirect-to-master default 127.0.0.1 6380 127.0.0.1 6381 (if slave-master fail-over quick enough) + logger.info("Sentinel " + sentinelPool.getAddress() + " published: " + message); String[] switchMasterMsg = message.split(" "); - // if the switeched master name equals my master name, destroy the old pool and init a new pool + // if the master name equals my master name, destroy the old pool and init a new pool if (masterName.equals(switchMasterMsg[0])) { - destroyInternalPool(); + destroyInternelPool(); - ConnectionInfo masterConnectionInfo = buildMasterConnectionInfo(switchMasterMsg[3], - switchMasterMsg[4]); - logger.info("Switch master to " + masterConnectionInfo.getHostAndPort()); + HostAndPort masterAddress = new HostAndPort(switchMasterMsg[3], + Integer.parseInt(switchMasterMsg[4])); + logger.info("Switch master to " + masterAddress); - initInternalPool(masterConnectionInfo); + initInternalPool(masterAddress, masterConnectionInfo, masterPoolConfig); - previousMasterConnectionInfo = masterConnectionInfo; + previousMasterAddress = masterAddress; } } diff --git a/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java index ed7b78dcd..04d2bc64b 100644 --- a/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java @@ -9,9 +9,9 @@ import org.springside.modules.nosql.redis.JedisScriptExecutor; import org.springside.modules.nosql.redis.JedisTemplate; +import org.springside.modules.nosql.redis.pool.JedisPool; import org.springside.modules.utils.Threads; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.exceptions.JedisConnectionException; import com.google.common.collect.Lists;