Skip to content

Commit

Permalink
Merge pull request #1 from dangdangdotcom/master
Browse files Browse the repository at this point in the history
1
  • Loading branch information
TonnyFeng committed Jun 1, 2016
2 parents 85b9ab1 + 8ee2f9d commit 45a3335
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public JValidator(URL url) {
}

public void validate(String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Exception {
String methodClassName = clazz.getName() + "_" + toUpperMethoName(methodName);
String methodClassName = clazz.getName() + "$" + toUpperMethoName(methodName);
Class<?> methodClass = null;
try {
methodClass = Class.forName(methodClassName, false, Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.dubbo.common.utils.StringUtils;
import org.apache.commons.pool.impl.GenericObjectPool;

import redis.clients.jedis.Jedis;
Expand All @@ -48,10 +49,11 @@
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.dubbo.rpc.RpcException;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
* RedisRegistry
*
*
* @author william.liangf
*/
public class RedisRegistry extends FailbackRegistry {
Expand All @@ -65,26 +67,26 @@ public class RedisRegistry extends FailbackRegistry {
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));

private final ScheduledFuture<?> expireFuture;

private final String root;

private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();

private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();

private final int reconnectPeriod;

private final int expirePeriod;

private volatile boolean admin = false;

private boolean replicate;

public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
throw new IllegalStateException("registry address == null");
}
GenericObjectPool.Config config = new GenericObjectPool.Config();
config.testOnBorrow = url.getParameter("test.on.borrow", true);
config.testOnReturn = url.getParameter("test.on.return", false);
Expand All @@ -103,19 +105,22 @@ public RedisRegistry(URL url) {
config.timeBetweenEvictionRunsMillis = url.getParameter("time.between.eviction.runs.millis", 0);
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.minEvictableIdleTimeMillis = url.getParameter("min.evictable.idle.time.millis", 0);

String cluster = url.getParameter("cluster", "failover");
if (! "failover".equals(cluster) && ! "replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);

List<String> addresses = new ArrayList<String>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
addresses.addAll(Arrays.asList(backups));
}

// 增加Redis密码支持
String password = url.getPassword();
for (String address : addresses) {
int i = address.indexOf(':');
String host;
Expand All @@ -127,10 +132,16 @@ public RedisRegistry(URL url) {
host = address;
port = DEFAULT_REDIS_PORT;
}
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));
if (StringUtils.isEmpty(password)) {
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));
} else {
// 使用密码连接。 此处要求备用redis与主要redis使用相同的密码
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), password));
}
}

this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
Expand All @@ -140,7 +151,7 @@ public RedisRegistry(URL url) {
group = group + Constants.PATH_SEPARATOR;
}
this.root = group;

this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
Expand All @@ -152,10 +163,11 @@ public void run() {
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

private void deferExpired() {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
boolean isBroken = false;
try {
Jedis jedis = jedisPool.getResource();
try {
Expand All @@ -170,18 +182,24 @@ private void deferExpired() {
if (admin) {
clean(jedis);
}
if (! replicate) {
break;//  如果服务器端已同步数据,只需写入单台机器
if (!replicate) {
break;// 如果服务器端已同步数据,只需写入单台机器
}
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}

// 监控中心负责删除过期脏数据
private void clean(Jedis jedis) {
Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
Expand All @@ -202,7 +220,7 @@ private void clean(Jedis jedis) {
logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
}
}
}
}
}
if (delete) {
jedis.publish(key, Constants.UNREGISTER);
Expand All @@ -214,16 +232,20 @@ private void clean(Jedis jedis) {

public boolean isAvailable() {
for (JedisPool jedisPool : jedisPools.values()) {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
Jedis jedis = jedisPool.getResource();
try {
if (jedis.isConnected()) {
return true; // 至少需单台机器可用
}
} finally {
if (jedis.isConnected()) {
return true; // 至少需单台机器可用
}
} catch (JedisConnectionException e) {
isBroken = true;
} finally {
if (isBroken) {
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
}
}
return false;
Expand Down Expand Up @@ -265,15 +287,22 @@ public void doRegister(URL url) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
success = true;
if (! replicate) {
break; //  如果服务器端已同步数据,只需写入单台机器
break; // 如果服务器端已同步数据,只需写入单台机器
}
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
Expand All @@ -298,15 +327,22 @@ public void doUnregister(URL url) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
success = true;
if (! replicate) {
break; //  如果服务器端已同步数据,只需写入单台机器
break; // 如果服务器端已同步数据,只需写入单台机器
}
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
Expand All @@ -320,7 +356,7 @@ public void doUnregister(URL url) {
}
}
}

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
Expand All @@ -339,6 +375,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
Expand All @@ -363,8 +400,14 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
}
success = true;
break; // 只需读一个服务器的数据
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch(Throwable t) { // 尝试下一个服务器
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
Expand Down Expand Up @@ -470,7 +513,7 @@ private String toCategoryPath(URL url) {
}

private class NotifySub extends JedisPubSub {

private final JedisPool jedisPool;

public NotifySub(JedisPool jedisPool) {
Expand All @@ -482,14 +525,21 @@ public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
if (msg.equals(Constants.REGISTER)
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
boolean isBroken = false;
try {
doNotify(jedis, key);
} catch (JedisConnectionException e){
isBroken = true;
} finally {
jedisPool.returnResource(jedis);
if(isBroken){
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
} catch (Throwable t) { // TODO 通知失败没有恢复机制保障
logger.error(t.getMessage(), t);
Expand Down Expand Up @@ -527,23 +577,23 @@ private class Notifier extends Thread {
private volatile Jedis jedis;

private volatile boolean first = true;

private volatile boolean running = true;

private final AtomicInteger connectSkip = new AtomicInteger();

private final AtomicInteger connectSkiped = new AtomicInteger();

private final Random random = new Random();

private volatile int connectRandom;

private void resetSkip() {
connectSkip.set(0);
connectSkiped.set(0);
connectRandom = 0;
}

private boolean isSkip() {
int skip = connectSkip.get(); // 跳过次数增长
if (skip >= 10) { // 如果跳过次数增长超过10,取随机数
Expand All @@ -560,13 +610,13 @@ private boolean isSkip() {
connectRandom = 0;
return false;
}

public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}

@Override
public void run() {
while (running) {
Expand Down Expand Up @@ -618,7 +668,7 @@ public void run() {
}
}
}

public void shutdown() {
try {
running = false;
Expand All @@ -627,7 +677,7 @@ public void shutdown() {
logger.warn(t.getMessage(), t);
}
}

}

}
}
Loading

0 comments on commit 45a3335

Please sign in to comment.