Skip to content

Commit

Permalink
Add Consul synchronization to Nacos function for issue #17
Browse files Browse the repository at this point in the history
  • Loading branch information
paderlol committed Dec 31, 2018
1 parent f7320d2 commit 57e60d2
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 28 deletions.
8 changes: 8 additions & 0 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,13 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<!-- consul -->
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>1.3.1</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ public enum ClusterTypeEnum {

CS("CS", "configserver集群"),

NACOS("NACOS", "nacos集群"), EUREKA("EUREKA", "eureka集群"),
NACOS("NACOS", "nacos集群"),

EUREKA("EUREKA", "eureka集群"),

CONSUL("CONSUL", "consul集群"),

ZK("ZK", "zookeeper集群");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.alibaba.nacossync.extension.holder;

import com.ecwid.consul.v1.ConsulClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.net.URL;

/**
* @author paderlol
* @date: 2018-12-31 16:26
*/
@Service
@Slf4j
public class ConsulServerHolder extends AbstractServerHolder<ConsulClient> {

public static final String HTTP = "http://";

@Override
ConsulClient createServer(String serverAddress, String namespace) throws Exception {
serverAddress = serverAddress.startsWith(HTTP) ? serverAddress : HTTP + serverAddress;
URL url = new URL(serverAddress);
return new ConsulClient(url.getHost(), url.getPort());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.alibaba.nacossync.extension.impl;

import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.ConsulServerHolder;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.health.model.HealthService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.*;

/**
* Consul 同步 Nacos
*
* @author paderlol
* @date: 2018-12-31 16:25
*/
@Slf4j
@NacosSyncService(clusterType = ClusterTypeEnum.CONSUL)
public class ConsulSyncServiceImpl implements SyncService {

@Autowired
private ConsulServerHolder consulServerHolder;
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;

@Autowired
private NacosServerHolder nacosServerHolder;

@Override
public boolean delete(TaskDO taskDO) {

try {
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}

} catch (Exception e) {
log.error("delete task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
return false;
}
return true;
}

@Override
public boolean sync(TaskDO taskDO) {
try {
ConsulClient consulClient = consulServerHolder.get(taskDO.getSourceClusterId(), null);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
Response<List<HealthService>> response =
consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT);
List<HealthService> healthServiceList = response.getValue();
Set<String> instanceKeySet = new HashSet<>();
for (HealthService healthService : healthServiceList) {
if (needSync(healthService.getNode().getMeta())) {

destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(healthService, taskDO));
instanceKeySet.add(composeInstanceKey(healthService.getService().getAddress(),
healthService.getService().getPort()));
}
}
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeySet.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}

} catch (Exception e) {
log.error("sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
return false;
}
return true;
}

/**
* 判断当前实例数据是否源集群信息是一致的, 一致才会进行删除
*
* @param destMetaData
* @param taskDO
* @return
*/
private boolean needDelete(Map<String, String> destMetaData, TaskDO taskDO) {
return StringUtils.equals(destMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY),
taskDO.getSourceClusterId());
}

private Instance buildSyncInstance(HealthService instance, TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(instance.getService().getAddress());
temp.setPort(instance.getService().getPort());

Map<String, String> metaData = new HashMap<>(instance.getNode().getMeta());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
temp.setMetadata(metaData);
return temp;
}

/**
* 判断当前实例数据是否是其他地方同步过来的, 如果是则不进行同步操作
*
* @param sourceMetaData
* @return
*/
private boolean needSync(Map<String, String> sourceMetaData) {
return StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
}

private String composeInstanceKey(String ip, int port) {
return ip + ":" + port;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.EurekaServerHolder;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
Expand All @@ -18,8 +19,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -33,7 +32,7 @@
*/
@Slf4j
@NacosSyncService(clusterType = ClusterTypeEnum.EUREKA)
public class EurekaSyncServiceImpl implements com.alibaba.nacossync.extension.SyncService {
public class EurekaSyncServiceImpl implements SyncService {

@Autowired
private EurekaServerHolder eurekaServerHolder;
Expand Down Expand Up @@ -72,6 +71,9 @@ public boolean sync(TaskDO taskDO) {
if (Objects.requireNonNull(HttpStatus.resolve(eurekaHttpResponse.getStatusCode())).is2xxSuccessful()) {
Application application = eurekaHttpResponse.getEntity();
for (InstanceInfo instanceInfo : application.getInstances()) {
if (needSync(instanceInfo.getMetadata())) {
continue;
}
if (InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus())) {
destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(instanceInfo, taskDO));
Expand Down Expand Up @@ -119,7 +121,14 @@ private Instance buildSyncInstance(InstanceInfo instance, TaskDO taskDO) {
return temp;
}

public static void main(String[] args) throws MalformedURLException {
System.out.println(new URL("127.0.0.1:18033/eureka").getProtocol());
/**
* 判断当前实例数据是否是其他地方同步过来的, 如果是则不进行同步操作
*
* @param sourceMetaData
* @return
*/
private boolean needSync(Map<String, String> sourceMetaData) {
return StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.pojo.model.TaskDO;
Expand All @@ -24,7 +25,7 @@

@Slf4j
@NacosSyncService(clusterType = ClusterTypeEnum.NACOS)
public class NacosSyncServiceImpl implements com.alibaba.nacossync.extension.SyncService {
public class NacosSyncServiceImpl implements SyncService {
private Map<String, EventListener> nacosListenerMap = new ConcurrentHashMap<>();
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
Expand Down Expand Up @@ -111,10 +112,7 @@ private String composeInstanceKey(Instance instance) {
* @return
*/
private boolean needSync(Map<String, String> sourceMetaData) {
if (StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY))) {
return true;
}
return false;
return StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
package com.alibaba.nacossync.extension.impl;

import static com.alibaba.nacossync.util.DubboConstants.*;
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.CloseableUtils;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.CloseableUtils;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

import static com.alibaba.nacossync.util.DubboConstants.*;
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
import lombok.extern.slf4j.Slf4j;

/**
* @author paderlol
Expand All @@ -34,7 +37,7 @@
*/
@Slf4j
@NacosSyncService(clusterType = ClusterTypeEnum.ZK)
public class ZookeeperSyncServiceImpl implements com.alibaba.nacossync.extension.SyncService {
public class ZookeeperSyncServiceImpl implements SyncService {

/**
* 存放zk监听缓存 格式taskId -> PathChildrenCache实例
Expand Down Expand Up @@ -70,7 +73,7 @@ public boolean sync(TaskDO taskDO) {
String path = event.getData().getPath();
Map<String, String> queryParam = parseQueryString(path);

if (isMatch(taskDO, queryParam)) {
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
switch (event.getType()) {
Expand Down Expand Up @@ -229,4 +232,14 @@ private String getServiceNameFromCache(String taskId, Map<String, String> queryP
queryParam.get(INTERFACE_KEY), queryParam.get(VERSION_KEY), queryParam.get(GROUP_KEY)));
}

/**
* 判断当前实例数据是否是其他地方同步过来的, 如果是则不进行同步操作
*
* @param sourceMetaData
* @return
*/
private boolean needSync(Map<String, String> sourceMetaData) {
return StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
}

}

0 comments on commit 57e60d2

Please sign in to comment.